通过服务总线消息传递进行分布式跟踪和关联

微服务开发中的常见问题之一是能否通过处理过程中涉及的所有服务跟踪来自客户端的操作。 此能力对于进行调试、性能分析、A/B 测试和其他典型诊断方案相当有用。 此问题的一个组成部分是跟踪工作的逻辑片段。 这包括消息处理结果和延迟,以及外部依赖项调用。 另一个组成部分是这些诊断事件在进程边界外部的关联。

当生成者通过队列发送消息时,此活动通常发生在其他某个逻辑操作的范围内,并由其他客户端或服务启动。 当使用者收到消息时,会继续相同的操作。 生成者与使用者(以及其他处理该操作的服务)也许会发出遥测事件,以跟踪操作流和结果。 若要将此类事件相关联并以端到端的方式跟踪操作,报告遥测数据的每个服务必须为每个事件提供跟踪上下文戳记。 NServiceBus 是一个可以帮助开发人员默认发出所有这些遥测数据的库。

Azure 服务总线消息传递已定义生成者与使用者应该用来传递此类跟踪上下文的有效负载属性。 协议基于 W3C Trace-Context

属性名称 说明
Diagnostic-Id 生成者针对队列发出的外部调用的唯一标识符。 请参阅 W3C Trace-Context traceparent 标头了解有关格式的信息

服务总线 .NET 客户端自动跟踪

ServiceBusProcessor适用于 .NET 的 Azure 消息传送服务总线客户端 类提供了可通过跟踪系统或客户端代码片段挂接的跟踪检测点。 使用检测可以从客户端跟踪对服务总线消息传递服务发出的所有调用。 如果消息处理是使用 ProcessMessageAsyncServiceBusProcessor(消息处理程序模式)完成的,则还会检测消息处理。

使用 Azure Application Insights 进行跟踪

Microsoft Application Insights 提供丰富的性能监视功能,包括自动请求和依赖项跟踪。

请根据项目类型安装 Application Insights SDK:

如果你使用 ProcessMessageAsyncServiceBusProcessor(消息处理程序模式)来处理消息,则还会检测消息处理。 系统会自动跟踪由服务完成的所有服务总线调用,并将其与其他遥测项关联。 否则,请参考以下示例手动进行消息处理跟踪。

跟踪消息处理

async Task ProcessAsync(ProcessMessageEventArgs args)
{
    ServiceBusReceivedMessage message = args.Message;
    if (message.ApplicationProperties.TryGetValue("Diagnostic-Id", out var objectId) && objectId is string diagnosticId)
    {
        var activity = new Activity("ServiceBusProcessor.ProcessMessage");
        activity.SetParentId(diagnosticId);
        // If you're using Microsoft.ApplicationInsights package version 2.6-beta or higher, you should call StartOperation<RequestTelemetry>(activity) instead
        using (var operation = telemetryClient.StartOperation<RequestTelemetry>("Process", activity.RootId, activity.ParentId))
        {
            telemetryClient.TrackTrace("Received message");
            try 
            {
            // process message
            }
            catch (Exception ex)
            {
                telemetryClient.TrackException(ex);
                operation.Telemetry.Success = false;
                throw;
            }

            telemetryClient.TrackTrace("Done");
        }
    }
}

在本示例中,系统针对每个已处理的消息报告请求遥测,并提供时间戳、持续时间和结果(成功)。 遥测功能也会提供一组关联属性。 在消息处理期间报告的嵌套跟踪和异常也带有关联属性的戳记,代表它们是 RequestTelemetry 的“子级”。

如果在消息处理期间对支持的外部组件发出调用,则还会自动跟踪和关联这些调用。 请参阅使用 Application Insights .NET SDK 跟踪自定义操作来了解手动跟踪和关联。

如果除 Application Insights SDK 之外还运行了外部代码,则在查看 Application Insights 日志时,你会发现持续时间延长。

Application Insights 日志中更长的持续时间

这并不意味着接收消息时存在延迟。 在这种情况下,消息已经被接收,因为消息以参数的形式传递给 SDK 代码。 而且,App Insights 日志(进程)中的“名称”标记指示消息正在由外部事件处理代码处理 。 此问题与 Azure 无关。 相反,这些指标指示的是外部代码的效率,前提是已从服务总线接收到消息。

使用 OpenTelemetry 进行跟踪

服务总线 .NET 客户端库版本 7.5.0 及更高版本支持实验模式下的 OpenTelemetry。 有关详细信息,请参阅 .NET SDK 中的分布式跟踪

在没有跟踪系统的情况下进行跟踪

如果跟踪系统不支持自动服务总线调用跟踪,可以考虑将此类支持添加到跟踪系统或应用程序中。 本部分介绍服务总线 .NET 客户端发送的诊断事件。

使用 .NET 跟踪基元 System.Diagnostics.ActivitySystem.Diagnostics.DiagnosticSource 检测服务总线 .NET 客户端。

Activity 充当跟踪上下文,而 DiagnosticSource 是通知机制。

如果没有针对 DiagnosticSource 事件的侦听器,检测会关闭,以避免产生检测成本。 DiagnosticSource 将所有控制权授予侦听器:

  • 侦听器控制要侦听的源和事件
  • 侦听器控制事件速率和采样
  • 事件连同某个有效负载一起发送,该有效负载可提供完整上下文,使你能够在事件发生期间访问和修改 Message 对象

在继续实现之前,请先通过 DiagnosticSource 用户指南熟悉相关流程。

让我们在 ASP.NET Core 应用中针对服务总线事件创建一个侦听器,以通过 Microsoft.Extension.Logger 写入日志。 该侦听器使用 System.Reactive.Core 库来订阅 DiagnosticSource(不使用它也能轻松订阅 DiagnosticSource)

public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory factory, IApplicationLifetime applicationLifetime)
{
    // configuration...

    var serviceBusLogger = factory.CreateLogger("Azure.Messaging.ServiceBus");

    IDisposable innerSubscription = null;
    IDisposable outerSubscription = DiagnosticListener.AllListeners.Subscribe(delegate (DiagnosticListener listener)
    {
        // subscribe to the Service Bus DiagnosticSource
        if (listener.Name == "Azure.Messaging.ServiceBus")
        {
            // receive event from Service Bus DiagnosticSource
            innerSubscription = listener.Subscribe(delegate (KeyValuePair<string, object> evnt)
            {
                // Log operation details once it's done
                if (evnt.Key.EndsWith("Stop"))
                {
                    Activity currentActivity = Activity.Current;
                    serviceBusLogger.LogInformation($"Operation {currentActivity.OperationName} is finished, Duration={currentActivity.Duration}, Id={currentActivity.Id}, StartTime={currentActivity.StartTimeUtc}");
                }
            });
        }
    });

    applicationLifetime.ApplicationStopping.Register(() =>
    {
        outerSubscription?.Dispose();
        innerSubscription?.Dispose();
    });
}

在此示例中,侦听器记录每个服务总线操作的持续时间、结果、唯一标识符和开始时间。

事件

所有事件都会具有以下属性,这些属性符合开放式遥测规范: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/api.md

  • message_bus.destination - 队列/主题/订阅路径
  • peer.address - 完全限定的命名空间
  • kind - 生成者、使用者或客户端。 发送消息时使用生成者,接收时使用使用者,处置时使用客户端。
  • component - servicebus

所有事件还具有 EntityEndpoint 属性。

  • Entity - 实体(队列、主题等)的名称
  • Endpoint - 服务总线终结点 URL

检测的操作

下面是检测的操作的完整列表:

操作名称 跟踪的 API
ServiceBusSender.Send ServiceBusSender.SendMessageAsync
ServiceBusSender.SendMessagesAsync
ServiceBusSender.Schedule ServiceBusSender.ScheduleMessageAsync
ServiceBusSender.ScheduleMessagesAsync
ServiceBusSender.Cancel ServiceBusSender.CancelScheduledMessageAsync
ServiceBusSender.CancelScheduledMessagesAsync
ServiceBusReceiver.Receive ServiceBusReceiver.ReceiveMessageAsync
ServiceBusReceiver.ReceiveMessagesAsync
ServiceBusReceiver.ReceiveDeferred ServiceBusReceiver.ReceiveDeferredMessagesAsync
ServiceBusReceiver.Peek ServiceBusReceiver.PeekMessageAsync
ServiceBusReceiver.PeekMessagesAsync
ServiceBusReceiver.Abandon ServiceBusReceiver.AbandonMessagesAsync
ServiceBusReceiver.Complete ServiceBusReceiver.CompleteMessagesAsync
ServiceBusReceiver.DeadLetter ServiceBusReceiver.DeadLetterMessagesAsync
ServiceBusReceiver.Defer ServiceBusReceiver.DeferMessagesAsync
ServiceBusReceiver.RenewMessageLock ServiceBusReceiver.RenewMessageLockAsync
ServiceBusSessionReceiver.RenewSessionLock ServiceBusSessionReceiver.RenewSessionLockAsync
ServiceBusSessionReceiver.GetSessionState ServiceBusSessionReceiver.GetSessionStateAsync
ServiceBusSessionReceiver.SetSessionState ServiceBusSessionReceiver.SetSessionStateAsync
ServiceBusProcessor.ProcessMessage ServiceBusProcessor 上的处理器回调集。 ProcessMessageAsync 属性
ServiceBusSessionProcessor.ProcessSessionMessage ServiceBusSessionProcessor 上的处理器回调集。 ProcessMessageAsync 属性

筛选和采样

在某些情况下,我们可能只需记录一部分事件,以减少性能开销或存储耗用量。 可以只记录“Stop”事件(如上述示例所示),或者对特定百分比的事件采样。 使用 DiagnosticSource 可以通过 IsEnabled 谓词实现此目的。 有关详细信息,请参阅 DiagnosticSource 中基于上下文的筛选

可以针对单个操作调用 IsEnabled 多次,以将性能影响降到最低。

按以下顺序调用 IsEnabled

  1. IsEnabled(<OperationName>, string entity, null),例如 IsEnabled("ServiceBusSender.Send", "MyQueue1")。 请注意,末尾没有“Start”或“Stop”。 使用此语句可以筛选出特定的操作或队列。 如果回调方法返回 false,则表示未发送操作的事件。

    • 对于“Process”和“ProcessSession”操作,还会收到 IsEnabled(<OperationName>, string entity, Activity activity) 回调。 使用此回调可根据 activity.Id 或 Tags 属性筛选事件。
  2. IsEnabled(<OperationName>.Start),例如 IsEnabled("ServiceBusSender.Send.Start")。 检查是否应激发“Start”事件。 结果只影响“Start”事件,但进一步的检测不依赖它。

“Stop”事件没有 IsEnabled

如果某个操作结果为异常,则会调用 IsEnabled("ServiceBusSender.Send.Exception")。 只能订阅“Exception”事件并阻止剩余的检测。 在此情况下,仍需处理此类异常。 由于其他检测已被禁用,因此跟踪上下文不会连同消息一起从使用者流向生成者。

也可以使用 IsEnabled 来实现采样策略。 基于 Activity.IdActivity.RootId 的采样可确保所有尝试都获取一致的采样结果(前提是采样内容通过跟踪系统或你自己的代码传播)。

如果同一个源存在多个 DiagnosticSource 侦听器,则只需一个侦听器接受事件,因此无法保证调用 IsEnabled

后续步骤