diff --git a/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs b/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs index cda5755..3e95be2 100644 --- a/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs +++ b/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs @@ -5,6 +5,7 @@ using System; using System.Diagnostics; using System.Text; using System.Threading.Tasks; +using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Processor.States; using Microsoft.Extensions.Logging; @@ -13,24 +14,32 @@ namespace DotNetCore.CAP.Kafka { internal class KafkaPublishMessageSender : BasePublishMessageSender { - private readonly ConnectionPool _connectionPool; + private readonly IConnectionPool _connectionPool; private readonly ILogger _logger; + private readonly string _serversAddress; public KafkaPublishMessageSender( CapOptions options, IStateChanger stateChanger, IStorageConnection connection, - ConnectionPool connectionPool, ILogger logger) + IConnectionPool connectionPool, ILogger logger) : base(logger, options, connection, stateChanger) { _logger = logger; _connectionPool = connectionPool; + _serversAddress = _connectionPool.ServersAddress; } public override async Task PublishAsync(string keyName, string content) { + var startTime = DateTimeOffset.UtcNow; + var stopwatch = Stopwatch.StartNew(); + Guid operationId = Guid.Empty; + var producer = _connectionPool.Rent(); try { + operationId = s_diagnosticListener.WritePublishBefore(keyName, content, _serversAddress); + var contentBytes = Encoding.UTF8.GetBytes(content); var message = await producer.ProduceAsync(keyName, null, contentBytes); @@ -43,13 +52,18 @@ namespace DotNetCore.CAP.Kafka }); } + s_diagnosticListener.WritePublishAfter(operationId, message.Topic, content, _serversAddress, startTime, stopwatch.Elapsed); + _logger.LogDebug($"kafka topic message [{keyName}] has been published."); return OperateResult.Success; } catch (Exception ex) { + s_diagnosticListener.WritePublishError(operationId, keyName, content, _serversAddress, ex, startTime, stopwatch.Elapsed); + var wapperEx = new PublisherSentFailedException(ex.Message, ex); + return OperateResult.Failed(wapperEx); } finally @@ -61,5 +75,6 @@ namespace DotNetCore.CAP.Kafka } } } + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs index e3167d8..26760b2 100644 --- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs +++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs @@ -5,8 +5,8 @@ using System; using System.Data; using System.Diagnostics; using System.Threading.Tasks; +using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Infrastructure; -using DotNetCore.CAP.Internal; using DotNetCore.CAP.Models; using Microsoft.Extensions.Logging; @@ -152,16 +152,19 @@ namespace DotNetCore.CAP.Abstractions private async Task PublishWithTransAsync(string name, T contentObj, string callbackName = null) { - try + Guid operationId = default(Guid); + var content = Serialize(contentObj, callbackName); + + var message = new CapPublishedMessage { - var content = Serialize(contentObj, callbackName); + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; - var message = new CapPublishedMessage - { - Name = name, - Content = content, - StatusName = StatusName.Scheduled - }; + try + { + operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); var id = await ExecuteAsync(DbConnection, DbTransaction, message); @@ -170,6 +173,7 @@ namespace DotNetCore.CAP.Abstractions if (id > 0) { _logger.LogInformation($"message [{message}] has been persisted in the database."); + s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); message.Id = id; @@ -179,6 +183,7 @@ namespace DotNetCore.CAP.Abstractions catch (Exception e) { _logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e); + s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); Console.WriteLine(e); throw; } diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index b9bbd2d..43e34df 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -6,6 +6,7 @@ using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; +using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Models; @@ -99,7 +100,9 @@ namespace DotNetCore.CAP { client.OnMessageReceived += (sender, messageContext) => { - Guid operationId = default(Guid); + var startTime = DateTimeOffset.UtcNow; + var stopwatch = Stopwatch.StartNew(); + var operationId = Guid.Empty; var receivedMessage = new CapReceivedMessage(messageContext) { @@ -108,13 +111,22 @@ namespace DotNetCore.CAP try { - operationId = s_diagnosticListener.WriteReceiveMessageStoreBefore(receivedMessage); + operationId = s_diagnosticListener.WriteReceiveMessageStoreBefore( + messageContext.Name, + messageContext.Content, + messageContext.Group); StoreMessage(receivedMessage); client.Commit(); - s_diagnosticListener.WriteReceiveMessageStoreAfter(operationId, receivedMessage); + s_diagnosticListener.WriteReceiveMessageStoreAfter( + operationId, + messageContext.Name, + messageContext.Content, + messageContext.Group, + startTime, + stopwatch.Elapsed); _dispatcher.EnqueueToExecute(receivedMessage); } @@ -124,7 +136,13 @@ namespace DotNetCore.CAP client.Reject(); - s_diagnosticListener.WriteReceiveMessageStoreError(operationId, receivedMessage, e); + s_diagnosticListener.WriteReceiveMessageStoreError(operationId, + messageContext.Name, + messageContext.Content, + messageContext.Group, + e, + startTime, + stopwatch.Elapsed); } }; diff --git a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs index 31c1f2a..5b6588d 100644 --- a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs +++ b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs @@ -4,6 +4,7 @@ using System; using System.Diagnostics; using System.Threading.Tasks; +using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Models; @@ -22,7 +23,7 @@ namespace DotNetCore.CAP // diagnostics listener // ReSharper disable once InconsistentNaming - private static readonly DiagnosticListener s_diagnosticListener = + protected static readonly DiagnosticListener s_diagnosticListener = new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); protected BasePublishMessageSender( @@ -42,7 +43,6 @@ namespace DotNetCore.CAP public async Task SendAsync(CapPublishedMessage message) { var sp = Stopwatch.StartNew(); - var operationId = s_diagnosticListener.WritePublishBefore(message); var result = await PublishAsync(message.Name, message.Content); @@ -52,14 +52,12 @@ namespace DotNetCore.CAP { await SetSuccessfulState(message); - s_diagnosticListener.WritePublishAfter(operationId, message); _logger.MessageHasBeenSent(sp.Elapsed.TotalSeconds); return OperateResult.Success; } else { - s_diagnosticListener.WritePublishError(operationId, message, result.Exception); _logger.MessagePublishException(message.Id, result.Exception); await SetFailedState(message, result.Exception, out bool stillRetry); diff --git a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs index 9133863..0629852 100644 --- a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs +++ b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs @@ -4,6 +4,7 @@ using System; using System.Diagnostics; using System.Threading.Tasks; +using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Models; @@ -144,7 +145,10 @@ namespace DotNetCore.CAP throw new SubscriberNotFoundException(error); } - Guid operationId = default(Guid); + var startTime = DateTimeOffset.UtcNow; + var stopwatch = Stopwatch.StartNew(); + var operationId = Guid.Empty; + var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext()); try @@ -153,7 +157,7 @@ namespace DotNetCore.CAP var ret = await Invoker.InvokeAsync(consumerContext); - s_diagnosticListener.WriteConsumerInvokeAfter(operationId,consumerContext); + s_diagnosticListener.WriteConsumerInvokeAfter(operationId, consumerContext, startTime, stopwatch.Elapsed); if (!string.IsNullOrEmpty(ret.CallbackName)) { @@ -162,7 +166,7 @@ namespace DotNetCore.CAP } catch (Exception ex) { - s_diagnosticListener.WriteConsumerInvokeError(operationId, consumerContext, ex); + s_diagnosticListener.WriteConsumerInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed); throw new SubscriberExecutionFailedException(ex.Message, ex); }