什么是有状态流式处理?

有状态结构化流式处理查询需要对中间状态信息进行增量更新,而无状态结构化流式处理查询仅跟踪有关从源到接收器已处理哪些行的信息。

有状态操作包括流式聚合、流式 dropDuplicates、流到流连接和自定义有状态应用程序。

有状态结构化流式处理查询所需的中间状态信息可能会导致意外延迟和生产问题(如果配置有误)。

在 Databricks Runtime 13.3 LTS 及更高版本中,可以使用 RocksDB 启用更改日志检查点,以降低结构化流工作负载的检查点持续时间和端到端延迟。 Databricks 建议为所有结构化流式处理有状态查询启用更改日志检查点。 请参阅启用更改日志检查点

优化有状态结构化流式处理查询

管理有状态结构化流式处理查询的中间状态信息有助于防止意外的延迟和生产问题。

Databricks 建议:

  • 使用计算优化的实例作为工作器。
  • 将无序分区的数量设置为群集中的核心数的 1-2 倍。
  • 在 SparkSession 中,将 spark.sql.streaming.noDataMicroBatches.enabled 配置设置为 false。 这可阻止流式微批处理引擎处理不包含数据的微批处理。 另请注意,将此配置设置为 false 可能导致有状态操作,这些操作使用水印或处理时间超时,以便直到新数据到达后才进行数据输出,而不是立即输出。

Databricks 建议将 RocksDB 与 changelog 检查点配合使用来管理有状态流的状态。 请参阅在 Azure Databricks 上配置 RocksDB 状态存储

注意

无法在两次查询重启之间更改状态管理方案。 如果查询已使用默认管理启动,则必须使用新的检查点位置从头开始重启查询以更改状态存储。

在结构化流式处理中使用多个有状态运算符

在 Databricks Runtime 13.3 LTS 及更高版本中,Azure Databricks 为结构化流式处理工作负载中的有状态运算符提供高级支持。 现在可以将多个有状态运算符链接在一起,这意味着可以将操作的输出(如开窗聚合)馈送到另一个有状态操作(如联接)。

在 Databricks Runtime 16.2 及更高版本中,可以在具有多个有状态运算符的工作负载中使用 transformWithState。 请参阅 构建自定义有状态应用程序。 以下示例演示了可以使用的几种模式。

重要

使用多个有状态运算符时存在以下限制:

  • 不支持旧的自定义有状态运算符(FlatMapGroupWithStateapplyInPandasWithState)。
  • 仅支持追加输出模式。

链接的时间窗口聚合

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

两个不同流中的时间窗口聚合,后跟流间窗口联接

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala编程语言

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

流间时间间隔联接,后跟时间窗口聚合

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

结构化流式处理的状态重新平衡

默认情况下,为 Delta Live Tables 中的所有流式处理工作负载启用状态平衡调整。 在 Databricks Runtime 11.3 LTS 及更高版本中,可以在 Spark 群集配置中设置以下配置选项,以启用状态再平衡:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

状态重新平衡有利于经历群集大小重设事件的有状态结构化流式处理管道。 无论是否更改群集大小,无状态流式处理操作都不会受益。

注意

计算自动缩放在缩减结构化流式处理工作负载的群集大小方面存在限制。 Databricks 建议将增量实时表与增强型自动缩放用于流式处理工作负载。 请参阅使用增强型自动缩放来优化增量实时表管道的群集利用率

群集大小调整事件会触发状态重新平衡。 在再平衡事件期间,当状态从云存储加载到新执行程序时,微批处理可能具有较高的延迟。