@@ -3,7 +3,6 @@ | |||||
using System; | using System; | ||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Linq; | |||||
using System.Text; | using System.Text; | ||||
using System.Threading; | using System.Threading; | ||||
using Confluent.Kafka; | using Confluent.Kafka; | ||||
@@ -151,6 +151,8 @@ namespace DotNetCore.CAP.Internal | |||||
{ | { | ||||
client.OnMessageReceived += async (sender, transportMessage) => | client.OnMessageReceived += async (sender, transportMessage) => | ||||
{ | { | ||||
_logger.MessageReceived(transportMessage.GetId(), transportMessage.GetName()); | |||||
_cts.Token.ThrowIfCancellationRequested(); | _cts.Token.ThrowIfCancellationRequested(); | ||||
long? tracingTimestamp = null; | long? tracingTimestamp = null; | ||||
try | try | ||||
@@ -189,7 +191,7 @@ namespace DotNetCore.CAP.Internal | |||||
{ | { | ||||
var content = StringSerializer.Serialize(message); | var content = StringSerializer.Serialize(message); | ||||
await _storage.StoreReceivedExceptionMessageAsync(name, group, content); | await _storage.StoreReceivedExceptionMessageAsync(name, group, content); | ||||
client.Commit(); | client.Commit(); | ||||
TracingAfter(tracingTimestamp, transportMessage, _serverAddress); | TracingAfter(tracingTimestamp, transportMessage, _serverAddress); | ||||
@@ -40,7 +40,12 @@ namespace DotNetCore.CAP.Internal | |||||
logger.LogDebug($"Message published. name: {name}, content:{content}."); | logger.LogDebug($"Message published. name: {name}, content:{content}."); | ||||
} | } | ||||
public static void MessagePublishException(this ILogger logger, long messageId, string reason, Exception ex) | |||||
public static void MessageReceived(this ILogger logger, string messageId, string name) | |||||
{ | |||||
logger.LogDebug($"Received message. id:{messageId}, name: {name}"); | |||||
} | |||||
public static void MessagePublishException(this ILogger logger, string messageId, string reason, Exception ex) | |||||
{ | { | ||||
logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}"); | logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}"); | ||||
} | } | ||||
@@ -6,6 +6,7 @@ using System.Collections.Concurrent; | |||||
using System.Threading; | using System.Threading; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using DotNetCore.CAP.Internal; | using DotNetCore.CAP.Internal; | ||||
using DotNetCore.CAP.Messages; | |||||
using DotNetCore.CAP.Persistence; | using DotNetCore.CAP.Persistence; | ||||
using DotNetCore.CAP.Transport; | using DotNetCore.CAP.Transport; | ||||
using Microsoft.Extensions.Logging; | using Microsoft.Extensions.Logging; | ||||
@@ -67,7 +68,7 @@ namespace DotNetCore.CAP.Processor | |||||
var result = await _sender.SendAsync(message); | var result = await _sender.SendAsync(message); | ||||
if (!result.Succeeded) | if (!result.Succeeded) | ||||
{ | { | ||||
_logger.LogWarning(result.Exception, "Message send failed! -->" + result); | |||||
_logger.MessagePublishException(message.Origin.GetId(),result.ToString(),result.Exception); | |||||
} | } | ||||
} | } | ||||
catch (Exception ex) | catch (Exception ex) | ||||