Savorboard před 7 roky
rodič
revize
81171e00e0
2 změnil soubory, kde provedl 76 přidání a 41 odebrání
  1. +3
    -5
      src/DotNetCore.CAP/ICallbackPublisher.cs
  2. +73
    -36
      src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs

+ 3
- 5
src/DotNetCore.CAP/ICallbackPublisher.cs Zobrazit soubor

@@ -1,12 +1,10 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP
{
public interface ICallbackPublisher
{
Task PublishAsync(string name, object obj);
Task PublishAsync(CapPublishedMessage obj);
}
}

+ 73
- 36
src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs Zobrazit soubor

@@ -23,64 +23,101 @@ namespace DotNetCore.CAP.Internal
{
_modelBinderFactory = modelBinderFactory;
_serviceProvider = serviceProvider;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_logger = logger;
_consumerContext = consumerContext;

_consumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext));
_executor = ObjectMethodExecutor.Create(_consumerContext.ConsumerDescriptor.MethodInfo,
_consumerContext.ConsumerDescriptor.ImplTypeInfo);
}

public async Task InvokeAsync()
{
using (_logger.BeginScope("consumer invoker begin"))
{
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name);
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name);

var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider,
_consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider,
_consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());

var jsonConent = _consumerContext.DeliverMessage.Content;
var jsonConent = _consumerContext.DeliverMessage.Content;
var message = Helper.FromJson<Message>(jsonConent);

var message = Helper.FromJson<Message>(jsonConent);
object result = null;
if (_executor.MethodParameters.Length > 0)
{
result = await ExecuteWithParameterAsync(obj, message.Content.ToString());
}
else
{
result = await ExecuteAsync(obj);
}

object returnObj = null;
if (_executor.MethodParameters.Length > 0)
if (!string.IsNullOrEmpty(message.CallbackName))
{
await SentCallbackMessage(message.Id, message.CallbackName, result);
}
}

private async Task<object> ExecuteAsync(object @class)
{
if (_executor.IsMethodAsync)
{
return await _executor.ExecuteAsync(@class);
}
else
{
return _executor.Execute(@class);
}
}

private async Task<object> ExecuteWithParameterAsync(object @class, string parameterString)
{
var firstParameter = _executor.MethodParameters[0];
try
{
var binder = _modelBinderFactory.CreateBinder(firstParameter);
var bindResult = await binder.BindModelAsync(parameterString);
if (bindResult.IsSuccess)
{
var firstParameter = _executor.MethodParameters[0];
try
if (_executor.IsMethodAsync)
{
var binder = _modelBinderFactory.CreateBinder(firstParameter);
var result = await binder.BindModelAsync(message.Content.ToString());
if (result.IsSuccess)
{
returnObj = _executor.Execute(obj, result.Model);
}
else
{
_logger.LogWarning($"Parameters:{firstParameter.Name} bind failed! the content is:" + jsonConent);
}
return await _executor.ExecuteAsync(@class, bindResult.Model);
}
catch (FormatException ex)
else
{
_logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, jsonConent, ex);
return _executor.Execute(@class, bindResult.Model);
}
}
else
{
returnObj = _executor.Execute(obj);
throw new MethodBindException($"Parameters:{firstParameter.Name} bind failed! ParameterString is: {parameterString} ");
}
}
catch (FormatException ex)
{
_logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, parameterString, ex);
return null;
}
}

//TODO :refactor
if (returnObj != null && !string.IsNullOrEmpty(message.CallbackName))
private async Task SentCallbackMessage(string messageId, string topicName, object bodyObj)
{
var callbackMessage = new Message
{
Id = messageId,
Content = bodyObj
};

using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
var publisher = provider.GetRequiredService<ICallbackPublisher>();

var publishedMessage = new CapPublishedMessage
{
var publisher = _serviceProvider.GetRequiredService<ICallbackPublisher>();
var callbackMessage = new Message(returnObj)
{
Id = message.Id,
Timestamp = DateTime.Now
};
await publisher.PublishAsync(message.CallbackName, callbackMessage);
}
Name = topicName,
Content = Helper.ToJson(callbackMessage),
StatusName = StatusName.Scheduled
};
await publisher.PublishAsync(publishedMessage);
}
}
}

Načítá se…
Zrušit
Uložit