Browse Source

Add open telemetry support.

master
Savorboard 2 years ago
parent
commit
acd5865ce0
4 changed files with 98 additions and 58 deletions
  1. +7
    -0
      CAP.sln
  2. +25
    -0
      src/DotNetCore.CAP.OpenTelemetry/CAP.Options.Extension.cs
  3. +55
    -57
      src/DotNetCore.CAP.OpenTelemetry/DiagnosticObserver.cs
  4. +11
    -1
      src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs

+ 7
- 0
CAP.sln View File

@@ -84,6 +84,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Pulsar.InMemory", "s
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer.DispatcherPerGroup", "samples\Sample.RabbitMQ.SqlServer.DispatcherPerGroup\Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj", "{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer.DispatcherPerGroup", "samples\Sample.RabbitMQ.SqlServer.DispatcherPerGroup\Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj", "{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.OpenTelemetry", "src\DotNetCore.CAP.OpenTelemetry\DotNetCore.CAP.OpenTelemetry.csproj", "{83DDB126-A00B-4064-86E7-568322CA67EC}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@@ -202,6 +204,10 @@ Global
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Debug|Any CPU.Build.0 = Debug|Any CPU {DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.ActiveCfg = Release|Any CPU {DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.Build.0 = Release|Any CPU {DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.Build.0 = Release|Any CPU
{83DDB126-A00B-4064-86E7-568322CA67EC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{83DDB126-A00B-4064-86E7-568322CA67EC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{83DDB126-A00B-4064-86E7-568322CA67EC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{83DDB126-A00B-4064-86E7-568322CA67EC}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@@ -235,6 +241,7 @@ Global
{AB7A10CB-2C7E-49CE-AA21-893772FF6546} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {AB7A10CB-2C7E-49CE-AA21-893772FF6546} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5} = {3A6B6931-A123-477A-9469-8B468B5385AF} {B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68} = {3A6B6931-A123-477A-9469-8B468B5385AF} {DCDF58E8-F823-4F04-9F8C-E8076DC16A68} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{83DDB126-A00B-4064-86E7-568322CA67EC} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


src/DotNetCore.CAP.OpenTelemetry/CAP.OpenTelemetryCapOptionsExtension.cs → src/DotNetCore.CAP.OpenTelemetry/CAP.Options.Extension.cs View File

@@ -1,10 +1,13 @@
// Copyright (c) .NET Core Community. All rights reserved. // Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.


using System;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.OpenTelemetry; using DotNetCore.CAP.OpenTelemetry;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.DependencyInjection.Extensions;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;


// ReSharper disable once CheckNamespace // ReSharper disable once CheckNamespace
namespace DotNetCore.CAP namespace DotNetCore.CAP
@@ -13,6 +16,7 @@ namespace DotNetCore.CAP
{ {
public void AddServices(IServiceCollection services) public void AddServices(IServiceCollection services)
{ {
services.AddSingleton<CapDiagnosticObserver>();
services.AddSingleton<CapDiagnosticProcessorObserver>(); services.AddSingleton<CapDiagnosticProcessorObserver>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, OpenTelemetryDiagnosticRegister>()); services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, OpenTelemetryDiagnosticRegister>());
} }
@@ -27,4 +31,25 @@ namespace DotNetCore.CAP
return options; return options;
} }
} }

public static class TracerProviderBuilderExtensions
{
/// <summary>
/// Enables the message eventing data collection for CAP.
/// </summary>
/// <param name="builder"><see cref="TracerProviderBuilder"/> being configured.</param>
/// <returns>The instance of <see cref="TracerProviderBuilder"/> to chain the calls.</returns>
public static TracerProviderBuilder AddCapInstrumentation(this TracerProviderBuilder builder)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}

builder.AddSource("DotNetCore.CAP.OpenTelemetry")
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("CAP"));

return builder;
}
}
} }

+ 55
- 57
src/DotNetCore.CAP.OpenTelemetry/DiagnosticObserver.cs View File

@@ -12,16 +12,15 @@ using OpenTelemetry;
using OpenTelemetry.Context.Propagation; using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace; using OpenTelemetry.Trace;
using CapEvents = DotNetCore.CAP.Diagnostics.CapDiagnosticListenerNames; using CapEvents = DotNetCore.CAP.Diagnostics.CapDiagnosticListenerNames;
using Status = OpenTelemetry.Trace.Status;


namespace DotNetCore.CAP.OpenTelemetry namespace DotNetCore.CAP.OpenTelemetry
{ {
internal class CapDiagnosticObserver : IObserver<KeyValuePair<string, object?>>
public class CapDiagnosticObserver : IObserver<KeyValuePair<string, object?>>
{ {
private static readonly ActivitySource ActivitySource = new("DotNetCore.CAP.OpenTelemetry");
private static readonly ActivitySource ActivitySource = new("DotNetCore.CAP.OpenTelemetry", "1.0.0");
private static readonly TextMapPropagator Propagator = new TraceContextPropagator(); private static readonly TextMapPropagator Propagator = new TraceContextPropagator();


private readonly ConcurrentDictionary<string, Activity> _contexts = new();
private readonly ConcurrentDictionary<string, ActivityContext> _contexts = new();


private const string OperateNamePrefix = "CAP/"; private const string OperateNamePrefix = "CAP/";
private const string ProducerOperateNameSuffix = "/Publisher"; private const string ProducerOperateNameSuffix = "/Publisher";
@@ -42,49 +41,57 @@ namespace DotNetCore.CAP.OpenTelemetry
case CapEvents.BeforePublishMessageStore: case CapEvents.BeforePublishMessageStore:
{ {
var eventData = (CapEventDataPubStore)evt.Value!; var eventData = (CapEventDataPubStore)evt.Value!;
var useParent = false;
if (Activity.Current != null)
{
if (_contexts.TryAdd(eventData.Message.GetId(), Activity.Current.Context))
{
useParent = true;
}
}
var activity = ActivitySource.StartActivity("Event Persistence: " + eventData.Operation); var activity = ActivitySource.StartActivity("Event Persistence: " + eventData.Operation);
if (activity != null) if (activity != null)
{ {
activity.SetTag("component", "CAP");
activity.SetTag("message.name", eventData.Operation); activity.SetTag("message.name", eventData.Operation);
activity.AddEvent(new ActivityEvent("CAP message persistence start...", activity.AddEvent(new ActivityEvent("CAP message persistence start...",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value))); DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));
_contexts.TryAdd(eventData.Message.GetId(), activity);

if (!useParent)
{
_contexts[eventData.Message.GetId()] = Activity.Current!.Context;
}
} }
} }
break; break;
case CapEvents.AfterPublishMessageStore: case CapEvents.AfterPublishMessageStore:
{ {
var eventData = (CapEventDataPubStore)evt.Value!; var eventData = (CapEventDataPubStore)evt.Value!;
if (_contexts.TryRemove(eventData.Message.GetId(), out Activity activity))
{
activity.AddEvent(new ActivityEvent("CAP message persistence succeeded!",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value),
new ActivityTagsCollection { new("cap.persistence.duration", eventData.ElapsedTimeMs) })
);
activity.Dispose();
}

Activity.Current?.AddEvent(new ActivityEvent("CAP message persistence succeeded!",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value),
new ActivityTagsCollection { new("cap.persistence.duration", eventData.ElapsedTimeMs) })
);

Activity.Current?.Stop();
} }
break; break;
case CapEvents.ErrorPublishMessageStore: case CapEvents.ErrorPublishMessageStore:
{ {
var eventData = (CapEventDataPubStore)evt.Value!; var eventData = (CapEventDataPubStore)evt.Value!;
if (_contexts.TryRemove(eventData.Message.GetId(), out Activity activity))
if (Activity.Current is { } activity)
{ {
var exception = eventData.Exception!; var exception = eventData.Exception!;

activity.SetStatus(Status.Error.WithDescription(exception.Message)); activity.SetStatus(Status.Error.WithDescription(exception.Message));

activity.RecordException(exception); activity.RecordException(exception);

activity.Dispose();
activity.Stop();
} }
} }
break; break;
case CapEvents.BeforePublish: case CapEvents.BeforePublish:
{ {
var eventData = (CapEventDataPubSend)evt.Value!; var eventData = (CapEventDataPubSend)evt.Value!;
var activity = ActivitySource.StartActivity(OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix, ActivityKind.Producer);
_contexts.TryRemove(eventData.TransportMessage.GetId(), out var context);
var activity = ActivitySource.StartActivity(OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix, ActivityKind.Producer, context);
if (activity != null) if (activity != null)
{ {
activity.SetTag("messaging.system", eventData.BrokerAddress.Name); activity.SetTag("messaging.system", eventData.BrokerAddress.Name);
@@ -102,56 +109,52 @@ namespace DotNetCore.CAP.OpenTelemetry
{ {
msg.Headers[key] = value; msg.Headers[key] = value;
}); });

_contexts.TryAdd(eventData.TransportMessage.GetId(), activity);
} }
} }
break; break;
case CapEvents.AfterPublish: case CapEvents.AfterPublish:
{ {
var eventData = (CapEventDataPubSend)evt.Value!; var eventData = (CapEventDataPubSend)evt.Value!;
if (_contexts.TryRemove(eventData.TransportMessage.GetId(), out Activity activity))
if (Activity.Current is { } activity)
{ {
activity.AddEvent(new ActivityEvent("Message publishing succeeded!", activity.AddEvent(new ActivityEvent("Message publishing succeeded!",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value),
new ActivityTagsCollection { new("cap.send.duration", eventData.ElapsedTimeMs) }) new ActivityTagsCollection { new("cap.send.duration", eventData.ElapsedTimeMs) })
); );
activity.Dispose();
activity.Stop();
} }
} }
break; break;
case CapEvents.ErrorPublish: case CapEvents.ErrorPublish:
{ {
var eventData = (CapEventDataPubSend)evt.Value!; var eventData = (CapEventDataPubSend)evt.Value!;
if (_contexts.TryRemove(eventData.TransportMessage.GetId(), out Activity activity))
if (Activity.Current is { } activity)
{ {
var exception = eventData.Exception!; var exception = eventData.Exception!;
activity.SetStatus(Status.Error.WithDescription(exception.Message)); activity.SetStatus(Status.Error.WithDescription(exception.Message));

activity.RecordException(exception); activity.RecordException(exception);

activity.Dispose();
activity.Stop();
} }
} }
break; break;
case CapEvents.BeforeConsume: case CapEvents.BeforeConsume:
{ {

var eventData = (CapEventDataSubStore)evt.Value!; var eventData = (CapEventDataSubStore)evt.Value!;
var activity = ActivitySource.StartActivity(OperateNamePrefix + eventData.Operation + ConsumerOperateNameSuffix, ActivityKind.Consumer);
if (activity != null)
var parentContext = Propagator.Extract(default, eventData.TransportMessage, (msg, key) =>
{ {
var parentContext = Propagator.Extract(default, eventData.TransportMessage, (msg, key) =>
if (msg.Headers.TryGetValue(key, out string? value))
{ {
if (msg.Headers.TryGetValue(key, out string? value))
{
return new[] { value };
}
return Enumerable.Empty<string>();
});
return new[] { value };
}
return Enumerable.Empty<string>();
});


Baggage.Current = parentContext.Baggage;
var activity = ActivitySource.StartActivity(OperateNamePrefix + eventData.Operation + ConsumerOperateNameSuffix,
ActivityKind.Consumer,
parentContext.ActivityContext);


if (activity != null)
{
activity.SetTag("messaging.system", eventData.BrokerAddress.Name); activity.SetTag("messaging.system", eventData.BrokerAddress.Name);
activity.SetTag("messaging.destination", eventData.Operation); activity.SetTag("messaging.destination", eventData.Operation);
activity.SetTag("messaging.destination_kind", "topic"); activity.SetTag("messaging.destination_kind", "topic");
@@ -162,66 +165,63 @@ namespace DotNetCore.CAP.OpenTelemetry
activity.AddEvent(new ActivityEvent("CAP message persistence start...", activity.AddEvent(new ActivityEvent("CAP message persistence start...",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value))); DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));


_contexts.TryAdd(eventData.TransportMessage.GetId(), activity);

_contexts[eventData.TransportMessage.GetId() + eventData.TransportMessage.GetGroup()] = activity.Context;
} }
} }
break; break;
case CapEvents.AfterConsume: case CapEvents.AfterConsume:
{ {
var eventData = (CapEventDataSubStore)evt.Value!; var eventData = (CapEventDataSubStore)evt.Value!;
if (_contexts.TryRemove(eventData.TransportMessage.GetId(), out Activity activity))
if (Activity.Current is { } activity)
{ {
activity.AddEvent(new ActivityEvent("CAP message persistence succeeded!", activity.AddEvent(new ActivityEvent("CAP message persistence succeeded!",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value),
new ActivityTagsCollection { new("cap.receive.duration", eventData.ElapsedTimeMs) })); new ActivityTagsCollection { new("cap.receive.duration", eventData.ElapsedTimeMs) }));


_contexts.TryAdd(eventData.TransportMessage.GetId(), activity);
activity.Stop();
} }
} }
break; break;
case CapEvents.ErrorConsume: case CapEvents.ErrorConsume:
{ {
var eventData = (CapEventDataSubStore)evt.Value!; var eventData = (CapEventDataSubStore)evt.Value!;
if (_contexts.TryRemove(eventData.TransportMessage.GetId(), out Activity activity))
if (Activity.Current is { } activity)
{ {
var exception = eventData.Exception!; var exception = eventData.Exception!;
activity.SetStatus(Status.Error.WithDescription(exception.Message)); activity.SetStatus(Status.Error.WithDescription(exception.Message));


activity.RecordException(exception); activity.RecordException(exception);


activity.Dispose();
activity.Stop();
} }
} }
break; break;
case CapEvents.BeforeSubscriberInvoke: case CapEvents.BeforeSubscriberInvoke:
{ {
var eventData = (CapEventDataSubExecute)evt.Value!; var eventData = (CapEventDataSubExecute)evt.Value!;
var activity = ActivitySource.StartActivity("Subscriber Invoke: " + eventData.MethodInfo!.Name);
_contexts.TryRemove(eventData.Message.GetId() + eventData.Message.GetGroup(), out var context);
var activity = ActivitySource.StartActivity("Subscriber Invoke: " + eventData.MethodInfo!.Name, ActivityKind.Internal,
context);
if (activity != null) if (activity != null)
{ {
activity.SetTag("component", "CAP");
activity.SetTag("messaging.operation", "process"); activity.SetTag("messaging.operation", "process");
activity.SetTag("code.function", eventData.MethodInfo.Name); activity.SetTag("code.function", eventData.MethodInfo.Name);
activity.SetTag("code.namespace", eventData.MethodInfo.GetType().Namespace);


activity.AddEvent(new ActivityEvent("Begin invoke the subscriber:" + eventData.MethodInfo.Name, activity.AddEvent(new ActivityEvent("Begin invoke the subscriber:" + eventData.MethodInfo.Name,
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value))); DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));

_contexts.TryAdd(eventData.Message.GetId(), activity);
} }
} }
break; break;
case CapEvents.AfterSubscriberInvoke: case CapEvents.AfterSubscriberInvoke:
{ {
var eventData = (CapEventDataSubExecute)evt.Value!; var eventData = (CapEventDataSubExecute)evt.Value!;
if (_contexts.TryRemove(eventData.Message.GetId(), out Activity activity))
if (Activity.Current is { } activity)
{ {
activity.AddEvent(new ActivityEvent("Subscriber invoke succeeded!", activity.AddEvent(new ActivityEvent("Subscriber invoke succeeded!",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value),
new ActivityTagsCollection { new("cap.invoke.duration", eventData.ElapsedTimeMs) })); new ActivityTagsCollection { new("cap.invoke.duration", eventData.ElapsedTimeMs) }));
activity.Dispose();
activity.Stop();
} }


} }
@@ -229,18 +229,16 @@ namespace DotNetCore.CAP.OpenTelemetry
case CapEvents.ErrorSubscriberInvoke: case CapEvents.ErrorSubscriberInvoke:
{ {
var eventData = (CapEventDataSubExecute)evt.Value!; var eventData = (CapEventDataSubExecute)evt.Value!;
if (_contexts.TryRemove(eventData.Message.GetId(), out Activity activity))
if (Activity.Current is { } activity)
{ {
var exception = eventData.Exception!; var exception = eventData.Exception!;
activity.SetStatus(Status.Error.WithDescription(exception.Message)); activity.SetStatus(Status.Error.WithDescription(exception.Message));
activity.RecordException(exception); activity.RecordException(exception);
activity.Dispose();
activity.Stop();
} }
} }
break; break;

} }
} }

} }
} }

+ 11
- 1
src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs View File

@@ -4,13 +4,22 @@
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Diagnostics;
using Microsoft.Extensions.Logging;


namespace DotNetCore.CAP.OpenTelemetry namespace DotNetCore.CAP.OpenTelemetry
{ {
public class CapDiagnosticProcessorObserver : IObserver<DiagnosticListener> public class CapDiagnosticProcessorObserver : IObserver<DiagnosticListener>
{ {
private readonly ILogger _logger;
private readonly CapDiagnosticObserver _capObserver;
public const string DiagnosticListenerName = CapDiagnosticListenerNames.DiagnosticListenerName; public const string DiagnosticListenerName = CapDiagnosticListenerNames.DiagnosticListenerName;


public CapDiagnosticProcessorObserver(ILogger<CapDiagnosticProcessorObserver> logger, CapDiagnosticObserver capObserver)
{
_logger = logger;
_capObserver = capObserver;
}

public void OnCompleted() public void OnCompleted()
{ {
} }
@@ -23,7 +32,8 @@ namespace DotNetCore.CAP.OpenTelemetry
{ {
if (listener.Name == DiagnosticListenerName) if (listener.Name == DiagnosticListenerName)
{ {
listener.Subscribe(new CapDiagnosticObserver());
listener.Subscribe(_capObserver);
_logger.LogInformation($"Loaded diagnostic listener [{DiagnosticListenerName}].");
} }
} }
} }

Loading…
Cancel
Save