@@ -16,18 +16,7 @@ Install the CAP OpenTelemetry package into the project. | |||||
dotnet add package DotNetCore.Cap.OpenTelemetry | dotnet add package DotNetCore.Cap.OpenTelemetry | ||||
``` | ``` | ||||
The OpenTelemetry data comes from [diagnostics](diagnostics.md), add the configuration to enable data collection. | |||||
```C# | |||||
services.AddCap(x => | |||||
{ | |||||
//*** | |||||
x.UseOpenTelemetry(); // <-- Add this line | |||||
}); | |||||
``` | |||||
Add the instrumentation of CAP to the configuration of OpenTelemetry. | |||||
The OpenTelemetry data comes from [diagnostics](diagnostics.md), add the instrumentation of CAP to the configuration of OpenTelemetry. | |||||
```C# | ```C# | ||||
services.AddOpenTelemetryTracing((builder) => builder | services.AddOpenTelemetryTracing((builder) => builder | ||||
@@ -6,8 +6,6 @@ OpenTelemetry是工具、api和sdk的集合。 使用它来检测、生成、收 | |||||
## 集成 | ## 集成 | ||||
You can find it [here](https://opentelemetry.io/docs/instrumentation/net/getting-started/) about how to use OpenTelemetry in console applications or ASP.NET Core, at here we mainly describe how to tracing CAP data to OpenTelemetry. | |||||
你可以在[这里](https://opentelemetry.io/docs/instrumentation/net/getting-started/)找到关于如何在控制台应用或ASP.NET Core 中使用OpenTelemetry。 | 你可以在[这里](https://opentelemetry.io/docs/instrumentation/net/getting-started/)找到关于如何在控制台应用或ASP.NET Core 中使用OpenTelemetry。 | ||||
在这里我们主要描述如何将CAP集成到OpenTelemetry中。 | 在这里我们主要描述如何将CAP集成到OpenTelemetry中。 | ||||
@@ -19,23 +17,12 @@ You can find it [here](https://opentelemetry.io/docs/instrumentation/net/getting | |||||
dotnet add package DotNetCore.Cap.OpenTelemetry | dotnet add package DotNetCore.Cap.OpenTelemetry | ||||
``` | ``` | ||||
OpenTelemetry 的跟踪数据来自于[Diagnostics](diagnostics.md)发送的诊断数据,使用下面的配置行来启用收集数据。 | |||||
```C# | |||||
services.AddCap(x => | |||||
{ | |||||
//*** | |||||
x.UseOpenTelemetry(); // <-- Add this line | |||||
}); | |||||
``` | |||||
添加 CAP Instrumentation 到 OpenTelemetry的扩展配置中。 | |||||
OpenTelemetry 的跟踪数据来自于[Diagnostics](diagnostics.md)发送的诊断数据,添加 CAP Instrumentation 到 OpenTelemetry的扩展配置中会进行自动收集。 | |||||
```C# | ```C# | ||||
services.AddOpenTelemetryTracing((builder) => builder | services.AddOpenTelemetryTracing((builder) => builder | ||||
.AddAspNetCoreInstrumentation() | .AddAspNetCoreInstrumentation() | ||||
.AddCapInstrumentation() // <-- Add this line | |||||
.AddCapInstrumentation() // <-- 添加这行 | |||||
.AddZipkinExporter() | .AddZipkinExporter() | ||||
); | ); | ||||
``` | ``` | ||||
@@ -1,55 +0,0 @@ | |||||
// Copyright (c) .NET Core Community. All rights reserved. | |||||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||||
using System; | |||||
using DotNetCore.CAP.Internal; | |||||
using DotNetCore.CAP.OpenTelemetry; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||||
using OpenTelemetry.Resources; | |||||
using OpenTelemetry.Trace; | |||||
// ReSharper disable once CheckNamespace | |||||
namespace DotNetCore.CAP | |||||
{ | |||||
internal class OpenTelemetryCapOptionsExtension : ICapOptionsExtension | |||||
{ | |||||
public void AddServices(IServiceCollection services) | |||||
{ | |||||
services.AddSingleton<CapDiagnosticObserver>(); | |||||
services.AddSingleton<CapDiagnosticProcessorObserver>(); | |||||
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, OpenTelemetryDiagnosticRegister>()); | |||||
} | |||||
} | |||||
public static class CapOptionsExtensions | |||||
{ | |||||
public static CapOptions UseOpenTelemetry(this CapOptions options) | |||||
{ | |||||
options.RegisterExtension(new OpenTelemetryCapOptionsExtension()); | |||||
return options; | |||||
} | |||||
} | |||||
public static class TracerProviderBuilderExtensions | |||||
{ | |||||
/// <summary> | |||||
/// Enables the message eventing data collection for CAP. | |||||
/// </summary> | |||||
/// <param name="builder"><see cref="TracerProviderBuilder"/> being configured.</param> | |||||
/// <returns>The instance of <see cref="TracerProviderBuilder"/> to chain the calls.</returns> | |||||
public static TracerProviderBuilder AddCapInstrumentation(this TracerProviderBuilder builder) | |||||
{ | |||||
if (builder == null) | |||||
{ | |||||
throw new ArgumentNullException(nameof(builder)); | |||||
} | |||||
builder.AddSource("DotNetCore.CAP.OpenTelemetry") | |||||
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("CAP")); | |||||
return builder; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,27 @@ | |||||
// Copyright (c) .NET Core Community. All rights reserved. | |||||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||||
using System; | |||||
namespace DotNetCore.CAP.OpenTelemetry | |||||
{ | |||||
/// <summary> | |||||
/// CAP instrumentation. | |||||
/// </summary> | |||||
internal class CapInstrumentation : IDisposable | |||||
{ | |||||
private readonly DiagnosticSourceSubscriber? _diagnosticSourceSubscriber; | |||||
public CapInstrumentation(DiagnosticListener diagnosticListener) | |||||
{ | |||||
_diagnosticSourceSubscriber = new DiagnosticSourceSubscriber(diagnosticListener, null); | |||||
_diagnosticSourceSubscriber.Subscribe(); | |||||
} | |||||
/// <inheritdoc/> | |||||
public void Dispose() | |||||
{ | |||||
_diagnosticSourceSubscriber?.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -15,9 +15,10 @@ using CapEvents = DotNetCore.CAP.Diagnostics.CapDiagnosticListenerNames; | |||||
namespace DotNetCore.CAP.OpenTelemetry | namespace DotNetCore.CAP.OpenTelemetry | ||||
{ | { | ||||
public class CapDiagnosticObserver : IObserver<KeyValuePair<string, object?>> | |||||
internal class DiagnosticListener : IObserver<KeyValuePair<string, object?>> | |||||
{ | { | ||||
private static readonly ActivitySource ActivitySource = new("DotNetCore.CAP.OpenTelemetry", "1.0.0"); | |||||
public const string SourceName = "DotNetCore.CAP.OpenTelemetry"; | |||||
private static readonly ActivitySource ActivitySource = new(SourceName, "1.0.0"); | |||||
private static readonly TextMapPropagator Propagator = new TraceContextPropagator(); | private static readonly TextMapPropagator Propagator = new TraceContextPropagator(); | ||||
private readonly ConcurrentDictionary<string, ActivityContext> _contexts = new(); | private readonly ConcurrentDictionary<string, ActivityContext> _contexts = new(); | ||||
@@ -25,7 +26,7 @@ namespace DotNetCore.CAP.OpenTelemetry | |||||
private const string OperateNamePrefix = "CAP/"; | private const string OperateNamePrefix = "CAP/"; | ||||
private const string ProducerOperateNameSuffix = "/Publisher"; | private const string ProducerOperateNameSuffix = "/Publisher"; | ||||
private const string ConsumerOperateNameSuffix = "/Subscriber"; | private const string ConsumerOperateNameSuffix = "/Subscriber"; | ||||
public void OnCompleted() | public void OnCompleted() | ||||
{ | { | ||||
} | } | ||||
@@ -189,9 +190,7 @@ namespace DotNetCore.CAP.OpenTelemetry | |||||
{ | { | ||||
var exception = eventData.Exception!; | var exception = eventData.Exception!; | ||||
activity.SetStatus(Status.Error.WithDescription(exception.Message)); | activity.SetStatus(Status.Error.WithDescription(exception.Message)); | ||||
activity.RecordException(exception); | activity.RecordException(exception); | ||||
activity.Stop(); | activity.Stop(); | ||||
} | } | ||||
} | } | ||||
@@ -223,7 +222,6 @@ namespace DotNetCore.CAP.OpenTelemetry | |||||
activity.Stop(); | activity.Stop(); | ||||
} | } | ||||
} | } | ||||
break; | break; | ||||
case CapEvents.ErrorSubscriberInvoke: | case CapEvents.ErrorSubscriberInvoke: |
@@ -1,40 +0,0 @@ | |||||
// Copyright (c) .NET Core Community. All rights reserved. | |||||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||||
using System; | |||||
using System.Diagnostics; | |||||
using DotNetCore.CAP.Diagnostics; | |||||
using Microsoft.Extensions.Logging; | |||||
namespace DotNetCore.CAP.OpenTelemetry | |||||
{ | |||||
public class CapDiagnosticProcessorObserver : IObserver<DiagnosticListener> | |||||
{ | |||||
private readonly ILogger _logger; | |||||
private readonly CapDiagnosticObserver _capObserver; | |||||
public const string DiagnosticListenerName = CapDiagnosticListenerNames.DiagnosticListenerName; | |||||
public CapDiagnosticProcessorObserver(ILogger<CapDiagnosticProcessorObserver> logger, CapDiagnosticObserver capObserver) | |||||
{ | |||||
_logger = logger; | |||||
_capObserver = capObserver; | |||||
} | |||||
public void OnCompleted() | |||||
{ | |||||
} | |||||
public void OnError(Exception error) | |||||
{ | |||||
} | |||||
public void OnNext(DiagnosticListener listener) | |||||
{ | |||||
if (listener.Name == DiagnosticListenerName) | |||||
{ | |||||
listener.Subscribe(_capObserver); | |||||
_logger.LogInformation($"Loaded diagnostic listener [{DiagnosticListenerName}]."); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,97 @@ | |||||
// Copyright (c) .NET Core Community. All rights reserved. | |||||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Threading; | |||||
using DotNetCore.CAP.Diagnostics; | |||||
namespace DotNetCore.CAP.OpenTelemetry | |||||
{ | |||||
internal class DiagnosticSourceSubscriber : IDisposable, IObserver<System.Diagnostics.DiagnosticListener> | |||||
{ | |||||
private readonly List<IDisposable> _listenerSubscriptions; | |||||
private readonly Func<string, DiagnosticListener> _handlerFactory; | |||||
private readonly Func<System.Diagnostics.DiagnosticListener, bool> _diagnosticSourceFilter; | |||||
private readonly Func<string, object?, object?, bool>? _isEnabledFilter; | |||||
private long _disposed; | |||||
private IDisposable? _allSourcesSubscription; | |||||
public DiagnosticSourceSubscriber( | |||||
DiagnosticListener handler, | |||||
Func<string, object?, object?, bool>? isEnabledFilter) | |||||
: this(_ => handler, | |||||
value => CapDiagnosticListenerNames.DiagnosticListenerName == value.Name, | |||||
isEnabledFilter) | |||||
{ | |||||
} | |||||
public DiagnosticSourceSubscriber( | |||||
Func<string, DiagnosticListener> handlerFactory, | |||||
Func<System.Diagnostics.DiagnosticListener, bool> diagnosticSourceFilter, | |||||
Func<string, object?, object?, bool>? isEnabledFilter) | |||||
{ | |||||
_listenerSubscriptions = new List<IDisposable>(); | |||||
_handlerFactory = handlerFactory ?? throw new ArgumentNullException(nameof(handlerFactory)); | |||||
_diagnosticSourceFilter = diagnosticSourceFilter; | |||||
_isEnabledFilter = isEnabledFilter; | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
_allSourcesSubscription ??= System.Diagnostics.DiagnosticListener.AllListeners.Subscribe(this); | |||||
} | |||||
public void OnNext(System.Diagnostics.DiagnosticListener value) | |||||
{ | |||||
if ((Interlocked.Read(ref _disposed) == 0) && _diagnosticSourceFilter(value)) | |||||
{ | |||||
var handler = _handlerFactory(value.Name); | |||||
var subscription = _isEnabledFilter == null ? | |||||
value.Subscribe(handler) : | |||||
value.Subscribe(handler, _isEnabledFilter); | |||||
lock (_listenerSubscriptions) | |||||
{ | |||||
_listenerSubscriptions.Add(subscription); | |||||
} | |||||
} | |||||
} | |||||
public void OnCompleted() | |||||
{ | |||||
} | |||||
public void OnError(Exception error) | |||||
{ | |||||
} | |||||
/// <inheritdoc/> | |||||
public void Dispose() | |||||
{ | |||||
Dispose(true); | |||||
GC.SuppressFinalize(this); | |||||
} | |||||
protected virtual void Dispose(bool disposing) | |||||
{ | |||||
if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 1) | |||||
{ | |||||
return; | |||||
} | |||||
lock (_listenerSubscriptions) | |||||
{ | |||||
foreach (var listenerSubscription in _listenerSubscriptions) | |||||
{ | |||||
listenerSubscription?.Dispose(); | |||||
} | |||||
_listenerSubscriptions.Clear(); | |||||
} | |||||
_allSourcesSubscription?.Dispose(); | |||||
_allSourcesSubscription = null; | |||||
} | |||||
} | |||||
} |
@@ -4,14 +4,13 @@ | |||||
<TargetFramework>netstandard2.1</TargetFramework> | <TargetFramework>netstandard2.1</TargetFramework> | ||||
<Nullable>enable</Nullable> | <Nullable>enable</Nullable> | ||||
<Description>CAP instrumentation for OpenTelemetry .NET</Description> | <Description>CAP instrumentation for OpenTelemetry .NET</Description> | ||||
<PackageTags>$(PackageTags);distributed-tracing</PackageTags> | |||||
<PackageTags>$(PackageTags);distributed-tracing;opentelemetry;</PackageTags> | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="OpenTelemetry" Version="1.2.0-rc1" /> | <PackageReference Include="OpenTelemetry" Version="1.2.0-rc1" /> | ||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | <ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | ||||
</ItemGroup> | </ItemGroup> | ||||
@@ -1,34 +0,0 @@ | |||||
// Copyright (c) .NET Core Community. All rights reserved. | |||||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||||
using System.Diagnostics; | |||||
using System.Threading; | |||||
using DotNetCore.CAP.Internal; | |||||
namespace DotNetCore.CAP.OpenTelemetry | |||||
{ | |||||
public class OpenTelemetryDiagnosticRegister : IProcessingServer | |||||
{ | |||||
private readonly CapDiagnosticProcessorObserver _diagnosticProcessorObserver; | |||||
public OpenTelemetryDiagnosticRegister(CapDiagnosticProcessorObserver diagnosticProcessorObserver) | |||||
{ | |||||
_diagnosticProcessorObserver = diagnosticProcessorObserver; | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
} | |||||
public void Pulse() | |||||
{ | |||||
} | |||||
public void Start(CancellationToken stoppingToken) | |||||
{ | |||||
DiagnosticListener.AllListeners.Subscribe(_diagnosticProcessorObserver); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,31 @@ | |||||
// Copyright (c) .NET Core Community. All rights reserved. | |||||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||||
using System; | |||||
using DotNetCore.CAP.OpenTelemetry; | |||||
// ReSharper disable once CheckNamespace | |||||
namespace OpenTelemetry.Trace | |||||
{ | |||||
public static class TracerProviderBuilderExtensions | |||||
{ | |||||
/// <summary> | |||||
/// Enables the message eventing data collection for CAP. | |||||
/// </summary> | |||||
/// <param name="builder"><see cref="TracerProviderBuilder"/> being configured.</param> | |||||
/// <returns>The instance of <see cref="TracerProviderBuilder"/> to chain the calls.</returns> | |||||
public static TracerProviderBuilder AddCapInstrumentation(this TracerProviderBuilder builder) | |||||
{ | |||||
if (builder == null) | |||||
{ | |||||
throw new ArgumentNullException(nameof(builder)); | |||||
} | |||||
builder.AddSource(DiagnosticListener.SourceName); | |||||
var instrumentation = new CapInstrumentation(new DiagnosticListener()); | |||||
return builder.AddInstrumentation(() => instrumentation); | |||||
} | |||||
} | |||||
} |
@@ -39,7 +39,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
public event EventHandler<LogMessageEventArgs>? OnLog; | public event EventHandler<LogMessageEventArgs>? OnLog; | ||||
public BrokerAddress BrokerAddress => new("RabbitMQ", _rabbitMQOptions.HostName); | |||||
public BrokerAddress BrokerAddress => new("RabbitMQ", $"{_rabbitMQOptions.HostName}:{_rabbitMQOptions.Port}"); | |||||
public void Subscribe(IEnumerable<string> topics) | public void Subscribe(IEnumerable<string> topics) | ||||
{ | { | ||||
@@ -96,7 +96,6 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
public void Dispose() | public void Dispose() | ||||
{ | { | ||||
_channel?.Dispose(); | _channel?.Dispose(); | ||||
//The connection should not be closed here, because the connection is still in use elsewhere. | //The connection should not be closed here, because the connection is still in use elsewhere. | ||||
//_connection?.Dispose(); | //_connection?.Dispose(); | ||||