Savorboard 5 роки тому
джерело
коміт
5d7c779654
11 змінених файлів з 114 додано та 115 видалено
  1. +1
    -1
      src/DotNetCore.CAP.Dashboard/DashboardRoutes.cs
  2. +2
    -2
      src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
  3. +3
    -3
      src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs
  4. +8
    -8
      src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs
  5. +3
    -3
      src/DotNetCore.CAP/Internal/ISubscribeDispatcher.cs
  6. +83
    -83
      src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs
  7. +2
    -2
      src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs
  8. +2
    -2
      src/DotNetCore.CAP/Internal/ISubscribeInvokerFactory.cs
  9. +3
    -3
      src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
  10. +4
    -4
      src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
  11. +3
    -4
      test/DotNetCore.CAP.Test/ConsumerInvokerTest.cs

+ 1
- 1
src/DotNetCore.CAP.Dashboard/DashboardRoutes.cs Переглянути файл

@@ -111,7 +111,7 @@ namespace DotNetCore.CAP.Dashboard
{
var msg = client.Storage.GetMonitoringApi().GetReceivedMessageAsync(messageId)
.GetAwaiter().GetResult();
client.RequestServices.GetService<ISubscriberExecutor>().ExecuteAsync(msg);
client.RequestServices.GetService<ISubscribeDispatcher>().DispatchAsync(msg);
});

Routes.AddRazorPage(


+ 2
- 2
src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs Переглянути файл

@@ -39,7 +39,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<ICapPublisher, CapPublisher>();

services.TryAddSingleton<IConsumerServiceSelector, DefaultConsumerServiceSelector>();
services.TryAddSingleton<IConsumerInvokerFactory, ConsumerInvokerFactory>();
services.TryAddSingleton<ISubscribeInvokerFactory, ConsumerInvokerFactory>();
services.TryAddSingleton<MethodMatcherCache>();

services.TryAddSingleton<IConsumerRegister, ConsumerRegister>();
@@ -60,7 +60,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<ISerializer, JsonUtf8Serializer>();

// Warning: IPublishMessageSender need to inject at extension project.
services.TryAddSingleton<ISubscriberExecutor, DefaultSubscriberExecutor>();
services.TryAddSingleton<ISubscribeDispatcher, DefaultSubscribeDispatcher>();

//Options and extension service
var options = new CapOptions();


+ 3
- 3
src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs Переглянути файл

@@ -6,7 +6,7 @@ using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.Internal
{
internal class ConsumerInvokerFactory : IConsumerInvokerFactory
internal class ConsumerInvokerFactory : ISubscribeInvokerFactory
{
private readonly ILoggerFactory _loggerFactory;
private readonly IServiceProvider _serviceProvider;
@@ -19,9 +19,9 @@ namespace DotNetCore.CAP.Internal
_serviceProvider = serviceProvider;
}

public IConsumerInvoker CreateInvoker()
public ISubscribeInvoker CreateInvoker()
{
return new DefaultConsumerInvoker(_loggerFactory, _serviceProvider);
return new DefaultSubscribeInvoker(_loggerFactory, _serviceProvider);
}
}
}

src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs → src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs Переглянути файл

@@ -17,7 +17,7 @@ using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Internal
{
internal class DefaultSubscriberExecutor : ISubscriberExecutor
internal class DefaultSubscribeDispatcher : ISubscribeDispatcher
{
private readonly IDataStorage _dataStorage;
private readonly ILogger _logger;
@@ -29,8 +29,8 @@ namespace DotNetCore.CAP.Internal
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName);

public DefaultSubscriberExecutor(
ILogger<DefaultSubscriberExecutor> logger,
public DefaultSubscribeDispatcher(
ILogger<DefaultSubscribeDispatcher> logger,
IOptions<CapOptions> options,
IServiceProvider provider)
{
@@ -39,12 +39,12 @@ namespace DotNetCore.CAP.Internal
_options = options.Value;

_dataStorage = _provider.GetService<IDataStorage>();
Invoker = _provider.GetService<IConsumerInvokerFactory>().CreateInvoker();
Invoker = _provider.GetService<ISubscribeInvokerFactory>().CreateInvoker();
}

private IConsumerInvoker Invoker { get; }
private ISubscribeInvoker Invoker { get; }

public Task<OperateResult> ExecuteAsync(MediumMessage message, CancellationToken cancellationToken)
public Task<OperateResult> DispatchAsync(MediumMessage message, CancellationToken cancellationToken)
{
var selector = _provider.GetService<MethodMatcherCache>();
if (!selector.TryGetTopicExecutor(message.Origin.GetName(), message.Origin.GetGroup(), out var executor))
@@ -58,10 +58,10 @@ namespace DotNetCore.CAP.Internal
return Task.FromResult(OperateResult.Failed(new SubscriberNotFoundException(error)));
}

return ExecuteAsync(message, executor, cancellationToken);
return DispatchAsync(message, executor, cancellationToken);
}

public async Task<OperateResult> ExecuteAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken)
public async Task<OperateResult> DispatchAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken)
{
bool retry;
OperateResult result;

src/DotNetCore.CAP/Internal/ISubscriberExecutor.cs → src/DotNetCore.CAP/Internal/ISubscribeDispatcher.cs Переглянути файл

@@ -10,10 +10,10 @@ namespace DotNetCore.CAP.Internal
/// <summary>
/// Consumer executor
/// </summary>
public interface ISubscriberExecutor
public interface ISubscribeDispatcher
{
Task<OperateResult> ExecuteAsync(MediumMessage message, CancellationToken cancellationToken = default);
Task<OperateResult> DispatchAsync(MediumMessage message, CancellationToken cancellationToken = default);

Task<OperateResult> ExecuteAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken = default);
Task<OperateResult> DispatchAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken = default);
}
}

src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs → src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs Переглянути файл

@@ -1,84 +1,84 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Internal;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Internal
{
internal class DefaultConsumerInvoker : IConsumerInvoker
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
public DefaultConsumerInvoker(ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
_logger = loggerFactory.CreateLogger<DefaultConsumerInvoker>();
}
public async Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
_logger.LogDebug("Executing subscriber method : {0}", context.ConsumerDescriptor.MethodInfo.Name);
var executor = ObjectMethodExecutor.Create(
context.ConsumerDescriptor.MethodInfo,
context.ConsumerDescriptor.ImplTypeInfo);
using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
var srvType = context.ConsumerDescriptor.ServiceTypeInfo?.AsType();
var implType = context.ConsumerDescriptor.ImplTypeInfo.AsType();
object obj = null;
if (srvType != null)
{
obj = provider.GetServices(srvType).FirstOrDefault(o => o.GetType() == implType);
}
if (obj == null)
{
obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, implType);
}
var message = context.DeliverMessage;
var parameterDescriptors = context.ConsumerDescriptor.Parameters;
var executeParameters = new object[parameterDescriptors.Count];
for (var i = 0; i < parameterDescriptors.Count; i++)
{
if (parameterDescriptors[i].IsFromCap)
{
executeParameters[i] = new CapHeader(message.Headers);
}
else
{
executeParameters[i] = message.Value;
}
}
var resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters);
return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName());
}
}
private async Task<object> ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object[] parameter)
{
if (executor.IsMethodAsync)
{
return await executor.ExecuteAsync(@class, parameter);
}
return executor.Execute(@class, parameter);
}
}
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Internal;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Internal
{
internal class DefaultSubscribeInvoker : ISubscribeInvoker
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
public DefaultSubscribeInvoker(ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
_logger = loggerFactory.CreateLogger<DefaultSubscribeInvoker>();
}
public async Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
_logger.LogDebug("Executing subscriber method : {0}", context.ConsumerDescriptor.MethodInfo.Name);
var executor = ObjectMethodExecutor.Create(
context.ConsumerDescriptor.MethodInfo,
context.ConsumerDescriptor.ImplTypeInfo);
using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
var srvType = context.ConsumerDescriptor.ServiceTypeInfo?.AsType();
var implType = context.ConsumerDescriptor.ImplTypeInfo.AsType();
object obj = null;
if (srvType != null)
{
obj = provider.GetServices(srvType).FirstOrDefault(o => o.GetType() == implType);
}
if (obj == null)
{
obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, implType);
}
var message = context.DeliverMessage;
var parameterDescriptors = context.ConsumerDescriptor.Parameters;
var executeParameters = new object[parameterDescriptors.Count];
for (var i = 0; i < parameterDescriptors.Count; i++)
{
if (parameterDescriptors[i].IsFromCap)
{
executeParameters[i] = new CapHeader(message.Headers);
}
else
{
executeParameters[i] = message.Value;
}
}
var resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters);
return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName());
}
}
private async Task<object> ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object[] parameter)
{
if (executor.IsMethodAsync)
{
return await executor.ExecuteAsync(@class, parameter);
}
return executor.Execute(@class, parameter);
}
}
}

src/DotNetCore.CAP/Internal/IConsumerInvoker.cs → src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs Переглянути файл

@@ -9,10 +9,10 @@ namespace DotNetCore.CAP.Internal
/// <summary>
/// Perform user definition method of consumers.
/// </summary>
internal interface IConsumerInvoker
internal interface ISubscribeInvoker
{
/// <summary>
/// Invoke consumer method whit consumer context.
/// Invoke subscribe method with the consumer context.
/// </summary>
/// <param name="context">consumer execute context</param>
/// <param name="cancellationToken">The object of <see cref="CancellationToken"/>.</param>

src/DotNetCore.CAP/Internal/IConsumerInvokerFactory.cs → src/DotNetCore.CAP/Internal/ISubscribeInvokerFactory.cs Переглянути файл

@@ -3,8 +3,8 @@

namespace DotNetCore.CAP.Internal
{
internal interface IConsumerInvokerFactory
internal interface ISubscribeInvokerFactory
{
IConsumerInvoker CreateInvoker();
ISubscribeInvoker CreateInvoker();
}
}

+ 3
- 3
src/DotNetCore.CAP/Processor/IDispatcher.Default.cs Переглянути файл

@@ -16,7 +16,7 @@ namespace DotNetCore.CAP.Processor
{
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly IMessageSender _sender;
private readonly ISubscriberExecutor _executor;
private readonly ISubscribeDispatcher _executor;
private readonly ILogger<Dispatcher> _logger;

private readonly BlockingCollection<MediumMessage> _publishedMessageQueue =
@@ -27,7 +27,7 @@ namespace DotNetCore.CAP.Processor

public Dispatcher(ILogger<Dispatcher> logger,
IMessageSender sender,
ISubscriberExecutor executor)
ISubscribeDispatcher executor)
{
_logger = logger;
_sender = sender;
@@ -90,7 +90,7 @@ namespace DotNetCore.CAP.Processor
{
foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token))
{
_executor.ExecuteAsync(message.Item1, message.Item2, _cts.Token);
_executor.DispatchAsync(message.Item1, message.Item2, _cts.Token);
}
}
catch (OperationCanceledException)


+ 4
- 4
src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs Переглянути файл

@@ -18,17 +18,17 @@ namespace DotNetCore.CAP.Processor
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger<MessageNeedToRetryProcessor> _logger;
private readonly IMessageSender _messageSender;
private readonly ISubscriberExecutor _subscriberExecutor;
private readonly ISubscribeDispatcher _subscribeDispatcher;
private readonly TimeSpan _waitingInterval;

public MessageNeedToRetryProcessor(
IOptions<CapOptions> options,
ILogger<MessageNeedToRetryProcessor> logger,
ISubscriberExecutor subscriberExecutor,
ISubscribeDispatcher subscribeDispatcher,
IMessageSender messageSender)
{
_logger = logger;
_subscriberExecutor = subscriberExecutor;
_subscribeDispatcher = subscribeDispatcher;
_messageSender = messageSender;
_waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval);
}
@@ -69,7 +69,7 @@ namespace DotNetCore.CAP.Processor

foreach (var message in messages)
{
await _subscriberExecutor.ExecuteAsync(message);
await _subscribeDispatcher.DispatchAsync(message);

await context.WaitAsync(_delay);
}


+ 3
- 4
test/DotNetCore.CAP.Test/ConsumerInvokerTest.cs Переглянути файл

@@ -17,12 +17,11 @@ namespace DotNetCore.CAP.Test
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddLogging();
serviceCollection.AddSingleton<IConsumerInvoker, DefaultConsumerInvoker>();
serviceCollection.AddTransient<FakeSubscriber>();
serviceCollection.AddSingleton<ISubscribeInvoker, DefaultSubscribeInvoker>();
_serviceProvider = serviceCollection.BuildServiceProvider();
}

private IConsumerInvoker ConsumerInvoker => _serviceProvider.GetService<IConsumerInvoker>();
private ISubscribeInvoker SubscribeInvoker => _serviceProvider.GetService<ISubscribeInvoker>();

[Fact]
public async Task InvokeTest()
@@ -40,7 +39,7 @@ namespace DotNetCore.CAP.Test
var message = new Message(header, null);
var context = new ConsumerContext(descriptor, message);

var ret = await ConsumerInvoker.InvokeAsync(context);
var ret = await SubscribeInvoker.InvokeAsync(context);
Assert.Equal(int.MaxValue, ret.Result);
}
}


Завантаження…
Відмінити
Зберегти