使用子协议创建可靠的 Websocket
当 Websocket 客户端连接由于间歇性网络问题而断开时,消息可能会丢失。 在发布/订阅系统中,发布服务器与订阅服务器是分离的,因此发布服务器可能无法检测到订阅服务器连接断开或消息丢失。 客户端克服间歇性网络问题并保持可靠的消息传送至关重要。 若要实现此目的,可以在可靠 Azure Web PubSub 子协议的帮助下创建可靠的 Websocket 客户端。
可靠协议
Web PubSub 服务支持两个可靠子协议:json.reliable.webpubsub.azure.v1
和 protobuf.reliable.webpubsub.azure.v1
。 客户端必须与子协议的发布服务器、订阅服务器和恢复部分保持一致才能实现可靠性。 不正确实现子协议可能导致消息传送无法按预期工作,或者服务由于协议冲突而终止客户端。
简单方法 - 使用客户端 SDK
创建可靠客户端的最简单方法是使用客户端 SDK。 客户端 SDK 实现 Web PubSub 客户端规范,默认使用 json.reliable.webpubsub.azure.v1
。 如需快速入门,请参阅将 PubSub 与客户端 SDK 配合使用。
困难方法 - 手动实现
以下教程可指导你完成实现 Web PubSub 客户端规范的重要部分。 本指南不适用于既想要了解快速入门知识,又想要了解可靠性实现原则的读者。 如需快速入门,请使用客户端 SDK。
初始化
若要使用可靠的子协议,必须在构造 Websocket 连接时设置子协议。 在 JavaScript 中,可使用以下代码:
使用 Json 可靠子协议:
var pubsub = new WebSocket( "wss://test.webpubsub.azure.cn/client/hubs/hub1", "json.reliable.webpubsub.azure.v1" );
使用 Protobuf 可靠子协议:
var pubsub = new WebSocket( "wss://test.webpubsub.azure.cn/client/hubs/hub1", "protobuf.reliable.webpubsub.azure.v1" );
连接恢复
连接恢复是实现可靠性的基础,在使用 json.reliable.webpubsub.azure.v1
和 protobuf.reliable.webpubsub.azure.v1
协议时必须实现重新连接。
Websocket 连接依赖于 TCP。 当连接未断开时,消息不会丢失,而是按顺序传送。 为了防止因连接断开而丢失消息,Web PubSub 服务将保留连接状态信息,包括组和消息信息。 此信息用于在连接恢复时还原客户端
当客户端使用可靠子协议重新连接到服务时,客户端将收到包含 connectionId
和 reconnectionToken
的 Connected
消息。 connectionId
标识服务中连接的会话。
{
"type": "system",
"event": "connected",
"connectionId": "<connection_id>",
"reconnectionToken": "<reconnection_token>"
}
WebSocket 连接断开后,客户端应尝试使用相同的 connectionId
重新连接以还原同一会话。 客户端不需要与服务器进行协商并获取 access_token
。 在恢复连接时,客户端应使用服务主机名、connection_id
和 reconnection_token
直接向服务发出 WebSocket 连接请求:
wss://<service-endpoint>/client/hubs/<hub>?awps_connection_id=<connection_id>&awps_reconnection_token=<reconnection_token>
如果网络问题尚未得到解决,连接恢复可能会失败。 客户端应持续重试重新连接,直到:
- Websocket 连接关闭并出现状态代码 1008。 该状态代码表示已从服务中删除了此 connectionId。
- 恢复失败的持续时间超过 1 分钟。
Publisher
将事件发送到事件处理程序或者将消息发布到其他客户端的客户端称为发布服务器。 发布服务器应在消息中设置 ackId
,以接收来自 Web PubSub 服务的确认,其中指出消息发布是否成功。
ackId
是消息的标识符,每个新消息应使用唯一 ID。 重新发送消息时应使用原始 ackId
。
示例组发送消息:
{
"type": "sendToGroup",
"group": "group1",
"dataType": "text",
"data": "text data",
"ackId": 1
}
示例确认响应:
{
"type": "ack",
"ackId": 1,
"success": true
}
如果 Web PubSub 服务返回带有 success: true
的确认响应,则表示该消息已由服务处理,并且客户端可以预期该消息将传递给所有订阅服务器。
当服务遇到暂时性内部错误并且无法将消息发送到订阅服务器时,发布服务器将收到带有 success: false
的确认。 发布服务器应读取错误,以确定是否要重新发送消息。 如果重新发送消息,则应使用相同的 ackId
。
{
"type": "ack",
"ackId": 1,
"success": false,
"error": {
"name": "InternalServerError",
"message": "Internal server error"
}
}
如果服务的确认响应因 WebSocket 连接断开而丢失,发布服务器应在恢复后使用相同的 ackId
重新发送消息。 如果服务以前处理了消息,它将发送包含 Duplicate
错误的确认。 发布服务器应停止重新发送此消息。
{
"type": "ack",
"ackId": 1,
"success": false,
"error": {
"name": "Duplicate",
"message": "Message with ack-id: 1 has been processed"
}
}
订阅者
从事件处理程序或发布服务器接收消息的客户端称为订阅服务器。 当连接因网络问题而断开时,Web PubSub 服务不知道已向订阅服务器发送了多少消息。 为了确定订阅服务器收到的最后一条消息,服务将发送包含 sequenceId
的数据消息。 订阅服务器使用序列确认消息做出响应:
示例序列确认:
{
"type": "sequenceAck",
"sequenceId": 1
}
sequenceId
是连接 ID 会话中的 uint64 递增编号。 订阅服务器应记录它收到的最大 sequenceId
,仅接受具有较大 sequenceId
的消息,并删除具有较小或相等 sequenceId
的消息。 订阅服务器应使用它记录的最大 sequenceId
进行确认,以便服务可以跳过重新传送已由订阅服务器收到的消息的操作。 例如,如果订阅服务器使用具有 sequenceId: 5
的 sequenceAck
做出响应,则服务仅重新发送 sequenceId
大于 5 的消息。
所有消息均按顺序传送到订阅服务器,直到 WebSocket 连接断开。 根据 sequenceId
,服务知道订阅服务器在一个会话中通过 WebSocket 连接收到了多少消息。 在 WebSocket 连接断开后,服务将重新传送未由订阅服务器确认的消息。 服务将存储有限数量的未确认消息。 当消息数超过限制时,服务将关闭 WebSocket 连接并删除会话。 因此,订阅服务器应尽快确认 sequenceId
。