向 Azure 服务总线队列发送消息并从中接收消息 (Java)
在本快速入门中,你会创建一个 Java 应用,以便向 Azure 服务总线队列发送消息,并从中接收消息。
注意
本快速入门分步介绍了一个简单方案,也就是将消息发送到服务总线队列并接收这些消息。 可在 GitHub 上的 Azure SDK for Java 存储库中找到预生成的 Azure 服务总线 Java 示例。
提示
如果你目前在 Spring 应用程序中使用 Azure 服务总线资源,建议将 Spring Cloud Azure 视为替代方法。 Spring Cloud Azure 是一个开源项目,提供 Spring 与 Azure 服务的无缝集成。 若要详细了解 Spring Cloud Azure,并查看使用服务总线的示例,请参阅使用 Azure 服务总线的 Spring Cloud 流。
先决条件
- Azure 订阅。 若要完成本教程,需要一个 Azure 帐户。 你可以激活 MSDN 订阅者权益或注册试用版订阅。
- 安装 Azure SDK for Java。 如果使用 Eclipse,则可安装 Azure Toolkit for Eclipse,其中包含用于 Java 的 Azure SDK。 然后,可将“适用于 Java 的 Microsoft Azure 库”添加到项目 。 如果使用 IntelliJ,请参阅安装 Azure Toolkit for IntelliJ。
在 Azure 门户中创建命名空间
若要开始在 Azure 中使用服务总线消息实体,必须先使用在 Azure 中唯一的名称创建一个命名空间。 命名空间提供了用于应用程序中的服务总线资源(队列、主题等)的范围容器。
创建命名空间:
登录 Azure 门户。
导航到“所有服务”页。
在左侧的导航栏中,从类别列表中选择“集成”,将鼠标悬停在“服务总线”上,然后选择“服务总线”磁贴上的 按钮。+。
在“创建命名空间”页的“基本信息”标记中,执行以下步骤 :
对于“订阅”,请选择要在其中创建命名空间的 Azure 订阅。
对于资源组,选择现有资源组或创建新的资源组。
输入命名空间的名称。 命名空间名称应遵循以下命名约定:
- 该名称在 Azure 中必须唯一。 系统会立即检查该名称是否可用。
- 名称长度最少为 6 个字符,最多为 50 个字符。
- 名称只能包含字母、数字、连字符
-
。 - 名称必须以字母开头,并且必须以字母或数字结尾。
- 名称不以
-sb
或-mgmt
结尾。
对于“位置”,请选择托管该命名空间的区域。
对于“定价层”,请选择命名空间的定价层(“基本”、“标准”或“高级”)。 对于本快速入门,请选择“标准”。
重要
若要使用主题和订阅,请选择“标准”或“高级”。 基本定价层不支持主题/订阅。
如果选择了“高级”定价层,请指定“消息传送单元”数 。 高级层在 CPU 和内存级别提供资源隔离,使每个工作负荷在隔离的环境中运行。 此资源容器称为消息传送单元。 高级命名空间至少具有一个消息传送单元。 可为每个服务总线高级命名空间选择 1、2、4、8 或 16 个消息传送单元。 有关详细信息,请参阅服务总线高级消息传送。
在页面底部选择“查看 + 创建”。
在“查看 + 创建”页上,查看设置,然后选择“创建” 。
资源部署成功后,在部署页上选择“转到资源”。
将会看到服务总线命名空间的主页。
在 Azure 门户中创建队列
在“服务总线命名空间”页上,展开左侧导航菜单上的“实体”,然后在左侧菜单中选择“队列”。
在“队列”页面上,选择工具栏上的“+ 队列”。
输入队列名称,其他值则保留默认值。
现在选择“创建”。
向 Azure 验证应用
本快速入门介绍了连接到 Azure 服务总线的两种方法:无密码和连接字符串方法。 第一个选项展示如何使用 Azure Active Directory 中的安全主体和基于角色的访问控制 (RBAC) 连接到服务总线命名空间。 无需担心代码、配置文件或安全存储(如 Azure Key Vault)中存在硬编码的连接字符串。 第二个选项展示如何使用连接字符串连接到服务总线命名空间。 如果不熟悉 Azure,你可能会感觉连接字符串选项更易于使用。 建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅身份验证和授权。 还可以在概述页上阅读有关无密码身份验证的详细信息。
将角色分配给 Azure AD 用户
在本地开发时,请确保连接到 Azure 服务总线的用户帐户具有正确的权限。 你需要拥有 Azure 服务总线数据所有者角色才能发送和接收消息。 若要为自己分配此角色,需要具有“用户访问管理员”角色,或者具有包含 Microsoft.Authorization/roleAssignments/write
操作的其他角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 可在范围概述页上详细了解角色分配的可用范围。
以下示例将 Azure Service Bus Data Owner
角色分配给用户帐户,该角色提供对 Azure 服务总线资源的完全访问权限。 在实际方案中,遵循最小特权原则,仅向用户提供更安全的生产环境所需的最小权限。
适用于 Azure 服务总线的 Azure 内置角色
对于 Azure 服务总线,通过 Azure 门户和 Azure 资源管理 API 对命名空间和所有相关资源的管理已使用 Azure RBAC 模型进行了保护。 Azure 提供以下 Azure 内置角色,用于授予对服务总线命名空间的访问权限:
- Azure 服务总线数据所有者:允许对服务总线命名空间及其实体(队列、主题、订阅和筛选器)进行数据访问。 此角色的成员可以从队列或主题/订阅发送和接收消息。
- Azure 服务总线数据发送者:使用此角色可以为服务总线命名空间及其实体提供发送访问权限。
- Azure 服务总线数据接收者:使用此角色可以为服务总线命名空间及其实体提供接收访问权限。
如果要创建自定义角色,请参阅执行服务总线操作所需的权限。
将 Azure AD 用户添加到 Azure 服务总线所有者角色
将 Azure AD 用户名添加到服务总线命名空间级别的“Azure 服务总线数据所有者”角色。 它将允许在用户帐户上下文中运行的应用将消息发送到队列或主题,并从队列或主题的订阅中接收消息。
重要
在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,最多可能需要 8 分钟。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。
如果未在 Azure 门户中打开“服务总线命名空间”页,请使用主搜索栏或左侧导航找到你的服务总线命名空间。
在概述页面上,从左侧菜单中选择“访问控制(IAM)”。
在“访问控制 (IAM)”页上,选择“角色分配”选项卡。
从顶部菜单中选择“+ 添加”,然后从出现的下拉菜单中选择“添加角色分配”。
使用搜索框将结果筛选为所需角色。 对于此示例,请搜索
Azure Service Bus Data Owner
并选择匹配的结果。 然后选择“下一步” 。在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。
在对话框中,搜索 Azure AD 用户名(通常为 user@domain 电子邮件地址),然后选择对话框底部的“选择”。
选择“查看 + 分配”转到最后一页,然后再次选择“查看 + 分配”完成该过程。
向队列发送消息
在本部分中,你将创建一个 Java 控制台项目,并添加代码以将消息发送到之前创建的队列。
创建 Java 控制台项目
使用 Eclipse 或所选工具创建 Java 项目。
配置应用程序以使用服务总线
添加对 Azure Core 和 Azure 服务总线库的引用。
如果使用的是 Eclipse 并创建了 Java 控制台应用程序,请将 Java 项目转换为 Maven:在“包资源管理器”窗口中右键单击该项目,然后选择“配置”->“转换为 Maven 项目”。 然后,将依赖项添加到这两个库,如以下示例所示。
更新 pom.xml
文件以将依赖项添加到 Azure 服务总线和 Azure 标识包。
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.13.3</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
添加将消息发送到队列的代码
将以下
import
语句添加到 Java 文件的主题中。在类中,定义用于保存连接字符串和队列名称的变量。
在类中添加一个名为
sendMessage
的方法,以向队列发送一条消息。重要
将
NAMESPACENAME
替换为你的服务总线命名空间的名称。static void sendMessage() { // create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .build(); ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .fullyQualifiedNamespace("NAMESPACENAME.servicebus.chinacloudapi.cn") .credential(credential) .sender() .queueName(queueName) .buildClient(); // send one message to the queue senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.println("Sent a single message to the queue: " + queueName); }
在类中添加一个名为
createMessages
的方法,以创建消息列表。 通常,可以从应用程序的不同部分获得这些消息。 在这里,我们将创建一个示例消息列表。static List<ServiceBusMessage> createMessages() { // create a list of messages and return it to the caller ServiceBusMessage[] messages = { new ServiceBusMessage("First message"), new ServiceBusMessage("Second message"), new ServiceBusMessage("Third message") }; return Arrays.asList(messages); }
添加一个名为
sendMessageBatch
方法的方法,以将消息发送到你创建的队列。 此方法为队列创建ServiceBusSenderClient
调用createMessages
方法来获取消息列表,准备一个或多个批处理,并将批处理发送到队列。重要
将
NAMESPACENAME
替换为你的服务总线命名空间的名称。static void sendMessageBatch() { // create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .build(); ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .fullyQualifiedNamespace("NAMESPACENAME.servicebus.chinacloudapi.cn") .credential(credential) .sender() .queueName(queueName) .buildClient(); // Creates an ServiceBusMessageBatch where the ServiceBus. ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch(); // create a list of messages List<ServiceBusMessage> listOfMessages = createMessages(); // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all // messages are sent. for (ServiceBusMessage message : listOfMessages) { if (messageBatch.tryAddMessage(message)) { continue; } // The batch is full, so we create a new batch and send the batch. senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the queue: " + queueName); // create a new batch messageBatch = senderClient.createMessageBatch(); // Add that message that we couldn't before. if (!messageBatch.tryAddMessage(message)) { System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes()); } } if (messageBatch.getCount() > 0) { senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the queue: " + queueName); } //close the client senderClient.close(); }
从队列接收消息
在本部分中,你将添加从队列检索消息的代码。
添加名为
receiveMessages
的方法,以从队列检索消息。 此方法通过指定用于处理消息的处理程序和用于处理错误的另一个处理程序来为队列创建ServiceBusProcessorClient
。 然后,它将启动处理器,等待几秒钟,输出接收的消息,然后停止和关闭处理器。重要
- 将
NAMESPACENAME
替换为你的服务总线命名空间的名称。 - 将代码中
QueueTest
中的QueueTest::processMessage
替换为你的类的名称。
// handles received messages static void receiveMessages() throws InterruptedException { DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .build(); ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .fullyQualifiedNamespace("NAMESPACENAME.servicebus.chinacloudapi.cn") .credential(credential) .processor() .queueName(queueName) .processMessage(context -> processMessage(context)) .processError(context -> processError(context)) .buildProcessorClient(); System.out.println("Starting the processor"); processorClient.start(); TimeUnit.SECONDS.sleep(10); System.out.println("Stopping and closing the processor"); processorClient.close(); }
- 将
添加
processMessage
方法以处理从服务总线订阅接收的消息。private static void processMessage(ServiceBusReceivedMessageContext context) { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); }
添加
processError
方法以处理错误消息。private static void processError(ServiceBusErrorContext context) { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (!(context.getException() instanceof ServiceBusException)) { System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException()); return; } ServiceBusException exception = (ServiceBusException) context.getException(); ServiceBusFailureReason reason = exception.getReason(); if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || reason == ServiceBusFailureReason.UNAUTHORIZED) { System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n", reason, exception.getMessage()); } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { System.out.printf("Message lock lost for message: %s%n", context.getException()); } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { try { // Choosing an arbitrary amount of time to wait until trying again. TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { System.err.println("Unable to sleep for period of time"); } } else { System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(), reason, context.getException()); } }
更新
main
方法以调用sendMessage
、sendMessageBatch
和receiveMessages
方法,并引发InterruptedException
。public static void main(String[] args) throws InterruptedException { sendMessage(); sendMessageBatch(); receiveMessages(); }
运行应用
如果使用的是 Eclipse,请右键单击该项目,选择“导出”,展开 Java,选择“可运行 JAR 文件”,然后按照步骤创建可运行 JAR 文件。
如果在登录计算机时使用了与添加到 Azure 服务总线数据所有者角色的用户帐户不同的用户帐户,请执行以下步骤。 否则,请跳过此步骤,然后在下一步中继续运行 Jar 文件。
在计算机上安装 Azure CLI。
使用以下 CLI 命令登录到 Azure。 使用添加到 Azure 服务总线数据所有者角色的同一用户帐户。
az cloud set -n AzureChinaCloud az login # az cloud set -n AzureCloud //means return to Public Azure.
使用以下命令运行 Jar 文件。
java -jar <JAR FILE NAME>
控制台窗口中会显示以下输出。
Sent a single message to the queue: myqueue Sent a batch of messages to the queue: myqueue Starting the processor Processing message. Session: 88d961dd801f449e9c3e0f8a5393a527, Sequence #: 1. Contents: Hello, World! Processing message. Session: e90c8d9039ce403bbe1d0ec7038033a0, Sequence #: 2. Contents: First message Processing message. Session: 311a216a560c47d184f9831984e6ac1d, Sequence #: 3. Contents: Second message Processing message. Session: f9a871be07414baf9505f2c3d466c4ab, Sequence #: 4. Contents: Third message Stopping and closing the processor
在 Azure 门户中的服务总线命名空间的“概述”页上,可看到传入和传出消息计数 。 等待一分钟左右,然后刷新页面以查看最新值。
在此“概述”页上选择队列,导航到“服务总线队列”页面 。 还可在此页上看到传入和传出消息计数 。 还可看到其他信息,如队列的当前大小、最大大小和活动消息计数等 。
后续步骤
请参阅以下文档和示例: