Azure 事件中心

Azure 事件中心是超大规模的遥测引入服务,可收集、传输和存储数百万的事件。 作为分布式流式处理平台,它为用户提供低延迟和可配置的时间保留,使用户可以将大量遥测数据引入到云中,并使用发布-订阅语义从多个应用程序中读取数据。

本文介绍了如何通过 Azure 事件中心和 Azure Databricks 群集使用结构化流式处理。

注意

Azure 事件中心提供与 Apache Kafka 兼容的终结点,该终结点可与 Databricks Runtime 中提供的结构化流式处理 Kafka 连接器一起使用,以处理来自 Azure 事件中心的消息。 Databricks 建议使用结构化流式处理 Kafka 连接器处理来自 Azure 事件中心的消息。

要求

有关当前的版本支持,请参阅 Azure 事件中心 Spark 连接器项目自述文件中的“最新版本”。

  1. 使用 Maven 坐标 com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17 在 Azure Databricks 工作区中创建库

    注意

    此连接器会定期更新,并且可能会有最新版本:建议你从 Maven 存储库拉取最新的连接器

  2. 将创建的库安装到群集中。

架构

记录的架构为:

类型
body binary
partition string
offset string
sequenceNumber long
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string,json]

body 始终以字节数组的形式提供。 请使用 cast("string") 显式反序列化 body 列。

配置

本部分介绍了使用事件中心时所需的配置设置。

有关使用 Azure 事件中心配置结构化流式处理的详细指南,请参阅 Microsoft 制定的结构化流式处理和 Azure 事件中心集成指南

有关使用结构化流式处理的详细指导,请参阅 Azure Databricks 上的流式处理

连接字符串

需要使用事件中心连接字符串来连接到事件中心服务。 可以使用 Azure 门户或使用库中的 ConnectionStringBuilder 获取事件中心实例的连接字符串。

Azure 门户

从 Azure 门户获取连接字符串时,它可能有也可能没有 EntityPath 键。 请注意以下几点:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

若要连接到事件中心,必须提供 EntityPath。 如果你的连接字符串中没有该路径,请不要担心。 下面的对象将处理此问题:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

另外,也可以使用 ConnectionStringBuilder 来创建连接字符串。

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

与事件中心相关的所有配置都在 EventHubsConf 中进行。 若要创建 EventHubsConf,必须传递连接字符串:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

若要详细了解如何获取有效的连接字符串,请参阅连接字符串

有关配置的完整列表,请参阅 EventHubsConf。 若要开始,请参阅下面的部分配置。

选项 默认 查询类型 说明
consumerGroup 字符串 “$Default” 流式处理和批处理 使用者组是整个事件中心的视图。 使用者组使多个消费应用程序都有各自独立的事件流视图,并按自身步调和偏移量独立读取流。 Microsoft 文档中提供了详细信息。
startingPosition EventPosition 流的开头 流式处理和批处理 结构化流式处理作业的起始位置。 有关选项读取顺序的信息,请参阅 startingPositions
maxEventsPerTrigger long partitionCount

1000-
流式处理查询 每个触发器间隔处理的最大事件数的速率限制。 指定的事件总数将按比例分配到不同卷的分区中。

对于每个选项,EventHubsConf 中都存在一个相应的设置。 例如: 。

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf 允许用户通过 EventPosition 类指定起始(和结束)位置。 EventPosition 定义事件在事件中心分区中的位置。 位置可以是排队的时间、偏移量、序列号、流的开头或流的末尾。

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

如果要在特定位置开始(或结束),只需创建正确的 EventPosition 并在 EventHubsConf 中对它进行设置即可:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)