Add support for OpenTelemetrymaster
@@ -84,6 +84,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Pulsar.InMemory", "s | |||
EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer.DispatcherPerGroup", "samples\Sample.RabbitMQ.SqlServer.DispatcherPerGroup\Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj", "{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}" | |||
EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.OpenTelemetry", "src\DotNetCore.CAP.OpenTelemetry\DotNetCore.CAP.OpenTelemetry.csproj", "{83DDB126-A00B-4064-86E7-568322CA67EC}" | |||
EndProject | |||
Global | |||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | |||
Debug|Any CPU = Debug|Any CPU | |||
@@ -202,6 +204,10 @@ Global | |||
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{83DDB126-A00B-4064-86E7-568322CA67EC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{83DDB126-A00B-4064-86E7-568322CA67EC}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{83DDB126-A00B-4064-86E7-568322CA67EC}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{83DDB126-A00B-4064-86E7-568322CA67EC}.Release|Any CPU.Build.0 = Release|Any CPU | |||
EndGlobalSection | |||
GlobalSection(SolutionProperties) = preSolution | |||
HideSolutionNode = FALSE | |||
@@ -235,6 +241,7 @@ Global | |||
{AB7A10CB-2C7E-49CE-AA21-893772FF6546} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} | |||
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5} = {3A6B6931-A123-477A-9469-8B468B5385AF} | |||
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68} = {3A6B6931-A123-477A-9469-8B468B5385AF} | |||
{83DDB126-A00B-4064-86E7-568322CA67EC} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} | |||
EndGlobalSection | |||
GlobalSection(ExtensibilityGlobals) = postSolution | |||
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} | |||
@@ -0,0 +1,42 @@ | |||
# OpenTelemetry | |||
https://opentelemetry.io/ | |||
OpenTelemetry is a collection of tools, APIs, and SDKs. Use it to instrument, generate, collect, and export telemetry data (metrics, logs, and traces) to help you analyze your software’s performance and behavior. | |||
## Integration | |||
You can find it [here](https://opentelemetry.io/docs/instrumentation/net/getting-started/) about how to use OpenTelemetry in console applications or ASP.NET Core, at here we mainly describe how to tracing CAP data to OpenTelemetry. | |||
### Configuration | |||
Install the CAP OpenTelemetry package into the project. | |||
```C# | |||
dotnet add package DotNetCore.Cap.OpenTelemetry | |||
``` | |||
The OpenTelemetry data comes from [diagnostics](diagnostics.md), add the configuration to enable data collection. | |||
```C# | |||
services.AddCap(x => | |||
{ | |||
//*** | |||
x.UseOpenTelemetry(); // <-- Add this line | |||
}); | |||
``` | |||
Add the instrumentation of CAP to the configuration of OpenTelemetry. | |||
```C# | |||
services.AddOpenTelemetryTracing((builder) => builder | |||
.AddAspNetCoreInstrumentation() | |||
.AddCapInstrumentation() // <-- Add this line | |||
.AddZipkinExporter() | |||
); | |||
``` | |||
Here is a diagram of CAP's tracking data in Zipkin: | |||
<img src="/img/opentelemetry.png"> |
@@ -0,0 +1,45 @@ | |||
# OpenTelemetry | |||
https://opentelemetry.io/ | |||
OpenTelemetry是工具、api和sdk的集合。 使用它来检测、生成、收集和导出遥测数据(度量、日志和跟踪),以帮助您分析软件的性能和行为。 | |||
## 集成 | |||
You can find it [here](https://opentelemetry.io/docs/instrumentation/net/getting-started/) about how to use OpenTelemetry in console applications or ASP.NET Core, at here we mainly describe how to tracing CAP data to OpenTelemetry. | |||
你可以在[这里](https://opentelemetry.io/docs/instrumentation/net/getting-started/)找到关于如何在控制台应用或ASP.NET Core 中使用OpenTelemetry。 | |||
在这里我们主要描述如何将CAP集成到OpenTelemetry中。 | |||
### 配置 | |||
安装CAP的OpenTelemetry包到项目中。 | |||
```C# | |||
dotnet add package DotNetCore.Cap.OpenTelemetry | |||
``` | |||
OpenTelemetry 的跟踪数据来自于[Diagnostics](diagnostics.md)发送的诊断数据,使用下面的配置行来启用收集数据。 | |||
```C# | |||
services.AddCap(x => | |||
{ | |||
//*** | |||
x.UseOpenTelemetry(); // <-- Add this line | |||
}); | |||
``` | |||
添加 CAP Instrumentation 到 OpenTelemetry的扩展配置中。 | |||
```C# | |||
services.AddOpenTelemetryTracing((builder) => builder | |||
.AddAspNetCoreInstrumentation() | |||
.AddCapInstrumentation() // <-- Add this line | |||
.AddZipkinExporter() | |||
); | |||
``` | |||
以下是CAP的跟踪数据在 Zipkin 中的一个示意图: | |||
<img src="/img/opentelemetry.png"> |
@@ -113,6 +113,7 @@ nav: | |||
- Consul: user-guide/en/monitoring/consul.md | |||
- Dashboard: user-guide/en/monitoring/dashboard.md | |||
- Diagnostics: user-guide/en/monitoring/diagnostics.md | |||
- OpenTelemetry: user-guide/en/monitoring/opentelemetry.md | |||
- Samples: | |||
- Github: user-guide/en/samples/github.md | |||
- eShopOnContainers: user-guide/en/samples/eshoponcontainers.md | |||
@@ -149,8 +150,8 @@ nav: | |||
- 监控: | |||
- Consul: user-guide/zh/monitoring/consul.md | |||
- Dashboard: user-guide/zh/monitoring/dashboard.md | |||
- 性能追踪: user-guide/zh/monitoring/diagnostics.md | |||
- 健康检查: user-guide/zh/monitoring/health-checks.md | |||
- Diagnostics: user-guide/zh/monitoring/diagnostics.md | |||
- OpenTelemetry: user-guide/zh/monitoring/opentelemetry.md | |||
- 示例: | |||
- Castle DynamicProxy: user-guide/zh/samples/castle.dynamicproxy.md | |||
- Github: user-guide/zh/samples/github.md | |||
@@ -0,0 +1,55 @@ | |||
// Copyright (c) .NET Core Community. All rights reserved. | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using System; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.OpenTelemetry; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||
using OpenTelemetry.Resources; | |||
using OpenTelemetry.Trace; | |||
// ReSharper disable once CheckNamespace | |||
namespace DotNetCore.CAP | |||
{ | |||
internal class OpenTelemetryCapOptionsExtension : ICapOptionsExtension | |||
{ | |||
public void AddServices(IServiceCollection services) | |||
{ | |||
services.AddSingleton<CapDiagnosticObserver>(); | |||
services.AddSingleton<CapDiagnosticProcessorObserver>(); | |||
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, OpenTelemetryDiagnosticRegister>()); | |||
} | |||
} | |||
public static class CapOptionsExtensions | |||
{ | |||
public static CapOptions UseOpenTelemetry(this CapOptions options) | |||
{ | |||
options.RegisterExtension(new OpenTelemetryCapOptionsExtension()); | |||
return options; | |||
} | |||
} | |||
public static class TracerProviderBuilderExtensions | |||
{ | |||
/// <summary> | |||
/// Enables the message eventing data collection for CAP. | |||
/// </summary> | |||
/// <param name="builder"><see cref="TracerProviderBuilder"/> being configured.</param> | |||
/// <returns>The instance of <see cref="TracerProviderBuilder"/> to chain the calls.</returns> | |||
public static TracerProviderBuilder AddCapInstrumentation(this TracerProviderBuilder builder) | |||
{ | |||
if (builder == null) | |||
{ | |||
throw new ArgumentNullException(nameof(builder)); | |||
} | |||
builder.AddSource("DotNetCore.CAP.OpenTelemetry") | |||
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("CAP")); | |||
return builder; | |||
} | |||
} | |||
} |
@@ -0,0 +1,244 @@ | |||
// Copyright (c) .NET Core Community. All rights reserved. | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using System; | |||
using System.Collections.Concurrent; | |||
using System.Collections.Generic; | |||
using System.Diagnostics; | |||
using System.Linq; | |||
using DotNetCore.CAP.Diagnostics; | |||
using DotNetCore.CAP.Messages; | |||
using OpenTelemetry; | |||
using OpenTelemetry.Context.Propagation; | |||
using OpenTelemetry.Trace; | |||
using CapEvents = DotNetCore.CAP.Diagnostics.CapDiagnosticListenerNames; | |||
namespace DotNetCore.CAP.OpenTelemetry | |||
{ | |||
public class CapDiagnosticObserver : IObserver<KeyValuePair<string, object?>> | |||
{ | |||
private static readonly ActivitySource ActivitySource = new("DotNetCore.CAP.OpenTelemetry", "1.0.0"); | |||
private static readonly TextMapPropagator Propagator = new TraceContextPropagator(); | |||
private readonly ConcurrentDictionary<string, ActivityContext> _contexts = new(); | |||
private const string OperateNamePrefix = "CAP/"; | |||
private const string ProducerOperateNameSuffix = "/Publisher"; | |||
private const string ConsumerOperateNameSuffix = "/Subscriber"; | |||
public void OnCompleted() | |||
{ | |||
} | |||
public void OnError(Exception error) | |||
{ | |||
} | |||
public void OnNext(KeyValuePair<string, object?> evt) | |||
{ | |||
switch (evt.Key) | |||
{ | |||
case CapEvents.BeforePublishMessageStore: | |||
{ | |||
var eventData = (CapEventDataPubStore)evt.Value!; | |||
var useParent = false; | |||
if (Activity.Current != null) | |||
{ | |||
if (_contexts.TryAdd(eventData.Message.GetId(), Activity.Current.Context)) | |||
{ | |||
useParent = true; | |||
} | |||
} | |||
var activity = ActivitySource.StartActivity("Event Persistence: " + eventData.Operation); | |||
if (activity != null) | |||
{ | |||
activity.SetTag("message.name", eventData.Operation); | |||
activity.AddEvent(new ActivityEvent("CAP message persistence start...", | |||
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value))); | |||
if (!useParent) | |||
{ | |||
_contexts[eventData.Message.GetId()] = Activity.Current!.Context; | |||
} | |||
} | |||
} | |||
break; | |||
case CapEvents.AfterPublishMessageStore: | |||
{ | |||
var eventData = (CapEventDataPubStore)evt.Value!; | |||
Activity.Current?.AddEvent(new ActivityEvent("CAP message persistence succeeded!", | |||
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), | |||
new ActivityTagsCollection { new("cap.persistence.duration", eventData.ElapsedTimeMs) }) | |||
); | |||
Activity.Current?.Stop(); | |||
} | |||
break; | |||
case CapEvents.ErrorPublishMessageStore: | |||
{ | |||
var eventData = (CapEventDataPubStore)evt.Value!; | |||
if (Activity.Current is { } activity) | |||
{ | |||
var exception = eventData.Exception!; | |||
activity.SetStatus(Status.Error.WithDescription(exception.Message)); | |||
activity.RecordException(exception); | |||
activity.Stop(); | |||
} | |||
} | |||
break; | |||
case CapEvents.BeforePublish: | |||
{ | |||
var eventData = (CapEventDataPubSend)evt.Value!; | |||
_contexts.TryRemove(eventData.TransportMessage.GetId(), out var context); | |||
var activity = ActivitySource.StartActivity(OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix, ActivityKind.Producer, context); | |||
if (activity != null) | |||
{ | |||
activity.SetTag("messaging.system", eventData.BrokerAddress.Name); | |||
activity.SetTag("messaging.destination", eventData.Operation); | |||
activity.SetTag("messaging.destination_kind", "topic"); | |||
activity.SetTag("messaging.url", eventData.BrokerAddress.Endpoint!.Replace("-1", "5672")); | |||
activity.SetTag("messaging.message_id", eventData.TransportMessage.GetId()); | |||
activity.SetTag("messaging.message_payload_size_bytes", eventData.TransportMessage.Body?.Length); | |||
activity.AddEvent(new ActivityEvent("Message publishing start...", | |||
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value))); | |||
Propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), eventData.TransportMessage, | |||
(msg, key, value) => | |||
{ | |||
msg.Headers[key] = value; | |||
}); | |||
} | |||
} | |||
break; | |||
case CapEvents.AfterPublish: | |||
{ | |||
var eventData = (CapEventDataPubSend)evt.Value!; | |||
if (Activity.Current is { } activity) | |||
{ | |||
activity.AddEvent(new ActivityEvent("Message publishing succeeded!", | |||
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), | |||
new ActivityTagsCollection { new("cap.send.duration", eventData.ElapsedTimeMs) }) | |||
); | |||
activity.Stop(); | |||
} | |||
} | |||
break; | |||
case CapEvents.ErrorPublish: | |||
{ | |||
var eventData = (CapEventDataPubSend)evt.Value!; | |||
if (Activity.Current is { } activity) | |||
{ | |||
var exception = eventData.Exception!; | |||
activity.SetStatus(Status.Error.WithDescription(exception.Message)); | |||
activity.RecordException(exception); | |||
activity.Stop(); | |||
} | |||
} | |||
break; | |||
case CapEvents.BeforeConsume: | |||
{ | |||
var eventData = (CapEventDataSubStore)evt.Value!; | |||
var parentContext = Propagator.Extract(default, eventData.TransportMessage, (msg, key) => | |||
{ | |||
if (msg.Headers.TryGetValue(key, out string? value)) | |||
{ | |||
return new[] { value }; | |||
} | |||
return Enumerable.Empty<string>(); | |||
}); | |||
var activity = ActivitySource.StartActivity(OperateNamePrefix + eventData.Operation + ConsumerOperateNameSuffix, | |||
ActivityKind.Consumer, | |||
parentContext.ActivityContext); | |||
if (activity != null) | |||
{ | |||
activity.SetTag("messaging.system", eventData.BrokerAddress.Name); | |||
activity.SetTag("messaging.destination", eventData.Operation); | |||
activity.SetTag("messaging.destination_kind", "topic"); | |||
activity.SetTag("messaging.url", eventData.BrokerAddress.Endpoint!.Replace("-1", "5672")); | |||
activity.SetTag("messaging.message_id", eventData.TransportMessage.GetId()); | |||
activity.SetTag("messaging.message_payload_size_bytes", eventData.TransportMessage.Body?.Length); | |||
activity.AddEvent(new ActivityEvent("CAP message persistence start...", | |||
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value))); | |||
_contexts[eventData.TransportMessage.GetId() + eventData.TransportMessage.GetGroup()] = activity.Context; | |||
} | |||
} | |||
break; | |||
case CapEvents.AfterConsume: | |||
{ | |||
var eventData = (CapEventDataSubStore)evt.Value!; | |||
if (Activity.Current is { } activity) | |||
{ | |||
activity.AddEvent(new ActivityEvent("CAP message persistence succeeded!", | |||
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), | |||
new ActivityTagsCollection { new("cap.receive.duration", eventData.ElapsedTimeMs) })); | |||
activity.Stop(); | |||
} | |||
} | |||
break; | |||
case CapEvents.ErrorConsume: | |||
{ | |||
var eventData = (CapEventDataSubStore)evt.Value!; | |||
if (Activity.Current is { } activity) | |||
{ | |||
var exception = eventData.Exception!; | |||
activity.SetStatus(Status.Error.WithDescription(exception.Message)); | |||
activity.RecordException(exception); | |||
activity.Stop(); | |||
} | |||
} | |||
break; | |||
case CapEvents.BeforeSubscriberInvoke: | |||
{ | |||
var eventData = (CapEventDataSubExecute)evt.Value!; | |||
_contexts.TryRemove(eventData.Message.GetId() + eventData.Message.GetGroup(), out var context); | |||
var activity = ActivitySource.StartActivity("Subscriber Invoke: " + eventData.MethodInfo!.Name, ActivityKind.Internal, | |||
context); | |||
if (activity != null) | |||
{ | |||
activity.SetTag("messaging.operation", "process"); | |||
activity.SetTag("code.function", eventData.MethodInfo.Name); | |||
activity.AddEvent(new ActivityEvent("Begin invoke the subscriber:" + eventData.MethodInfo.Name, | |||
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value))); | |||
} | |||
} | |||
break; | |||
case CapEvents.AfterSubscriberInvoke: | |||
{ | |||
var eventData = (CapEventDataSubExecute)evt.Value!; | |||
if (Activity.Current is { } activity) | |||
{ | |||
activity.AddEvent(new ActivityEvent("Subscriber invoke succeeded!", | |||
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), | |||
new ActivityTagsCollection { new("cap.invoke.duration", eventData.ElapsedTimeMs) })); | |||
activity.Stop(); | |||
} | |||
} | |||
break; | |||
case CapEvents.ErrorSubscriberInvoke: | |||
{ | |||
var eventData = (CapEventDataSubExecute)evt.Value!; | |||
if (Activity.Current is { } activity) | |||
{ | |||
var exception = eventData.Exception!; | |||
activity.SetStatus(Status.Error.WithDescription(exception.Message)); | |||
activity.RecordException(exception); | |||
activity.Stop(); | |||
} | |||
} | |||
break; | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,40 @@ | |||
// Copyright (c) .NET Core Community. All rights reserved. | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using System; | |||
using System.Diagnostics; | |||
using DotNetCore.CAP.Diagnostics; | |||
using Microsoft.Extensions.Logging; | |||
namespace DotNetCore.CAP.OpenTelemetry | |||
{ | |||
public class CapDiagnosticProcessorObserver : IObserver<DiagnosticListener> | |||
{ | |||
private readonly ILogger _logger; | |||
private readonly CapDiagnosticObserver _capObserver; | |||
public const string DiagnosticListenerName = CapDiagnosticListenerNames.DiagnosticListenerName; | |||
public CapDiagnosticProcessorObserver(ILogger<CapDiagnosticProcessorObserver> logger, CapDiagnosticObserver capObserver) | |||
{ | |||
_logger = logger; | |||
_capObserver = capObserver; | |||
} | |||
public void OnCompleted() | |||
{ | |||
} | |||
public void OnError(Exception error) | |||
{ | |||
} | |||
public void OnNext(DiagnosticListener listener) | |||
{ | |||
if (listener.Name == DiagnosticListenerName) | |||
{ | |||
listener.Subscribe(_capObserver); | |||
_logger.LogInformation($"Loaded diagnostic listener [{DiagnosticListenerName}]."); | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,19 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<Nullable>enable</Nullable> | |||
<Description>CAP instrumentation for OpenTelemetry .NET</Description> | |||
<PackageTags>$(PackageTags);distributed-tracing</PackageTags> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="OpenTelemetry" Version="1.2.0-rc1" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -0,0 +1,34 @@ | |||
// Copyright (c) .NET Core Community. All rights reserved. | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using System.Diagnostics; | |||
using System.Threading; | |||
using DotNetCore.CAP.Internal; | |||
namespace DotNetCore.CAP.OpenTelemetry | |||
{ | |||
public class OpenTelemetryDiagnosticRegister : IProcessingServer | |||
{ | |||
private readonly CapDiagnosticProcessorObserver _diagnosticProcessorObserver; | |||
public OpenTelemetryDiagnosticRegister(CapDiagnosticProcessorObserver diagnosticProcessorObserver) | |||
{ | |||
_diagnosticProcessorObserver = diagnosticProcessorObserver; | |||
} | |||
public void Dispose() | |||
{ | |||
} | |||
public void Pulse() | |||
{ | |||
} | |||
public void Start(CancellationToken stoppingToken) | |||
{ | |||
DiagnosticListener.AllListeners.Subscribe(_diagnosticProcessorObserver); | |||
} | |||
} | |||
} |