APPLY CHANGES API:使用增量实时表简化变更数据捕获

增量实时表使用APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT API 简化了变更数据捕获 (CDC)。 使用的接口取决于更改数据源:

  • 使用APPLY CHANGES处理更改数据源 (CDF) 的更改。
  • 使用APPLY CHANGES FROM SNAPSHOT(公共预览版)处理数据库快照中的更改。

以前,通常使用 MERGE INTO 语句处理 Azure Databricks 上的 CDC 记录。 但是,MERGE INTO可能会由于无序记录而生成不正确的结果,或者需要复杂的逻辑来重新排序记录。

增量实时表 SQL 和 Python 接口支持APPLY CHANGES API。 增量实时表 Python 接口支持APPLY CHANGES FROM SNAPSHOT API。

APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT都支持使用 SCD 类型 1 和类型 2 更新表:

  • 使用 SCD 类型 1 直接更新记录。 不保留更新记录的历史记录。
  • 使用 SCD 类型 2 保留有关所有更新或者对指定列集的更新的记录的历史记录。

有关语法和其他参考,请参阅:

注意

本文介绍如何根据源数据中的更改来更新增量实时表管道中的表。 若要了解如何记录和查询 Delta 表的行级更改信息,请参阅在 Azure Databricks 上使用 Delta Lake 更改数据馈送

要求

若要使用 CDC API,必须将管道配置为使用增量实时表 ProAdvanced 版本

如何使用APPLY CHANGES API 实现 CDC?

增量实时表中的 APPLY CHANGES API 可自动处理无序记录,从而确保正确处理 CDC 记录,而且无需开发复杂的逻辑来处理无序记录。 必须在源数据中指定一列作为记录排序依据,增量实时表将此顺序解释为源数据正确排序的单调递增表示形式。 增量实时表会自动处理不按顺序到达的数据。 对于 SCD 类型 2 的更改,增量实时表会将相应的顺序值传播到目标表的 __START_AT__END_AT 列。 每个顺序值的每个键应该有一个非重复更新,不支持 NULL 顺序值。

要使用APPLY CHANGES执行 CDC 处理,请先创建流式表,然后使用 SQL 中的APPLY CHANGES INTO语句或 Python 中的apply_changes()函数指定更改源的源、键和排序。 若要创建目标流式表,请使用 SQL 中的 CREATE OR REFRESH STREAMING TABLE 语句或 Python 中的 create_streaming_table() 函数。 请参阅SCD 类型 1 和类型 2 处理示例。

有关语法详细信息,请参阅增量实时表SQL 参考Python 参考

如何使用APPLY CHANGES FROM SNAPSHOT API 实现 CDC?

重要

APPLY CHANGES FROM SNAPSHOT API 为公共预览版

APPLY CHANGES FROM SNAPSHOT是声明性 API,它比较一系列有序快照以有效地确定源数据的更改,然后运行对快照中的记录进行 CDC 处理所需的处理。 仅增量实时表 Python 接口支持APPLY CHANGES FROM SNAPSHOT

APPLY CHANGES FROM SNAPSHOT支持从多个源类型引入快照:

  • 使用定期快照引入从现有表或视图中引入快照。 APPLY CHANGES FROM SNAPSHOT提供了简单的简化界面,用于支持定期从现有数据库对象引入快照。 每次管道更新都会引入新快照,引入时间用作快照版本。 在连续模式下运行管道时,会按照包含 APPLY CHANGES FROM SNAPSHOT 处理的流的触发器间隔设置确定的周期,使用每个管道更新引入多个快照。
  • 使用历史快照引入来处理包含数据库快照的文件,例如从 Oracle 或 MySQL 数据库或数据仓库生成的快照。

要使用APPLY CHANGES FROM SNAPSHOT从任何源类型执行 CDC 处理,请先创建流式表,然后使用 Python 中的apply_changes_from_snapshot()函数指定实现处理所需的快照、键和其他参数。 请参阅定期快照引入历史快照引入示例。

传递给 API 的快照必须按版本按升序排列。 如果增量实时表检测到无序快照,则会引发错误。

有关语法详细信息,请参阅增量实时表Python 参考

限制

  • 不能将APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT查询的目标用作流式表的源。 从APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT查询的目标读取的表必须是具体化视图。
  • 用于排序的列必须是可排序数据类型。

示例:使用 CDF 源数据处理 SCD 类型 1 和 SCD 类型 2

以下部分提供了增量实时表 SCD 类型 1 和类型 2 查询的示例,这些查询基于更改数据源中的源事件更新目标表:

  1. 新建用户记录。
  2. 删除用户记录。
  3. 更新用户记录。 在 SCD 类型 1 示例中,最后的 UPDATE 操作延迟到达并从目标表中删除,展示了无序事件的处理。

以下所有示例假设你知道如何配置和更新增量实时表管道。 请参阅教程:运行第一个增量实时表管道

若要运行这些示例,必须首先创建一个示例数据集。 请参阅生成测试数据

下面是这些示例的输入记录:

userId name city operation sequenceNum
124 Raul 瓦哈卡 INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 Null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel 奇瓦瓦 UPDATE 5

如果你取消注释示例数据中的最后一行,则会插入以下记录用于指定记录的截断位置:

userId name city operation sequenceNum
null Null null TRUNCATE 3

注意

以下所有示例都包含用于指定DELETETRUNCATE操作的选项,但其中的每个选项都是可选的。

处理 SCD 类型 1 更新

以下示例演示了如何处理 SCD 类型 1 更新:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

运行 SCD 类型 1 示例后,目标表包含以下记录:

userId name city
124 Raul 瓦哈卡
125 Mercedes Guadalajara
126 Lily Cancun

使用附加的 TRUNCATE 记录运行 SCD 类型 1 示例后,由于 sequenceNum=3 处的 TRUNCATE 操作,124126 的记录将被截断,并且目标表包含以下记录:

userId name city
125 Mercedes Guadalajara

处理 SCD 类型 2 更新

以下示例演示了如何处理 SCD 类型 2 更新:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

运行 SCD 类型 2 示例后,目标表包含以下记录:

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel 奇瓦瓦 5 6
124 Raul 瓦哈卡 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancun 2 Null

SCD 类型 2 查询还可以指定一个输出列的子集,以便跟踪目标表中的历史记录。 对其他列的更改都就地更新,而不是生成新的历史记录。 以下示例演示了如何从跟踪中排除 city 列:

以下示例演示如何通过 SCD 类型 2 使用跟踪历史记录:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

在没有额外 TRUNCATE 记录的情况下运行此示例后,目标表包含以下记录:

userId name city __START_AT __END_AT
123 Isabel 奇瓦瓦 1 6
124 Raul 瓦哈卡 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancun 2 null

生成测试数据

以下代码用于生成可在本教程的示例查询中使用的示例数据集。 假设你拥有新建架构和新表的适当凭据,则可以使用笔记本或 Databricks SQL 运行这些语句。 以下代码不能作为增量实时表管道的一部分运行:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

示例:定期快照处理

以下示例演示了 SCD 类型 2 处理,该处理引入存储在mycatalog.myschema.mytable的表的快照。 处理结果写入名为target的表。

mycatalog.myschema.mytable 时间戳 2024-01-01 00:00:00 的记录

密钥
1 a1
2 a2

mycatalog.myschema.mytable 时间戳 2024-01-01 12:00:00 的记录

密钥
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

处理快照后,目标表包含以下记录:

密钥 __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 Null
3 a3 2024-01-01 12:00:00 Null

示例:历史快照处理

以下示例演示了 SCD 类型 2 处理,该处理基于存储在云存储系统中的两个快照中的源事件更新目标表:

存储在 /<PATH>/filename1.csvtimestamp 上的快照

密钥 TrackingColumn NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

存储在 /<PATH>/filename2.csvtimestamp + 5 上的快照

密钥 TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

以下代码示例演示了如何使用以下快照处理 SCD 类型 2 更新:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

处理快照后,目标表包含以下记录:

密钥 TrackingColumn NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 null
3 a3 b3 2 null
4 a4 b4_new 1 Null

在目标流式表中添加、更改或删除数据

如果管道将表发布到 Unity Catalog,则可以使用数据操作语言 (DML) 声明(包括插入、更新、删除与合并声明)来修改由 APPLY CHANGES INTO 声明创建的目标流式表。

注意

  • 不支持 DML 声明修改流式表的表架构。 确保 DML 语句不会尝试修改表架构。
  • 只能在使用 Databricks Runtime 13.3 LTS 及更高版本的 Unity Catalog 共享群集或 SQL 仓库中运行用于更新流式表的 DML 语句。
  • 由于流式传输要求仅追加数据源,因此如果你的处理需要从包含更改的源流式传输表进行流式传输(例如,通过 DML 语句),请在读取源流式传输表时设置 skipChangeCommits 标志。 设置 skipChangeCommits 后,将会忽略删除或修改源表上记录的事务。 如果处理不需要流式表,则可以使用具体化视图(没有仅追加限制)作为目标表。

由于增量实时表使用指定的 SEQUENCE BY 列并将适当的排序值传播到(供 SCD 类型 2 使用的)目标表的 __START_AT__END_AT 列,因此必须确保 DML 声明使用这些列的有效值,确保记录的顺序正确。 请参阅如何使用 APPLY CHANGES API 实现 CDC?

有关如何使用 DML 声明处理流式表的详细信息,请参阅在流式表中添加、更改或删除数据

以下示例插入一个起始序列为 5 的活动记录:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

APPLY CHANGES 目标表读取更改数据馈送

在 Databricks Runtime 15.2 及更高版本中,可以从流式表中读取更改数据馈送,该表是 APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT 查询的目标,读取方式与从其他 Delta 表中读取更改数据馈送的方式相同。 从目标流式表中读取更改数据馈送需要满足以下条件:

  • 目标流式表必须发布到 Unity Catalog。 请参阅将 Unity Catalog 与 Delta Live Tables 管道配合使用
  • 若要从目标流式表中读取更改数据馈送,必须使用 Databricks Runtime 15.2 或更高版本。 若要在另一个增量实时表管道中读取更改数据馈送,必须将管道配置为使用 Databricks Runtime 15.2 或更高版本。

可以从在 Delta 实时表管道中创建的目标流式表中读取更改数据馈送,读取方式与从其他 Delta 表中读取更改数据馈送的方式相同。 若要详细了解如何使用 Delta 更改数据馈送功能,包括 Python 和 SQL 中的示例,请参阅在 Azure Databricks 上使用 Delta Lake 更改数据馈送

注意

更改数据馈送记录包括标识更改事件类型的元数据。 在表中更新记录时,关联的更改记录的元数据通常包括设置为 update_preimageupdate_postimage 事件的 _change_type 值。

但是,如果对包含更改主键值的目标流式表进行更新,则 _change_type 值会有所不同。 当更改包括对主键的更新时,_change_type 元数据字段将设置为 insertdelete 事件。 如果对具有 UPDATEMERGE 语句的某个密钥字段进行手动更新,或者对于 SCD 类型 2 表,当 __start_at 字段更改以反映较早的起始序列值时,可能会更改主键。

APPLY CHANGES 查询确定主键值,SCD 类型 1 和 SCD 类型 2 处理分别对应不同的值:

  • 对于 SCD 类型 1 处理和增量实时表 Python 接口,主键是 apply_changes() 函数中 keys 参数的值。 对于增量实时表 SQL 接口,主键是由 APPLY CHANGES INTO 语句中的 KEYS 子句定义的列。
  • 对于 SCD 类型 2,主键是 keys 参数或 KEYS 子句以及 coalesce(__START_AT, __END_AT) 操作的返回值,其中 __START_AT__END_AT 分别是目标流式表中的相应列。

获取有关由 Delta Live Tables CDC 查询处理的记录的数据

注意

以下指标仅通过 APPLY CHANGES 查询(而不是 APPLY CHANGES FROM SNAPSHOT 查询)捕获。

以下指标由 APPLY CHANGES 查询捕获:

  • num_upserted_rows:在更新期间更新插入数据集的输出行数。
  • num_deleted_rows:在更新期间从数据集中删除的现有输出行数。

对于 apply changes 查询,不会捕获作为非 CDC 流输出的 num_output_rows 指标。

哪些数据对象用于增量实时表 CDC 处理?

注意:以下数据结构仅适用于APPLY CHANGES处理,而不适用于APPLY CHANGES FROM SNAPSHOT处理。

在 Hive 元存储中声明目标表时,会创建两个数据结构:

  • 一个使用分配给目标表的名称的视图。
  • 一个由增量实时表用来管理 CDC 处理的内部支持表。 此表的命名方式是在目标表名称的前面加上 __apply_changes_storage_

例如,如果你声明一个名为 dlt_cdc_target 的目标表,则会在元存储中看到一个名为 dlt_cdc_target 的视图和一个名为 __apply_changes_storage_dlt_cdc_target 的表。 创建视图后,增量实时表可以筛选出处理无序数据所需的额外信息(例如逻辑删除和版本)。 若要查看处理后的数据,请查询目标视图。 由于表 __apply_changes_storage_ 的架构可能会更改以支持将来的功能或增强功能,因此不应查询表以供生产使用。 如果手动在表中添加数据,则认为记录出现在其他更改之前,因为缺少版本列。

如果管道发布到 Unity Catalog,则用户无法访问内部支持表。