流式处理和增量引入

Azure Databricks 使用 Apache Spark 结构化流式处理来支持与引入工作负载关联的许多产品,包括:

  • 自动加载程序
  • COPY INTO
  • Delta Live Tables 管道

本文讨论了流式处理和增量批处理语义之间的一些差异,并简要概述了如何为 Databricks 中所需的语义配置引入工作负载。

流式处理和增量批处理引入之间的区别是什么?

可能的引入工作流配置范围从近实时处理到不频繁的增量批处理不等。 这两种模式都使用 Apache Spark 结构化流式处理来支持增量处理,但具有不同的语义。 为简单起见,本文将近乎实时的引入称为流式引入,将更不频繁的增量处理称为增量批处理引入

流式引入

流式处理在数据引入和表更新上下文中是指近实时的数据处理,其中 Azure Databricks 使用始终可用的基础结构将记录从源引入到接收器中。 流式处理工作负载会持续从配置的数据源引入更新,除非发生故障,停止引入。

增量批处理引入

增量批处理引入是指从短期作业的数据源中处理所有新记录的模式。 增量批处理引入通常按计划进行,但也可以手动触发或基于文件到达进行。

增量批处理引入不同于批处理引入,因为它会自动检测数据源中的新记录,并忽略已引入的记录。

使用作业引入

使用 Databricks 作业,你可以编排工作流和计划任务,包括笔记本、库、增量实时表管道和 Databricks SQL 查询。

注意

你可以使用所有 Azure Databricks 计算类型和任务类型来配置增量批处理引入。 仅在经典作业计算和增量实时表的生产环境中支持流式处理引入。

作业具有两种主要操作模式:

  • 如果连续作业遇到故障,它们会自动重试。 此模式适用于流式处理引入。
  • 触发的作业在触发时运行任务。 触发器包括:
    • 按指定计划运行作业的基于时间的触发器。
    • 在文件进入指定位置时运行作业的基于文件的触发器。
    • 其他触发器,例如 REST API 调用、Azure Databricks CLI 命令的执行,或单击工作区 UI 中的“立即运行”按钮。

对于增量批处理工作负载,请使用 AvailableNow 触发器模式配置作业,如下所示:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("table_name")

对于流式处理工作负载,默认的触发器间隔为 processingTime ="500ms"。 以下示例演示如何每 5 秒处理一次微批处理:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(processingTime="5 seconds")
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.ProcessingTime, "5 seconds")
  .toTable("table_name")

重要

如果需要近乎实时的引入语义,请使用经典作业。

使用增量实时表进行引入

与作业类似,增量实时表管道可以在触发模式或连续模式下运行。 对于具有流式处理表的近实时流式处理语义,请使用连续模式。

使用流式表配置来自云对象存储、Apache Kafka、Amazon Kinesis 或 Apache Pulsar 的流式或增量批量引入。