diff --git a/CAP.sln b/CAP.sln index 8f4620b..db7190d 100644 --- a/CAP.sln +++ b/CAP.sln @@ -84,6 +84,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Pulsar.InMemory", "s EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer.DispatcherPerGroup", "samples\Sample.RabbitMQ.SqlServer.DispatcherPerGroup\Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj", "{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.OpenTelemetry", "src\DotNetCore.CAP.OpenTelemetry\DotNetCore.CAP.OpenTelemetry.csproj", "{83DDB126-A00B-4064-86E7-568322CA67EC}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -202,6 +204,10 @@ Global {DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Debug|Any CPU.Build.0 = Debug|Any CPU {DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.ActiveCfg = Release|Any CPU {DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.Build.0 = Release|Any CPU + {83DDB126-A00B-4064-86E7-568322CA67EC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {83DDB126-A00B-4064-86E7-568322CA67EC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {83DDB126-A00B-4064-86E7-568322CA67EC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {83DDB126-A00B-4064-86E7-568322CA67EC}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -235,6 +241,7 @@ Global {AB7A10CB-2C7E-49CE-AA21-893772FF6546} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5} = {3A6B6931-A123-477A-9469-8B468B5385AF} {DCDF58E8-F823-4F04-9F8C-E8076DC16A68} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {83DDB126-A00B-4064-86E7-568322CA67EC} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/src/DotNetCore.CAP.OpenTelemetry/CAP.OpenTelemetryCapOptionsExtension.cs b/src/DotNetCore.CAP.OpenTelemetry/CAP.Options.Extension.cs similarity index 52% rename from src/DotNetCore.CAP.OpenTelemetry/CAP.OpenTelemetryCapOptionsExtension.cs rename to src/DotNetCore.CAP.OpenTelemetry/CAP.Options.Extension.cs index ca11e3d..69e939f 100644 --- a/src/DotNetCore.CAP.OpenTelemetry/CAP.OpenTelemetryCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.OpenTelemetry/CAP.Options.Extension.cs @@ -1,10 +1,13 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System; using DotNetCore.CAP.Internal; using DotNetCore.CAP.OpenTelemetry; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; // ReSharper disable once CheckNamespace namespace DotNetCore.CAP @@ -13,6 +16,7 @@ namespace DotNetCore.CAP { public void AddServices(IServiceCollection services) { + services.AddSingleton(); services.AddSingleton(); services.TryAddEnumerable(ServiceDescriptor.Singleton()); } @@ -27,4 +31,25 @@ namespace DotNetCore.CAP return options; } } + + public static class TracerProviderBuilderExtensions + { + /// + /// Enables the message eventing data collection for CAP. + /// + /// being configured. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddCapInstrumentation(this TracerProviderBuilder builder) + { + if (builder == null) + { + throw new ArgumentNullException(nameof(builder)); + } + + builder.AddSource("DotNetCore.CAP.OpenTelemetry") + .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("CAP")); + + return builder; + } + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.OpenTelemetry/DiagnosticObserver.cs b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticObserver.cs index a115e14..0a63df6 100644 --- a/src/DotNetCore.CAP.OpenTelemetry/DiagnosticObserver.cs +++ b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticObserver.cs @@ -12,16 +12,15 @@ using OpenTelemetry; using OpenTelemetry.Context.Propagation; using OpenTelemetry.Trace; using CapEvents = DotNetCore.CAP.Diagnostics.CapDiagnosticListenerNames; -using Status = OpenTelemetry.Trace.Status; namespace DotNetCore.CAP.OpenTelemetry { - internal class CapDiagnosticObserver : IObserver> + public class CapDiagnosticObserver : IObserver> { - private static readonly ActivitySource ActivitySource = new("DotNetCore.CAP.OpenTelemetry"); + private static readonly ActivitySource ActivitySource = new("DotNetCore.CAP.OpenTelemetry", "1.0.0"); private static readonly TextMapPropagator Propagator = new TraceContextPropagator(); - private readonly ConcurrentDictionary _contexts = new(); + private readonly ConcurrentDictionary _contexts = new(); private const string OperateNamePrefix = "CAP/"; private const string ProducerOperateNameSuffix = "/Publisher"; @@ -42,49 +41,57 @@ namespace DotNetCore.CAP.OpenTelemetry case CapEvents.BeforePublishMessageStore: { var eventData = (CapEventDataPubStore)evt.Value!; + var useParent = false; + if (Activity.Current != null) + { + if (_contexts.TryAdd(eventData.Message.GetId(), Activity.Current.Context)) + { + useParent = true; + } + } var activity = ActivitySource.StartActivity("Event Persistence: " + eventData.Operation); if (activity != null) { - activity.SetTag("component", "CAP"); activity.SetTag("message.name", eventData.Operation); activity.AddEvent(new ActivityEvent("CAP message persistence start...", DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value))); - _contexts.TryAdd(eventData.Message.GetId(), activity); + + if (!useParent) + { + _contexts[eventData.Message.GetId()] = Activity.Current!.Context; + } } } break; case CapEvents.AfterPublishMessageStore: { var eventData = (CapEventDataPubStore)evt.Value!; - if (_contexts.TryRemove(eventData.Message.GetId(), out Activity activity)) - { - activity.AddEvent(new ActivityEvent("CAP message persistence succeeded!", - DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), - new ActivityTagsCollection { new("cap.persistence.duration", eventData.ElapsedTimeMs) }) - ); - activity.Dispose(); - } + + Activity.Current?.AddEvent(new ActivityEvent("CAP message persistence succeeded!", + DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), + new ActivityTagsCollection { new("cap.persistence.duration", eventData.ElapsedTimeMs) }) + ); + + Activity.Current?.Stop(); } break; case CapEvents.ErrorPublishMessageStore: { var eventData = (CapEventDataPubStore)evt.Value!; - if (_contexts.TryRemove(eventData.Message.GetId(), out Activity activity)) + if (Activity.Current is { } activity) { var exception = eventData.Exception!; - activity.SetStatus(Status.Error.WithDescription(exception.Message)); - activity.RecordException(exception); - - activity.Dispose(); + activity.Stop(); } } break; case CapEvents.BeforePublish: { var eventData = (CapEventDataPubSend)evt.Value!; - var activity = ActivitySource.StartActivity(OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix, ActivityKind.Producer); + _contexts.TryRemove(eventData.TransportMessage.GetId(), out var context); + var activity = ActivitySource.StartActivity(OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix, ActivityKind.Producer, context); if (activity != null) { activity.SetTag("messaging.system", eventData.BrokerAddress.Name); @@ -102,56 +109,52 @@ namespace DotNetCore.CAP.OpenTelemetry { msg.Headers[key] = value; }); - - _contexts.TryAdd(eventData.TransportMessage.GetId(), activity); } } break; case CapEvents.AfterPublish: { var eventData = (CapEventDataPubSend)evt.Value!; - if (_contexts.TryRemove(eventData.TransportMessage.GetId(), out Activity activity)) + if (Activity.Current is { } activity) { activity.AddEvent(new ActivityEvent("Message publishing succeeded!", DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), new ActivityTagsCollection { new("cap.send.duration", eventData.ElapsedTimeMs) }) ); - activity.Dispose(); + activity.Stop(); } } break; case CapEvents.ErrorPublish: { var eventData = (CapEventDataPubSend)evt.Value!; - if (_contexts.TryRemove(eventData.TransportMessage.GetId(), out Activity activity)) + if (Activity.Current is { } activity) { var exception = eventData.Exception!; activity.SetStatus(Status.Error.WithDescription(exception.Message)); - activity.RecordException(exception); - - activity.Dispose(); + activity.Stop(); } } break; case CapEvents.BeforeConsume: { - var eventData = (CapEventDataSubStore)evt.Value!; - var activity = ActivitySource.StartActivity(OperateNamePrefix + eventData.Operation + ConsumerOperateNameSuffix, ActivityKind.Consumer); - if (activity != null) + var parentContext = Propagator.Extract(default, eventData.TransportMessage, (msg, key) => { - var parentContext = Propagator.Extract(default, eventData.TransportMessage, (msg, key) => + if (msg.Headers.TryGetValue(key, out string? value)) { - if (msg.Headers.TryGetValue(key, out string? value)) - { - return new[] { value }; - } - return Enumerable.Empty(); - }); + return new[] { value }; + } + return Enumerable.Empty(); + }); - Baggage.Current = parentContext.Baggage; + var activity = ActivitySource.StartActivity(OperateNamePrefix + eventData.Operation + ConsumerOperateNameSuffix, + ActivityKind.Consumer, + parentContext.ActivityContext); + if (activity != null) + { activity.SetTag("messaging.system", eventData.BrokerAddress.Name); activity.SetTag("messaging.destination", eventData.Operation); activity.SetTag("messaging.destination_kind", "topic"); @@ -162,66 +165,63 @@ namespace DotNetCore.CAP.OpenTelemetry activity.AddEvent(new ActivityEvent("CAP message persistence start...", DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value))); - _contexts.TryAdd(eventData.TransportMessage.GetId(), activity); - + _contexts[eventData.TransportMessage.GetId() + eventData.TransportMessage.GetGroup()] = activity.Context; } } break; case CapEvents.AfterConsume: { var eventData = (CapEventDataSubStore)evt.Value!; - if (_contexts.TryRemove(eventData.TransportMessage.GetId(), out Activity activity)) + if (Activity.Current is { } activity) { activity.AddEvent(new ActivityEvent("CAP message persistence succeeded!", DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), new ActivityTagsCollection { new("cap.receive.duration", eventData.ElapsedTimeMs) })); - _contexts.TryAdd(eventData.TransportMessage.GetId(), activity); + activity.Stop(); } } break; case CapEvents.ErrorConsume: { var eventData = (CapEventDataSubStore)evt.Value!; - if (_contexts.TryRemove(eventData.TransportMessage.GetId(), out Activity activity)) + if (Activity.Current is { } activity) { var exception = eventData.Exception!; activity.SetStatus(Status.Error.WithDescription(exception.Message)); activity.RecordException(exception); - activity.Dispose(); + activity.Stop(); } } break; case CapEvents.BeforeSubscriberInvoke: { var eventData = (CapEventDataSubExecute)evt.Value!; - var activity = ActivitySource.StartActivity("Subscriber Invoke: " + eventData.MethodInfo!.Name); + _contexts.TryRemove(eventData.Message.GetId() + eventData.Message.GetGroup(), out var context); + var activity = ActivitySource.StartActivity("Subscriber Invoke: " + eventData.MethodInfo!.Name, ActivityKind.Internal, + context); if (activity != null) { - activity.SetTag("component", "CAP"); activity.SetTag("messaging.operation", "process"); activity.SetTag("code.function", eventData.MethodInfo.Name); - activity.SetTag("code.namespace", eventData.MethodInfo.GetType().Namespace); activity.AddEvent(new ActivityEvent("Begin invoke the subscriber:" + eventData.MethodInfo.Name, DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value))); - - _contexts.TryAdd(eventData.Message.GetId(), activity); } } break; case CapEvents.AfterSubscriberInvoke: { var eventData = (CapEventDataSubExecute)evt.Value!; - if (_contexts.TryRemove(eventData.Message.GetId(), out Activity activity)) + if (Activity.Current is { } activity) { activity.AddEvent(new ActivityEvent("Subscriber invoke succeeded!", DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), new ActivityTagsCollection { new("cap.invoke.duration", eventData.ElapsedTimeMs) })); - - activity.Dispose(); + + activity.Stop(); } } @@ -229,18 +229,16 @@ namespace DotNetCore.CAP.OpenTelemetry case CapEvents.ErrorSubscriberInvoke: { var eventData = (CapEventDataSubExecute)evt.Value!; - if (_contexts.TryRemove(eventData.Message.GetId(), out Activity activity)) + if (Activity.Current is { } activity) { var exception = eventData.Exception!; activity.SetStatus(Status.Error.WithDescription(exception.Message)); activity.RecordException(exception); - activity.Dispose(); + activity.Stop(); } } break; - } } - } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs index aa72367..a98897c 100644 --- a/src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs +++ b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs @@ -4,13 +4,22 @@ using System; using System.Diagnostics; using DotNetCore.CAP.Diagnostics; +using Microsoft.Extensions.Logging; namespace DotNetCore.CAP.OpenTelemetry { public class CapDiagnosticProcessorObserver : IObserver { + private readonly ILogger _logger; + private readonly CapDiagnosticObserver _capObserver; public const string DiagnosticListenerName = CapDiagnosticListenerNames.DiagnosticListenerName; + public CapDiagnosticProcessorObserver(ILogger logger, CapDiagnosticObserver capObserver) + { + _logger = logger; + _capObserver = capObserver; + } + public void OnCompleted() { } @@ -23,7 +32,8 @@ namespace DotNetCore.CAP.OpenTelemetry { if (listener.Name == DiagnosticListenerName) { - listener.Subscribe(new CapDiagnosticObserver()); + listener.Subscribe(_capObserver); + _logger.LogInformation($"Loaded diagnostic listener [{DiagnosticListenerName}]."); } } }