diff --git a/src/DotNetCore.CAP.Dashboard/DashboardRoutes.cs b/src/DotNetCore.CAP.Dashboard/DashboardRoutes.cs index 86b04aa..2f6b6da 100644 --- a/src/DotNetCore.CAP.Dashboard/DashboardRoutes.cs +++ b/src/DotNetCore.CAP.Dashboard/DashboardRoutes.cs @@ -111,7 +111,7 @@ namespace DotNetCore.CAP.Dashboard { var msg = client.Storage.GetMonitoringApi().GetReceivedMessageAsync(messageId) .GetAwaiter().GetResult(); - client.RequestServices.GetService().ExecuteAsync(msg); + client.RequestServices.GetService().DispatchAsync(msg); }); Routes.AddRazorPage( diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs index bd59c21..d06378d 100644 --- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs +++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs @@ -39,7 +39,7 @@ namespace Microsoft.Extensions.DependencyInjection services.TryAddSingleton(); services.TryAddSingleton(); - services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); @@ -60,7 +60,7 @@ namespace Microsoft.Extensions.DependencyInjection services.TryAddSingleton(); // Warning: IPublishMessageSender need to inject at extension project. - services.TryAddSingleton(); + services.TryAddSingleton(); //Options and extension service var options = new CapOptions(); diff --git a/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs b/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs index 689e608..3979394 100644 --- a/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs +++ b/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs @@ -6,7 +6,7 @@ using Microsoft.Extensions.Logging; namespace DotNetCore.CAP.Internal { - internal class ConsumerInvokerFactory : IConsumerInvokerFactory + internal class ConsumerInvokerFactory : ISubscribeInvokerFactory { private readonly ILoggerFactory _loggerFactory; private readonly IServiceProvider _serviceProvider; @@ -19,9 +19,9 @@ namespace DotNetCore.CAP.Internal _serviceProvider = serviceProvider; } - public IConsumerInvoker CreateInvoker() + public ISubscribeInvoker CreateInvoker() { - return new DefaultConsumerInvoker(_loggerFactory, _serviceProvider); + return new DefaultSubscribeInvoker(_loggerFactory, _serviceProvider); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs similarity index 93% rename from src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs rename to src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs index 4e7d9e3..76ebe25 100644 --- a/src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs @@ -17,7 +17,7 @@ using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Internal { - internal class DefaultSubscriberExecutor : ISubscriberExecutor + internal class DefaultSubscribeDispatcher : ISubscribeDispatcher { private readonly IDataStorage _dataStorage; private readonly ILogger _logger; @@ -29,8 +29,8 @@ namespace DotNetCore.CAP.Internal private static readonly DiagnosticListener s_diagnosticListener = new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName); - public DefaultSubscriberExecutor( - ILogger logger, + public DefaultSubscribeDispatcher( + ILogger logger, IOptions options, IServiceProvider provider) { @@ -39,12 +39,12 @@ namespace DotNetCore.CAP.Internal _options = options.Value; _dataStorage = _provider.GetService(); - Invoker = _provider.GetService().CreateInvoker(); + Invoker = _provider.GetService().CreateInvoker(); } - private IConsumerInvoker Invoker { get; } + private ISubscribeInvoker Invoker { get; } - public Task ExecuteAsync(MediumMessage message, CancellationToken cancellationToken) + public Task DispatchAsync(MediumMessage message, CancellationToken cancellationToken) { var selector = _provider.GetService(); if (!selector.TryGetTopicExecutor(message.Origin.GetName(), message.Origin.GetGroup(), out var executor)) @@ -58,10 +58,10 @@ namespace DotNetCore.CAP.Internal return Task.FromResult(OperateResult.Failed(new SubscriberNotFoundException(error))); } - return ExecuteAsync(message, executor, cancellationToken); + return DispatchAsync(message, executor, cancellationToken); } - public async Task ExecuteAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken) + public async Task DispatchAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken) { bool retry; OperateResult result; diff --git a/src/DotNetCore.CAP/Internal/ISubscriberExecutor.cs b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.cs similarity index 53% rename from src/DotNetCore.CAP/Internal/ISubscriberExecutor.cs rename to src/DotNetCore.CAP/Internal/ISubscribeDispatcher.cs index b1ba89b..8e1b698 100644 --- a/src/DotNetCore.CAP/Internal/ISubscriberExecutor.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.cs @@ -10,10 +10,10 @@ namespace DotNetCore.CAP.Internal /// /// Consumer executor /// - public interface ISubscriberExecutor + public interface ISubscribeDispatcher { - Task ExecuteAsync(MediumMessage message, CancellationToken cancellationToken = default); + Task DispatchAsync(MediumMessage message, CancellationToken cancellationToken = default); - Task ExecuteAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken = default); + Task DispatchAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs similarity index 90% rename from src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs rename to src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs index b87dd06..6a0b155 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs @@ -1,84 +1,84 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using DotNetCore.CAP.Messages; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Internal; -using Microsoft.Extensions.Logging; - -namespace DotNetCore.CAP.Internal -{ - internal class DefaultConsumerInvoker : IConsumerInvoker - { - private readonly ILogger _logger; - private readonly IServiceProvider _serviceProvider; - - public DefaultConsumerInvoker(ILoggerFactory loggerFactory, IServiceProvider serviceProvider) - { - _serviceProvider = serviceProvider; - _logger = loggerFactory.CreateLogger(); - } - - public async Task InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default) - { - cancellationToken.ThrowIfCancellationRequested(); - - _logger.LogDebug("Executing subscriber method : {0}", context.ConsumerDescriptor.MethodInfo.Name); - - var executor = ObjectMethodExecutor.Create( - context.ConsumerDescriptor.MethodInfo, - context.ConsumerDescriptor.ImplTypeInfo); - - using (var scope = _serviceProvider.CreateScope()) - { - var provider = scope.ServiceProvider; - var srvType = context.ConsumerDescriptor.ServiceTypeInfo?.AsType(); - var implType = context.ConsumerDescriptor.ImplTypeInfo.AsType(); - - object obj = null; - - if (srvType != null) - { - obj = provider.GetServices(srvType).FirstOrDefault(o => o.GetType() == implType); - } - - if (obj == null) - { - obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, implType); - } - - var message = context.DeliverMessage; - var parameterDescriptors = context.ConsumerDescriptor.Parameters; - var executeParameters = new object[parameterDescriptors.Count]; - for (var i = 0; i < parameterDescriptors.Count; i++) - { - if (parameterDescriptors[i].IsFromCap) - { - executeParameters[i] = new CapHeader(message.Headers); - } - else - { - executeParameters[i] = message.Value; - } - } - - var resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters); - return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName()); - } - } - - private async Task ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object[] parameter) - { - if (executor.IsMethodAsync) - { - return await executor.ExecuteAsync(@class, parameter); - } - - return executor.Execute(@class, parameter); - } - } +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Messages; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Internal; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP.Internal +{ + internal class DefaultSubscribeInvoker : ISubscribeInvoker + { + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + + public DefaultSubscribeInvoker(ILoggerFactory loggerFactory, IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider; + _logger = loggerFactory.CreateLogger(); + } + + public async Task InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + _logger.LogDebug("Executing subscriber method : {0}", context.ConsumerDescriptor.MethodInfo.Name); + + var executor = ObjectMethodExecutor.Create( + context.ConsumerDescriptor.MethodInfo, + context.ConsumerDescriptor.ImplTypeInfo); + + using (var scope = _serviceProvider.CreateScope()) + { + var provider = scope.ServiceProvider; + var srvType = context.ConsumerDescriptor.ServiceTypeInfo?.AsType(); + var implType = context.ConsumerDescriptor.ImplTypeInfo.AsType(); + + object obj = null; + + if (srvType != null) + { + obj = provider.GetServices(srvType).FirstOrDefault(o => o.GetType() == implType); + } + + if (obj == null) + { + obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, implType); + } + + var message = context.DeliverMessage; + var parameterDescriptors = context.ConsumerDescriptor.Parameters; + var executeParameters = new object[parameterDescriptors.Count]; + for (var i = 0; i < parameterDescriptors.Count; i++) + { + if (parameterDescriptors[i].IsFromCap) + { + executeParameters[i] = new CapHeader(message.Headers); + } + else + { + executeParameters[i] = message.Value; + } + } + + var resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters); + return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName()); + } + } + + private async Task ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object[] parameter) + { + if (executor.IsMethodAsync) + { + return await executor.ExecuteAsync(@class, parameter); + } + + return executor.Execute(@class, parameter); + } + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.cs b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs similarity index 86% rename from src/DotNetCore.CAP/Internal/IConsumerInvoker.cs rename to src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs index 2dbf34c..d6acf45 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs @@ -9,10 +9,10 @@ namespace DotNetCore.CAP.Internal /// /// Perform user definition method of consumers. /// - internal interface IConsumerInvoker + internal interface ISubscribeInvoker { /// - /// Invoke consumer method whit consumer context. + /// Invoke subscribe method with the consumer context. /// /// consumer execute context /// The object of . diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvokerFactory.cs b/src/DotNetCore.CAP/Internal/ISubscribeInvokerFactory.cs similarity index 69% rename from src/DotNetCore.CAP/Internal/IConsumerInvokerFactory.cs rename to src/DotNetCore.CAP/Internal/ISubscribeInvokerFactory.cs index 9fd7e74..c8a582d 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerInvokerFactory.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeInvokerFactory.cs @@ -3,8 +3,8 @@ namespace DotNetCore.CAP.Internal { - internal interface IConsumerInvokerFactory + internal interface ISubscribeInvokerFactory { - IConsumerInvoker CreateInvoker(); + ISubscribeInvoker CreateInvoker(); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs index 4150b30..f75db71 100644 --- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs @@ -16,7 +16,7 @@ namespace DotNetCore.CAP.Processor { private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private readonly IMessageSender _sender; - private readonly ISubscriberExecutor _executor; + private readonly ISubscribeDispatcher _executor; private readonly ILogger _logger; private readonly BlockingCollection _publishedMessageQueue = @@ -27,7 +27,7 @@ namespace DotNetCore.CAP.Processor public Dispatcher(ILogger logger, IMessageSender sender, - ISubscriberExecutor executor) + ISubscribeDispatcher executor) { _logger = logger; _sender = sender; @@ -90,7 +90,7 @@ namespace DotNetCore.CAP.Processor { foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token)) { - _executor.ExecuteAsync(message.Item1, message.Item2, _cts.Token); + _executor.DispatchAsync(message.Item1, message.Item2, _cts.Token); } } catch (OperationCanceledException) diff --git a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs index b900e80..005b010 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs @@ -18,17 +18,17 @@ namespace DotNetCore.CAP.Processor private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); private readonly ILogger _logger; private readonly IMessageSender _messageSender; - private readonly ISubscriberExecutor _subscriberExecutor; + private readonly ISubscribeDispatcher _subscribeDispatcher; private readonly TimeSpan _waitingInterval; public MessageNeedToRetryProcessor( IOptions options, ILogger logger, - ISubscriberExecutor subscriberExecutor, + ISubscribeDispatcher subscribeDispatcher, IMessageSender messageSender) { _logger = logger; - _subscriberExecutor = subscriberExecutor; + _subscribeDispatcher = subscribeDispatcher; _messageSender = messageSender; _waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval); } @@ -69,7 +69,7 @@ namespace DotNetCore.CAP.Processor foreach (var message in messages) { - await _subscriberExecutor.ExecuteAsync(message); + await _subscribeDispatcher.DispatchAsync(message); await context.WaitAsync(_delay); } diff --git a/test/DotNetCore.CAP.Test/ConsumerInvokerTest.cs b/test/DotNetCore.CAP.Test/ConsumerInvokerTest.cs index aa14448..2c69d20 100644 --- a/test/DotNetCore.CAP.Test/ConsumerInvokerTest.cs +++ b/test/DotNetCore.CAP.Test/ConsumerInvokerTest.cs @@ -17,12 +17,11 @@ namespace DotNetCore.CAP.Test { var serviceCollection = new ServiceCollection(); serviceCollection.AddLogging(); - serviceCollection.AddSingleton(); - serviceCollection.AddTransient(); + serviceCollection.AddSingleton(); _serviceProvider = serviceCollection.BuildServiceProvider(); } - private IConsumerInvoker ConsumerInvoker => _serviceProvider.GetService(); + private ISubscribeInvoker SubscribeInvoker => _serviceProvider.GetService(); [Fact] public async Task InvokeTest() @@ -40,7 +39,7 @@ namespace DotNetCore.CAP.Test var message = new Message(header, null); var context = new ConsumerContext(descriptor, message); - var ret = await ConsumerInvoker.InvokeAsync(context); + var ret = await SubscribeInvoker.InvokeAsync(context); Assert.Equal(int.MaxValue, ret.Result); } }