本文提供了 DLT SQL 编程接口的详细信息。
- 有关 Python API 的信息,请参阅 DLT Python 语言参考。
- 有关 SQL 命令的详细信息,请参阅 SQL 语言参考。
可以在 SQL 查询中使用 Python 用户定义函数 (UDF),但必须先在 Python 文件中定义这些 UDF,然后再在 SQL 源文件中调用它们。 请参阅用户定义标量函数 - Python。
局限性
不支持 PIVOT
子句。 Spark 中的 pivot
操作需要预先加载输入数据以计算输出架构。 DLT 不支持此功能。
创建 DLT 具体化视图或流式处理表
注释
弃用用于创建具体化视图的 CREATE OR REFRESH LIVE TABLE
语法。 请改用 CREATE OR REFRESH MATERIALIZED VIEW
。
在声明流式表或具体化视图时,可以使用相同的基本 SQL 语法。
使用 SQL 声明 DLT 具体化视图
下面介绍了使用 SQL 在 DLT 中声明具体化视图的语法:
CREATE OR REFRESH MATERIALIZED VIEW view_name
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[CLUSTER BY clause]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
使用 SQL 声明 DLT 流式处理表
只能使用针对流式处理源读取的查询来声明流式处理表。 Databricks 建议使用自动加载程序从云对象存储中流式引入文件。 请参阅自动加载程序 SQL 语法。
将管道中的其他表或视图指定为流式处理源时,必须在数据集名称周围包含 STREAM()
函数。
下面介绍使用 SQL 在 DLT 中声明流式处理表的语法:
CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[CLUSTER BY clause]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
创建 DLT 视图
下面介绍了使用 SQL 声明视图的语法:
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
自动加载器 SQL 语法
下面介绍了在 SQL 中使用自动加载程序的语法:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
自动加载程序的选项是键值对。 有关支持的格式和选项的详细信息,请参阅 “选项”。
例如:
CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
FROM STREAM read_files(
"/Volumes/my_volume/path/to/files/*",
format => "json",
inferColumnTypes => true,
maxFilesPerTrigger => 100,
schemaEvolutionMode => "addNewColumns",
modifiedAfter => "2025-03-11T23:59:59.999+00:00"
)
示例:定义表
你可以通过以下方式创建数据集:从外部数据源或管道中定义的数据集读取数据。 若要从内部数据集中读取,请指定表名称,此名称将使用已配置管道的默认目录和架构设置。 以下示例定义了两个不同的数据集:一个将 JSON 文件作为输入源的 taxi_raw
表,一个将 filtered_data
表作为输入的 taxi_raw
表:
CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM read_files("/databricks-datasets/nyctaxi/sample/json/")
CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
...
FROM taxi_raw
示例:从流式处理源读取
若要从流式处理源(例如自动加载程序或内部数据集)读取数据,请定义表 STREAMING
:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/", format => "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
有关对数据进行流式处理的详细信息,请参阅使用管道转换数据。
从具体化视图或流式处理表中永久删除记录
若要从启用了删除向量的物化视图或流表中永久删除记录(如为了符合 GDPR 要求),必须对对象的底层 Delta 表执行其他操作。
控制表的具体化方式
表还提供对其具体化的额外控制:
- 指定如何使用
CLUSTER BY
对表格进行聚类。 可以使用液体聚类分析来加快查询速度。 请参阅使用液体聚类分析 Delta 表。 - 指定如何使用对表进行
PARTITIONED BY
。 - 可以使用
TBLPROPERTIES
来设置表属性。 请参阅 DLT 表属性。 - 使用
LOCATION
设置来设置存储位置。 默认情况下,如果未设置LOCATION
,表数据会存储在管道存储位置。 - 可在架构定义中使用生成的列。 请参阅 示例:指定架构和集群列。
注释
对于大小小于 1 TB 的表,Databricks 建议让 DLT 控制数据组织。 除非您希望表增大到超过 1 TB,否则 Databricks 建议您不指定分区列。
示例:指定架构和群集列
你可以在定义表时指定架构。 以下示例指定目标表的架构,包括使用 Delta Lake 中由 和 生成的列,并定义表的聚类列:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) CLUSTER BY (order_day_of_week, customer_id)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
默认情况下,如果未指定架构,DLT 会从 table
定义推断架构。
示例:指定分区列
您可以选择性地为表指定分区列:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Liquid 聚类分析提供灵活的优化解决方案进行聚类分析。 请考虑对 DLT 使用 CLUSTER BY
而不是 PARTITIONED BY
。
示例:定义表约束
注释
对表约束的 DLT 支持以 公开预览提供。 要定义表约束,您的管道必须是启用了 Unity Catalog 的管道,并配置为使用 preview
通道。
指定架构时,可以定义主键和外键。 约束具备信息性,系统不会强制执行。 请参阅 SQL 语言参考中的 CONSTRAINT 子句。
以下示例定义具有主键和外键约束的表:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
在使用 SQL 声明表或视图时,对所用值进行参数化
使用 SET
在声明表或视图的查询中指定配置值,包括 Spark 配置。 在 SET
语句有权访问已定义的值之后,在笔记本中定义的任何表或视图。 对 SET 语句之后的任何表或视图执行 Spark 查询时,会使用通过 SET
语句指定的任何 Spark 配置。 若要在查询中读取配置值,请使用字符串内插语法 ${}
。 下面的示例设置名为 startDate
的配置值,并在查询中使用该值:
SET startDate='2020-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
若要指定多个配置值,请对每个值使用单独的 SET
语句。
示例:定义行筛选器和列掩码
重要
行筛选器和列掩码处于公共预览版。
若要生成具有行筛选器和列掩码的物化视图或流式处理表,请使用 ROW FILTER 子句和 MASK 子句。 以下示例演示如何定义同时具有行筛选器和列掩码的具体化视图和流式处理表:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)
CREATE OR REFRESH MATERIALIZED VIEW sales (
customer_id STRING MASK catalog.schema.customer_id_mask_fn,
customer_name STRING,
number_of_line_items STRING COMMENT 'Number of items in the order',
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze
有关行筛选器和列掩码的详细信息,请参阅发布具有行筛选器和列掩码的表。
SQL 属性
CREATE TABLE 或 VIEW |
---|
TEMPORARY 创建表,但不发布表的元数据。 TEMPORARY 子句指令 DLT 创建一张供管道使用但不允许在管道外访问的表。 为了缩短处理时间,临时表会在创建它的管道的生命周期内持久存在,而不仅仅是一次更新。 |
STREAMING 创建一个表,该表将输入数据集作为流读取。 输入数据集必须是流式处理数据源,例如 Auto Loader 或 STREAMING 表。 |
CLUSTER BY 对表启用动态聚类,并定义要用作聚类键的列。 请参阅使用液体聚类分析 Delta 表。 |
PARTITIONED BY 包含一列或多列的可选列表,用于对表进行分区。 |
LOCATION 表数据的可选存储位置。 如果未设置,系统将默认为管道存储位置。 |
COMMENT 表的可选说明。 |
column_constraint 列的可选信息性主键或外键约束。 |
MASK clause (公共预览版)添加列掩码函数以对敏感数据进行匿名化处理。 该列的将来查询将返回被评估函数的结果,而不是该列的原始值。 这对于精细访问控制非常有用,因为该函数可以检查用户的身份和组成员资格以确定是否编辑该值。 请参阅 Column mask 子句。 |
table_constraint 表的可选信息性主键或外键约束。 |
TBLPROPERTIES 表的表属性可选列表。 |
WITH ROW FILTER clause (公共预览版)向表中添加行筛选器函数。 将来对该表的查询会收到函数计算结果为 TRUE 的行的子集。 这对于精细的访问控制很有用,因为它允许函数检查调用用户的标识和组成员身份以决定是否筛选某些行。 请参阅 ROW FILTER 条款。 |
select_statement 定义表数据集的 DLT 查询。 |
CONSTRAINT 子句 |
---|
EXPECT expectation_name 定义数据质量约束 expectation_name 。 如果未定义 ON VIOLATION 约束,则将违反约束的行添加到目标数据集。 |
ON VIOLATION 对失败的行执行的可选操作:
|
在 DLT 中使用 SQL 更改数据捕获
通过 APPLY CHANGES INTO
语句使用 DLT CDC 功能,如下所述:
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
使用与非 APPLY CHANGES
查询相同的 CONSTRAINT
子句为 APPLY CHANGES
目标定义数据质量约束。 请参阅通过管道预期管理数据质量。
注释
INSERT
和 UPDATE
事件的默认行为是从源中更新或插入 CDC 事件:更新目标表中与指定键匹配的行,或在目标表中不存在匹配记录时插入新行。 可以使用 DELETE
条件指定对 APPLY AS DELETE WHEN
事件的处理。
重要
必须声明一个要向其应用更改的目标流式处理表。 可以选择为目标表指定架构。 在指定 APPLY CHANGES
目标表的架构时,还必须包含具有与 __START_AT
字段相同数据类型的 __END_AT
和 sequence_by
列。
请参阅 APPLY CHANGES API:使用 DLT 简化变更数据捕获。
子句 |
---|
KEYS 用于唯一标识源数据中的行的列或列组合。 这用于标识哪些 CDC 事件适用于目标表中的特定记录。 若要定义列的组合,请使用以逗号分隔的列列表。 该语句是必需的。 |
IGNORE NULL UPDATES 允许引入包含目标列子集的更新。 当 CDC 事件匹配现有行并指定 IGNORE NULL UPDATES 时,具有 null 的列将在目标中保留其现有值。 这也适用于值为 null 的嵌套列。此子句是可选的。 默认设置是用 null 值覆盖现有列。 |
APPLY AS DELETE WHEN 指定何时应将 CDC 事件视为 DELETE 而不是更新插入。 为了处理无序数据,已删除的行将暂时保留为基础增量表中的逻辑删除,并在元存储中创建一个用于筛选出这些逻辑删除的视图。 可通过以下方式配置保留间隔pipelines.cdc.tombstoneGCThresholdInSeconds 表属性。此子句是可选的。 |
APPLY AS TRUNCATE WHEN 指定何时应将 CDC 事件视为完整表 TRUNCATE 。 由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。仅支持为 SCD 类型 1 使用 APPLY AS TRUNCATE WHEN 子句。 SCD 类型 2 不支持截断操作。此子句是可选的。 |
SEQUENCE BY 用于指定源数据中 CDC 事件的逻辑顺序的列名。 DLT 使用此排序来处理以无序方式到达的更改事件。 指定的列必须是可排序的数据类型。 该语句是必需的。 |
COLUMNS 指定要包含在目标表中的列子集。 您可以选择:
此子句是可选的。 当未指定 COLUMNS 子句时,默认设置是包含目标表中的所有列。 |
STORED AS 将记录存储为 SCD 类型 1 还是 SCD 类型 2。 此子句是可选的。 默认值为 SCD 类型 1。 |
TRACK HISTORY ON 指定输出列的子集,以便在这些指定列发生任何更改时生成历史记录。 您可以选择:
此子句是可选的。 默认设置为当发生任何更改时跟踪所有输出列的历史记录,等效于 TRACK HISTORY ON * 。 |