diff --git a/src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs b/src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs new file mode 100644 index 0000000..dbf1b9d --- /dev/null +++ b/src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs @@ -0,0 +1,54 @@ +using System; +using System.Threading.Tasks; +using DotNetCore.CAP.Abstractions; +using DotNetCore.CAP.Models; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP.Internal +{ + public class DefaultSubscriberExecutor : ISubscriberExecutor + { + private readonly IConsumerInvokerFactory _consumerInvokerFactory; + private readonly ILogger _logger; + private readonly MethodMatcherCache _selector; + + public DefaultSubscriberExecutor(MethodMatcherCache selector, + IConsumerInvokerFactory consumerInvokerFactory, + ILogger logger) + { + _selector = selector; + _consumerInvokerFactory = consumerInvokerFactory; + _logger = logger; + } + + + public async Task ExecuteAsync(CapReceivedMessage receivedMessage) + { + try + { + var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.Name); + + if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group)) + { + var error = $"Topic:{receivedMessage.Name}, can not be found subscriber method."; + throw new SubscriberNotFoundException(error); + } + + // If there are multiple consumers in the same group, we will take the first + var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; + var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext()); + + await _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); + + return OperateResult.Success; + } + catch (Exception ex) + { + _logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.Name}", + ex); + + return OperateResult.Failed(ex); + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/ISubscriberExecutor.cs b/src/DotNetCore.CAP/Internal/ISubscriberExecutor.cs new file mode 100644 index 0000000..cdfe5dc --- /dev/null +++ b/src/DotNetCore.CAP/Internal/ISubscriberExecutor.cs @@ -0,0 +1,10 @@ +using System.Threading.Tasks; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.Internal +{ + public interface ISubscriberExecutor + { + Task ExecuteAsync(CapReceivedMessage receivedMessage); + } +}