Azure 事件中心的 Apache Kafka 事务
本文详细介绍了如何将 Apache Kafka 事务 API 与 Azure 事件中心配合使用。
概述
事件中心提供 Kafka 终结点,现有的 Kafka 客户端应用程序可将该终结点用作运行你自己的 Kafka 群集的替代方案。 事件中心可与许多现有 Kafka 应用程序配合使用。 有关详细信息,请参阅适用于 Apache Kafka 的事件中心。
本文档重点介绍如何无缝地将 Kafka 事务 API 与 Azure 事件中心配合使用。
注意
Kafka Transactions 目前在高级层和专用层中以公共预览版提供。
Apache Kafka 中的事务
在云本机环境中,应用程序必须能够灵活地应对网络中断及命名空间重启和升级。 需要严格处理保证的应用程序必须使用事务框架或 API 来确保执行所有操作,要么不执行任何操作,以便可靠地管理应用程序和数据状态。 如果一组操作失败,则可以原子方式可靠地重试这些操作,以确保正确的处理保证。
注意
当需要以“全部或无”方式处理多个操作时,通常需要事务性保证。
对于所有其他操作,如果特定操作失败,则客户端应用程序默认复原,从而以指数退避方式重试操作。
Apache Kafka 提供事务 API,以确保跨相同或不同的主题/分区集提供此级别的处理保证。
事务适用于以下情况:
- 事务生成者。
- 仅一次处理语义。
事务生成者
事务生成者确保将数据以原子方式写入到不同主题的多个分区。 生成者可以启动事务,写入同一主题或跨不同主题的多个分区,然后提交或中止事务。
为了确保生成者是事务性的,应将 enable.idempotence
设置为 true,以确保数据仅写入一次,从而避免发送端重复数据。 此外,应将 transaction.id
设置为唯一标识生成者。
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transactional-producer-1");
KafkaProducer<String, String> producer = new KafkaProducer(producerProps);
初始化生成者后,以下调用可确保生成者以事务生成者身份向中转站注册 -
producer.initTransactions();
然后,生成者必须显式开始事务,按正常方式执行跨不同主题和分区的发送操作,然后使用以下调用提交事务 –
producer.beginTransaction();
/*
Send to multiple topic partitions.
*/
producer.commitTransaction();
如果事务因错误或超时而需要中止,则生成者可以调用 abortTransaction()
方法。
producer.abortTransaction();
仅一次语义
仅一次语义在事务生成者的基础上构建,方法是在生成者的事务范围内添加使用者,以便保证每个记录都仅读取、处理和写入一次。
首先实例化事务生成者 -
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transactional-producer-1");
KafkaProducer<K, V> producer = new KafkaProducer(producerProps);
producer.initTransactions();
然后,使用者必须配置为只读取非事务性消息或提交的事务性消息,具体方法是设置以下属性 –
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);
实例化使用者后,可以订阅必须从中读取记录的主题 –
consumer.subscribe(singleton("inputTopic"));
使用者轮询输入主题中的记录后,生成者将开始处理记录并将其写入输出主题的事务范围。 写入记录后,将创建所有分区的已更新偏移量映射。 然后,生成者会在提交事务之前将这个更新后的偏移量映射发送到事务。
在任何情况下,事务都将中止,生成者会以原子方式再次重试处理。
while (true) {
ConsumerRecords records = consumer.poll(Long.Max_VALUE);
producer.beginTransaction();
try {
for (ConsumerRecord record : records) {
/*
Process record as appropriate
*/
// Write to output topic
producer.send(producerRecord("outputTopic", record));
}
/*
Generate the offset map to be committed.
*/
Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
for (TopicPartition partition : records.partitions()) {
// Calculate the offset to commit and populate the map.
offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
}
// send offsets to transaction and then commit the transaction.
producer.sendOffsetsToTransaction(offsetsToCommit, group);
producer.commitTransaction();
} catch (Exception e)
{
producer.abortTransaction();
}
}
警告
如果事务在 max.transaction.timeout.ms
之前未提交或中止,则事务将自动由事件中心中止。 默认 max.transaction.timeout.ms
由事件中心设置为“15 分钟”,但生成者可以通过在生成者配置属性中设置 transaction.timeout.ms
属性将其替代为较小的值。
迁移指南
如果有想要与 Azure 事件中心一起使用的现有 Kafka 应用程序,请查看 Azure 事件中心的 Kafka 迁移指南以快速启动运行。
后续步骤
若要详细了解事件中心和适用于 Kafka 的事件中心,请参阅以下文章: