向 Azure 服务总线主题发送消息,并从该主题的订阅接收消息 (Java)

在本快速入门中,你将使用 azure-messaging-servicebus package 包编写 Java 代码,以将消息发送到 Azure 服务总线主题,然后从该主题的订阅接收消息。

注意

本快速入门分步说明将一批消息发送到某个服务总线主题并从该主题的订阅接收这些消息这一简单场景。 可在 GitHub 上的 Azure SDK for Java 存储库中找到预生成的 Azure 服务总线 Java 示例。

提示

如果你目前在 Spring 应用程序中使用 Azure 服务总线资源,建议将 Spring Cloud Azure 视为替代方法。 Spring Cloud Azure 是一个开源项目,提供 Spring 与 Azure 服务的无缝集成。 若要详细了解 Spring Cloud Azure,并查看使用服务总线的示例,请参阅使用 Azure 服务总线的 Spring Cloud 流

先决条件

在 Azure 门户中创建命名空间

若要开始在 Azure 中使用服务总线消息实体,必须先使用在 Azure 中唯一的名称创建一个命名空间。 命名空间提供了用于应用程序中的服务总线资源(队列、主题等)的范围容器。

创建命名空间:

  1. 登录 Azure 门户

  2. 导航到“所有服务”

  3. 在左侧的导航栏中,从类别列表中选择“集成”,将鼠标悬停在“服务总线”上,然后选择“服务总线”磁贴上的 按钮。+

    图像显示“创建资源”、“集成”以及菜单中的“服务总线”选择。

  4. 在“创建命名空间”页的“基本信息”标记中,执行以下步骤 :

    1. 对于“订阅”,请选择要在其中创建命名空间的 Azure 订阅。

    2. 对于资源组,选择现有资源组或创建新的资源组。

    3. 输入命名空间的名称。 命名空间名称应遵循以下命名约定:

      • 该名称在 Azure 中必须唯一。 系统会立即检查该名称是否可用。
      • 名称长度最少为 6 个字符,最多为 50 个字符。
      • 名称只能包含字母、数字、连字符 -
      • 名称必须以字母开头,并且必须以字母或数字结尾。
      • 名称不以 -sb-mgmt 结尾。
    4. 对于“位置”,请选择托管该命名空间的区域。

    5. 对于“定价层”,请选择命名空间的定价层(“基本”、“标准”或“高级”)。 对于本快速入门,请选择“标准”。

      重要

      若要使用主题和订阅,请选择“标准”或“高级”。 基本定价层不支持主题/订阅。

      如果选择了“高级”定价层,请指定“消息传送单元”数 。 高级层在 CPU 和内存级别提供资源隔离,使每个工作负荷在隔离的环境中运行。 此资源容器称为消息传送单元。 高级命名空间至少具有一个消息传送单元。 可为每个服务总线高级命名空间选择 1、2、4、8 或 16 个消息传送单元。 有关详细信息,请参阅服务总线高级消息传送

    6. 在页面底部选择“查看 + 创建”。

      图像显示“创建命名空间”页

    7. 在“查看 + 创建”页上,查看设置,然后选择“创建” 。

  5. 资源部署成功后,在部署页上选择“转到资源”。

    图像显示“部署成功”页,其中包括“转到资源”链接。

  6. 将会看到服务总线命名空间的主页。

    图像显示已创建的服务总线命名空间的主页。

使用 Azure 门户创建主题

  1. 在“服务总线命名空间”页上,展开左侧导航菜单上的“实体”,然后在左侧菜单中选择“主题”

  2. 在工具栏中选择“+ 主题”。

  3. 输入主题名称。 将其他选项保留默认值。

  4. 选择“创建”。

    图像显示了“创建主题”页。

创建主题的订阅

  1. 选择在上一部分创建的主题

    图像显示主题列表中选择的主题。

  2. 在“服务总线主题”页面上,选择工具栏上的“+ 订阅” 。

    图像显示“添加订阅”按钮。

  3. 在“创建订阅”页上执行以下步骤:

    1. 对于订阅名称,输入“S1” 。

    2. 对于“最大交付数”,输入“3” 。

    3. 然后,选择“创建”以创建订阅。

      图像显示了“创建订阅”页。

向 Azure 验证应用

本快速入门介绍了连接到 Azure 服务总线的两种方法:无密码和连接字符串方法。 第一个选项展示如何使用 Azure Active Directory 中的安全主体和基于角色的访问控制 (RBAC) 连接到服务总线命名空间。 无需担心代码、配置文件或安全存储(如 Azure Key Vault)中存在硬编码的连接字符串。 第二个选项展示如何使用连接字符串连接到服务总线命名空间。 如果不熟悉 Azure,你可能会感觉连接字符串选项更易于使用。 建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅身份验证和授权。 还可以在概述页上阅读有关无密码身份验证的详细信息。

将角色分配给 Azure AD 用户

在本地开发时,请确保连接到 Azure 服务总线的用户帐户具有正确的权限。 你需要拥有 Azure 服务总线数据所有者角色才能发送和接收消息。 若要为自己分配此角色,需要具有“用户访问管理员”角色,或者具有包含 Microsoft.Authorization/roleAssignments/write 操作的其他角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 可在范围概述页上详细了解角色分配的可用范围。

以下示例将 Azure Service Bus Data Owner 角色分配给用户帐户,该角色提供对 Azure 服务总线资源的完全访问权限。 在实际方案中,遵循最小特权原则,仅向用户提供更安全的生产环境所需的最小权限。

适用于 Azure 服务总线的 Azure 内置角色

对于 Azure 服务总线,通过 Azure 门户和 Azure 资源管理 API 对命名空间和所有相关资源的管理已使用 Azure RBAC 模型进行了保护。 Azure 提供以下 Azure 内置角色,用于授予对服务总线命名空间的访问权限:

如果要创建自定义角色,请参阅执行服务总线操作所需的权限

将 Azure AD 用户添加到 Azure 服务总线所有者角色

将 Azure AD 用户名添加到服务总线命名空间级别的“Azure 服务总线数据所有者”角色。 它将允许在用户帐户上下文中运行的应用将消息发送到队列或主题,并从队列或主题的订阅中接收消息。

重要

在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,最多可能需要 8 分钟。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。

  1. 如果未在 Azure 门户中打开“服务总线命名空间”页,请使用主搜索栏或左侧导航找到你的服务总线命名空间。

  2. 在概述页面上,从左侧菜单中选择“访问控制(IAM)”。

  3. 在“访问控制 (IAM)”页上,选择“角色分配”选项卡。

  4. 从顶部菜单中选择“+ 添加”,然后从出现的下拉菜单中选择“添加角色分配”。

    显示如何分配角色的屏幕截图。

  5. 使用搜索框将结果筛选为所需角色。 对于此示例,请搜索 Azure Service Bus Data Owner 并选择匹配的结果。 然后选择“下一步” 。

  6. 在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。

  7. 在对话框中,搜索 Azure AD 用户名(通常为 user@domain 电子邮件地址),然后选择对话框底部的“选择”。

  8. 选择“查看 + 分配”转到最后一页,然后再次选择“查看 + 分配”完成该过程。

将消息发送到主题

在本部分中,你将创建一个 Java 控制台项目,并添加代码以将消息发送到创建的主题。

创建 Java 控制台项目

使用 Eclipse 或所选工具创建 Java 项目。

配置应用程序以使用服务总线

添加对 Azure Core 和 Azure 服务总线库的引用。

如果使用的是 Eclipse 并创建了 Java 控制台应用程序,请将 Java 项目转换为 Maven:在“包资源管理器”窗口中右键单击该项目,然后选择“配置”->“转换为 Maven 项目”。 然后,将依赖项添加到这两个库,如以下示例所示。

更新 pom.xml 文件以将依赖项添加到 Azure 服务总线和 Azure 标识包。

    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.13.3</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

添加将消息发送到主题的代码

  1. 将以下 import 语句添加到 Java 文件的主题中。

    import com.azure.messaging.servicebus.*;
    import com.azure.identity.*;
    
    import java.util.concurrent.TimeUnit;
    import java.util.Arrays;
    import java.util.List;
    
  2. 在类中,定义变量以保存连接字符串(无密码场景不需要)、主题名称和订阅名称。

    static String topicName = "<TOPIC NAME>";
    static String subName = "<SUBSCRIPTION NAME>";
    

    重要

    <TOPIC NAME> 替换为主题名称,并将 <SUBSCRIPTION NAME> 替换为主题订阅的名称。

  3. 在类中添加一个名为 sendMessage 的方法,以向主题发送一条消息。

    重要

    NAMESPACENAME 替换为你的服务总线命名空间的名称。

    static void sendMessage()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.chinacloudapi.cn")
                .credential(credential)
                .sender()
                .topicName(topicName)
                .buildClient();
    
        // send one message to the topic
        senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
        System.out.println("Sent a single message to the topic: " + topicName);
    }
    
    
  4. 在类中添加一个名为 createMessages 的方法,以创建消息列表。 通常,可以从应用程序的不同部分获得这些消息。 在这里,我们将创建一个示例消息列表。

    static List<ServiceBusMessage> createMessages()
    {
        // create a list of messages and return it to the caller
        ServiceBusMessage[] messages = {
                new ServiceBusMessage("First message"),
                new ServiceBusMessage("Second message"),
                new ServiceBusMessage("Third message")
        };
        return Arrays.asList(messages);
    }
    
  5. 添加一个名为 sendMessageBatch 方法的方法,以将消息发送到你创建的主题。 此方法为主题创建 ServiceBusSenderClient,调用 createMessages 方法来获取消息列表,准备一个或多个批处理,并将批处理发送到主题。

    重要

    NAMESPACENAME 替换为你的服务总线命名空间的名称。

    static void sendMessageBatch()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.chinacloudapi.cn")
                .credential(credential)
                .sender()
                .topicName(topicName)
                .buildClient();
    
        // Creates an ServiceBusMessageBatch where the ServiceBus.
        ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();
    
        // create a list of messages
        List<ServiceBusMessage> listOfMessages = createMessages();
    
        // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
        // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
        // messages are sent.
        for (ServiceBusMessage message : listOfMessages) {
            if (messageBatch.tryAddMessage(message)) {
                continue;
            }
    
            // The batch is full, so we create a new batch and send the batch.
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the topic: " + topicName);
    
            // create a new batch
            messageBatch = senderClient.createMessageBatch();
    
            // Add that message that we couldn't before.
            if (!messageBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
            }
        }
    
        if (messageBatch.getCount() > 0) {
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the topic: " + topicName);
        }
    
        //close the client
        senderClient.close();
    }
    

从订阅接收消息

在本部分中,你将添加代码以从主题的订阅中检索消息。

  1. 添加名为 receiveMessages 的方法,以从订阅检索消息。 此方法通过指定用于处理消息的处理程序和用于处理错误的另一个处理程序来为订阅创建 ServiceBusProcessorClient。 然后,它将启动处理器,等待几秒钟,输出接收的消息,然后停止和关闭处理器。

    重要

    • NAMESPACENAME 替换为你的服务总线命名空间的名称。
    • 将代码中 ServiceBusTopicTest 中的 ServiceBusTopicTest::processMessage 替换为你的类的名称。
    // handles received messages
    static void receiveMessages() throws InterruptedException
    {
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        // Create an instance of the processor through the ServiceBusClientBuilder
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
            .fullyQualifiedNamespace("NAMESPACENAME.servicebus.chinacloudapi.cn")
            .credential(credential)
            .processor()
            .topicName(topicName)
            .subscriptionName(subName)
            .processMessage(context -> processMessage(context))
            .processError(context -> processError(context))
            .buildProcessorClient();
    
        System.out.println("Starting the processor");
        processorClient.start();
    
        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping and closing the processor");
        processorClient.close();
    }
    
  2. 添加 processMessage 方法以处理从服务总线订阅接收的消息。

    private static void processMessage(ServiceBusReceivedMessageContext context) {
        ServiceBusReceivedMessage message = context.getMessage();
        System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
            message.getSequenceNumber(), message.getBody());
    }
    
  3. 添加 processError 方法以处理错误消息。

    private static void processError(ServiceBusErrorContext context) {
        System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
            context.getFullyQualifiedNamespace(), context.getEntityPath());
    
        if (!(context.getException() instanceof ServiceBusException)) {
            System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
            return;
        }
    
        ServiceBusException exception = (ServiceBusException) context.getException();
        ServiceBusFailureReason reason = exception.getReason();
    
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
            || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
            || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
                reason, exception.getMessage());
        } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            System.out.printf("Message lock lost for message: %s%n", context.getException());
        } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
            try {
                // Choosing an arbitrary amount of time to wait until trying again.
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                System.err.println("Unable to sleep for period of time");
            }
        } else {
            System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
                reason, context.getException());
        }
    }
    
  4. 更新 main 方法以调用 sendMessagesendMessageBatchreceiveMessages 方法,并引发 InterruptedException

    public static void main(String[] args) throws InterruptedException {
        sendMessage();
        sendMessageBatch();
        receiveMessages();
    }
    

运行应用

运行程序,可以看到类似于以下输出的输出:

  1. 如果使用的是 Eclipse,请右键单击该项目,选择“导出”,展开 Java,选择“可运行 JAR 文件”,然后按照步骤创建可运行 JAR 文件。

  2. 如果在登录计算机时使用了与添加到 Azure 服务总线数据所有者角色的用户帐户不同的用户帐户,请执行以下步骤。 否则,请跳过此步骤,然后在下一步中继续运行 Jar 文件。

    1. 在计算机上安装 Azure CLI

    2. 使用以下 CLI 命令登录到 Azure。 使用添加到 Azure 服务总线数据所有者角色的同一用户帐户。

      az cloud set -n AzureChinaCloud
      az login
      # az cloud set -n AzureCloud   //means return to Public Azure.
      
  3. 使用以下命令运行 Jar 文件。

    java -jar <JAR FILE NAME>
    
  4. 控制台窗口中会显示以下输出。

    Sent a single message to the topic: mytopic
    Sent a batch of messages to the topic: mytopic
    Starting the processor
    Processing message. Session: e0102f5fbaf646988a2f4b65f7d32385, Sequence #: 1. Contents: Hello, World!
    Processing message. Session: 3e991e232ca248f2bc332caa8034bed9, Sequence #: 2. Contents: First message
    Processing message. Session: 56d3a9ea7df446f8a2944ee72cca4ea0, Sequence #: 3. Contents: Second message
    Processing message. Session: 7bd3bd3e966a40ebbc9b29b082da14bb, Sequence #: 4. Contents: Third message
    

在 Azure 门户中的服务总线命名空间的“概述”页上,可看到传入和传出消息计数 。 等待一分钟左右,然后刷新页面以查看最新值。

传入和传出消息计数

切换到窗格偏中下位置的“主题”选项卡,然后选择主题以查看该主题的“服务总线主题”页 。 在此页上,“消息”图表中应显示四条传入消息和四条传出消息。

传入和传出消息

如果注释掉 main 方法中的 receiveMessages 调用,并再次运行应用,则在“服务总线主题”页上,除 4 条传出消息外,还会看到 8 条传入消息(4 条新消息)。

已更新的主题页

在此页上,如果选择一个订阅,则将转到“服务总线订阅”页。 可以在此页上查看活动消息计数、死信消息计数等。 在此示例中,还有四条活动消息未被接收器接收。

活动消息计数

后续步骤

请参阅以下文档和示例: