什么是有状态流式处理?
有状态结构化流式处理查询需要对中间状态信息进行增量更新,而无状态结构化流式处理查询仅跟踪有关从源到接收器已处理哪些行的信息。
有状态操作包括流式处理聚合、流式处理 dropDuplicates
、流间联接、mapGroupsWithState
和 flatMapGroupsWithState
。
如果配置不正确,有状态结构化流式处理查询所需的中间状态信息可能会导致意外延迟和生产问题。
在 Databricks Runtime 13.3 LTS 及更高版本中,可以使用 RocksDB 启用更改日志检查点,以降低结构化流工作负载的检查点持续时间和端到端延迟。 Databricks 建议为所有“结构化流式处理”有状态查询启用更改日志检查点。 请参阅启用更改日志检查点。
优化有状态结构化流式处理查询
管理有状态结构化流式处理查询的中间状态信息有助于防止意外的延迟和生产问题。
Databricks 建议:
- 使用计算优化的实例作为工作器。
- 将无序分区的数量设置为群集中的核心数的 1-2 倍。
- 在 SparkSession 中,将
spark.sql.streaming.noDataMicroBatches.enabled
配置设置为false
。 这可阻止流式微批处理引擎处理不包含数据的微批处理。 另请注意,将此配置设置为false
可能会导致有状态操作,该操作利用水印或处理时间超时,直到新数据到达时才获取数据输出,而不是立即获取数据输出。
Databricks 建议将 RocksDB 与 changelog 检查点配合使用来管理有状态流的状态。 请参阅在 Azure Databricks 上配置 RocksDB 状态存储。
注意
无法在两次查询重启之间更改状态管理方案。 也就是说,如果使用默认管理启动了某个查询,但不使用新的检查点位置从头启动查询,则无法更改该查询。
在结构化流式处理中使用多个有状态运算符
在 Databricks Runtime 13.3 LTS 及更高版本中,Azure Databricks 为结构化流式处理工作负载中的有状态运算符提供高级支持。 现在可以将多个有状态运算符链接在一起,这意味着可以将操作的输出(如开窗聚合)馈送到另一个有状态操作(如联接)。
以下示例演示了可以使用的几种模式。
重要
使用多个有状态运算符时存在以下限制:
- 不支持
FlatMapGroupWithState
。 - 仅支持追加输出模式。
链接的时间窗口聚合
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
两个不同流中的时间窗口聚合,后跟流间窗口联接
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
流间时间间隔联接,后跟时间窗口聚合
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
结构化流的状态重新平衡
默认情况下,为 Delta Live Tables 中的所有流式处理工作负载启用状态重新平衡。 在 Databricks Runtime 11.3 LTS 及更高版本中,可以在 Spark 群集配置中设置以下配置选项,以启用状态再平衡:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
状态再平衡有利于经历群集大小重设事件的有状态结构化流式处理管道。 无论是否更改群集大小,无状态流式处理操作都不会受益。
注意
计算自动缩放在缩减结构化流式处理工作负载的群集大小方面存在限制。 Databricks 建议将增量实时表与增强式自动缩放用于流式处理工作负载。 请参阅使用增强型自动缩放来优化增量实时表管道的群集利用率。
群集大小调整事件会导致触发状态重新平衡。 在重新平衡事件期间,当状态从云存储加载到新执行程序时,微批处理可能具有较高的延迟。
为 mapGroupsWithState
指定初始状态
可以使用 flatMapGroupsWithState
或 mapGroupsWithState
为结构化流式处理有状态处理指定用户定义的初始状态。 当在不使用有效检查点的情况下启动有状态流时,这样做就可避免重新处理数据。
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
以下示例用例指定 flatMapGroupsWithState
运算符的初始状态:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
以下示例用例指定 mapGroupsWithState
运算符的初始状态:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
测试 mapGroupsWithState
更新函数
利用 TestGroupState
API,你可以测试用于 Dataset.groupByKey(...).mapGroupsWithState(...)
和 Dataset.groupByKey(...).flatMapGroupsWithState(...)
的状态更新函数。
状态更新函数使用 GroupState
类型的对象获取先前的状态作为输入。 请参阅 Apache Spark GroupState 参考文档。 例如:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}