From 81171e00e02f1df42728444e5f75789ded728d46 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Tue, 15 Aug 2017 22:21:37 +0800 Subject: [PATCH] refactor --- src/DotNetCore.CAP/ICallbackPublisher.cs | 8 +- .../Internal/IConsumerInvoker.Default.cs | 109 ++++++++++++------ 2 files changed, 76 insertions(+), 41 deletions(-) diff --git a/src/DotNetCore.CAP/ICallbackPublisher.cs b/src/DotNetCore.CAP/ICallbackPublisher.cs index d0a0456..1743b52 100644 --- a/src/DotNetCore.CAP/ICallbackPublisher.cs +++ b/src/DotNetCore.CAP/ICallbackPublisher.cs @@ -1,12 +1,10 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; +using System.Threading.Tasks; +using DotNetCore.CAP.Models; namespace DotNetCore.CAP { public interface ICallbackPublisher { - Task PublishAsync(string name, object obj); + Task PublishAsync(CapPublishedMessage obj); } } diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs index d164a0f..9acca77 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs @@ -23,64 +23,101 @@ namespace DotNetCore.CAP.Internal { _modelBinderFactory = modelBinderFactory; _serviceProvider = serviceProvider; - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _logger = logger; + _consumerContext = consumerContext; - _consumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext)); _executor = ObjectMethodExecutor.Create(_consumerContext.ConsumerDescriptor.MethodInfo, _consumerContext.ConsumerDescriptor.ImplTypeInfo); } public async Task InvokeAsync() { - using (_logger.BeginScope("consumer invoker begin")) - { - _logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name); + _logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name); - var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, - _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType()); + var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, + _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType()); - var jsonConent = _consumerContext.DeliverMessage.Content; + var jsonConent = _consumerContext.DeliverMessage.Content; + var message = Helper.FromJson(jsonConent); - var message = Helper.FromJson(jsonConent); + object result = null; + if (_executor.MethodParameters.Length > 0) + { + result = await ExecuteWithParameterAsync(obj, message.Content.ToString()); + } + else + { + result = await ExecuteAsync(obj); + } - object returnObj = null; - if (_executor.MethodParameters.Length > 0) + if (!string.IsNullOrEmpty(message.CallbackName)) + { + await SentCallbackMessage(message.Id, message.CallbackName, result); + } + } + + private async Task ExecuteAsync(object @class) + { + if (_executor.IsMethodAsync) + { + return await _executor.ExecuteAsync(@class); + } + else + { + return _executor.Execute(@class); + } + } + + private async Task ExecuteWithParameterAsync(object @class, string parameterString) + { + var firstParameter = _executor.MethodParameters[0]; + try + { + var binder = _modelBinderFactory.CreateBinder(firstParameter); + var bindResult = await binder.BindModelAsync(parameterString); + if (bindResult.IsSuccess) { - var firstParameter = _executor.MethodParameters[0]; - try + if (_executor.IsMethodAsync) { - var binder = _modelBinderFactory.CreateBinder(firstParameter); - var result = await binder.BindModelAsync(message.Content.ToString()); - if (result.IsSuccess) - { - returnObj = _executor.Execute(obj, result.Model); - } - else - { - _logger.LogWarning($"Parameters:{firstParameter.Name} bind failed! the content is:" + jsonConent); - } + return await _executor.ExecuteAsync(@class, bindResult.Model); } - catch (FormatException ex) + else { - _logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, jsonConent, ex); + return _executor.Execute(@class, bindResult.Model); } } else { - returnObj = _executor.Execute(obj); + throw new MethodBindException($"Parameters:{firstParameter.Name} bind failed! ParameterString is: {parameterString} "); } + } + catch (FormatException ex) + { + _logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, parameterString, ex); + return null; + } + } - //TODO :refactor - if (returnObj != null && !string.IsNullOrEmpty(message.CallbackName)) + private async Task SentCallbackMessage(string messageId, string topicName, object bodyObj) + { + var callbackMessage = new Message + { + Id = messageId, + Content = bodyObj + }; + + using (var scope = _serviceProvider.CreateScope()) + { + var provider = scope.ServiceProvider; + var publisher = provider.GetRequiredService(); + + var publishedMessage = new CapPublishedMessage { - var publisher = _serviceProvider.GetRequiredService(); - var callbackMessage = new Message(returnObj) - { - Id = message.Id, - Timestamp = DateTime.Now - }; - await publisher.PublishAsync(message.CallbackName, callbackMessage); - } + Name = topicName, + Content = Helper.ToJson(callbackMessage), + StatusName = StatusName.Scheduled + }; + await publisher.PublishAsync(publishedMessage); } } }