使用 SQL 开发管道代码

增量实时表引入多个新的 SQL 关键字和函数,用于在管道中定义具体化视图和流式处理表。 SQL 对开发管道的支持基于 Spark SQL 的基础知识,并添加对结构化流式处理功能的支持。

熟悉 PySpark DataFrame 的用户可能更喜欢使用 Python 开发管道代码。 Python 支持更广泛的测试和操作,这些测试和操作难以通过 SQL 实现,例如元编程操作。 请参阅使用 Python 开发管道代码

有关增量实时表 SQL 语法的完整参考,请参阅增量实时表 SQL 语言参考

用于管道开发的 SQL 基础知识

创建增量实时表数据集的 SQL 代码使用 CREATE OR REFRESH 语法,以根据查询结果定义具体化视图和流式处理表。

STREAM 关键字指示是否应使用流式处理语义读取 SELECT 子句中引用的数据源。

增量实时表源代码与 SQL 脚本完全不同:增量实时表在管道中配置的所有源代码文件中评估所有数据集定义,并在运行任何查询之前生成数据流图。 笔记本或脚本中显示的查询顺序不会定义执行顺序。

使用 SQL 创建具体化视图。

以下代码示例演示了使用 SQL 创建具体化视图的基本语法:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

使用 SQL 创建流式处理表

以下代码示例演示了使用 SQL 创建流式处理表的基本语法:

注意

并非所有数据源都支持流式读取,某些数据源应始终使用流式处理语义进行处理。

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

从对象存储加载数据

增量实时表支持从 Azure Databricks 所支持的所有格式加载数据。 请参阅数据格式选项

注意

这些示例使用自动装载到工作区中 /databricks-datasets 下的数据。 Databricks 建议使用卷路径或云 URI,以引用存储在云对象存储中的数据。 请参阅什么是 Unity Catalog 卷?

在针对存储在云对象存储中的数据配置增量引入工作负荷时,Databricks 建议使用自动加载程序和流式处理表。 请参阅什么是自动加载程序?

SQL 使用 read_files 函数调用自动加载程序功能。 还必须使用 STREAM 关键字,以配置带有 read_files 的流式读取。

以下示例使用自动加载程序从 JSON 文件创建流式处理表:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

read_files 函数还支持批处理语义以创建具体化视图。 以下示例使用批处理语义读取 JSON 目录并创建具体化视图:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

使用预期以验证数据

可使用预期以设置和强制执行数据质量约束。 请参阅使用 Delta Live Tables 管理数据质量

以下代码定义名为 valid_data 的预期,该预期可删除数据引入过程中为 null 的记录:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

查询管道中定义的具体化视图和流式处理表

使用 LIVE 架构查询管道中定义的其他具体化视图和流式处理表。

以下示例定义四种数据集:

  • 名为 orders 的流式处理表,可加载 JSON 数据。
  • 名为 customers 的具体化视图,可加载 CSV 数据。
  • 名为 customer_orders 的具体化视图,可连接 orderscustomers 数据集的记录,将顺序时间戳强制转换为日期,以及选择 customer_idorder_numberstateorder_date字段。
  • 名为 daily_orders_by_state 的具体化视图,可聚合每种状态的每日订单数。
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;