教程:Delta Lake

本教程介绍 Azure Databricks 上的常见 Delta Lake 操作,包括:

可以从附加到 Azure Databricks 计算资源(例如群集)的笔记本内部运行本文中的示例 Python、Scala 和 SQL 代码。 还可以从与 Databricks SQL 中的 SQL 仓库关联的查询中运行本文中的 SQL 代码。

准备源数据

本教程依赖于名为 People 10 M 的数据集。其中包含 1000 万条虚构记录,这些记录保存了有关人员的事实,例如名字和姓氏、出生日期和工资。 本教程假设此数据集位于与目标 Azure Databricks 工作区关联的 Unity Catalog 中。

若要获取本教程的 People 10 M 数据集,请执行以下操作:

  1. 转到 Kaggle 中的 People 10 M 页面。
  2. 单击“下载”,将名为 archive.zip 的文件下载到本地计算机。
  3. 提取 archive.zip 文件中名为 export.csv 的文件。 export.csv 文件包含本教程的数据。

若要将 export.csv 文件上传到卷,请执行以下操作:

  1. 在边栏上,单击“目录”
  2. 在“目录资源管理器”中,浏览并打开要将 export.csv 文件上传到的卷。
  3. 单击“上传到此卷”。
  4. 在本地计算机上拖放或者浏览并选择 export.csv 文件。
  5. 单击“上载” 。

在以下代码示例中,请将 /Volumes/main/default/my-volume/export.csv 替换为目标卷中 export.csv 文件的路径。

创建表

默认情况下,在 Azure Databricks 上创建的所有表都使用 Delta Lake。 Databricks 建议使用 Unity Catalog 托管表。

在前面的代码示例和以下代码示例中,请将表名 main.default.people_10m 替换为 Unity Catalog 中的目标三部分目录、架构和表名。

注意

Delta Lake 是 Azure Databricks 所有读取、写入和表创建命令的默认值。

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

上述操作将创建新的托管表。 有关创建 Delta 表时的可用选项的信息,请参阅 CREATE TABLE

在 Databricks Runtime 13.3 LTS 及更高版本中,可以使用 CREATE TABLE LIKE 创建一个新的空 Delta 表,该表会复制源 Delta 表的架构和表属性。 这在将表从开发环境提升到生产环境时特别有用,如以下代码示例所示:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

若要创建空表,还可以使用 Delta Lake 中适用于 PythonScalaDeltaTableBuilder API。 与等效的 DataFrameWriter API 相比,这些 API 可以更轻松地指定其他信息,例如列注释、表属性和生成的列

重要

此功能目前以公共预览版提供。

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

在表中更新插入

若要将一组更新和插入操作合并到现有的 Delta 表中,请使用适用于 PythonScalaDeltaTable.merge 方法,以及适用于 SQL 的 MERGE INTO 语句。 例如,以下示例从源表中获取数据并将其合并到目标 Delta 表中。 如果两个表中有一个匹配行,Delta Lake 会使用给定的表达式更新数据列。 如果没有匹配行,Delta Lake 会添加一个新行。 此操作称为“upsert”。

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

在 SQL 中,如果指定 *,则会在目标表中更新或插入所有列,并假设源表具有与目标表相同的列。 如果目标表没有相同的列,则查询将引发分析错误。

执行插入操作时必须为表中的每个列指定一个值(例如,当现有数据集中没有匹配行时,必须这样做)。 但是,你不需要更新所有值。

若要查看结果,请查询表。

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

读取表

可以按表名或表路径访问 Delta 表中的数据,如以下示例中所示:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

写入到表

Delta Lake 使用标准语法将数据写入表中。

若要以原子方式将新数据添加到现有 Delta 表,请使用追加模式,如以下示例中所示:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

若要替换表中的所有数据,请使用覆盖模式,如以下示例中所示:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

更新表

可以在 Delta 表中更新与谓词匹配的数据。 例如,在示例 people_10m 表中,若要将 gender 列中的缩写从 MF 更改为 MaleFemale,可运行以下命令:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

从表中删除

可以从 Delta 表中删除与谓词匹配的数据。 例如,在示例 people_10m 表中,若要删除与 1955 之前的 birthDate 列中具有某个值的人员对应的所有行,可运行以下命令:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

重要

删除操作会从最新版本的 Delta 表中删除数据,但是直到显式删除旧的版本后才能从物理存储中删除数据。 有关详细信息,请参阅清空

显示表历史记录

若要查看表的历史记录,请使用适用于 PythonScalaDeltaTable.history 方法以及适用于 SQL 中的 DESCRIBE HISTORY 语句,该语句提供对表进行的每次写入的出处信息,包括表版本、操作、用户等。

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

查询表的旧版本(按时间顺序查看)

Delta Lake 按时间顺序查看允许你查询 Delta 表的旧快照。

若要查询较早版本的表,请指定表的版本或时间戳。 例如,若要从前面的历史记录中查询版本 0 或时间戳 2024-05-15T22:43:15.000+00:00Z,请使用以下命令:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

对于时间戳,仅接受日期或时间戳字符串,例如 "2024-05-15T22:43:15.000+00:00""2024-05-15 22:43:15"

使用 DataFrameReader 选项,可以从固定到表的特定版本或时间戳的 Delta 表创建数据帧,例如:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

有关详细信息,请参阅使用 Delta Lake 表历史记录

优化表

对表执行多个更改后,可能会有很多小文件。 为了提高读取查询的速度,可以使用优化操作将小文件折叠为较大的文件:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

按列进行 Z 排序

为了进一步提高读取性能,你可以通过 Z 排序将相关的信息放置在同一组文件中。 Delta Lake 数据跳过算法使用此并置来大幅减少需要读取的数据量。 若要对数据进行 Z 排序,需要在 Z 排序依据操作中指定要排序的列。 例如,若要按 gender 进行并置,请运行:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

有关运行优化操作时可用的完整选项集,请参阅优化数据文件布局

使用 VACUUM 清理快照

Delta Lake 为读取提供快照隔离,这意味着即使其他用户或作业正在查询表,也可以安全地运行优化操作。 不过,最终你应该清除旧快照。 可以通过运行 vacuum 操作来实现此目的:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

有关有效使用 vacuum 操作的详细信息,请参阅使用 vacuum 删除未使用的数据文件