@@ -5,6 +5,7 @@ using System; | |||
using System.Diagnostics; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Diagnostics; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.Processor.States; | |||
using Microsoft.Extensions.Logging; | |||
@@ -13,24 +14,32 @@ namespace DotNetCore.CAP.Kafka | |||
{ | |||
internal class KafkaPublishMessageSender : BasePublishMessageSender | |||
{ | |||
private readonly ConnectionPool _connectionPool; | |||
private readonly IConnectionPool _connectionPool; | |||
private readonly ILogger _logger; | |||
private readonly string _serversAddress; | |||
public KafkaPublishMessageSender( | |||
CapOptions options, IStateChanger stateChanger, IStorageConnection connection, | |||
ConnectionPool connectionPool, ILogger<KafkaPublishMessageSender> logger) | |||
IConnectionPool connectionPool, ILogger<KafkaPublishMessageSender> logger) | |||
: base(logger, options, connection, stateChanger) | |||
{ | |||
_logger = logger; | |||
_connectionPool = connectionPool; | |||
_serversAddress = _connectionPool.ServersAddress; | |||
} | |||
public override async Task<OperateResult> PublishAsync(string keyName, string content) | |||
{ | |||
var startTime = DateTimeOffset.UtcNow; | |||
var stopwatch = Stopwatch.StartNew(); | |||
Guid operationId = Guid.Empty; | |||
var producer = _connectionPool.Rent(); | |||
try | |||
{ | |||
operationId = s_diagnosticListener.WritePublishBefore(keyName, content, _serversAddress); | |||
var contentBytes = Encoding.UTF8.GetBytes(content); | |||
var message = await producer.ProduceAsync(keyName, null, contentBytes); | |||
@@ -43,13 +52,18 @@ namespace DotNetCore.CAP.Kafka | |||
}); | |||
} | |||
s_diagnosticListener.WritePublishAfter(operationId, message.Topic, content, _serversAddress, startTime, stopwatch.Elapsed); | |||
_logger.LogDebug($"kafka topic message [{keyName}] has been published."); | |||
return OperateResult.Success; | |||
} | |||
catch (Exception ex) | |||
{ | |||
s_diagnosticListener.WritePublishError(operationId, keyName, content, _serversAddress, ex, startTime, stopwatch.Elapsed); | |||
var wapperEx = new PublisherSentFailedException(ex.Message, ex); | |||
return OperateResult.Failed(wapperEx); | |||
} | |||
finally | |||
@@ -61,5 +75,6 @@ namespace DotNetCore.CAP.Kafka | |||
} | |||
} | |||
} | |||
} | |||
} |
@@ -5,8 +5,8 @@ using System; | |||
using System.Data; | |||
using System.Diagnostics; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Diagnostics; | |||
using DotNetCore.CAP.Infrastructure; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.Models; | |||
using Microsoft.Extensions.Logging; | |||
@@ -152,16 +152,19 @@ namespace DotNetCore.CAP.Abstractions | |||
private async Task PublishWithTransAsync<T>(string name, T contentObj, string callbackName = null) | |||
{ | |||
try | |||
Guid operationId = default(Guid); | |||
var content = Serialize(contentObj, callbackName); | |||
var message = new CapPublishedMessage | |||
{ | |||
var content = Serialize(contentObj, callbackName); | |||
Name = name, | |||
Content = content, | |||
StatusName = StatusName.Scheduled | |||
}; | |||
var message = new CapPublishedMessage | |||
{ | |||
Name = name, | |||
Content = content, | |||
StatusName = StatusName.Scheduled | |||
}; | |||
try | |||
{ | |||
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); | |||
var id = await ExecuteAsync(DbConnection, DbTransaction, message); | |||
@@ -170,6 +173,7 @@ namespace DotNetCore.CAP.Abstractions | |||
if (id > 0) | |||
{ | |||
_logger.LogInformation($"message [{message}] has been persisted in the database."); | |||
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); | |||
message.Id = id; | |||
@@ -179,6 +183,7 @@ namespace DotNetCore.CAP.Abstractions | |||
catch (Exception e) | |||
{ | |||
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e); | |||
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); | |||
Console.WriteLine(e); | |||
throw; | |||
} | |||
@@ -6,6 +6,7 @@ using System.Diagnostics; | |||
using System.Linq; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Diagnostics; | |||
using DotNetCore.CAP.Infrastructure; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.Models; | |||
@@ -99,7 +100,9 @@ namespace DotNetCore.CAP | |||
{ | |||
client.OnMessageReceived += (sender, messageContext) => | |||
{ | |||
Guid operationId = default(Guid); | |||
var startTime = DateTimeOffset.UtcNow; | |||
var stopwatch = Stopwatch.StartNew(); | |||
var operationId = Guid.Empty; | |||
var receivedMessage = new CapReceivedMessage(messageContext) | |||
{ | |||
@@ -108,13 +111,22 @@ namespace DotNetCore.CAP | |||
try | |||
{ | |||
operationId = s_diagnosticListener.WriteReceiveMessageStoreBefore(receivedMessage); | |||
operationId = s_diagnosticListener.WriteReceiveMessageStoreBefore( | |||
messageContext.Name, | |||
messageContext.Content, | |||
messageContext.Group); | |||
StoreMessage(receivedMessage); | |||
client.Commit(); | |||
s_diagnosticListener.WriteReceiveMessageStoreAfter(operationId, receivedMessage); | |||
s_diagnosticListener.WriteReceiveMessageStoreAfter( | |||
operationId, | |||
messageContext.Name, | |||
messageContext.Content, | |||
messageContext.Group, | |||
startTime, | |||
stopwatch.Elapsed); | |||
_dispatcher.EnqueueToExecute(receivedMessage); | |||
} | |||
@@ -124,7 +136,13 @@ namespace DotNetCore.CAP | |||
client.Reject(); | |||
s_diagnosticListener.WriteReceiveMessageStoreError(operationId, receivedMessage, e); | |||
s_diagnosticListener.WriteReceiveMessageStoreError(operationId, | |||
messageContext.Name, | |||
messageContext.Content, | |||
messageContext.Group, | |||
e, | |||
startTime, | |||
stopwatch.Elapsed); | |||
} | |||
}; | |||
@@ -4,6 +4,7 @@ | |||
using System; | |||
using System.Diagnostics; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Diagnostics; | |||
using DotNetCore.CAP.Infrastructure; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.Models; | |||
@@ -22,7 +23,7 @@ namespace DotNetCore.CAP | |||
// diagnostics listener | |||
// ReSharper disable once InconsistentNaming | |||
private static readonly DiagnosticListener s_diagnosticListener = | |||
protected static readonly DiagnosticListener s_diagnosticListener = | |||
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); | |||
protected BasePublishMessageSender( | |||
@@ -42,7 +43,6 @@ namespace DotNetCore.CAP | |||
public async Task<OperateResult> SendAsync(CapPublishedMessage message) | |||
{ | |||
var sp = Stopwatch.StartNew(); | |||
var operationId = s_diagnosticListener.WritePublishBefore(message); | |||
var result = await PublishAsync(message.Name, message.Content); | |||
@@ -52,14 +52,12 @@ namespace DotNetCore.CAP | |||
{ | |||
await SetSuccessfulState(message); | |||
s_diagnosticListener.WritePublishAfter(operationId, message); | |||
_logger.MessageHasBeenSent(sp.Elapsed.TotalSeconds); | |||
return OperateResult.Success; | |||
} | |||
else | |||
{ | |||
s_diagnosticListener.WritePublishError(operationId, message, result.Exception); | |||
_logger.MessagePublishException(message.Id, result.Exception); | |||
await SetFailedState(message, result.Exception, out bool stillRetry); | |||
@@ -4,6 +4,7 @@ | |||
using System; | |||
using System.Diagnostics; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Diagnostics; | |||
using DotNetCore.CAP.Infrastructure; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.Models; | |||
@@ -144,7 +145,10 @@ namespace DotNetCore.CAP | |||
throw new SubscriberNotFoundException(error); | |||
} | |||
Guid operationId = default(Guid); | |||
var startTime = DateTimeOffset.UtcNow; | |||
var stopwatch = Stopwatch.StartNew(); | |||
var operationId = Guid.Empty; | |||
var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext()); | |||
try | |||
@@ -153,7 +157,7 @@ namespace DotNetCore.CAP | |||
var ret = await Invoker.InvokeAsync(consumerContext); | |||
s_diagnosticListener.WriteConsumerInvokeAfter(operationId,consumerContext); | |||
s_diagnosticListener.WriteConsumerInvokeAfter(operationId, consumerContext, startTime, stopwatch.Elapsed); | |||
if (!string.IsNullOrEmpty(ret.CallbackName)) | |||
{ | |||
@@ -162,7 +166,7 @@ namespace DotNetCore.CAP | |||
} | |||
catch (Exception ex) | |||
{ | |||
s_diagnosticListener.WriteConsumerInvokeError(operationId, consumerContext, ex); | |||
s_diagnosticListener.WriteConsumerInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed); | |||
throw new SubscriberExecutionFailedException(ex.Message, ex); | |||
} | |||