有状态查询的异步状态检查点
注意
在 Databricks Runtime 10.4 LTS 及更高版本中可用。
异步状态检查点为流式处理查询维护恰好一次的保证,但可以为在状态更新上出现瓶颈的一些结构化流式处理有状态工作负载减少总体延迟。 这通过在完成上一个微批处理的计算后立即开始处理下一个微批处理来实现,无需等待状态检查点完成。 下表比较了同步检查点和异步检查点的权衡:
特征 | 同步检查点 | 异步检查点 |
---|---|---|
延迟 | 每个微批处理的延迟更高。 | 减少延迟,因为微批处理可能会重叠。 |
重启 | 快速恢复,因为只需重新运行最后一批。 | 重启延迟较高,因为可能需要重新运行多个微批处理。 |
以下是可能从异步状态检查点中受益的流式处理作业的特征:
- 作业有一个或多个有状态操作(例如,聚合、
flatMapGroupsWithState
、mapGroupsWithState
、流之间的联接) - 状态检查点延迟是导致整体批处理执行延迟的主要因素之一。 此信息可以在 StreamingQueryProgress 事件中找到。 这些事件也可在 Spark 驱动程序上的 log4j 日志中找到。 下面是流式处理查询进度的示例,以及如何查找状态检查点对总体批处理执行延迟的影响。
-
{ "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19", "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe", "...", "batchId" : 0, "durationMs" : { "...", "triggerExecution" : 547730, "..." }, "stateOperators" : [ { "...", "commitTimeMs" : 3186626, "numShufflePartitions" : 64, "..." }] }
上述查询进度事件的状态检查点延迟分析
- 批处理持续时间 (
durationMs.triggerDuration
) 约为 547 秒。 - 状态存储提交延迟时间 (
stateOperations[0].commitTimeMs
) 大约为 3186 秒。 提交延迟时间是所有包含状态存储的任务的总和。 在此例中共有 64 个这样的任务 (stateOperators[0].numShufflePartitions
)。 - 每个包含状态运算符的任务平均花费 50 秒(3186/64)用于检查点。 这导致了批处理持续时间的额外延迟。 假设所有 64 个任务并发运行,则检查点步骤占据了大约 9%(50 秒/547 秒)的批处理持续时间。 当并发运行的最大任务数小于 64 时,这一百分比会更高。
- 批处理持续时间 (
-
启用异步状态检查点
必须为异步状态检查点使用基于 RocksDB 的状态存储。 设置以下配置:
spark.conf.set(
"spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
"true"
)
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)
异步检查点的限制和要求
注意
计算自动缩放在缩减结构化流式处理工作负载的群集大小方面存在限制。 Databricks 建议将增量实时表与增强式自动缩放用于流式处理工作负载。 请参阅使用增强型自动缩放来优化增量实时表管道的群集利用率。
- 在任何一个或多个存储内,异步检查点中的任何故障都会导致查询失败。 在同步检查点模式下,检查点作为任务的一部分执行,Spark 在查询失败之前会多次重试任务。 异步状态检查点不提供此机制。 Databricks 建议使用连续作业以在作业失败时自动重试。 请参阅连续运行作业。
- 当状态存储位置在微批处理执行之间未更改时,异步检查点最有效。 群集大小重设与异步状态检查可能无法正常结合使用,因为在出现群集大小重设事件时,状态存储实例可能会在添加或删除节点时重新分布。
- 仅在 RocksDB 状态存储提供程序实现中支持异步状态检查点。 默认的内存中状态存储实现不支持异步状态检查点。