将任务添加到 Databricks 资产捆绑包中的作业

本文提供了可在 Databricks 资产包中向 Azure Databricks 作业添加的各种类型任务的示例。 请参阅什么是 Databricks 资产捆绑包?

大多数作业任务类型在其支持的设置中都具有特定于任务的参数,但你也可以定义传递给任务的作业参数。 作业参数支持动态值引用,从而支持在任务之间传递特定于作业运行的值。 请参阅什么是动态值引用?

注意

可以替代作业任务设置。 请参阅替代 Databricks 资产捆绑包中的作业任务设置

提示

若要使用 Databricks CLI 快速生成现有作业的资源配置,可以使用 bundle generate job 命令。 请参阅捆绑包命令

笔记本任务

使用此任务运行笔记本。

以下示例将笔记本任务添加到作业,并设置名为 my_job_run_id 的作业参数。 要部署的笔记本的路径是相对于声明了此任务的配置文件而言。 该任务从其在 Azure Databricks 工作区中的部署位置获取笔记本。

resources:
  jobs:
    my-notebook-job:
      name: my-notebook-job
      tasks:
        - task_key: my-notebook-task
          notebook_task:
            notebook_path: ./my-notebook.ipynb
      parameters:
        - name: my_job_run_id
          default: "{{job.run_id}}"

有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > notebook_task,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 请参阅作业的笔记本任务

If/else 条件任务

你可以通过 condition_task 向作业添加具有 if/else 条件逻辑的任务。 该任务评估可用于控制其他任务执行的条件。 条件任务不需要群集来执行,也不支持重试或通知。 有关 if/else 任务的详细信息,请参阅使用 If/else 任务向作业添加分支逻辑

以下示例包含条件任务和笔记本任务,其中笔记本任务仅在作业修复数小于 5 时执行。

resources:
  jobs:
    my-job:
      name: my-job
      tasks:
        - task_key: condition_task
          condition_task:
            op: LESS_THAN
            left: "{{job.repair_count}}"
            right: "5"
        - task_key: notebook_task
          depends_on:
            - task_key: condition_task
              outcome: "true"
          notebook_task:
            notebook_path: ../src/notebook.ipynb

有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > condition_task,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。

For each 任务

你可以通过 for_each_task 向作业添加包含 for each 循环的任务。 该任务为提供的每个输入执行嵌套任务。 有关 for_each_task 的详细信息,请参阅在循环中运行参数化 Azure Databricks 作业任务

以下示例向作业添加一个 for_each_task,在该作业中循环访问另一个任务的值并将其处理。

resources:
  jobs:
    my_job:
      name: my_job
      tasks:
        - task_key: generate_countries_list
          notebook_task:
            notebook_path: ../src/generate_countries_list.ipnyb
        - task_key: process_countries
          depends_on:
            - task_key: generate_countries_list
          for_each_task:
            inputs: "{{tasks.generate_countries_list.values.countries}}"
            task:
              task_key: process_countries_iteration
              notebook_task:
                notebook_path: ../src/process_countries_notebook.ipnyb

有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > for_each_task,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。

Python 脚本任务

使用此任务运行 Python 文件。

以下示例向作业添加 Python 脚本任务。 要部署的 Python 文件的路径是相对于声明了此任务的配置文件而言。 该任务从其在 Azure Databricks 工作区中的部署位置获取 Python 文件。

resources:
  jobs:
    my-python-script-job:
      name: my-python-script-job

      tasks:
        - task_key: my-python-script-task
          spark_python_task:
            python_file: ./my-script.py

有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > spark_python_task,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 另请参阅作业的 Python 脚本任务

Python wheel 任务

使用此任务可运行 Python wheel 文件。

以下示例向作业添加 Python wheel 任务。 要部署的 Python wheel 文件的路径是相对于声明了此任务的配置文件而言。 请参阅 Databricks 资产捆绑包的库依赖项

resources:
  jobs:
    my-python-wheel-job:
      name: my-python-wheel-job
      tasks:
        - task_key: my-python-wheel-task
          python_wheel_task:
            entry_point: run
            package_name: my_package
          libraries:
            - whl: ./my_package/dist/my_package-*.whl

有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > python_wheel_task,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 另请参阅使用 Databricks 资产捆绑包开发 Python Wheel 文件作业的 Python Wheel 任务

JAR 任务

使用此任务运行 JAR。 可以引用本地 JAR 库或工作区、Unity Catalog 卷或外部云存储位置中的 JAR 库。 请参阅 Databricks 资产捆绑包的库依赖项

以下示例向作业添加 JAR 任务。 JAR 的路径是相对于指定卷的位置而言。

resources:
  jobs:
    my-jar-job:
      name: my-jar-job
      tasks:
        - task_key: my-jar-task
          spark_jar_task:
            main_class_name: org.example.com.Main
          libraries:
            - jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar

有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > spark_jar_task,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 请参阅作业的 JAR 任务

SQL 文件任务

使用此任务运行位于工作区或远程 Git 存储库中的 SQL 文件。

以下示例向作业添加 SQL 文件任务。 此 SQL 文件任务使用指定的 SQL 仓库来运行指定的 SQL 文件。

resources:
  jobs:
    my-sql-file-job:
      name: my-sql-file-job
      tasks:
        - task_key: my-sql-file-task
          sql_task:
            file:
              path: /Users/someone@example.com/hello-world.sql
              source: WORKSPACE
            warehouse_id: 1a111111a1111aa1

若要获取 SQL 仓库的 ID,请打开 SQL 仓库的设置页,然后复制“概述”选项卡上“名称”字段中仓库名称后面括号中的 ID

有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > sql_task > file,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 请参阅作业的 SQL 任务

增量实时表管道任务

使用此任务运行增量实时表管道。 请参阅什么是增量实时表?

以下示例向作业添加增量实时表管道任务。 此增量实时表管道任务运行指定的管道。

resources:
  jobs:
    my-pipeline-job:
      name: my-pipeline-job
      tasks:
        - task_key: my-pipeline-task
          pipeline_task:
            pipeline_id: 11111111-1111-1111-1111-111111111111

可以通过以下方法来查找管道的 ID:在工作区中打开管道,并在管道设置页的“管道详细信息”选项卡上复制“管道 ID”值。

有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > pipeline_task,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 请参阅作业的 Delta Live Tables 管道任务

dbt 任务

使用此任务运行一个或多个 dbt 命令。 请参阅连接到 dbt Cloud

以下示例向作业添加 dbt 任务。 此 dbt 任务使用指定的 SQL 仓库来运行指定的 dbt 命令。

resources:
  jobs:
    my-dbt-job:
      name: my-dbt-job
      tasks:
        - task_key: my-dbt-task
          dbt_task:
            commands:
              - "dbt deps"
              - "dbt seed"
              - "dbt run"
            project_directory: /Users/someone@example.com/Testing
            warehouse_id: 1a111111a1111aa1
          libraries:
            - pypi:
                package: "dbt-databricks>=1.0.0,<2.0.0"

若要获取 SQL 仓库的 ID,请打开 SQL 仓库的设置页,然后复制“概述”选项卡上“名称”字段中仓库名称后面括号中的 ID

有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > dbt_task,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。 请参阅作业的 dbt 任务

Databricks 资产捆绑包还包含一个 dbt-sql 项目模板,可用于定义具有 dbt 任务的作业,以及已部署 dbt 作业的 dbt 配置文件。 有关 Databricks 资产捆绑包模板的信息,请参阅使用默认捆绑包模板

运行作业任务

使用此任务运行另一个作业。

以下示例在第二个作业中包含了运行第一个作业的运行作业任务。

resources:
  jobs:
    my-first-job:
      name: my-first-job
      tasks:
        - task_key: my-first-job-task
          new_cluster:
            spark_version: "13.3.x-scala2.12"
            node_type_id: "i3.xlarge"
            num_workers: 2
          notebook_task:
            notebook_path: ./src/test.py
    my_second_job:
      name: my-second-job
      tasks:
        - task_key: my-second-job-task
          run_job_task:
            job_id: ${resources.jobs.my-first-job.id}

此示例使用替换来检索要运行的作业的 ID。 若要从 UI 获取作业的 ID,请在工作区中打开作业,并从作业“设置”页的“作业详细信息”选项卡中的“作业 ID”值中复制该 ID

有关你可以为此任务设置的其他映射,请参阅创建作业操作的请求有效负载中的 tasks > run_job_task,该有效负载在 REST API 参考的 POST /api/2.1/jobs/create 中定义,以 YAML 格式表示。