将任务添加到 Databricks 资产捆绑包中的作业
本文提供了可在 Databricks 资产包中向 Azure Databricks 作业添加的各种类型任务的示例。 请参阅什么是 Databricks 资产捆绑包?。
大多数作业任务类型在其支持的设置中都具有特定于任务的参数,但你也可以定义传递给任务的作业参数。 作业参数支持动态值引用,从而支持在任务之间传递特定于作业运行的值。 请参阅什么是动态值引用?。
注意
可以替代作业任务设置。 请参阅替代 Databricks 资产捆绑包中的作业任务设置。
提示
若要使用 Databricks CLI 快速生成现有作业的资源配置,可以使用 bundle generate job
命令。 请参阅捆绑包命令。
笔记本任务
使用此任务运行笔记本。
以下示例将笔记本任务添加到作业,并设置名为 my_job_run_id
的作业参数。 要部署的笔记本的路径是相对于声明了此任务的配置文件而言。 该任务从其在 Azure Databricks 工作区中的部署位置获取笔记本。
resources:
jobs:
my-notebook-job:
name: my-notebook-job
tasks:
- task_key: my-notebook-task
notebook_task:
notebook_path: ./my-notebook.ipynb
parameters:
- name: my_job_run_id
default: "{{job.run_id}}"
有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > notebook_task
,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 请参阅作业的笔记本任务。
If/else 条件任务
你可以通过 condition_task
向作业添加具有 if/else 条件逻辑的任务。 该任务评估可用于控制其他任务执行的条件。 条件任务不需要群集来执行,也不支持重试或通知。 有关 if/else 任务的详细信息,请参阅使用 If/else 任务向作业添加分支逻辑。
以下示例包含条件任务和笔记本任务,其中笔记本任务仅在作业修复数小于 5 时执行。
resources:
jobs:
my-job:
name: my-job
tasks:
- task_key: condition_task
condition_task:
op: LESS_THAN
left: "{{job.repair_count}}"
right: "5"
- task_key: notebook_task
depends_on:
- task_key: condition_task
outcome: "true"
notebook_task:
notebook_path: ../src/notebook.ipynb
有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > condition_task
,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。
For each 任务
你可以通过 for_each_task
向作业添加包含 for each 循环的任务。 该任务为提供的每个输入执行嵌套任务。 有关 for_each_task
的详细信息,请参阅在循环中运行参数化 Azure Databricks 作业任务。
以下示例向作业添加一个 for_each_task
,在该作业中循环访问另一个任务的值并将其处理。
resources:
jobs:
my_job:
name: my_job
tasks:
- task_key: generate_countries_list
notebook_task:
notebook_path: ../src/generate_countries_list.ipnyb
- task_key: process_countries
depends_on:
- task_key: generate_countries_list
for_each_task:
inputs: "{{tasks.generate_countries_list.values.countries}}"
task:
task_key: process_countries_iteration
notebook_task:
notebook_path: ../src/process_countries_notebook.ipnyb
有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > for_each_task
,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。
Python 脚本任务
使用此任务运行 Python 文件。
以下示例向作业添加 Python 脚本任务。 要部署的 Python 文件的路径是相对于声明了此任务的配置文件而言。 该任务从其在 Azure Databricks 工作区中的部署位置获取 Python 文件。
resources:
jobs:
my-python-script-job:
name: my-python-script-job
tasks:
- task_key: my-python-script-task
spark_python_task:
python_file: ./my-script.py
有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > spark_python_task
,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 另请参阅作业的 Python 脚本任务。
Python wheel 任务
使用此任务可运行 Python wheel 文件。
以下示例向作业添加 Python wheel 任务。 要部署的 Python wheel 文件的路径是相对于声明了此任务的配置文件而言。 请参阅 Databricks 资产捆绑包的库依赖项。
resources:
jobs:
my-python-wheel-job:
name: my-python-wheel-job
tasks:
- task_key: my-python-wheel-task
python_wheel_task:
entry_point: run
package_name: my_package
libraries:
- whl: ./my_package/dist/my_package-*.whl
有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > python_wheel_task
,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 另请参阅使用 Databricks 资产捆绑包开发 Python Wheel 文件和作业的 Python Wheel 任务。
JAR 任务
使用此任务运行 JAR。 可以引用本地 JAR 库或工作区、Unity Catalog 卷或外部云存储位置中的 JAR 库。 请参阅 Databricks 资产捆绑包的库依赖项。
以下示例向作业添加 JAR 任务。 JAR 的路径是相对于指定卷的位置而言。
resources:
jobs:
my-jar-job:
name: my-jar-job
tasks:
- task_key: my-jar-task
spark_jar_task:
main_class_name: org.example.com.Main
libraries:
- jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar
有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > spark_jar_task
,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 请参阅作业的 JAR 任务。
SQL 文件任务
使用此任务运行位于工作区或远程 Git 存储库中的 SQL 文件。
以下示例向作业添加 SQL 文件任务。 此 SQL 文件任务使用指定的 SQL 仓库来运行指定的 SQL 文件。
resources:
jobs:
my-sql-file-job:
name: my-sql-file-job
tasks:
- task_key: my-sql-file-task
sql_task:
file:
path: /Users/someone@example.com/hello-world.sql
source: WORKSPACE
warehouse_id: 1a111111a1111aa1
若要获取 SQL 仓库的 ID,请打开 SQL 仓库的设置页,然后复制“概述”选项卡上“名称”字段中仓库名称后面括号中的 ID。
有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > sql_task > file
,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 请参阅作业的 SQL 任务。
增量实时表管道任务
使用此任务运行增量实时表管道。 请参阅什么是增量实时表?。
以下示例向作业添加增量实时表管道任务。 此增量实时表管道任务运行指定的管道。
resources:
jobs:
my-pipeline-job:
name: my-pipeline-job
tasks:
- task_key: my-pipeline-task
pipeline_task:
pipeline_id: 11111111-1111-1111-1111-111111111111
可以通过以下方法来查找管道的 ID:在工作区中打开管道,并在管道设置页的“管道详细信息”选项卡上复制“管道 ID”值。
有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > pipeline_task
,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 请参阅作业的 Delta Live Tables 管道任务。
dbt 任务
使用此任务运行一个或多个 dbt 命令。 请参阅连接到 dbt Cloud。
以下示例向作业添加 dbt 任务。 此 dbt 任务使用指定的 SQL 仓库来运行指定的 dbt 命令。
resources:
jobs:
my-dbt-job:
name: my-dbt-job
tasks:
- task_key: my-dbt-task
dbt_task:
commands:
- "dbt deps"
- "dbt seed"
- "dbt run"
project_directory: /Users/someone@example.com/Testing
warehouse_id: 1a111111a1111aa1
libraries:
- pypi:
package: "dbt-databricks>=1.0.0,<2.0.0"
若要获取 SQL 仓库的 ID,请打开 SQL 仓库的设置页,然后复制“概述”选项卡上“名称”字段中仓库名称后面括号中的 ID。
有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > dbt_task
,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 请参阅作业的 dbt 任务。
Databricks 资产捆绑包还包含一个 dbt-sql
项目模板,可用于定义具有 dbt 任务的作业,以及已部署 dbt 作业的 dbt 配置文件。 有关 Databricks 资产捆绑包模板的信息,请参阅使用默认捆绑包模板。
运行作业任务
使用此任务运行另一个作业。
以下示例在第二个作业中包含了运行第一个作业的运行作业任务。
resources:
jobs:
my-first-job:
name: my-first-job
tasks:
- task_key: my-first-job-task
new_cluster:
spark_version: "13.3.x-scala2.12"
node_type_id: "i3.xlarge"
num_workers: 2
notebook_task:
notebook_path: ./src/test.py
my_second_job:
name: my-second-job
tasks:
- task_key: my-second-job-task
run_job_task:
job_id: ${resources.jobs.my-first-job.id}
此示例使用替换来检索要运行的作业的 ID。 若要从 UI 获取作业的 ID,请在工作区中打开作业,并从作业“设置”页的“作业详细信息”选项卡中的“作业 ID”值中复制该 ID。
有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > run_job_task
,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。