Delta Live Tables SQL 语言参考

本文提供了有关 Delta Live Tables SQL 编程接口的详细信息。

可以在 SQL 查询中使用 Python 用户定义函数 (UDF),但必须先在 Python 文件中定义这些 UDF,然后再在 SQL 源文件中调用它们。 请参阅用户定义标量函数 - Python

限制

不支持 PIVOT 子句。 Spark 中的 pivot 操作需要预先加载输入数据以计算输出架构。 Delta Live Tables 不支持此功能。

创建 Delta Live Tables 具体化视图或流式表

注意

  • 弃用用于创建具体化视图的 CREATE OR REFRESH LIVE TABLE 语法。 相反,使用 CREATE OR REFRESH MATERIALIZED VIEW
  • 要使用 CLUSTER BY 子句启用 liquid 聚类,必须将管道配置为使用预览频道

在声明流式表或具体化视图时,可以使用相同的基本 SQL 语法。

使用 SQL 声明增量实时表具体化视图

下面介绍了使用 SQL 声明增量实时表中的具体化视图的语法:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

使用 SQL 声明增量实时表流式处理表

只能使用针对流式处理源读取的查询来声明流式表。 Databricks 建议使用自动加载程序从云对象存储中流式引入文件。 请参阅自动加载程序 SQL 语法

将管道中的其他表或视图指定为流式处理源时,必须在数据集名称周围包含 STREAM() 函数。

下面介绍了使用 SQL 在增量实时表中声明流式处理表的语法:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

创建 Delta Live Tables 视图

下面介绍了使用 SQL 声明视图的语法:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

自动加载程序 SQL 语法

下面介绍了在 SQL 中使用自动加载程序的语法:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

可以将支持的格式选项用于自动加载程序。 使用 map() 函数,可将选项传递给 read_files() 方法。 选项是键值对,其中键和值是字符串。 有关支持格式和选项的详细信息,请参阅文件格式选项

示例:定义表

你可以通过以下方式创建数据集:从外部数据源或管道中定义的数据集读取数据。 若要从内部数据集读取数据,请在数据集名称前追加 LIVE 关键字。 以下示例定义了两个不同的数据集:一个将 JSON 文件作为输入源的 taxi_raw 表,一个将 taxi_raw 表作为输入的 filtered_data 表:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

示例:从流式处理源读取

若要从流式处理源(例如自动加载程序或内部数据集)读取数据,请定义 STREAMING 表:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

有关对数据进行流式处理的详细信息,请参阅使用 Delta Live Tables 转换数据

控制表的具体化方式

表还提供对其具体化的额外控制:

  • 指定如何使用 PARTITIONED BY 对表进行分区。 可以使用分区来加快查询速度。
  • 可以使用 TBLPROPERTIES 来设置表属性。 请参阅 Delta Live Tables 表属性
  • 使用 LOCATION 设置来设置存储位置。 默认情况下,如果未设置 LOCATION,表数据会存储在管道存储位置。
  • 可在架构定义中使用生成的列。 请参阅示例:指定架构和分区列

注意

对于小于 1 TB 的表,Databricks 建议让增量实时表控制数据组织方式。 除非您希望表增大到超过 1 TB,否则 Databricks 建议您不指定分区列。

示例:指定架构和分区列

你可以在定义表时指定架构。 以下示例指定目标表的架构,包括使用 Delta Lake 生成的列和为表定义分区列:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

默认情况下,如果未指定架构,则增量实时表将从 table 定义推断架构。

示例:定义表约束

注意

对表约束的增量实时表支持目前以公共预览版提供。 若要定义表约束,管道必须是启用了 Unity 目录的管道,并配置为使用 preview 通道。

指定架构时,可以定义主键和外键。 约束具备信息性,系统不会强制执行。 请参阅 SQL 语言参考中的 CONSTRAINT 子句

以下示例定义具有主键和外键约束的表:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

参数化通过 SQL 声明表或视图时使用的值

使用 SET 在声明表或视图的查询中指定配置值,包括 Spark 配置。 在 SET 语句有权访问已定义的值之后,在笔记本中定义的任何表或视图。 对 SET 语句之后的任何表或视图执行 Spark 查询时,会使用通过 SET 语句指定的任何 Spark 配置。 若要在查询中读取配置值,请使用字符串内插语法 ${}。 下面的示例设置名为 startDate 的配置值,并在查询中使用该值:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

若要指定多个配置值,请对每个值使用单独的 SET 语句。

示例:定义行筛选器和列掩码

重要

行筛选器和列掩码为公共预览版

若要创建具有行筛选器和列掩码的具体化视图或流式处理表,请使用 ROW FILTER 子句MASK 子句。 以下示例演示如何定义具有行筛选器和列掩码的具体化视图和流式处理表:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze

有关行筛选器和列掩码的详细信息,请参阅发布具有行筛选器和列掩码的表

SQL 属性

注意

要使用 CLUSTER BY 子句启用 liquid 聚类,必须将管道配置为使用预览频道

CREATE TABLE 或 VIEW
TEMPORARY

创建表,但不发布表的元数据。 TEMPORARY 子句指示 Delta Live Tables 创建可用于管道但不应在管道外部访问的表。 为了缩短处理时间,临时表会在创建它的管道的生存期内持久保留,而不仅仅是一次更新。
STREAMING

创建一个表,该表将输入数据集作为流读取。 输入数据集必须是流式处理数据源,例如自动加载程序或 STREAMING 表。
CLUSTER BY

在表上启用 Liquid 聚类并定义要用作聚类键的列。

请参阅对 Delta 表使用 liquid 聚类分析
PARTITIONED BY

包含一列或多列的可选列表,用于对表进行分区。
LOCATION

表数据的可选存储位置。 如果未设置,系统将默认为管道存储位置。
COMMENT

表的可选说明。
column_constraint

列的可选信息性主键或外键约束。
MASK clause(公共预览版)

添加列掩码函数以对敏感数据进行匿名化处理。 该列的将来查询将返回被评估函数的结果,而不是该列的原始值。 这对于精细访问控制非常有用,因为该函数可以检查用户的身份和组成员资格以确定是否编辑该值。

请参阅列掩码子句
table_constraint

表的可选信息性主键或外键约束。
TBLPROPERTIES

表的表属性可选列表。
WITH ROW FILTER clause(公共预览版)

向表中添加行筛选器函数。 将来对该表的查询会收到函数计算结果为 TRUE 的行的子集。 这对于精细的访问控制很有用,因为它允许函数检查调用用户的标识和组成员身份以决定是否筛选某些行。

请参阅 ROW FILTER 子句
select_statement

一个 Delta Live Tables 查询,用于定义表的数据集。
CONSTRAINT 子句
EXPECT expectation_name

定义数据质量约束 expectation_name。 如果未定义 ON VIOLATION 约束,则将违反约束的行添加到目标数据集。
ON VIOLATION

对失败的行执行的可选操作:

- FAIL UPDATE:立即停止管道执行。
- DROP ROW:删除记录并继续处理。

在 Delta Live Tables 中使用 SQL 进行的变更数据捕获

通过 APPLY CHANGES INTO 语句使用 Delta Live Tables CDC 功能,如下所述:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

使用与非 APPLY CHANGES 查询相同的 CONSTRAINT 子句为 APPLY CHANGES 目标定义数据质量约束。 请参阅使用 Delta Live Tables 管理数据质量

注意

INSERTUPDATE 事件的默认行为是从源更新插入 CDC 事件:更新目标表中与指定键匹配的任何行或在目标表中不存在匹配记录时插入新行。 可以使用 APPLY AS DELETE WHEN 条件指定对 DELETE 事件的处理。

重要

必须声明一个要向其应用更改的目标流式表。 可以选择为目标表指定架构。 在指定 APPLY CHANGES 目标表的架构时,还必须包含具有与 sequence_by 字段相同数据类型的 __START_AT__END_AT 列。

请参阅 APPLY CHANGES API:简化 Delta Live Tables 中的变更数据捕获

子句
KEYS

唯一标识源数据中的行的列或列组合。 这用于标识哪些 CDC 事件适用于目标表中的特定记录。

若要定义列的组合,请使用以逗号分隔的列列表。

该语句是必需的。
IGNORE NULL UPDATES

允许引入包含目标列子集的更新。 当 CDC 事件匹配现有行并指定 IGNORE NULL UPDATES 时,具有 null 的列将在目标中保留其现有值。 这也适用于值为 null 的嵌套列。

此子句是可选的。

默认设置是用 null 值覆盖现有列。
APPLY AS DELETE WHEN

指定何时应将 CDC 事件视为 DELETE 而不是更新插入。 为了处理乱序数据,被删除的行被暂时保留为基础 Delta 表中的无效标记,并在元存储中创建一个视图来筛选掉这些无效标记。 保留间隔可以配置为
pipelines.cdc.tombstoneGCThresholdInSeconds 表属性

此子句是可选的。
APPLY AS TRUNCATE WHEN

指定何时应将 CDC 事件视为完整表 TRUNCATE。 由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。

仅 SCD 类型 1 支持 APPLY AS TRUNCATE WHEN 子句。 SCD 类型 2 不支持截断操作。

此子句是可选的。
SEQUENCE BY

指定源数据中 CDC 事件的逻辑顺序的列名。 增量实时表使用此排序来处理乱序到达的更改事件。

指定的列必须是可排序的数据类型。

该语句是必需的。
COLUMNS

指定要包含在目标表中的列子集。 可以:

- 指定要包含的完整列列表:COLUMNS (userId, name, city)
- 指定要排除的列列表:COLUMNS * EXCEPT (operation, sequenceNum)

此子句是可选的。

当未指定 COLUMNS 子句时,默认设置是包含目标表中的所有列。
STORED AS

将记录存储为 SCD 类型 1 还是 SCD 类型 2。

此子句是可选的。

默认值为 SCD 类型 1。
TRACK HISTORY ON

指定输出列的子集,以便在这些指定列发生任何更改时生成历史记录。 可以:

- 指定要跟踪的完整列列表:COLUMNS (userId, name, city)
- 指定要从跟踪中排除的列列表:COLUMNS * EXCEPT (operation, sequenceNum)

此子句是可选的。 默认设置为当发生任何更改时跟踪所有输出列的历史记录,等效于 TRACK HISTORY ON *