Azure 事件中心的 Apache Kafka 事务

本文详细介绍了如何将 Apache Kafka 事务 API 与 Azure 事件中心配合使用。

概述

事件中心提供 Kafka 终结点,现有的 Kafka 客户端应用程序可将该终结点用作运行你自己的 Kafka 群集的替代方案。 事件中心可与许多现有 Kafka 应用程序配合使用。 有关详细信息,请参阅适用于 Apache Kafka 的事件中心

本文档重点介绍如何无缝地将 Kafka 事务 API 与 Azure 事件中心配合使用。

注意

Kafka Transactions 目前在高级层和专用层中以公共预览版提供。

Apache Kafka 中的事务

在云本机环境中,应用程序必须能够灵活地应对网络中断及命名空间重启和升级。 需要严格处理保证的应用程序必须使用事务框架或 API 来确保执行所有操作,要么不执行任何操作,以便可靠地管理应用程序和数据状态。 如果一组操作失败,则可以原子方式可靠地重试这些操作,以确保正确的处理保证。

注意

当需要以“全部或无”方式处理多个操作时,通常需要事务性保证。

对于所有其他操作,如果特定操作失败,则客户端应用程序默认复原,从而以指数退避方式重试操作。

Apache Kafka 提供事务 API,以确保跨相同或不同的主题/分区集提供此级别的处理保证。

事务适用于以下情况:

  • 事务生成者。
  • 仅一次处理语义。

事务生成者

事务生成者确保将数据以原子方式写入到不同主题的多个分区。 生成者可以启动事务,写入同一主题或跨不同主题的多个分区,然后提交或中止事务。

为了确保生成者是事务性的,应将 enable.idempotence 设置为 true,以确保数据仅写入一次,从而避免发送端重复数据。 此外,应将 transaction.id 设置为唯一标识生成者。

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

初始化生成者后,以下调用可确保生成者以事务生成者身份向中转站注册 -

    producer.initTransactions();

然后,生成者必须显式开始事务,按正常方式执行跨不同主题和分区的发送操作,然后使用以下调用提交事务 –

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

如果事务因错误或超时而需要中止,则生成者可以调用 abortTransaction() 方法。

	producer.abortTransaction();

仅一次语义

仅一次语义在事务生成者的基础上构建,方法是在生成者的事务范围内添加使用者,以便保证每个记录都仅读取、处理和写入一次

首先实例化事务生成者 -


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

然后,使用者必须配置为只读取非事务性消息或提交的事务性消息,具体方法是设置以下属性 –


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

实例化使用者后,可以订阅必须从中读取记录的主题 –


    consumer.subscribe(singleton("inputTopic"));

使用者轮询输入主题中的记录后,生成者将开始处理记录并将其写入输出主题的事务范围。 写入记录后,将创建所有分区的已更新偏移量映射。 然后,生成者会在提交事务之前将这个更新后的偏移量映射发送到事务。

在任何情况下,事务都将中止,生成者会以原子方式再次重试处理。

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord("outputTopic", record));
    		}

            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }

            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

警告

如果事务在 max.transaction.timeout.ms 之前未提交或中止,则事务将自动由事件中心中止。 默认 max.transaction.timeout.ms 由事件中心设置为“15 分钟”,但生成者可以通过在生成者配置属性中设置 transaction.timeout.ms 属性将其替代为较小的值。

迁移指南

如果有想要与 Azure 事件中心一起使用的现有 Kafka 应用程序,请查看 Azure 事件中心的 Kafka 迁移指南以快速启动运行。

后续步骤

若要详细了解事件中心和适用于 Kafka 的事件中心,请参阅以下文章: