From 96ca2382717bd7deb27921332f89fdfc9991136b Mon Sep 17 00:00:00 2001 From: Savorboard Date: Wed, 18 Aug 2021 23:16:26 +0800 Subject: [PATCH 1/8] Update version to 5.1.4 --- build/version.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/version.props b/build/version.props index 1d665af..2181d53 100644 --- a/build/version.props +++ b/build/version.props @@ -2,7 +2,7 @@ 5 1 - 3 + 4 $(VersionMajor).$(VersionMinor).$(VersionPatch) From b94cffb2709403e35c82387d93436ad486df7a5a Mon Sep 17 00:00:00 2001 From: Savorboard Date: Mon, 23 Aug 2021 11:06:04 +0800 Subject: [PATCH 2/8] Add docs for nats transporter. --- .../user-guide/en/transport/general.md | 1 + docs/content/user-guide/en/transport/nats.md | 56 ++++++++++++++++++ .../user-guide/zh/transport/general.md | 1 + docs/content/user-guide/zh/transport/nats.md | 57 +++++++++++++++++++ docs/mkdocs.yml | 2 + 5 files changed, 117 insertions(+) create mode 100644 docs/content/user-guide/en/transport/nats.md create mode 100644 docs/content/user-guide/zh/transport/nats.md diff --git a/docs/content/user-guide/en/transport/general.md b/docs/content/user-guide/en/transport/general.md index f0c0074..a34071d 100644 --- a/docs/content/user-guide/en/transport/general.md +++ b/docs/content/user-guide/en/transport/general.md @@ -10,6 +10,7 @@ CAP supports several transport methods: * [Kafka](kafka.md) * [Azure Service Bus](azure-service-bus.md) * [Amazon SQS](aws-sqs.md) +* [NATS](nats.md) * [In-Memory Queue](in-memory-queue.md) * [Redis Streams](redis-streams.md) diff --git a/docs/content/user-guide/en/transport/nats.md b/docs/content/user-guide/en/transport/nats.md new file mode 100644 index 0000000..4efcd4f --- /dev/null +++ b/docs/content/user-guide/en/transport/nats.md @@ -0,0 +1,56 @@ +# NATS + +[NATS](https://nats.io/) is a simple, secure and performant communications system for digital systems, services and devices. NATS is part of the Cloud Native Computing Foundation (CNCF). + +## Configuration + +To use NATS transporter, you need to install the following package from NuGet: + +```powershell + +PM> Install-Package DotNetCore.CAP.NATS + +``` + +Then you can add configuration items to the `ConfigureServices` method of `Startup.cs`. + +```csharp + +public void ConfigureServices(IServiceCollection services) +{ + services.AddCap(capOptions => + { + capOptions.UseNATS(natsOptions=>{ + //NATS Options + }); + }); +} + +``` + +#### NATS Options + +NATS configuration parameters provided directly by the CAP: + +NAME | DESCRIPTION | TYPE | DEFAULT +:---|:---|---|:--- +Options | NATS client configuration | Options | Options +Servers | Server url/urls used to connect to the NATs server. | string | NULL +ConnectionPoolSize | number of connections pool | uint | 10 + +#### NATS ConfigurationOptions + +If you need **more** native NATS related configuration options, you can set them in the `Options` option: + +```csharp +services.AddCap(capOptions => +{ + capOptions.UseNATS(natsOptions=> + { + // NATS options. + natsOptions.Options.Url=""; + }); +}); +``` + +`Options` is a NATS.Client ConfigurationOptions , you can find more details through this [link](http://nats-io.github.io/nats.net/class_n_a_t_s_1_1_client_1_1_options.html) diff --git a/docs/content/user-guide/zh/transport/general.md b/docs/content/user-guide/zh/transport/general.md index b402aea..13736ef 100644 --- a/docs/content/user-guide/zh/transport/general.md +++ b/docs/content/user-guide/zh/transport/general.md @@ -10,6 +10,7 @@ CAP 支持以下几种运输方式: * [Kafka](kafka.md) * [Azure Service Bus](azure-service-bus.md) * [Amazon SQS](aws-sqs.md) +* [NATS](nats.md) * [In-Memory Queue](in-memory-queue.md) * [Redis Streams](redis-streams.md) diff --git a/docs/content/user-guide/zh/transport/nats.md b/docs/content/user-guide/zh/transport/nats.md new file mode 100644 index 0000000..3ed0e41 --- /dev/null +++ b/docs/content/user-guide/zh/transport/nats.md @@ -0,0 +1,57 @@ +# NATS + +[NATS](https://nats.io/)是一个简单、安全、高性能的数字系统、服务和设备通信系统。NATS 是 CNCF 的一部分。 + +## 配置 + +要使用NATS 传输器,你需要安装下面的NuGet包: + +```powershell + +PM> Install-Package DotNetCore.CAP.NATS + +``` + +你可以通过在 `Startup.cs` 文件中配置 `ConfigureServices` 来添加配置: + +```csharp + +public void ConfigureServices(IServiceCollection services) +{ + services.AddCap(capOptions => + { + capOptions.UseNATS(natsOptions=>{ + //NATS Options + }); + }); +} + +``` + +#### NATS 配置 + +CAP 直接提供的关于 NATS 的配置参数: + + +NAME | DESCRIPTION | TYPE | DEFAULT +:---|:---|---|:--- +Options | NATS 客户端配置 | Options | Options +Servers | 服务器Urls地址 | string | NULL +ConnectionPoolSize | 连接池数量 | uint | 10 + +#### NATS ConfigurationOptions + +如果你需要 **更多** 原生相关的配置项,可以通过 `Options` 配置项进行设定: + +```csharp +services.AddCap(capOptions => +{ + capOptions.UseNATS(natsOptions=> + { + // NATS options. + natsOptions.Options.Url=""; + }); +}); +``` + +`Options` 是 NATS.Client 客户端提供的配置, 你可以在这个[链接](http://nats-io.github.io/nats.net/class_n_a_t_s_1_1_client_1_1_options.html)找到更多详细信息。 diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 8c00d9e..1e3cc3c 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -98,6 +98,7 @@ nav: - Amazon SQS: user-guide/en/transport/aws-sqs.md - Apache Kafka®: user-guide/en/transport/kafka.md - Azure Service Bus: user-guide/en/transport/azure-service-bus.md + - NATS: user-guide/en/transport/nats.md - RabbitMQ: user-guide/en/transport/rabbitmq.md - Redis Streams: user-guide/en/transport/redis-streams.md - In-Memory Queue: user-guide/en/transport/in-memory-queue.md @@ -133,6 +134,7 @@ nav: - Amazon SQS: user-guide/zh/transport/aws-sqs.md - Apache Kafka®: user-guide/zh/transport/kafka.md - Azure Service Bus: user-guide/zh/transport/azure-service-bus.md + - NATS: user-guide/zh/transport/nats.md - RabbitMQ: user-guide/zh/transport/rabbitmq.md - Redis Streams: user-guide/zh/transport/redis-streams.md - In-Memory Queue: user-guide/zh/transport/in-memory-queue.md From df9a8cdd38bb38ce46636f69021aa368ecd68c45 Mon Sep 17 00:00:00 2001 From: danielblackwellkb <88295600+danielblackwellkb@users.noreply.github.com> Date: Mon, 23 Aug 2021 15:21:01 +1200 Subject: [PATCH 3/8] Allows the overriding of the Kafka ProviderBuilder and ConsumerBuilder (#978) * Allows the overriding of the ProviderBuilder and ConsumerBuilder creation functions. * Fix BuildProducer method layout --- src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs | 7 ++++++- src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs | 13 +++++++++---- .../KafkaConsumerClientFactory.cs | 4 ++-- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs b/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs index f4ba628..d9f0ab5 100644 --- a/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs +++ b/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs @@ -46,11 +46,16 @@ namespace DotNetCore.CAP.Kafka RequestTimeoutMs = 3000 }; - producer = new ProducerBuilder(config).Build(); + producer = BuildProducer(config); return producer; } + protected virtual IProducer BuildProducer(ProducerConfig config) + { + return new ProducerBuilder(config).Build(); + } + public bool Return(IProducer producer) { if (Interlocked.Increment(ref _pCount) <= _maxSize) diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index 7e4d598..d92f8f1 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -15,7 +15,7 @@ using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Kafka { - internal sealed class KafkaConsumerClient : IConsumerClient + public class KafkaConsumerClient : IConsumerClient { private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1); @@ -153,9 +153,7 @@ namespace DotNetCore.CAP.Kafka config.EnableAutoCommit ??= false; config.LogConnectionClose ??= false; - _consumerClient = new ConsumerBuilder(config) - .SetErrorHandler(ConsumerClient_OnConsumeError) - .Build(); + BuildConsumer(config); } } finally @@ -164,6 +162,13 @@ namespace DotNetCore.CAP.Kafka } } + protected virtual void BuildConsumer(ConsumerConfig config) + { + _consumerClient = new ConsumerBuilder(config) + .SetErrorHandler(ConsumerClient_OnConsumeError) + .Build(); + } + private void ConsumerClient_OnConsumeError(IConsumer consumer, Error e) { var logArgs = new LogMessageEventArgs diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs index 59374bd..e6df898 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs @@ -6,7 +6,7 @@ using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Kafka { - internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory + public class KafkaConsumerClientFactory : IConsumerClientFactory { private readonly IOptions _kafkaOptions; @@ -15,7 +15,7 @@ namespace DotNetCore.CAP.Kafka _kafkaOptions = kafkaOptions; } - public IConsumerClient Create(string groupId) + public virtual IConsumerClient Create(string groupId) { try { From 1ef8cf51666f2ce41b2b543dceb4a7e32df6934b Mon Sep 17 00:00:00 2001 From: lbhnrg2021 Date: Fri, 27 Aug 2021 11:30:07 +0800 Subject: [PATCH 4/8] The private static variable _isHealthy has no meaning in a singleton. If you use multi-host mode, there will be status detection problems (#982) --- src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index d3dbed3..6a9ed9b 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -35,7 +35,7 @@ namespace DotNetCore.CAP.Internal private BrokerAddress _serverAddress; private Task _compositeTask; private bool _disposed; - private static bool _isHealthy = true; + private bool _isHealthy = true; // diagnostics listener // ReSharper disable once InconsistentNaming From 42796c95b5818f48bc26bac5597788d6f040d7af Mon Sep 17 00:00:00 2001 From: Dan Date: Tue, 31 Aug 2021 21:50:57 +1200 Subject: [PATCH 5/8] Change Kafka BuildConsumer to return Consumer rather than set private _consumerClient (#990) Co-authored-by: Unknown --- src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index d92f8f1..ed235c8 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -153,7 +153,7 @@ namespace DotNetCore.CAP.Kafka config.EnableAutoCommit ??= false; config.LogConnectionClose ??= false; - BuildConsumer(config); + _consumerClient = BuildConsumer(config); } } finally @@ -162,9 +162,9 @@ namespace DotNetCore.CAP.Kafka } } - protected virtual void BuildConsumer(ConsumerConfig config) + protected virtual IConsumer BuildConsumer(ConsumerConfig config) { - _consumerClient = new ConsumerBuilder(config) + return new ConsumerBuilder(config) .SetErrorHandler(ConsumerClient_OnConsumeError) .Build(); } From f71aae14792d8f0d1a94d0a6c4062b1ab00d339b Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 2 Sep 2021 16:04:46 +0800 Subject: [PATCH 6/8] Code cleanup. --- .../DotNetCore.CAP.Dashboard.csproj | 9 +- .../ObjectMethodExecutor/AwaitableInfo.cs | 128 ------- .../CoercedAwaitableInfo.cs | 56 --- .../ObjectMethodExecutor.cs | 338 ------------------ .../ObjectMethodExecutorAwaitable.cs | 118 ------ .../ObjectMethodExecutorFSharpSupport.cs | 145 -------- src/DotNetCore.CAP/Internal/IMessageSender.cs | 3 +- .../Internal/ISubscribeInvoker.cs | 3 +- .../ObjectMethodExecutor/AwaitableInfo.cs | 6 +- .../CoercedAwaitableInfo.cs | 3 +- .../ObjectMethodExecutorAwaitable.cs | 3 +- .../ObjectMethodExecutorFSharpSupport.cs | 1 + .../Transport/IConsumerClient.cs | 4 +- 13 files changed, 18 insertions(+), 799 deletions(-) delete mode 100644 src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/AwaitableInfo.cs delete mode 100644 src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/CoercedAwaitableInfo.cs delete mode 100644 src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutor.cs delete mode 100644 src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs delete mode 100644 src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs diff --git a/src/DotNetCore.CAP.Dashboard/DotNetCore.CAP.Dashboard.csproj b/src/DotNetCore.CAP.Dashboard/DotNetCore.CAP.Dashboard.csproj index 09fcb2e..6055118 100644 --- a/src/DotNetCore.CAP.Dashboard/DotNetCore.CAP.Dashboard.csproj +++ b/src/DotNetCore.CAP.Dashboard/DotNetCore.CAP.Dashboard.csproj @@ -6,19 +6,18 @@ + + - - - - + - + diff --git a/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/AwaitableInfo.cs b/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/AwaitableInfo.cs deleted file mode 100644 index 7046303..0000000 --- a/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/AwaitableInfo.cs +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Linq; -using System.Reflection; -using System.Runtime.CompilerServices; - -namespace Microsoft.Extensions.Internal -{ - internal struct AwaitableInfo - { - public Type AwaiterType { get; } - public PropertyInfo AwaiterIsCompletedProperty { get; } - public MethodInfo AwaiterGetResultMethod { get; } - public MethodInfo AwaiterOnCompletedMethod { get; } - public MethodInfo AwaiterUnsafeOnCompletedMethod { get; } - public Type ResultType { get; } - public MethodInfo GetAwaiterMethod { get; } - - public AwaitableInfo( - Type awaiterType, - PropertyInfo awaiterIsCompletedProperty, - MethodInfo awaiterGetResultMethod, - MethodInfo awaiterOnCompletedMethod, - MethodInfo awaiterUnsafeOnCompletedMethod, - Type resultType, - MethodInfo getAwaiterMethod) - { - AwaiterType = awaiterType; - AwaiterIsCompletedProperty = awaiterIsCompletedProperty; - AwaiterGetResultMethod = awaiterGetResultMethod; - AwaiterOnCompletedMethod = awaiterOnCompletedMethod; - AwaiterUnsafeOnCompletedMethod = awaiterUnsafeOnCompletedMethod; - ResultType = resultType; - GetAwaiterMethod = getAwaiterMethod; - } - - public static bool IsTypeAwaitable(Type type, out AwaitableInfo awaitableInfo) - { - // Based on Roslyn code: http://source.roslyn.io/#Microsoft.CodeAnalysis.Workspaces/Shared/Extensions/ISymbolExtensions.cs,db4d48ba694b9347 - - // Awaitable must have method matching "object GetAwaiter()" - var getAwaiterMethod = type.GetRuntimeMethods().FirstOrDefault(m => - m.Name.Equals("GetAwaiter", StringComparison.OrdinalIgnoreCase) - && m.GetParameters().Length == 0 - && m.ReturnType != null); - if (getAwaiterMethod == null) - { - awaitableInfo = default(AwaitableInfo); - return false; - } - - var awaiterType = getAwaiterMethod.ReturnType; - - // Awaiter must have property matching "bool IsCompleted { get; }" - var isCompletedProperty = awaiterType.GetRuntimeProperties().FirstOrDefault(p => - p.Name.Equals("IsCompleted", StringComparison.OrdinalIgnoreCase) - && p.PropertyType == typeof(bool) - && p.GetMethod != null); - if (isCompletedProperty == null) - { - awaitableInfo = default(AwaitableInfo); - return false; - } - - // Awaiter must implement INotifyCompletion - var awaiterInterfaces = awaiterType.GetInterfaces(); - var implementsINotifyCompletion = awaiterInterfaces.Any(t => t == typeof(INotifyCompletion)); - if (!implementsINotifyCompletion) - { - awaitableInfo = default(AwaitableInfo); - return false; - } - - // INotifyCompletion supplies a method matching "void OnCompleted(Action action)" - var iNotifyCompletionMap = awaiterType - .GetTypeInfo() - .GetRuntimeInterfaceMap(typeof(INotifyCompletion)); - var onCompletedMethod = iNotifyCompletionMap.InterfaceMethods.Single(m => - m.Name.Equals("OnCompleted", StringComparison.OrdinalIgnoreCase) - && m.ReturnType == typeof(void) - && m.GetParameters().Length == 1 - && m.GetParameters()[0].ParameterType == typeof(Action)); - - // Awaiter optionally implements ICriticalNotifyCompletion - var implementsICriticalNotifyCompletion = - awaiterInterfaces.Any(t => t == typeof(ICriticalNotifyCompletion)); - MethodInfo unsafeOnCompletedMethod; - if (implementsICriticalNotifyCompletion) - { - // ICriticalNotifyCompletion supplies a method matching "void UnsafeOnCompleted(Action action)" - var iCriticalNotifyCompletionMap = awaiterType - .GetTypeInfo() - .GetRuntimeInterfaceMap(typeof(ICriticalNotifyCompletion)); - unsafeOnCompletedMethod = iCriticalNotifyCompletionMap.InterfaceMethods.Single(m => - m.Name.Equals("UnsafeOnCompleted", StringComparison.OrdinalIgnoreCase) - && m.ReturnType == typeof(void) - && m.GetParameters().Length == 1 - && m.GetParameters()[0].ParameterType == typeof(Action)); - } - else - { - unsafeOnCompletedMethod = null; - } - - // Awaiter must have method matching "void GetResult" or "T GetResult()" - var getResultMethod = awaiterType.GetRuntimeMethods().FirstOrDefault(m => - m.Name.Equals("GetResult") - && m.GetParameters().Length == 0); - if (getResultMethod == null) - { - awaitableInfo = default(AwaitableInfo); - return false; - } - - awaitableInfo = new AwaitableInfo( - awaiterType, - isCompletedProperty, - getResultMethod, - onCompletedMethod, - unsafeOnCompletedMethod, - getResultMethod.ReturnType, - getAwaiterMethod); - return true; - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/CoercedAwaitableInfo.cs b/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/CoercedAwaitableInfo.cs deleted file mode 100644 index 3f923e0..0000000 --- a/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/CoercedAwaitableInfo.cs +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Linq.Expressions; - -namespace Microsoft.Extensions.Internal -{ - internal struct CoercedAwaitableInfo - { - public AwaitableInfo AwaitableInfo { get; } - public Expression CoercerExpression { get; } - public Type CoercerResultType { get; } - public bool RequiresCoercion => CoercerExpression != null; - - public CoercedAwaitableInfo(AwaitableInfo awaitableInfo) - { - AwaitableInfo = awaitableInfo; - CoercerExpression = null; - CoercerResultType = null; - } - - public CoercedAwaitableInfo(Expression coercerExpression, Type coercerResultType, - AwaitableInfo coercedAwaitableInfo) - { - CoercerExpression = coercerExpression; - CoercerResultType = coercerResultType; - AwaitableInfo = coercedAwaitableInfo; - } - - public static bool IsTypeAwaitable(Type type, out CoercedAwaitableInfo info) - { - if (AwaitableInfo.IsTypeAwaitable(type, out var directlyAwaitableInfo)) - { - info = new CoercedAwaitableInfo(directlyAwaitableInfo); - return true; - } - - // It's not directly awaitable, but maybe we can coerce it. - // Currently we support coercing FSharpAsync. - if (ObjectMethodExecutorFSharpSupport.TryBuildCoercerFromFSharpAsyncToAwaitable(type, - out var coercerExpression, - out var coercerResultType)) - { - if (AwaitableInfo.IsTypeAwaitable(coercerResultType, out var coercedAwaitableInfo)) - { - info = new CoercedAwaitableInfo(coercerExpression, coercerResultType, coercedAwaitableInfo); - return true; - } - } - - info = default(CoercedAwaitableInfo); - return false; - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutor.cs b/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutor.cs deleted file mode 100644 index 26708a2..0000000 --- a/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutor.cs +++ /dev/null @@ -1,338 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Collections.Generic; -using System.Linq.Expressions; -using System.Reflection; - -// ReSharper disable once CheckNamespace -namespace Microsoft.Extensions.Internal -{ - internal class ObjectMethodExecutor - { - // ReSharper disable once InconsistentNaming - private static readonly ConstructorInfo _objectMethodExecutorAwaitableConstructor = - typeof(ObjectMethodExecutorAwaitable).GetConstructor(new[] - { - typeof(object), // customAwaitable - typeof(Func), // getAwaiterMethod - typeof(Func), // isCompletedMethod - typeof(Func), // getResultMethod - typeof(Action), // onCompletedMethod - typeof(Action) // unsafeOnCompletedMethod - }); - - private readonly MethodExecutor _executor; - private readonly MethodExecutorAsync _executorAsync; - private readonly object[] _parameterDefaultValues; - - private ObjectMethodExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo, object[] parameterDefaultValues) - { - if (methodInfo == null) - { - throw new ArgumentNullException(nameof(methodInfo)); - } - - MethodInfo = methodInfo; - MethodParameters = methodInfo.GetParameters(); - TargetTypeInfo = targetTypeInfo; - MethodReturnType = methodInfo.ReturnType; - - var isAwaitable = CoercedAwaitableInfo.IsTypeAwaitable(MethodReturnType, out var coercedAwaitableInfo); - - IsMethodAsync = isAwaitable; - AsyncResultType = isAwaitable ? coercedAwaitableInfo.AwaitableInfo.ResultType : null; - - // Upstream code may prefer to use the sync-executor even for async methods, because if it knows - // that the result is a specific Task where T is known, then it can directly cast to that type - // and await it without the extra heap allocations involved in the _executorAsync code path. - _executor = GetExecutor(methodInfo, targetTypeInfo); - - if (IsMethodAsync) - { - _executorAsync = GetExecutorAsync(methodInfo, targetTypeInfo, coercedAwaitableInfo); - } - - _parameterDefaultValues = parameterDefaultValues; - } - - public MethodInfo MethodInfo { get; } - - public ParameterInfo[] MethodParameters { get; } - - public TypeInfo TargetTypeInfo { get; } - - public Type AsyncResultType { get; } - - // This field is made internal set because it is set in unit tests. - public Type MethodReturnType { get; internal set; } - - public bool IsMethodAsync { get; } - - public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo) - { - return new ObjectMethodExecutor(methodInfo, targetTypeInfo, null); - } - - public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo, - object[] parameterDefaultValues) - { - if (parameterDefaultValues == null) - { - throw new ArgumentNullException(nameof(parameterDefaultValues)); - } - - return new ObjectMethodExecutor(methodInfo, targetTypeInfo, parameterDefaultValues); - } - - /// - /// Executes the configured method on . This can be used whether or not - /// the configured method is asynchronous. - /// - /// - /// Even if the target method is asynchronous, it's desirable to invoke it using Execute rather than - /// ExecuteAsync if you know at compile time what the return type is, because then you can directly - /// "await" that value (via a cast), and then the generated code will be able to reference the - /// resulting awaitable as a value-typed variable. If you use ExecuteAsync instead, the generated - /// code will have to treat the resulting awaitable as a boxed object, because it doesn't know at - /// compile time what type it would be. - /// - /// The object whose method is to be executed. - /// Parameters to pass to the method. - /// The method return value. - public object Execute(object target, params object[] parameters) - { - return _executor(target, parameters); - } - - /// - /// Executes the configured method on . This can only be used if the configured - /// method is asynchronous. - /// - /// - /// If you don't know at compile time the type of the method's returned awaitable, you can use ExecuteAsync, - /// which supplies an awaitable-of-object. This always works, but can incur several extra heap allocations - /// as compared with using Execute and then using "await" on the result value typecasted to the known - /// awaitable type. The possible extra heap allocations are for: - /// 1. The custom awaitable (though usually there's a heap allocation for this anyway, since normally - /// it's a reference type, and you normally create a new instance per call). - /// 2. The custom awaiter (whether or not it's a value type, since if it's not, you need a new instance - /// of it, and if it is, it will have to be boxed so the calling code can reference it as an object). - /// 3. The async result value, if it's a value type (it has to be boxed as an object, since the calling - /// code doesn't know what type it's going to be). - /// - /// The object whose method is to be executed. - /// Parameters to pass to the method. - /// An object that you can "await" to get the method return value. - public ObjectMethodExecutorAwaitable ExecuteAsync(object target, params object[] parameters) - { - return _executorAsync(target, parameters); - } - - public object GetDefaultValueForParameter(int index) - { - if (_parameterDefaultValues == null) - { - throw new InvalidOperationException( - $"Cannot call {nameof(GetDefaultValueForParameter)}, because no parameter default values were supplied."); - } - - if (index < 0 || index > MethodParameters.Length - 1) - { - throw new ArgumentOutOfRangeException(nameof(index)); - } - - return _parameterDefaultValues[index]; - } - - private static MethodExecutor GetExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo) - { - // Parameters to executor - var targetParameter = Expression.Parameter(typeof(object), "target"); - var parametersParameter = Expression.Parameter(typeof(object[]), "parameters"); - - // Build parameter list - var parameters = new List(); - var paramInfos = methodInfo.GetParameters(); - for (var i = 0; i < paramInfos.Length; i++) - { - var paramInfo = paramInfos[i]; - var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i)); - var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType); - - // valueCast is "(Ti) parameters[i]" - parameters.Add(valueCast); - } - - // Call method - var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType()); - var methodCall = Expression.Call(instanceCast, methodInfo, parameters); - - // methodCall is "((Ttarget) target) method((T0) parameters[0], (T1) parameters[1], ...)" - // Create function - if (methodCall.Type == typeof(void)) - { - var lambda = Expression.Lambda(methodCall, targetParameter, parametersParameter); - var voidExecutor = lambda.Compile(); - return WrapVoidMethod(voidExecutor); - } - else - { - // must coerce methodCall to match ActionExecutor signature - var castMethodCall = Expression.Convert(methodCall, typeof(object)); - var lambda = Expression.Lambda(castMethodCall, targetParameter, parametersParameter); - return lambda.Compile(); - } - } - - private static MethodExecutor WrapVoidMethod(VoidMethodExecutor executor) - { - return delegate(object target, object[] parameters) - { - executor(target, parameters); - return null; - }; - } - - private static MethodExecutorAsync GetExecutorAsync( - MethodInfo methodInfo, - TypeInfo targetTypeInfo, - CoercedAwaitableInfo coercedAwaitableInfo) - { - // Parameters to executor - var targetParameter = Expression.Parameter(typeof(object), "target"); - var parametersParameter = Expression.Parameter(typeof(object[]), "parameters"); - - // Build parameter list - var parameters = new List(); - var paramInfos = methodInfo.GetParameters(); - for (var i = 0; i < paramInfos.Length; i++) - { - var paramInfo = paramInfos[i]; - var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i)); - var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType); - - // valueCast is "(Ti) parameters[i]" - parameters.Add(valueCast); - } - - // Call method - var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType()); - var methodCall = Expression.Call(instanceCast, methodInfo, parameters); - - // Using the method return value, construct an ObjectMethodExecutorAwaitable based on - // the info we have about its implementation of the awaitable pattern. Note that all - // the funcs/actions we construct here are precompiled, so that only one instance of - // each is preserved throughout the lifetime of the ObjectMethodExecutor. - - // var getAwaiterFunc = (object awaitable) => - // (object)((CustomAwaitableType)awaitable).GetAwaiter(); - var customAwaitableParam = Expression.Parameter(typeof(object), "awaitable"); - var awaitableInfo = coercedAwaitableInfo.AwaitableInfo; - var postCoercionMethodReturnType = coercedAwaitableInfo.CoercerResultType ?? methodInfo.ReturnType; - var getAwaiterFunc = Expression.Lambda>( - Expression.Convert( - Expression.Call( - Expression.Convert(customAwaitableParam, postCoercionMethodReturnType), - awaitableInfo.GetAwaiterMethod), - typeof(object)), - customAwaitableParam).Compile(); - - // var isCompletedFunc = (object awaiter) => - // ((CustomAwaiterType)awaiter).IsCompleted; - var isCompletedParam = Expression.Parameter(typeof(object), "awaiter"); - var isCompletedFunc = Expression.Lambda>( - Expression.MakeMemberAccess( - Expression.Convert(isCompletedParam, awaitableInfo.AwaiterType), - awaitableInfo.AwaiterIsCompletedProperty), - isCompletedParam).Compile(); - - var getResultParam = Expression.Parameter(typeof(object), "awaiter"); - Func getResultFunc; - if (awaitableInfo.ResultType == typeof(void)) - { - getResultFunc = Expression.Lambda>( - Expression.Block( - Expression.Call( - Expression.Convert(getResultParam, awaitableInfo.AwaiterType), - awaitableInfo.AwaiterGetResultMethod), - Expression.Constant(null) - ), - getResultParam).Compile(); - } - else - { - getResultFunc = Expression.Lambda>( - Expression.Convert( - Expression.Call( - Expression.Convert(getResultParam, awaitableInfo.AwaiterType), - awaitableInfo.AwaiterGetResultMethod), - typeof(object)), - getResultParam).Compile(); - } - - // var onCompletedFunc = (object awaiter, Action continuation) => { - // ((CustomAwaiterType)awaiter).OnCompleted(continuation); - // }; - var onCompletedParam1 = Expression.Parameter(typeof(object), "awaiter"); - var onCompletedParam2 = Expression.Parameter(typeof(Action), "continuation"); - var onCompletedFunc = Expression.Lambda>( - Expression.Call( - Expression.Convert(onCompletedParam1, awaitableInfo.AwaiterType), - awaitableInfo.AwaiterOnCompletedMethod, - onCompletedParam2), - onCompletedParam1, - onCompletedParam2).Compile(); - - Action unsafeOnCompletedFunc = null; - if (awaitableInfo.AwaiterUnsafeOnCompletedMethod != null) - { - // var unsafeOnCompletedFunc = (object awaiter, Action continuation) => { - // ((CustomAwaiterType)awaiter).UnsafeOnCompleted(continuation); - // }; - var unsafeOnCompletedParam1 = Expression.Parameter(typeof(object), "awaiter"); - var unsafeOnCompletedParam2 = Expression.Parameter(typeof(Action), "continuation"); - unsafeOnCompletedFunc = Expression.Lambda>( - Expression.Call( - Expression.Convert(unsafeOnCompletedParam1, awaitableInfo.AwaiterType), - awaitableInfo.AwaiterUnsafeOnCompletedMethod, - unsafeOnCompletedParam2), - unsafeOnCompletedParam1, - unsafeOnCompletedParam2).Compile(); - } - - // If we need to pass the method call result through a coercer function to get an - // awaitable, then do so. - var coercedMethodCall = coercedAwaitableInfo.RequiresCoercion - ? Expression.Invoke(coercedAwaitableInfo.CoercerExpression, methodCall) - : (Expression) methodCall; - - // return new ObjectMethodExecutorAwaitable( - // (object)coercedMethodCall, - // getAwaiterFunc, - // isCompletedFunc, - // getResultFunc, - // onCompletedFunc, - // unsafeOnCompletedFunc); - var returnValueExpression = Expression.New( - _objectMethodExecutorAwaitableConstructor, - Expression.Convert(coercedMethodCall, typeof(object)), - Expression.Constant(getAwaiterFunc), - Expression.Constant(isCompletedFunc), - Expression.Constant(getResultFunc), - Expression.Constant(onCompletedFunc), - Expression.Constant(unsafeOnCompletedFunc, typeof(Action))); - - var lambda = - Expression.Lambda(returnValueExpression, targetParameter, parametersParameter); - return lambda.Compile(); - } - - private delegate ObjectMethodExecutorAwaitable MethodExecutorAsync(object target, params object[] parameters); - - private delegate object MethodExecutor(object target, params object[] parameters); - - private delegate void VoidMethodExecutor(object target, object[] parameters); - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs b/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs deleted file mode 100644 index ed2fee8..0000000 --- a/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Runtime.CompilerServices; - -namespace Microsoft.Extensions.Internal -{ - /// - /// Provides a common awaitable structure that can - /// return, regardless of whether the underlying value is a System.Task, an FSharpAsync, or an - /// application-defined custom awaitable. - /// - internal struct ObjectMethodExecutorAwaitable - { - private readonly object _customAwaitable; - private readonly Func _getAwaiterMethod; - private readonly Func _isCompletedMethod; - private readonly Func _getResultMethod; - private readonly Action _onCompletedMethod; - private readonly Action _unsafeOnCompletedMethod; - - // Perf note: since we're requiring the customAwaitable to be supplied here as an object, - // this will trigger a further allocation if it was a value type (i.e., to box it). We can't - // fix this by making the customAwaitable type generic, because the calling code typically - // does not know the type of the awaitable/awaiter at compile-time anyway. - // - // However, we could fix it by not passing the customAwaitable here at all, and instead - // passing a func that maps directly from the target object (e.g., controller instance), - // target method (e.g., action method info), and params array to the custom awaiter in the - // GetAwaiter() method below. In effect, by delaying the actual method call until the - // upstream code calls GetAwaiter on this ObjectMethodExecutorAwaitable instance. - // This optimization is not currently implemented because: - // [1] It would make no difference when the awaitable was an object type, which is - // by far the most common scenario (e.g., System.Task). - // [2] It would be complex - we'd need some kind of object pool to track all the parameter - // arrays until we needed to use them in GetAwaiter(). - // We can reconsider this in the future if there's a need to optimize for ValueTask - // or other value-typed awaitables. - - public ObjectMethodExecutorAwaitable( - object customAwaitable, - Func getAwaiterMethod, - Func isCompletedMethod, - Func getResultMethod, - Action onCompletedMethod, - Action unsafeOnCompletedMethod) - { - _customAwaitable = customAwaitable; - _getAwaiterMethod = getAwaiterMethod; - _isCompletedMethod = isCompletedMethod; - _getResultMethod = getResultMethod; - _onCompletedMethod = onCompletedMethod; - _unsafeOnCompletedMethod = unsafeOnCompletedMethod; - } - - public Awaiter GetAwaiter() - { - var customAwaiter = _getAwaiterMethod(_customAwaitable); - return new Awaiter(customAwaiter, _isCompletedMethod, _getResultMethod, _onCompletedMethod, - _unsafeOnCompletedMethod); - } - - public struct Awaiter : ICriticalNotifyCompletion - { - private readonly object _customAwaiter; - private readonly Func _isCompletedMethod; - private readonly Func _getResultMethod; - private readonly Action _onCompletedMethod; - private readonly Action _unsafeOnCompletedMethod; - - public Awaiter( - object customAwaiter, - Func isCompletedMethod, - Func getResultMethod, - Action onCompletedMethod, - Action unsafeOnCompletedMethod) - { - _customAwaiter = customAwaiter; - _isCompletedMethod = isCompletedMethod; - _getResultMethod = getResultMethod; - _onCompletedMethod = onCompletedMethod; - _unsafeOnCompletedMethod = unsafeOnCompletedMethod; - } - - public bool IsCompleted => _isCompletedMethod(_customAwaiter); - - public object GetResult() - { - return _getResultMethod(_customAwaiter); - } - - public void OnCompleted(Action continuation) - { - _onCompletedMethod(_customAwaiter, continuation); - } - - public void UnsafeOnCompleted(Action continuation) - { - // If the underlying awaitable implements ICriticalNotifyCompletion, use its UnsafeOnCompleted. - // If not, fall back on using its OnCompleted. - // - // Why this is safe: - // - Implementing ICriticalNotifyCompletion is a way of saying the caller can choose whether it - // needs the execution context to be preserved (which it signals by calling OnCompleted), or - // that it doesn't (which it signals by calling UnsafeOnCompleted). Obviously it's faster *not* - // to preserve and restore the context, so we prefer that where possible. - // - If a caller doesn't need the execution context to be preserved and hence calls UnsafeOnCompleted, - // there's no harm in preserving it anyway - it's just a bit of wasted cost. That's what will happen - // if a caller sees that the proxy implements ICriticalNotifyCompletion but the proxy chooses to - // pass the call on to the underlying awaitable's OnCompleted method. - - var underlyingMethodToUse = _unsafeOnCompletedMethod ?? _onCompletedMethod; - underlyingMethodToUse(_customAwaiter, continuation); - } - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs b/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs deleted file mode 100644 index 9ec3c41..0000000 --- a/src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Linq; -using System.Linq.Expressions; -using System.Reflection; -using System.Threading; -using System.Threading.Tasks; - -namespace Microsoft.Extensions.Internal -{ - /// - /// Helper for detecting whether a given type is FSharpAsync`1, and if so, supplying - /// an for mapping instances of that type to a C# awaitable. - /// - /// - /// The main design goal here is to avoid taking a compile-time dependency on - /// FSharp.Core.dll, because non-F# applications wouldn't use it. So all the references - /// to FSharp types have to be constructed dynamically at runtime. - /// - internal static class ObjectMethodExecutorFSharpSupport - { - private static readonly object _fsharpValuesCacheLock = new object(); - private static Assembly _fsharpCoreAssembly; - private static MethodInfo _fsharpAsyncStartAsTaskGenericMethod; - private static PropertyInfo _fsharpOptionOfTaskCreationOptionsNoneProperty; - private static PropertyInfo _fsharpOptionOfCancellationTokenNoneProperty; - - public static bool TryBuildCoercerFromFSharpAsyncToAwaitable( - Type possibleFSharpAsyncType, - out Expression coerceToAwaitableExpression, - out Type awaitableType) - { - var methodReturnGenericType = possibleFSharpAsyncType.IsGenericType - ? possibleFSharpAsyncType.GetGenericTypeDefinition() - : null; - - if (!IsFSharpAsyncOpenGenericType(methodReturnGenericType)) - { - coerceToAwaitableExpression = null; - awaitableType = null; - return false; - } - - var awaiterResultType = possibleFSharpAsyncType.GetGenericArguments().Single(); - awaitableType = typeof(Task<>).MakeGenericType(awaiterResultType); - - // coerceToAwaitableExpression = (object fsharpAsync) => - // { - // return (object)FSharpAsync.StartAsTask( - // (Microsoft.FSharp.Control.FSharpAsync)fsharpAsync, - // FSharpOption.None, - // FSharpOption.None); - // }; - var startAsTaskClosedMethod = _fsharpAsyncStartAsTaskGenericMethod - .MakeGenericMethod(awaiterResultType); - var coerceToAwaitableParam = Expression.Parameter(typeof(object)); - coerceToAwaitableExpression = Expression.Lambda( - Expression.Convert( - Expression.Call( - startAsTaskClosedMethod, - Expression.Convert(coerceToAwaitableParam, possibleFSharpAsyncType), - Expression.MakeMemberAccess(null, _fsharpOptionOfTaskCreationOptionsNoneProperty), - Expression.MakeMemberAccess(null, _fsharpOptionOfCancellationTokenNoneProperty)), - typeof(object)), - coerceToAwaitableParam); - - return true; - } - - private static bool IsFSharpAsyncOpenGenericType(Type possibleFSharpAsyncGenericType) - { - var typeFullName = possibleFSharpAsyncGenericType?.FullName; - if (!string.Equals(typeFullName, "Microsoft.FSharp.Control.FSharpAsync`1", StringComparison.Ordinal)) - { - return false; - } - - lock (_fsharpValuesCacheLock) - { - if (_fsharpCoreAssembly != null) - { - return possibleFSharpAsyncGenericType.Assembly == _fsharpCoreAssembly; - } - - return TryPopulateFSharpValueCaches(possibleFSharpAsyncGenericType); - } - } - - private static bool TryPopulateFSharpValueCaches(Type possibleFSharpAsyncGenericType) - { - var assembly = possibleFSharpAsyncGenericType.Assembly; - var fsharpOptionType = assembly.GetType("Microsoft.FSharp.Core.FSharpOption`1"); - var fsharpAsyncType = assembly.GetType("Microsoft.FSharp.Control.FSharpAsync"); - - if (fsharpOptionType == null || fsharpAsyncType == null) - { - return false; - } - - // Get a reference to FSharpOption.None - var fsharpOptionOfTaskCreationOptionsType = fsharpOptionType - .MakeGenericType(typeof(TaskCreationOptions)); - _fsharpOptionOfTaskCreationOptionsNoneProperty = fsharpOptionOfTaskCreationOptionsType - .GetTypeInfo() - .GetRuntimeProperty("None"); - - // Get a reference to FSharpOption.None - var fsharpOptionOfCancellationTokenType = fsharpOptionType - .MakeGenericType(typeof(CancellationToken)); - _fsharpOptionOfCancellationTokenNoneProperty = fsharpOptionOfCancellationTokenType - .GetTypeInfo() - .GetRuntimeProperty("None"); - - // Get a reference to FSharpAsync.StartAsTask<> - var fsharpAsyncMethods = fsharpAsyncType - .GetRuntimeMethods() - .Where(m => m.Name.Equals("StartAsTask", StringComparison.Ordinal)); - foreach (var candidateMethodInfo in fsharpAsyncMethods) - { - var parameters = candidateMethodInfo.GetParameters(); - if (parameters.Length == 3 - && TypesHaveSameIdentity(parameters[0].ParameterType, possibleFSharpAsyncGenericType) - && parameters[1].ParameterType == fsharpOptionOfTaskCreationOptionsType - && parameters[2].ParameterType == fsharpOptionOfCancellationTokenType) - { - // This really does look like the correct method (and hence assembly). - _fsharpAsyncStartAsTaskGenericMethod = candidateMethodInfo; - _fsharpCoreAssembly = assembly; - break; - } - } - - return _fsharpCoreAssembly != null; - } - - private static bool TypesHaveSameIdentity(Type type1, Type type2) - { - return type1.Assembly == type2.Assembly - && string.Equals(type1.Namespace, type2.Namespace, StringComparison.Ordinal) - && string.Equals(type1.Name, type2.Name, StringComparison.Ordinal); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/IMessageSender.cs b/src/DotNetCore.CAP/Internal/IMessageSender.cs index de0143a..94fedc0 100644 --- a/src/DotNetCore.CAP/Internal/IMessageSender.cs +++ b/src/DotNetCore.CAP/Internal/IMessageSender.cs @@ -3,11 +3,12 @@ using System.Threading.Tasks; using DotNetCore.CAP.Persistence; +using JetBrains.Annotations; namespace DotNetCore.CAP.Internal { public interface IMessageSender { - Task SendAsync(MediumMessage message); + Task SendAsync([NotNull] MediumMessage message); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs index dc55fe0..4f0aaab 100644 --- a/src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs @@ -3,6 +3,7 @@ using System.Threading; using System.Threading.Tasks; +using JetBrains.Annotations; namespace DotNetCore.CAP.Internal { @@ -16,6 +17,6 @@ namespace DotNetCore.CAP.Internal /// /// consumer execute context /// The object of . - Task InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default); + Task InvokeAsync([NotNull] ConsumerContext context, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/AwaitableInfo.cs b/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/AwaitableInfo.cs index 7046303..cdf9fc7 100644 --- a/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/AwaitableInfo.cs +++ b/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/AwaitableInfo.cs @@ -6,9 +6,10 @@ using System.Linq; using System.Reflection; using System.Runtime.CompilerServices; +// ReSharper disable once CheckNamespace namespace Microsoft.Extensions.Internal { - internal struct AwaitableInfo + internal readonly struct AwaitableInfo { public Type AwaiterType { get; } public PropertyInfo AwaiterIsCompletedProperty { get; } @@ -43,8 +44,7 @@ namespace Microsoft.Extensions.Internal // Awaitable must have method matching "object GetAwaiter()" var getAwaiterMethod = type.GetRuntimeMethods().FirstOrDefault(m => m.Name.Equals("GetAwaiter", StringComparison.OrdinalIgnoreCase) - && m.GetParameters().Length == 0 - && m.ReturnType != null); + && m.GetParameters().Length == 0); if (getAwaiterMethod == null) { awaitableInfo = default(AwaitableInfo); diff --git a/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/CoercedAwaitableInfo.cs b/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/CoercedAwaitableInfo.cs index 3f923e0..39c3475 100644 --- a/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/CoercedAwaitableInfo.cs +++ b/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/CoercedAwaitableInfo.cs @@ -4,9 +4,10 @@ using System; using System.Linq.Expressions; +// ReSharper disable once CheckNamespace namespace Microsoft.Extensions.Internal { - internal struct CoercedAwaitableInfo + internal readonly struct CoercedAwaitableInfo { public AwaitableInfo AwaitableInfo { get; } public Expression CoercerExpression { get; } diff --git a/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs b/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs index ed2fee8..29012ea 100644 --- a/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs +++ b/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs @@ -4,6 +4,7 @@ using System; using System.Runtime.CompilerServices; +// ReSharper disable once CheckNamespace namespace Microsoft.Extensions.Internal { /// @@ -11,7 +12,7 @@ namespace Microsoft.Extensions.Internal /// return, regardless of whether the underlying value is a System.Task, an FSharpAsync, or an /// application-defined custom awaitable. /// - internal struct ObjectMethodExecutorAwaitable + internal readonly struct ObjectMethodExecutorAwaitable { private readonly object _customAwaitable; private readonly Func _getAwaiterMethod; diff --git a/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs b/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs index 9ec3c41..4164e35 100644 --- a/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs +++ b/src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs @@ -8,6 +8,7 @@ using System.Reflection; using System.Threading; using System.Threading.Tasks; +// ReSharper disable once CheckNamespace namespace Microsoft.Extensions.Internal { /// diff --git a/src/DotNetCore.CAP/Transport/IConsumerClient.cs b/src/DotNetCore.CAP/Transport/IConsumerClient.cs index 926b9f7..cf1d5ed 100644 --- a/src/DotNetCore.CAP/Transport/IConsumerClient.cs +++ b/src/DotNetCore.CAP/Transport/IConsumerClient.cs @@ -23,7 +23,7 @@ namespace DotNetCore.CAP.Transport /// /// Names of the requested topics /// Topic identifiers - ICollection FetchTopics(IEnumerable topicNames) + ICollection FetchTopics([NotNull] IEnumerable topicNames) { return topicNames.ToList(); } @@ -32,7 +32,7 @@ namespace DotNetCore.CAP.Transport /// Subscribe to a set of topics to the message queue /// /// - void Subscribe(IEnumerable topics); + void Subscribe([NotNull] IEnumerable topics); /// /// Start listening From c5a07e7417c56740822a5687a586b488bce8916d Mon Sep 17 00:00:00 2001 From: Savorboard Date: Fri, 3 Sep 2021 15:38:35 +0800 Subject: [PATCH 7/8] Add support scanning subscribers from the assembly. (#993) --- .../wwwroot/public/index.html | 4 +- .../ITransport.RabbitMQ.cs | 6 +-- src/DotNetCore.CAP/CAP.Builder.cs | 38 +++++++++++++++ src/DotNetCore.CAP/Internal/Helper.cs | 5 ++ .../IConsumerServiceSelector.Assembly.cs | 47 +++++++++++++++++++ .../Internal/ISubscribeDispatcher.Default.cs | 6 ++- .../Internal/ISubscribeInvoker.Default.cs | 5 +- .../Internal/LoggerExtensions.cs | 9 +++- 8 files changed, 106 insertions(+), 14 deletions(-) create mode 100644 src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Assembly.cs diff --git a/src/DotNetCore.CAP.Dashboard/wwwroot/public/index.html b/src/DotNetCore.CAP.Dashboard/wwwroot/public/index.html index a73150e..7f4cfbb 100644 --- a/src/DotNetCore.CAP.Dashboard/wwwroot/public/index.html +++ b/src/DotNetCore.CAP.Dashboard/wwwroot/public/index.html @@ -6,9 +6,7 @@ - - <%= htmlWebpackPlugin.options.title %> - + CAP Dashboard diff --git a/src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs b/src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs index 61306ec..0a19789 100644 --- a/src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs +++ b/src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs @@ -40,15 +40,15 @@ namespace DotNetCore.CAP.RabbitMQ var props = channel.CreateBasicProperties(); props.DeliveryMode = 2; - props.Headers = message.Headers.ToDictionary(x => x.Key, x => (object) x.Value); - + props.Headers = message.Headers.ToDictionary(x => x.Key, x => (object)x.Value); + channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true); channel.BasicPublish(_exchange, message.GetName(), props, message.Body); channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5)); - _logger.LogDebug($"RabbitMQ topic message [{message.GetName()}] has been published."); + _logger.LogInformation("CAP message '{0}' published, internal id '{1}'", message.GetName(), message.GetId()); return Task.FromResult(OperateResult.Success); } diff --git a/src/DotNetCore.CAP/CAP.Builder.cs b/src/DotNetCore.CAP/CAP.Builder.cs index 76f012f..8cbf830 100644 --- a/src/DotNetCore.CAP/CAP.Builder.cs +++ b/src/DotNetCore.CAP/CAP.Builder.cs @@ -1,9 +1,15 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System; +using System.Linq; +using System.Reflection; using DotNetCore.CAP.Filter; +using DotNetCore.CAP.Internal; +using JetBrains.Annotations; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +// ReSharper disable UnusedMember.Global namespace DotNetCore.CAP { @@ -43,10 +49,42 @@ namespace DotNetCore.CAP /// public IServiceCollection Services { get; } + /// + /// Registers subscribers filter. + /// + /// Type of filter public CapBuilder AddSubscribeFilter() where T : class, ISubscribeFilter { Services.TryAddScoped(); return this; } + + /// + /// Registers subscribers from the specified assemblies. + /// + /// Assemblies to scan subscriber + public CapBuilder AddSubscriberAssembly(params Assembly[] assemblies) + { + if (assemblies == null) throw new ArgumentNullException(nameof(assemblies)); + + Services.Replace(new ServiceDescriptor(typeof(IConsumerServiceSelector), + x => new AssemblyConsumerServiceSelector(x, assemblies), + ServiceLifetime.Singleton)); + + return this; + } + + /// + /// Registers subscribers from the specified types. + /// + /// + public CapBuilder AddSubscriberAssembly([NotNull] params Type[] handlerAssemblyMarkerTypes) + { + if (handlerAssemblyMarkerTypes == null) throw new ArgumentNullException(nameof(handlerAssemblyMarkerTypes)); + + AddSubscriberAssembly(handlerAssemblyMarkerTypes.Select(t => t.GetTypeInfo().Assembly).ToArray()); + + return this; + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/Helper.cs b/src/DotNetCore.CAP/Internal/Helper.cs index 65cadef..814ce2a 100644 --- a/src/DotNetCore.CAP/Internal/Helper.cs +++ b/src/DotNetCore.CAP/Internal/Helper.cs @@ -35,6 +35,11 @@ namespace DotNetCore.CAP.Internal return false; } + if (!typeInfo.ContainsGenericParameters) + { + return false; + } + return !typeInfo.ContainsGenericParameters && typeInfo.Name.EndsWith("Controller", StringComparison.OrdinalIgnoreCase); } diff --git a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Assembly.cs b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Assembly.cs new file mode 100644 index 0000000..adf1bdc --- /dev/null +++ b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Assembly.cs @@ -0,0 +1,47 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; + +namespace DotNetCore.CAP.Internal +{ + /// + /// + /// A implementation that scanning subscribers from the assembly. + /// + public class AssemblyConsumerServiceSelector : ConsumerServiceSelector + { + private readonly Assembly[] _assemblies; + + public AssemblyConsumerServiceSelector(IServiceProvider serviceProvider, Assembly[] assemblies) : base(serviceProvider) + { + _assemblies = assemblies; + } + + protected override IEnumerable FindConsumersFromInterfaceTypes(IServiceProvider provider) + { + var descriptors = new List(); + + descriptors.AddRange(base.FindConsumersFromInterfaceTypes(provider)); + + var assembliesToScan = _assemblies.Distinct().ToArray(); + + var capSubscribeTypeInfo = typeof(ICapSubscribe).GetTypeInfo(); + + foreach (var type in assembliesToScan.SelectMany(a => a.DefinedTypes)) + { + if (!capSubscribeTypeInfo.IsAssignableFrom(type)) + { + continue; + } + + descriptors.AddRange(GetTopicAttributesDescription(type)); + } + + return descriptors; + } + } +} diff --git a/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs index 8c57915..84c521a 100644 --- a/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs @@ -89,6 +89,8 @@ namespace DotNetCore.CAP.Internal try { + _logger.ConsumerExecuting(descriptor.MethodInfo.Name); + var sp = Stopwatch.StartNew(); await InvokeConsumerMethodAsync(message, descriptor, cancellationToken); @@ -97,7 +99,7 @@ namespace DotNetCore.CAP.Internal await SetSuccessfulState(message); - _logger.ConsumerExecuted(sp.Elapsed.TotalMilliseconds); + _logger.ConsumerExecuted(descriptor.MethodInfo.Name, sp.Elapsed.TotalMilliseconds); return (false, OperateResult.Success); } @@ -183,7 +185,7 @@ namespace DotNetCore.CAP.Internal [Headers.CorrelationSequence] = (message.Origin.GetCorrelationSequence() + 1).ToString() }; - await _provider.GetService().PublishAsync(ret.CallbackName, ret.Result, header, cancellationToken); + await _provider.GetRequiredService().PublishAsync(ret.CallbackName, ret.Result, header, cancellationToken); } } catch (OperationCanceledException) diff --git a/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs index 0b0c645..2445194 100644 --- a/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs @@ -40,10 +40,7 @@ namespace DotNetCore.CAP.Internal var methodHandle = methodInfo.MethodHandle.Value; var key = $"{reflectedTypeHandle}_{methodHandle}"; - _logger.LogDebug("Executing subscriber method : {0}", methodInfo.Name); - - var executor = _executors.GetOrAdd(key, - x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo)); + var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo)); using var scope = _serviceProvider.CreateScope(); diff --git a/src/DotNetCore.CAP/Internal/LoggerExtensions.cs b/src/DotNetCore.CAP/Internal/LoggerExtensions.cs index e71e8ee..235bee1 100644 --- a/src/DotNetCore.CAP/Internal/LoggerExtensions.cs +++ b/src/DotNetCore.CAP/Internal/LoggerExtensions.cs @@ -45,9 +45,14 @@ namespace DotNetCore.CAP.Internal logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}"); } - public static void ConsumerExecuted(this ILogger logger, double milliseconds) + public static void ConsumerExecuting(this ILogger logger, string methodName) { - logger.LogDebug($"Consumer executed. Took: {milliseconds} ms."); + logger.LogInformation($"Executing subscriber method '{methodName}'"); + } + + public static void ConsumerExecuted(this ILogger logger, string methodName, double milliseconds) + { + logger.LogInformation($"Executed subscriber method '{methodName}' in {milliseconds} ms"); } public static void ServerStarting(this ILogger logger) From 1a9ca48d26cf9c20245f9b9cb0ab6505f3816bee Mon Sep 17 00:00:00 2001 From: Savorboard Date: Fri, 3 Sep 2021 16:03:30 +0800 Subject: [PATCH 8/8] Fixes unit test. --- src/DotNetCore.CAP/Internal/Helper.cs | 2 +- test/DotNetCore.CAP.Test/HelperTest.cs | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/DotNetCore.CAP/Internal/Helper.cs b/src/DotNetCore.CAP/Internal/Helper.cs index 814ce2a..2650554 100644 --- a/src/DotNetCore.CAP/Internal/Helper.cs +++ b/src/DotNetCore.CAP/Internal/Helper.cs @@ -35,7 +35,7 @@ namespace DotNetCore.CAP.Internal return false; } - if (!typeInfo.ContainsGenericParameters) + if (typeInfo.ContainsGenericParameters) { return false; } diff --git a/test/DotNetCore.CAP.Test/HelperTest.cs b/test/DotNetCore.CAP.Test/HelperTest.cs index 2b2b3b1..fcb550c 100644 --- a/test/DotNetCore.CAP.Test/HelperTest.cs +++ b/test/DotNetCore.CAP.Test/HelperTest.cs @@ -33,6 +33,19 @@ namespace DotNetCore.CAP.Test Assert.True(result); } + [Fact] + public void IsControllerAbstractTest() + { + //Arrange + var typeInfo = typeof(AbstractController).GetTypeInfo(); + + //Act + var result = Helper.IsController(typeInfo); + + //Assert + Assert.False(result); + } + [Theory] [InlineData(typeof(string))] [InlineData(typeof(decimal))] @@ -76,4 +89,9 @@ namespace DotNetCore.CAP.Test { } + + public abstract class AbstractController + { + + } }