From 39d8fe259d9ff7e773ab6d7d36418484d9c7b8d7 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 6 Jan 2022 11:27:26 +0800 Subject: [PATCH] Refactoring for OpenTelemetry. --- .../user-guide/en/monitoring/opentelemetry.md | 13 +-- .../user-guide/zh/monitoring/opentelemetry.md | 17 +--- .../CAP.Options.Extension.cs | 55 ----------- .../CapInstrumentation.cs | 27 ++++++ ...osticObserver.cs => DiagnosticListener.cs} | 10 +- .../DiagnosticProcessorObserver.cs | 40 -------- .../DiagnosticSourceSubscriber.cs | 97 +++++++++++++++++++ .../DotNetCore.CAP.OpenTelemetry.csproj | 3 +- .../IProcessingServer.DiagnosticRegister.cs | 34 ------- .../TracerProviderBuilder.Extension.cs | 31 ++++++ .../RabbitMQConsumerClient.cs | 3 +- 11 files changed, 164 insertions(+), 166 deletions(-) delete mode 100644 src/DotNetCore.CAP.OpenTelemetry/CAP.Options.Extension.cs create mode 100644 src/DotNetCore.CAP.OpenTelemetry/CapInstrumentation.cs rename src/DotNetCore.CAP.OpenTelemetry/{DiagnosticObserver.cs => DiagnosticListener.cs} (98%) delete mode 100644 src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs create mode 100644 src/DotNetCore.CAP.OpenTelemetry/DiagnosticSourceSubscriber.cs delete mode 100644 src/DotNetCore.CAP.OpenTelemetry/IProcessingServer.DiagnosticRegister.cs create mode 100644 src/DotNetCore.CAP.OpenTelemetry/TracerProviderBuilder.Extension.cs diff --git a/docs/content/user-guide/en/monitoring/opentelemetry.md b/docs/content/user-guide/en/monitoring/opentelemetry.md index b4be2d2..0146f69 100644 --- a/docs/content/user-guide/en/monitoring/opentelemetry.md +++ b/docs/content/user-guide/en/monitoring/opentelemetry.md @@ -16,18 +16,7 @@ Install the CAP OpenTelemetry package into the project. dotnet add package DotNetCore.Cap.OpenTelemetry ``` -The OpenTelemetry data comes from [diagnostics](diagnostics.md), add the configuration to enable data collection. - -```C# -services.AddCap(x => -{ - //*** - x.UseOpenTelemetry(); // <-- Add this line -}); - -``` - -Add the instrumentation of CAP to the configuration of OpenTelemetry. +The OpenTelemetry data comes from [diagnostics](diagnostics.md), add the instrumentation of CAP to the configuration of OpenTelemetry. ```C# services.AddOpenTelemetryTracing((builder) => builder diff --git a/docs/content/user-guide/zh/monitoring/opentelemetry.md b/docs/content/user-guide/zh/monitoring/opentelemetry.md index da3b3e7..ed0084a 100644 --- a/docs/content/user-guide/zh/monitoring/opentelemetry.md +++ b/docs/content/user-guide/zh/monitoring/opentelemetry.md @@ -6,8 +6,6 @@ OpenTelemetry是工具、api和sdk的集合。 使用它来检测、生成、收 ## 集成 -You can find it [here](https://opentelemetry.io/docs/instrumentation/net/getting-started/) about how to use OpenTelemetry in console applications or ASP.NET Core, at here we mainly describe how to tracing CAP data to OpenTelemetry. - 你可以在[这里](https://opentelemetry.io/docs/instrumentation/net/getting-started/)找到关于如何在控制台应用或ASP.NET Core 中使用OpenTelemetry。 在这里我们主要描述如何将CAP集成到OpenTelemetry中。 @@ -19,23 +17,12 @@ You can find it [here](https://opentelemetry.io/docs/instrumentation/net/getting dotnet add package DotNetCore.Cap.OpenTelemetry ``` -OpenTelemetry 的跟踪数据来自于[Diagnostics](diagnostics.md)发送的诊断数据,使用下面的配置行来启用收集数据。 - -```C# -services.AddCap(x => -{ - //*** - x.UseOpenTelemetry(); // <-- Add this line -}); - -``` - -添加 CAP Instrumentation 到 OpenTelemetry的扩展配置中。 +OpenTelemetry 的跟踪数据来自于[Diagnostics](diagnostics.md)发送的诊断数据,添加 CAP Instrumentation 到 OpenTelemetry的扩展配置中会进行自动收集。 ```C# services.AddOpenTelemetryTracing((builder) => builder .AddAspNetCoreInstrumentation() - .AddCapInstrumentation() // <-- Add this line + .AddCapInstrumentation() // <-- 添加这行 .AddZipkinExporter() ); ``` diff --git a/src/DotNetCore.CAP.OpenTelemetry/CAP.Options.Extension.cs b/src/DotNetCore.CAP.OpenTelemetry/CAP.Options.Extension.cs deleted file mode 100644 index 69e939f..0000000 --- a/src/DotNetCore.CAP.OpenTelemetry/CAP.Options.Extension.cs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using DotNetCore.CAP.Internal; -using DotNetCore.CAP.OpenTelemetry; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; -using OpenTelemetry.Resources; -using OpenTelemetry.Trace; - -// ReSharper disable once CheckNamespace -namespace DotNetCore.CAP -{ - internal class OpenTelemetryCapOptionsExtension : ICapOptionsExtension - { - public void AddServices(IServiceCollection services) - { - services.AddSingleton(); - services.AddSingleton(); - services.TryAddEnumerable(ServiceDescriptor.Singleton()); - } - } - - public static class CapOptionsExtensions - { - public static CapOptions UseOpenTelemetry(this CapOptions options) - { - options.RegisterExtension(new OpenTelemetryCapOptionsExtension()); - - return options; - } - } - - public static class TracerProviderBuilderExtensions - { - /// - /// Enables the message eventing data collection for CAP. - /// - /// being configured. - /// The instance of to chain the calls. - public static TracerProviderBuilder AddCapInstrumentation(this TracerProviderBuilder builder) - { - if (builder == null) - { - throw new ArgumentNullException(nameof(builder)); - } - - builder.AddSource("DotNetCore.CAP.OpenTelemetry") - .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("CAP")); - - return builder; - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.OpenTelemetry/CapInstrumentation.cs b/src/DotNetCore.CAP.OpenTelemetry/CapInstrumentation.cs new file mode 100644 index 0000000..c11eaae --- /dev/null +++ b/src/DotNetCore.CAP.OpenTelemetry/CapInstrumentation.cs @@ -0,0 +1,27 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; + +namespace DotNetCore.CAP.OpenTelemetry +{ + /// + /// CAP instrumentation. + /// + internal class CapInstrumentation : IDisposable + { + private readonly DiagnosticSourceSubscriber? _diagnosticSourceSubscriber; + + public CapInstrumentation(DiagnosticListener diagnosticListener) + { + _diagnosticSourceSubscriber = new DiagnosticSourceSubscriber(diagnosticListener, null); + _diagnosticSourceSubscriber.Subscribe(); + } + + /// + public void Dispose() + { + _diagnosticSourceSubscriber?.Dispose(); + } + } +} diff --git a/src/DotNetCore.CAP.OpenTelemetry/DiagnosticObserver.cs b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticListener.cs similarity index 98% rename from src/DotNetCore.CAP.OpenTelemetry/DiagnosticObserver.cs rename to src/DotNetCore.CAP.OpenTelemetry/DiagnosticListener.cs index 0a63df6..dc5d8f2 100644 --- a/src/DotNetCore.CAP.OpenTelemetry/DiagnosticObserver.cs +++ b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticListener.cs @@ -15,9 +15,10 @@ using CapEvents = DotNetCore.CAP.Diagnostics.CapDiagnosticListenerNames; namespace DotNetCore.CAP.OpenTelemetry { - public class CapDiagnosticObserver : IObserver> + internal class DiagnosticListener : IObserver> { - private static readonly ActivitySource ActivitySource = new("DotNetCore.CAP.OpenTelemetry", "1.0.0"); + public const string SourceName = "DotNetCore.CAP.OpenTelemetry"; + private static readonly ActivitySource ActivitySource = new(SourceName, "1.0.0"); private static readonly TextMapPropagator Propagator = new TraceContextPropagator(); private readonly ConcurrentDictionary _contexts = new(); @@ -25,7 +26,7 @@ namespace DotNetCore.CAP.OpenTelemetry private const string OperateNamePrefix = "CAP/"; private const string ProducerOperateNameSuffix = "/Publisher"; private const string ConsumerOperateNameSuffix = "/Subscriber"; - + public void OnCompleted() { } @@ -189,9 +190,7 @@ namespace DotNetCore.CAP.OpenTelemetry { var exception = eventData.Exception!; activity.SetStatus(Status.Error.WithDescription(exception.Message)); - activity.RecordException(exception); - activity.Stop(); } } @@ -223,7 +222,6 @@ namespace DotNetCore.CAP.OpenTelemetry activity.Stop(); } - } break; case CapEvents.ErrorSubscriberInvoke: diff --git a/src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs deleted file mode 100644 index a98897c..0000000 --- a/src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Diagnostics; -using DotNetCore.CAP.Diagnostics; -using Microsoft.Extensions.Logging; - -namespace DotNetCore.CAP.OpenTelemetry -{ - public class CapDiagnosticProcessorObserver : IObserver - { - private readonly ILogger _logger; - private readonly CapDiagnosticObserver _capObserver; - public const string DiagnosticListenerName = CapDiagnosticListenerNames.DiagnosticListenerName; - - public CapDiagnosticProcessorObserver(ILogger logger, CapDiagnosticObserver capObserver) - { - _logger = logger; - _capObserver = capObserver; - } - - public void OnCompleted() - { - } - - public void OnError(Exception error) - { - } - - public void OnNext(DiagnosticListener listener) - { - if (listener.Name == DiagnosticListenerName) - { - listener.Subscribe(_capObserver); - _logger.LogInformation($"Loaded diagnostic listener [{DiagnosticListenerName}]."); - } - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.OpenTelemetry/DiagnosticSourceSubscriber.cs b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticSourceSubscriber.cs new file mode 100644 index 0000000..4f57a33 --- /dev/null +++ b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticSourceSubscriber.cs @@ -0,0 +1,97 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Threading; +using DotNetCore.CAP.Diagnostics; + +namespace DotNetCore.CAP.OpenTelemetry +{ + internal class DiagnosticSourceSubscriber : IDisposable, IObserver + { + private readonly List _listenerSubscriptions; + private readonly Func _handlerFactory; + private readonly Func _diagnosticSourceFilter; + private readonly Func? _isEnabledFilter; + private long _disposed; + private IDisposable? _allSourcesSubscription; + + public DiagnosticSourceSubscriber( + DiagnosticListener handler, + Func? isEnabledFilter) + : this(_ => handler, + value => CapDiagnosticListenerNames.DiagnosticListenerName == value.Name, + isEnabledFilter) + { + } + + public DiagnosticSourceSubscriber( + Func handlerFactory, + Func diagnosticSourceFilter, + Func? isEnabledFilter) + { + _listenerSubscriptions = new List(); + _handlerFactory = handlerFactory ?? throw new ArgumentNullException(nameof(handlerFactory)); + _diagnosticSourceFilter = diagnosticSourceFilter; + _isEnabledFilter = isEnabledFilter; + } + + public void Subscribe() + { + _allSourcesSubscription ??= System.Diagnostics.DiagnosticListener.AllListeners.Subscribe(this); + } + + public void OnNext(System.Diagnostics.DiagnosticListener value) + { + if ((Interlocked.Read(ref _disposed) == 0) && _diagnosticSourceFilter(value)) + { + var handler = _handlerFactory(value.Name); + var subscription = _isEnabledFilter == null ? + value.Subscribe(handler) : + value.Subscribe(handler, _isEnabledFilter); + + lock (_listenerSubscriptions) + { + _listenerSubscriptions.Add(subscription); + } + } + } + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 1) + { + return; + } + + lock (_listenerSubscriptions) + { + foreach (var listenerSubscription in _listenerSubscriptions) + { + listenerSubscription?.Dispose(); + } + + _listenerSubscriptions.Clear(); + } + + _allSourcesSubscription?.Dispose(); + _allSourcesSubscription = null; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.OpenTelemetry/DotNetCore.CAP.OpenTelemetry.csproj b/src/DotNetCore.CAP.OpenTelemetry/DotNetCore.CAP.OpenTelemetry.csproj index 4592514..1f2a03a 100644 --- a/src/DotNetCore.CAP.OpenTelemetry/DotNetCore.CAP.OpenTelemetry.csproj +++ b/src/DotNetCore.CAP.OpenTelemetry/DotNetCore.CAP.OpenTelemetry.csproj @@ -4,14 +4,13 @@ netstandard2.1 enable CAP instrumentation for OpenTelemetry .NET - $(PackageTags);distributed-tracing + $(PackageTags);distributed-tracing;opentelemetry; - diff --git a/src/DotNetCore.CAP.OpenTelemetry/IProcessingServer.DiagnosticRegister.cs b/src/DotNetCore.CAP.OpenTelemetry/IProcessingServer.DiagnosticRegister.cs deleted file mode 100644 index 35af64f..0000000 --- a/src/DotNetCore.CAP.OpenTelemetry/IProcessingServer.DiagnosticRegister.cs +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System.Diagnostics; -using System.Threading; -using DotNetCore.CAP.Internal; - -namespace DotNetCore.CAP.OpenTelemetry -{ - public class OpenTelemetryDiagnosticRegister : IProcessingServer - { - private readonly CapDiagnosticProcessorObserver _diagnosticProcessorObserver; - - public OpenTelemetryDiagnosticRegister(CapDiagnosticProcessorObserver diagnosticProcessorObserver) - { - _diagnosticProcessorObserver = diagnosticProcessorObserver; - } - - public void Dispose() - { - - } - - public void Pulse() - { - - } - - public void Start(CancellationToken stoppingToken) - { - DiagnosticListener.AllListeners.Subscribe(_diagnosticProcessorObserver); - } - } -} diff --git a/src/DotNetCore.CAP.OpenTelemetry/TracerProviderBuilder.Extension.cs b/src/DotNetCore.CAP.OpenTelemetry/TracerProviderBuilder.Extension.cs new file mode 100644 index 0000000..b4d2af8 --- /dev/null +++ b/src/DotNetCore.CAP.OpenTelemetry/TracerProviderBuilder.Extension.cs @@ -0,0 +1,31 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using DotNetCore.CAP.OpenTelemetry; + +// ReSharper disable once CheckNamespace +namespace OpenTelemetry.Trace +{ + public static class TracerProviderBuilderExtensions + { + /// + /// Enables the message eventing data collection for CAP. + /// + /// being configured. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddCapInstrumentation(this TracerProviderBuilder builder) + { + if (builder == null) + { + throw new ArgumentNullException(nameof(builder)); + } + + builder.AddSource(DiagnosticListener.SourceName); + + var instrumentation = new CapInstrumentation(new DiagnosticListener()); + + return builder.AddInstrumentation(() => instrumentation); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index 548001b..5680f25 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -39,7 +39,7 @@ namespace DotNetCore.CAP.RabbitMQ public event EventHandler? OnLog; - public BrokerAddress BrokerAddress => new("RabbitMQ", _rabbitMQOptions.HostName); + public BrokerAddress BrokerAddress => new("RabbitMQ", $"{_rabbitMQOptions.HostName}:{_rabbitMQOptions.Port}"); public void Subscribe(IEnumerable topics) { @@ -96,7 +96,6 @@ namespace DotNetCore.CAP.RabbitMQ public void Dispose() { - _channel?.Dispose(); //The connection should not be closed here, because the connection is still in use elsewhere. //_connection?.Dispose();