在工作流中运行 Delta Live Tables 管道
可使用 Databricks 作业、Apache Airflow 或 Azure 数据工厂,在数据处理工作流中运行 Delta Live Tables 管道。
作业
可在 Databricks 作业中协调多个任务以实现数据处理工作流。 要在作业中包含 Delta Live Tables 管道,请在创建作业时使用“管道”任务。 请参阅作业的 Delta Live Tables 管道任务。
Apache Airflow
Apache Airflow是一种用于管理和计划数据工作流的开源解决方案。 Airflow 将工作流表示为操作的有向无环图 (DAG)。 在 Python 文件中定义工作流,Airflow 管理计划并执行。 若要了解如何通过 Azure Databricks 安装和使用 Airflow,请参阅使用 Apache Airflow 协调 Azure Databricks 作业。
若要在 Airflow 工作流中运行 Delta Live Tables 管道,请使用 DatabricksSubmitRunOperator。
要求
若要对 Delta Live Tables 使用 Airflow 支持,需要满足以下条件:
- Airflow 版本 2.1.0 或更高版本。
- Databricks 提供程序包版本2.1.0 或更高版本。
示例
下面的示例创建一个 Airflow DAG,该 DAG 使用标识符 8279d543-063c-4d63-9926-dae38e35ce8b
触发 Delta Live Tables 管道的更新:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
将 CONNECTION_ID
替换为与你的工作区的 Airflow 连接 的标识符。
将此示例保存在 airflow/dags
目录中,并使用 Airflow UI 来查看和触发 DAG。 使用 Delta Live Tables UI 查看管道更新的详细信息。
Azure 数据工厂
注意
增量实时表和 Azure 数据工厂均包含用于配置发生故障时的重试次数的选项。 如果在增量实时表管道和调用管道的 Azure 数据工厂活动中配置了重试值,则重试次数是 Azure 数据工厂重试值乘以增量实时表重试值。
例如,如果管道更新失败,增量实时表默认重试更新最多五次。 如果 Azure 数据工厂重试设置为 3,并且增量实时表管道使用默认的 5 次重试,则失败的增量实时表管道最多可以重试 15 次。 为避免管道更新失败时重试尝试过多次,Databricks 建议在配置增量实时表管道或调用管道的 Azure 数据工厂活动时限制重试次数。
若要更改增量实时表管道的重试配置,请在配置管道时使用 pipelines.numUpdateRetryAttempts
设置。
Azure 数据工厂是一项基于云的 ETL 服务,可用于协调数据集成和转换工作流。 Azure 数据工厂直接支持在工作流中运行 Azure Databricks 任务,包括笔记本、JAR 任务和 Python 脚本。 你也可以通过从 Azure 数据工厂 Web 活动 调用 Delta Live Tables API,将管道纳入工作流。 例如,从 Azure 数据工厂触发管道更新:
创建数据工厂或打开现有数据工厂。
创建完成后,打开数据工厂的页面,然后单击“打开 Azure 数据工厂工作室”磁贴。 Azure 数据工厂用户界面随即显示。
通过从 Azure 数据工厂工作室用户界面的“新建”下拉菜单中选择“管道”,创建一个新的 Azure 数据工厂管道。
在“活动”工具箱中,展开“常规”并将“Web”活动拖到管道画布上。 单击“设置”选项卡,输入以下值:
注意
作为安全最佳做法,在使用自动化工具、系统、脚本和应用进行身份验证时,Databricks 建议使用属于服务主体(而不是工作区用户)的个人访问令牌。 若要为服务主体创建令牌,请参阅管理服务主体的令牌。
URL:
替换
<get-workspace-instance>
。将
<pipeline-id>
替换为管道标识符。方法:从下拉菜单中选择“POST”。
标头:单击“+ 新建”。 在“名称”文本框中,输入
Authorization
。 在“值”文本框中,输入Bearer <personal-access-token>
。使用 Azure Databricks 个人访问令牌替换
<personal-access-token>
。正文:若要传递其他请求参数,请输入包含参数的 JSON 文档。 例如,启动更新并重新处理管道的所有数据:
{"full_refresh": "true"}
。 如果没有其他请求参数,请输入空括号 ({}
)。
若要测试 Web 活动,请单击数据工厂 UI 中管道工具栏上的“调试”。 运行的输出和状态(包括错误)显示在 Azure 数据工厂管道的“输出”选项卡中。 使用 Delta Live Tables UI 查看管道更新的详细信息。
提示
常见的工作流要求是在上一任务完成后启动任务。 由于 Delta Live Tables updates
请求是异步的(请求在开始更新后但更新完成之前返回),因此 Azure 数据工厂管道中依赖于 Delta Live Tables 更新的任务必须等待更新完成。 等待更新完成的一个选项是在触发 Delta Live Tables 更新的 Web 活动之后添加一个 Until 活动。 在 Until 活动中:
- 添加 Wait 活动,等待配置的秒数以完成更新。
- 在 Wait 活动之后添加一个 Web 活动,该活动使用 Delta Live Tables update details 请求来获取更新状态。 响应中的
state
字段返回更新的当前状态,包括它是否已完成。 - 使用
state
字段的值来设置 Until 活动的终止条件。 你也可以使用 Set Variable 活动,根据state
值添加管道变量,并将此变量用于终止条件。