diff --git a/src/DotNetCore.CAP/Internal/DefaultModelBinder.cs b/src/DotNetCore.CAP/Internal/DefaultModelBinder.cs index d162ccc..3d74047 100644 --- a/src/DotNetCore.CAP/Internal/DefaultModelBinder.cs +++ b/src/DotNetCore.CAP/Internal/DefaultModelBinder.cs @@ -3,7 +3,7 @@ using System.Linq.Expressions; using System.Reflection; using System.Threading.Tasks; using DotNetCore.CAP.Abstractions.ModelBinding; -using Newtonsoft.Json; +using DotNetCore.CAP.Infrastructure; namespace DotNetCore.CAP.Internal { @@ -17,8 +17,8 @@ namespace DotNetCore.CAP.Internal { bindingContext.Model = CreateModel(bindingContext); } - - bindingContext.Result = JsonConvert.DeserializeObject(bindingContext.Values, bindingContext.ModelType); + + bindingContext.Result = Helper.FromJson(bindingContext.Values, bindingContext.ModelType); return Task.CompletedTask; } diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs index 7d9c91e..921f3fd 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs @@ -11,9 +11,9 @@ namespace DotNetCore.CAP.Internal { protected readonly ILogger _logger; protected readonly IServiceProvider _serviceProvider; + protected readonly ConsumerContext _consumerContext; private readonly IModelBinder _modelBinder; private readonly ObjectMethodExecutor _executor; - protected readonly ConsumerContext _consumerContext; public DefaultConsumerInvoker(ILogger logger, IServiceProvider serviceProvider, @@ -30,42 +30,29 @@ namespace DotNetCore.CAP.Internal public Task InvokeAsync() { - try + using (_logger.BeginScope("consumer invoker begin")) { - using (_logger.BeginScope("consumer invoker begin")) - { - _logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.Attribute); + _logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name); - try - { - var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType()); + var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType()); - var value = _consumerContext.DeliverMessage.Content; + var value = _consumerContext.DeliverMessage.Content; - if (_executor.MethodParameters.Length > 0) - { - var firstParameter = _executor.MethodParameters[0]; + if (_executor.MethodParameters.Length > 0) + { + var firstParameter = _executor.MethodParameters[0]; - var bindingContext = ModelBindingContext.CreateBindingContext(value, - firstParameter.Name, firstParameter.ParameterType); + var bindingContext = ModelBindingContext.CreateBindingContext(value, + firstParameter.Name, firstParameter.ParameterType); - _modelBinder.BindModelAsync(bindingContext); - _executor.Execute(obj, bindingContext.Result); - } - else - { - _executor.Execute(obj); - } - return Task.CompletedTask; - } - finally - { - _logger.LogDebug("Executed consumer method ."); - } + _modelBinder.BindModelAsync(bindingContext); + _executor.Execute(obj, bindingContext.Result); } - } - finally - { + else + { + _executor.Execute(obj); + } + return Task.CompletedTask; } } } diff --git a/src/DotNetCore.CAP/Job/IJob.CapJob.cs b/src/DotNetCore.CAP/Job/IJob.CapJob.cs index 8c06a66..aa39683 100644 --- a/src/DotNetCore.CAP/Job/IJob.CapJob.cs +++ b/src/DotNetCore.CAP/Job/IJob.CapJob.cs @@ -41,15 +41,23 @@ namespace DotNetCore.CAP.Job var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted(); if (nextReceivedMessage != null) { - var executeDescriptor = matchs[nextReceivedMessage.KeyName]; - var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage); - var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); + try + { + var executeDescriptor = matchs[nextReceivedMessage.KeyName]; + var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage); + var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); - await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing); + await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing); - await invoker.InvokeAsync(); + await invoker.InvokeAsync(); + + await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Succeeded); + } + catch (Exception ex) + { + _logger.ReceivedMessageRetryExecutingFailed(nextReceivedMessage.KeyName, ex); + } - await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage,StatusName.Succeeded); } } } diff --git a/src/DotNetCore.CAP/LoggerExtensions.cs b/src/DotNetCore.CAP/LoggerExtensions.cs index 9cff8d7..bbdbd2c 100644 --- a/src/DotNetCore.CAP/LoggerExtensions.cs +++ b/src/DotNetCore.CAP/LoggerExtensions.cs @@ -21,6 +21,7 @@ namespace DotNetCore.CAP private static Action _enqueuingSentMessage; private static Action _enqueuingReceivdeMessage; private static Action _executingConsumerMethod; + private static Action _receivedMessageRetryExecuting; static LoggerExtensions() { @@ -78,6 +79,11 @@ namespace DotNetCore.CAP LogLevel.Error, 5, "Consumer method '{methodName}' failed to execute."); + + _receivedMessageRetryExecuting = LoggerMessage.Define( + LogLevel.Error, + 5, + "Received message topic method '{topicName}' failed to execute."); } public static void ConsumerMethodExecutingFailed(this ILogger logger, string methodName, Exception ex) @@ -85,6 +91,11 @@ namespace DotNetCore.CAP _executingConsumerMethod(logger, methodName, ex); } + public static void ReceivedMessageRetryExecutingFailed(this ILogger logger, string topicName, Exception ex) + { + _receivedMessageRetryExecuting(logger, topicName, ex); + } + public static void EnqueuingReceivedMessage(this ILogger logger, string nameKey, string content) { _enqueuingReceivdeMessage(logger, nameKey, content, null);