diff --git a/CAP.sln b/CAP.sln
index 8f4620b..db7190d 100644
--- a/CAP.sln
+++ b/CAP.sln
@@ -84,6 +84,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Pulsar.InMemory", "s
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer.DispatcherPerGroup", "samples\Sample.RabbitMQ.SqlServer.DispatcherPerGroup\Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj", "{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.OpenTelemetry", "src\DotNetCore.CAP.OpenTelemetry\DotNetCore.CAP.OpenTelemetry.csproj", "{83DDB126-A00B-4064-86E7-568322CA67EC}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -202,6 +204,10 @@ Global
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.Build.0 = Release|Any CPU
+ {83DDB126-A00B-4064-86E7-568322CA67EC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {83DDB126-A00B-4064-86E7-568322CA67EC}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {83DDB126-A00B-4064-86E7-568322CA67EC}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {83DDB126-A00B-4064-86E7-568322CA67EC}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -235,6 +241,7 @@ Global
{AB7A10CB-2C7E-49CE-AA21-893772FF6546} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68} = {3A6B6931-A123-477A-9469-8B468B5385AF}
+ {83DDB126-A00B-4064-86E7-568322CA67EC} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
diff --git a/docs/content/img/opentelemetry.png b/docs/content/img/opentelemetry.png
new file mode 100644
index 0000000..87fd0ae
Binary files /dev/null and b/docs/content/img/opentelemetry.png differ
diff --git a/docs/content/user-guide/en/monitoring/opentelemetry.md b/docs/content/user-guide/en/monitoring/opentelemetry.md
new file mode 100644
index 0000000..b4be2d2
--- /dev/null
+++ b/docs/content/user-guide/en/monitoring/opentelemetry.md
@@ -0,0 +1,42 @@
+# OpenTelemetry
+
+https://opentelemetry.io/
+
+OpenTelemetry is a collection of tools, APIs, and SDKs. Use it to instrument, generate, collect, and export telemetry data (metrics, logs, and traces) to help you analyze your software’s performance and behavior.
+
+## Integration
+
+You can find it [here](https://opentelemetry.io/docs/instrumentation/net/getting-started/) about how to use OpenTelemetry in console applications or ASP.NET Core, at here we mainly describe how to tracing CAP data to OpenTelemetry.
+
+### Configuration
+
+Install the CAP OpenTelemetry package into the project.
+
+```C#
+dotnet add package DotNetCore.Cap.OpenTelemetry
+```
+
+The OpenTelemetry data comes from [diagnostics](diagnostics.md), add the configuration to enable data collection.
+
+```C#
+services.AddCap(x =>
+{
+ //***
+ x.UseOpenTelemetry(); // <-- Add this line
+});
+
+```
+
+Add the instrumentation of CAP to the configuration of OpenTelemetry.
+
+```C#
+services.AddOpenTelemetryTracing((builder) => builder
+ .AddAspNetCoreInstrumentation()
+ .AddCapInstrumentation() // <-- Add this line
+ .AddZipkinExporter()
+);
+```
+
+Here is a diagram of CAP's tracking data in Zipkin:
+
+
\ No newline at end of file
diff --git a/docs/content/user-guide/zh/monitoring/opentelemetry.md b/docs/content/user-guide/zh/monitoring/opentelemetry.md
new file mode 100644
index 0000000..da3b3e7
--- /dev/null
+++ b/docs/content/user-guide/zh/monitoring/opentelemetry.md
@@ -0,0 +1,45 @@
+# OpenTelemetry
+
+https://opentelemetry.io/
+
+OpenTelemetry是工具、api和sdk的集合。 使用它来检测、生成、收集和导出遥测数据(度量、日志和跟踪),以帮助您分析软件的性能和行为。
+
+## 集成
+
+You can find it [here](https://opentelemetry.io/docs/instrumentation/net/getting-started/) about how to use OpenTelemetry in console applications or ASP.NET Core, at here we mainly describe how to tracing CAP data to OpenTelemetry.
+
+你可以在[这里](https://opentelemetry.io/docs/instrumentation/net/getting-started/)找到关于如何在控制台应用或ASP.NET Core 中使用OpenTelemetry。
+在这里我们主要描述如何将CAP集成到OpenTelemetry中。
+
+### 配置
+
+安装CAP的OpenTelemetry包到项目中。
+
+```C#
+dotnet add package DotNetCore.Cap.OpenTelemetry
+```
+
+OpenTelemetry 的跟踪数据来自于[Diagnostics](diagnostics.md)发送的诊断数据,使用下面的配置行来启用收集数据。
+
+```C#
+services.AddCap(x =>
+{
+ //***
+ x.UseOpenTelemetry(); // <-- Add this line
+});
+
+```
+
+添加 CAP Instrumentation 到 OpenTelemetry的扩展配置中。
+
+```C#
+services.AddOpenTelemetryTracing((builder) => builder
+ .AddAspNetCoreInstrumentation()
+ .AddCapInstrumentation() // <-- Add this line
+ .AddZipkinExporter()
+);
+```
+
+以下是CAP的跟踪数据在 Zipkin 中的一个示意图:
+
+
\ No newline at end of file
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index 8dd3daf..52aee2a 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -113,6 +113,7 @@ nav:
- Consul: user-guide/en/monitoring/consul.md
- Dashboard: user-guide/en/monitoring/dashboard.md
- Diagnostics: user-guide/en/monitoring/diagnostics.md
+ - OpenTelemetry: user-guide/en/monitoring/opentelemetry.md
- Samples:
- Github: user-guide/en/samples/github.md
- eShopOnContainers: user-guide/en/samples/eshoponcontainers.md
@@ -149,8 +150,8 @@ nav:
- 监控:
- Consul: user-guide/zh/monitoring/consul.md
- Dashboard: user-guide/zh/monitoring/dashboard.md
- - 性能追踪: user-guide/zh/monitoring/diagnostics.md
- - 健康检查: user-guide/zh/monitoring/health-checks.md
+ - Diagnostics: user-guide/zh/monitoring/diagnostics.md
+ - OpenTelemetry: user-guide/zh/monitoring/opentelemetry.md
- 示例:
- Castle DynamicProxy: user-guide/zh/samples/castle.dynamicproxy.md
- Github: user-guide/zh/samples/github.md
diff --git a/src/DotNetCore.CAP.OpenTelemetry/CAP.Options.Extension.cs b/src/DotNetCore.CAP.OpenTelemetry/CAP.Options.Extension.cs
new file mode 100644
index 0000000..69e939f
--- /dev/null
+++ b/src/DotNetCore.CAP.OpenTelemetry/CAP.Options.Extension.cs
@@ -0,0 +1,55 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using DotNetCore.CAP.Internal;
+using DotNetCore.CAP.OpenTelemetry;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using OpenTelemetry.Resources;
+using OpenTelemetry.Trace;
+
+// ReSharper disable once CheckNamespace
+namespace DotNetCore.CAP
+{
+ internal class OpenTelemetryCapOptionsExtension : ICapOptionsExtension
+ {
+ public void AddServices(IServiceCollection services)
+ {
+ services.AddSingleton();
+ services.AddSingleton();
+ services.TryAddEnumerable(ServiceDescriptor.Singleton());
+ }
+ }
+
+ public static class CapOptionsExtensions
+ {
+ public static CapOptions UseOpenTelemetry(this CapOptions options)
+ {
+ options.RegisterExtension(new OpenTelemetryCapOptionsExtension());
+
+ return options;
+ }
+ }
+
+ public static class TracerProviderBuilderExtensions
+ {
+ ///
+ /// Enables the message eventing data collection for CAP.
+ ///
+ /// being configured.
+ /// The instance of to chain the calls.
+ public static TracerProviderBuilder AddCapInstrumentation(this TracerProviderBuilder builder)
+ {
+ if (builder == null)
+ {
+ throw new ArgumentNullException(nameof(builder));
+ }
+
+ builder.AddSource("DotNetCore.CAP.OpenTelemetry")
+ .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("CAP"));
+
+ return builder;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.OpenTelemetry/DiagnosticObserver.cs b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticObserver.cs
new file mode 100644
index 0000000..0a63df6
--- /dev/null
+++ b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticObserver.cs
@@ -0,0 +1,244 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using DotNetCore.CAP.Diagnostics;
+using DotNetCore.CAP.Messages;
+using OpenTelemetry;
+using OpenTelemetry.Context.Propagation;
+using OpenTelemetry.Trace;
+using CapEvents = DotNetCore.CAP.Diagnostics.CapDiagnosticListenerNames;
+
+namespace DotNetCore.CAP.OpenTelemetry
+{
+ public class CapDiagnosticObserver : IObserver>
+ {
+ private static readonly ActivitySource ActivitySource = new("DotNetCore.CAP.OpenTelemetry", "1.0.0");
+ private static readonly TextMapPropagator Propagator = new TraceContextPropagator();
+
+ private readonly ConcurrentDictionary _contexts = new();
+
+ private const string OperateNamePrefix = "CAP/";
+ private const string ProducerOperateNameSuffix = "/Publisher";
+ private const string ConsumerOperateNameSuffix = "/Subscriber";
+
+ public void OnCompleted()
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnNext(KeyValuePair evt)
+ {
+ switch (evt.Key)
+ {
+ case CapEvents.BeforePublishMessageStore:
+ {
+ var eventData = (CapEventDataPubStore)evt.Value!;
+ var useParent = false;
+ if (Activity.Current != null)
+ {
+ if (_contexts.TryAdd(eventData.Message.GetId(), Activity.Current.Context))
+ {
+ useParent = true;
+ }
+ }
+ var activity = ActivitySource.StartActivity("Event Persistence: " + eventData.Operation);
+ if (activity != null)
+ {
+ activity.SetTag("message.name", eventData.Operation);
+ activity.AddEvent(new ActivityEvent("CAP message persistence start...",
+ DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));
+
+ if (!useParent)
+ {
+ _contexts[eventData.Message.GetId()] = Activity.Current!.Context;
+ }
+ }
+ }
+ break;
+ case CapEvents.AfterPublishMessageStore:
+ {
+ var eventData = (CapEventDataPubStore)evt.Value!;
+
+ Activity.Current?.AddEvent(new ActivityEvent("CAP message persistence succeeded!",
+ DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value),
+ new ActivityTagsCollection { new("cap.persistence.duration", eventData.ElapsedTimeMs) })
+ );
+
+ Activity.Current?.Stop();
+ }
+ break;
+ case CapEvents.ErrorPublishMessageStore:
+ {
+ var eventData = (CapEventDataPubStore)evt.Value!;
+ if (Activity.Current is { } activity)
+ {
+ var exception = eventData.Exception!;
+ activity.SetStatus(Status.Error.WithDescription(exception.Message));
+ activity.RecordException(exception);
+ activity.Stop();
+ }
+ }
+ break;
+ case CapEvents.BeforePublish:
+ {
+ var eventData = (CapEventDataPubSend)evt.Value!;
+ _contexts.TryRemove(eventData.TransportMessage.GetId(), out var context);
+ var activity = ActivitySource.StartActivity(OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix, ActivityKind.Producer, context);
+ if (activity != null)
+ {
+ activity.SetTag("messaging.system", eventData.BrokerAddress.Name);
+ activity.SetTag("messaging.destination", eventData.Operation);
+ activity.SetTag("messaging.destination_kind", "topic");
+ activity.SetTag("messaging.url", eventData.BrokerAddress.Endpoint!.Replace("-1", "5672"));
+ activity.SetTag("messaging.message_id", eventData.TransportMessage.GetId());
+ activity.SetTag("messaging.message_payload_size_bytes", eventData.TransportMessage.Body?.Length);
+
+ activity.AddEvent(new ActivityEvent("Message publishing start...",
+ DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));
+
+ Propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), eventData.TransportMessage,
+ (msg, key, value) =>
+ {
+ msg.Headers[key] = value;
+ });
+ }
+ }
+ break;
+ case CapEvents.AfterPublish:
+ {
+ var eventData = (CapEventDataPubSend)evt.Value!;
+ if (Activity.Current is { } activity)
+ {
+ activity.AddEvent(new ActivityEvent("Message publishing succeeded!",
+ DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value),
+ new ActivityTagsCollection { new("cap.send.duration", eventData.ElapsedTimeMs) })
+ );
+ activity.Stop();
+ }
+ }
+ break;
+ case CapEvents.ErrorPublish:
+ {
+ var eventData = (CapEventDataPubSend)evt.Value!;
+ if (Activity.Current is { } activity)
+ {
+ var exception = eventData.Exception!;
+ activity.SetStatus(Status.Error.WithDescription(exception.Message));
+ activity.RecordException(exception);
+ activity.Stop();
+ }
+ }
+ break;
+ case CapEvents.BeforeConsume:
+ {
+ var eventData = (CapEventDataSubStore)evt.Value!;
+ var parentContext = Propagator.Extract(default, eventData.TransportMessage, (msg, key) =>
+ {
+ if (msg.Headers.TryGetValue(key, out string? value))
+ {
+ return new[] { value };
+ }
+ return Enumerable.Empty();
+ });
+
+ var activity = ActivitySource.StartActivity(OperateNamePrefix + eventData.Operation + ConsumerOperateNameSuffix,
+ ActivityKind.Consumer,
+ parentContext.ActivityContext);
+
+ if (activity != null)
+ {
+ activity.SetTag("messaging.system", eventData.BrokerAddress.Name);
+ activity.SetTag("messaging.destination", eventData.Operation);
+ activity.SetTag("messaging.destination_kind", "topic");
+ activity.SetTag("messaging.url", eventData.BrokerAddress.Endpoint!.Replace("-1", "5672"));
+ activity.SetTag("messaging.message_id", eventData.TransportMessage.GetId());
+ activity.SetTag("messaging.message_payload_size_bytes", eventData.TransportMessage.Body?.Length);
+
+ activity.AddEvent(new ActivityEvent("CAP message persistence start...",
+ DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));
+
+ _contexts[eventData.TransportMessage.GetId() + eventData.TransportMessage.GetGroup()] = activity.Context;
+ }
+ }
+ break;
+ case CapEvents.AfterConsume:
+ {
+ var eventData = (CapEventDataSubStore)evt.Value!;
+ if (Activity.Current is { } activity)
+ {
+ activity.AddEvent(new ActivityEvent("CAP message persistence succeeded!",
+ DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value),
+ new ActivityTagsCollection { new("cap.receive.duration", eventData.ElapsedTimeMs) }));
+
+ activity.Stop();
+ }
+ }
+ break;
+ case CapEvents.ErrorConsume:
+ {
+ var eventData = (CapEventDataSubStore)evt.Value!;
+ if (Activity.Current is { } activity)
+ {
+ var exception = eventData.Exception!;
+ activity.SetStatus(Status.Error.WithDescription(exception.Message));
+
+ activity.RecordException(exception);
+
+ activity.Stop();
+ }
+ }
+ break;
+ case CapEvents.BeforeSubscriberInvoke:
+ {
+ var eventData = (CapEventDataSubExecute)evt.Value!;
+ _contexts.TryRemove(eventData.Message.GetId() + eventData.Message.GetGroup(), out var context);
+ var activity = ActivitySource.StartActivity("Subscriber Invoke: " + eventData.MethodInfo!.Name, ActivityKind.Internal,
+ context);
+ if (activity != null)
+ {
+ activity.SetTag("messaging.operation", "process");
+ activity.SetTag("code.function", eventData.MethodInfo.Name);
+
+ activity.AddEvent(new ActivityEvent("Begin invoke the subscriber:" + eventData.MethodInfo.Name,
+ DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));
+ }
+ }
+ break;
+ case CapEvents.AfterSubscriberInvoke:
+ {
+ var eventData = (CapEventDataSubExecute)evt.Value!;
+ if (Activity.Current is { } activity)
+ {
+ activity.AddEvent(new ActivityEvent("Subscriber invoke succeeded!",
+ DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value),
+ new ActivityTagsCollection { new("cap.invoke.duration", eventData.ElapsedTimeMs) }));
+
+ activity.Stop();
+ }
+
+ }
+ break;
+ case CapEvents.ErrorSubscriberInvoke:
+ {
+ var eventData = (CapEventDataSubExecute)evt.Value!;
+ if (Activity.Current is { } activity)
+ {
+ var exception = eventData.Exception!;
+ activity.SetStatus(Status.Error.WithDescription(exception.Message));
+ activity.RecordException(exception);
+ activity.Stop();
+ }
+ }
+ break;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs
new file mode 100644
index 0000000..a98897c
--- /dev/null
+++ b/src/DotNetCore.CAP.OpenTelemetry/DiagnosticProcessorObserver.cs
@@ -0,0 +1,40 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Diagnostics;
+using DotNetCore.CAP.Diagnostics;
+using Microsoft.Extensions.Logging;
+
+namespace DotNetCore.CAP.OpenTelemetry
+{
+ public class CapDiagnosticProcessorObserver : IObserver
+ {
+ private readonly ILogger _logger;
+ private readonly CapDiagnosticObserver _capObserver;
+ public const string DiagnosticListenerName = CapDiagnosticListenerNames.DiagnosticListenerName;
+
+ public CapDiagnosticProcessorObserver(ILogger logger, CapDiagnosticObserver capObserver)
+ {
+ _logger = logger;
+ _capObserver = capObserver;
+ }
+
+ public void OnCompleted()
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnNext(DiagnosticListener listener)
+ {
+ if (listener.Name == DiagnosticListenerName)
+ {
+ listener.Subscribe(_capObserver);
+ _logger.LogInformation($"Loaded diagnostic listener [{DiagnosticListenerName}].");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.OpenTelemetry/DotNetCore.CAP.OpenTelemetry.csproj b/src/DotNetCore.CAP.OpenTelemetry/DotNetCore.CAP.OpenTelemetry.csproj
new file mode 100644
index 0000000..4592514
--- /dev/null
+++ b/src/DotNetCore.CAP.OpenTelemetry/DotNetCore.CAP.OpenTelemetry.csproj
@@ -0,0 +1,19 @@
+
+
+
+ netstandard2.1
+ enable
+ CAP instrumentation for OpenTelemetry .NET
+ $(PackageTags);distributed-tracing
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/DotNetCore.CAP.OpenTelemetry/IProcessingServer.DiagnosticRegister.cs b/src/DotNetCore.CAP.OpenTelemetry/IProcessingServer.DiagnosticRegister.cs
new file mode 100644
index 0000000..35af64f
--- /dev/null
+++ b/src/DotNetCore.CAP.OpenTelemetry/IProcessingServer.DiagnosticRegister.cs
@@ -0,0 +1,34 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System.Diagnostics;
+using System.Threading;
+using DotNetCore.CAP.Internal;
+
+namespace DotNetCore.CAP.OpenTelemetry
+{
+ public class OpenTelemetryDiagnosticRegister : IProcessingServer
+ {
+ private readonly CapDiagnosticProcessorObserver _diagnosticProcessorObserver;
+
+ public OpenTelemetryDiagnosticRegister(CapDiagnosticProcessorObserver diagnosticProcessorObserver)
+ {
+ _diagnosticProcessorObserver = diagnosticProcessorObserver;
+ }
+
+ public void Dispose()
+ {
+
+ }
+
+ public void Pulse()
+ {
+
+ }
+
+ public void Start(CancellationToken stoppingToken)
+ {
+ DiagnosticListener.AllListeners.Subscribe(_diagnosticProcessorObserver);
+ }
+ }
+}