From 2f4b65b457d2d8608142863c2685c5256d826e11 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Wed, 4 Dec 2019 17:58:20 +0800 Subject: [PATCH] Refactoring --- src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs | 1 - src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs | 4 +++- src/DotNetCore.CAP/Internal/LoggerExtensions.cs | 7 ++++++- src/DotNetCore.CAP/Processor/IDispatcher.Default.cs | 3 ++- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index 26547a0..eeecd80 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Text; using System.Threading; using Confluent.Kafka; diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index cd7a648..d5e2fa2 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -151,6 +151,8 @@ namespace DotNetCore.CAP.Internal { client.OnMessageReceived += async (sender, transportMessage) => { + _logger.MessageReceived(transportMessage.GetId(), transportMessage.GetName()); + _cts.Token.ThrowIfCancellationRequested(); long? tracingTimestamp = null; try @@ -189,7 +191,7 @@ namespace DotNetCore.CAP.Internal { var content = StringSerializer.Serialize(message); await _storage.StoreReceivedExceptionMessageAsync(name, group, content); - + client.Commit(); TracingAfter(tracingTimestamp, transportMessage, _serverAddress); diff --git a/src/DotNetCore.CAP/Internal/LoggerExtensions.cs b/src/DotNetCore.CAP/Internal/LoggerExtensions.cs index 025c207..e3722c4 100644 --- a/src/DotNetCore.CAP/Internal/LoggerExtensions.cs +++ b/src/DotNetCore.CAP/Internal/LoggerExtensions.cs @@ -40,7 +40,12 @@ namespace DotNetCore.CAP.Internal logger.LogDebug($"Message published. name: {name}, content:{content}."); } - public static void MessagePublishException(this ILogger logger, long messageId, string reason, Exception ex) + public static void MessageReceived(this ILogger logger, string messageId, string name) + { + logger.LogDebug($"Received message. id:{messageId}, name: {name}"); + } + + public static void MessagePublishException(this ILogger logger, string messageId, string reason, Exception ex) { logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}"); } diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs index f75db71..b404f64 100644 --- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs @@ -6,6 +6,7 @@ using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Transport; using Microsoft.Extensions.Logging; @@ -67,7 +68,7 @@ namespace DotNetCore.CAP.Processor var result = await _sender.SendAsync(message); if (!result.Succeeded) { - _logger.LogWarning(result.Exception, "Message send failed! -->" + result); + _logger.MessagePublishException(message.Origin.GetId(),result.ToString(),result.Exception); } } catch (Exception ex)