瀏覽代碼

Add support scanning subscribers from the assembly. (#993)

master
Savorboard 3 年之前
父節點
當前提交
c5a07e7417
共有 8 個檔案被更改,包括 106 行新增14 行删除
  1. +1
    -3
      src/DotNetCore.CAP.Dashboard/wwwroot/public/index.html
  2. +3
    -3
      src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs
  3. +38
    -0
      src/DotNetCore.CAP/CAP.Builder.cs
  4. +5
    -0
      src/DotNetCore.CAP/Internal/Helper.cs
  5. +47
    -0
      src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Assembly.cs
  6. +4
    -2
      src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs
  7. +1
    -4
      src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs
  8. +7
    -2
      src/DotNetCore.CAP/Internal/LoggerExtensions.cs

+ 1
- 3
src/DotNetCore.CAP.Dashboard/wwwroot/public/index.html 查看文件

@@ -6,9 +6,7 @@
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width,initial-scale=1.0">
<link rel="icon" href="<%= BASE_URL %>favicon.ico">
<title>
<%= htmlWebpackPlugin.options.title %>
</title>
<title>CAP Dashboard</title>
</head>

<body>


+ 3
- 3
src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs 查看文件

@@ -40,15 +40,15 @@ namespace DotNetCore.CAP.RabbitMQ

var props = channel.CreateBasicProperties();
props.DeliveryMode = 2;
props.Headers = message.Headers.ToDictionary(x => x.Key, x => (object) x.Value);
props.Headers = message.Headers.ToDictionary(x => x.Key, x => (object)x.Value);
channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true);

channel.BasicPublish(_exchange, message.GetName(), props, message.Body);

channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));

_logger.LogDebug($"RabbitMQ topic message [{message.GetName()}] has been published.");
_logger.LogInformation("CAP message '{0}' published, internal id '{1}'", message.GetName(), message.GetId());

return Task.FromResult(OperateResult.Success);
}


+ 38
- 0
src/DotNetCore.CAP/CAP.Builder.cs 查看文件

@@ -1,9 +1,15 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Linq;
using System.Reflection;
using DotNetCore.CAP.Filter;
using DotNetCore.CAP.Internal;
using JetBrains.Annotations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
// ReSharper disable UnusedMember.Global

namespace DotNetCore.CAP
{
@@ -43,10 +49,42 @@ namespace DotNetCore.CAP
/// </summary>
public IServiceCollection Services { get; }

/// <summary>
/// Registers subscribers filter.
/// </summary>
/// <typeparam name="T">Type of filter</typeparam>
public CapBuilder AddSubscribeFilter<T>() where T : class, ISubscribeFilter
{
Services.TryAddScoped<ISubscribeFilter, T>();
return this;
}

/// <summary>
/// Registers subscribers from the specified assemblies.
/// </summary>
/// <param name="assemblies">Assemblies to scan subscriber</param>
public CapBuilder AddSubscriberAssembly(params Assembly[] assemblies)
{
if (assemblies == null) throw new ArgumentNullException(nameof(assemblies));

Services.Replace(new ServiceDescriptor(typeof(IConsumerServiceSelector),
x => new AssemblyConsumerServiceSelector(x, assemblies),
ServiceLifetime.Singleton));

return this;
}

/// <summary>
/// Registers subscribers from the specified types.
/// </summary>
/// <param name="handlerAssemblyMarkerTypes"></param>
public CapBuilder AddSubscriberAssembly([NotNull] params Type[] handlerAssemblyMarkerTypes)
{
if (handlerAssemblyMarkerTypes == null) throw new ArgumentNullException(nameof(handlerAssemblyMarkerTypes));

AddSubscriberAssembly(handlerAssemblyMarkerTypes.Select(t => t.GetTypeInfo().Assembly).ToArray());

return this;
}
}
}

+ 5
- 0
src/DotNetCore.CAP/Internal/Helper.cs 查看文件

@@ -35,6 +35,11 @@ namespace DotNetCore.CAP.Internal
return false;
}

if (!typeInfo.ContainsGenericParameters)
{
return false;
}

return !typeInfo.ContainsGenericParameters
&& typeInfo.Name.EndsWith("Controller", StringComparison.OrdinalIgnoreCase);
}


+ 47
- 0
src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Assembly.cs 查看文件

@@ -0,0 +1,47 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;

namespace DotNetCore.CAP.Internal
{
/// <inheritdoc />
/// <summary>
/// A <see cref="T:DotNetCore.CAP.Abstractions.IConsumerServiceSelector" /> implementation that scanning subscribers from the assembly.
/// </summary>
public class AssemblyConsumerServiceSelector : ConsumerServiceSelector
{
private readonly Assembly[] _assemblies;

public AssemblyConsumerServiceSelector(IServiceProvider serviceProvider, Assembly[] assemblies) : base(serviceProvider)
{
_assemblies = assemblies;
}

protected override IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(IServiceProvider provider)
{
var descriptors = new List<ConsumerExecutorDescriptor>();

descriptors.AddRange(base.FindConsumersFromInterfaceTypes(provider));

var assembliesToScan = _assemblies.Distinct().ToArray();

var capSubscribeTypeInfo = typeof(ICapSubscribe).GetTypeInfo();

foreach (var type in assembliesToScan.SelectMany(a => a.DefinedTypes))
{
if (!capSubscribeTypeInfo.IsAssignableFrom(type))
{
continue;
}

descriptors.AddRange(GetTopicAttributesDescription(type));
}

return descriptors;
}
}
}

+ 4
- 2
src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs 查看文件

@@ -89,6 +89,8 @@ namespace DotNetCore.CAP.Internal

try
{
_logger.ConsumerExecuting(descriptor.MethodInfo.Name);

var sp = Stopwatch.StartNew();

await InvokeConsumerMethodAsync(message, descriptor, cancellationToken);
@@ -97,7 +99,7 @@ namespace DotNetCore.CAP.Internal

await SetSuccessfulState(message);

_logger.ConsumerExecuted(sp.Elapsed.TotalMilliseconds);
_logger.ConsumerExecuted(descriptor.MethodInfo.Name, sp.Elapsed.TotalMilliseconds);

return (false, OperateResult.Success);
}
@@ -183,7 +185,7 @@ namespace DotNetCore.CAP.Internal
[Headers.CorrelationSequence] = (message.Origin.GetCorrelationSequence() + 1).ToString()
};

await _provider.GetService<ICapPublisher>().PublishAsync(ret.CallbackName, ret.Result, header, cancellationToken);
await _provider.GetRequiredService<ICapPublisher>().PublishAsync(ret.CallbackName, ret.Result, header, cancellationToken);
}
}
catch (OperationCanceledException)


+ 1
- 4
src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs 查看文件

@@ -40,10 +40,7 @@ namespace DotNetCore.CAP.Internal
var methodHandle = methodInfo.MethodHandle.Value;
var key = $"{reflectedTypeHandle}_{methodHandle}";

_logger.LogDebug("Executing subscriber method : {0}", methodInfo.Name);

var executor = _executors.GetOrAdd(key,
x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));
var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));

using var scope = _serviceProvider.CreateScope();



+ 7
- 2
src/DotNetCore.CAP/Internal/LoggerExtensions.cs 查看文件

@@ -45,9 +45,14 @@ namespace DotNetCore.CAP.Internal
logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}");
}

public static void ConsumerExecuted(this ILogger logger, double milliseconds)
public static void ConsumerExecuting(this ILogger logger, string methodName)
{
logger.LogDebug($"Consumer executed. Took: {milliseconds} ms.");
logger.LogInformation($"Executing subscriber method '{methodName}'");
}

public static void ConsumerExecuted(this ILogger logger, string methodName, double milliseconds)
{
logger.LogInformation($"Executed subscriber method '{methodName}' in {milliseconds} ms");
}

public static void ServerStarting(this ILogger logger)


Loading…
取消
儲存