瀏覽代碼

Merge branch 'master' of https://github.com/dotnetcore/CAP

master
Savorboard 3 年之前
父節點
當前提交
6b8c78f2c5
共有 32 個文件被更改,包括 278 次插入822 次删除
  1. +1
    -1
      build/version.props
  2. +1
    -0
      docs/content/user-guide/en/transport/general.md
  3. +56
    -0
      docs/content/user-guide/en/transport/nats.md
  4. +1
    -0
      docs/content/user-guide/zh/transport/general.md
  5. +57
    -0
      docs/content/user-guide/zh/transport/nats.md
  6. +2
    -0
      docs/mkdocs.yml
  7. +4
    -5
      src/DotNetCore.CAP.Dashboard/DotNetCore.CAP.Dashboard.csproj
  8. +0
    -128
      src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/AwaitableInfo.cs
  9. +0
    -56
      src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/CoercedAwaitableInfo.cs
  10. +0
    -338
      src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutor.cs
  11. +0
    -118
      src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs
  12. +0
    -145
      src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs
  13. +1
    -3
      src/DotNetCore.CAP.Dashboard/wwwroot/public/index.html
  14. +6
    -1
      src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs
  15. +9
    -4
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
  16. +2
    -2
      src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs
  17. +3
    -3
      src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs
  18. +38
    -0
      src/DotNetCore.CAP/CAP.Builder.cs
  19. +5
    -0
      src/DotNetCore.CAP/Internal/Helper.cs
  20. +1
    -1
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  21. +47
    -0
      src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Assembly.cs
  22. +2
    -1
      src/DotNetCore.CAP/Internal/IMessageSender.cs
  23. +4
    -2
      src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs
  24. +1
    -4
      src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs
  25. +2
    -1
      src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs
  26. +7
    -2
      src/DotNetCore.CAP/Internal/LoggerExtensions.cs
  27. +3
    -3
      src/DotNetCore.CAP/Internal/ObjectMethodExecutor/AwaitableInfo.cs
  28. +2
    -1
      src/DotNetCore.CAP/Internal/ObjectMethodExecutor/CoercedAwaitableInfo.cs
  29. +2
    -1
      src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs
  30. +1
    -0
      src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs
  31. +2
    -2
      src/DotNetCore.CAP/Transport/IConsumerClient.cs
  32. +18
    -0
      test/DotNetCore.CAP.Test/HelperTest.cs

+ 1
- 1
build/version.props 查看文件

@@ -2,7 +2,7 @@
<PropertyGroup>
<VersionMajor>5</VersionMajor>
<VersionMinor>1</VersionMinor>
<VersionPatch>3</VersionPatch>
<VersionPatch>4</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>


+ 1
- 0
docs/content/user-guide/en/transport/general.md 查看文件

@@ -10,6 +10,7 @@ CAP supports several transport methods:
* [Kafka](kafka.md)
* [Azure Service Bus](azure-service-bus.md)
* [Amazon SQS](aws-sqs.md)
* [NATS](nats.md)
* [In-Memory Queue](in-memory-queue.md)
* [Redis Streams](redis-streams.md)



+ 56
- 0
docs/content/user-guide/en/transport/nats.md 查看文件

@@ -0,0 +1,56 @@
# NATS

[NATS](https://nats.io/) is a simple, secure and performant communications system for digital systems, services and devices. NATS is part of the Cloud Native Computing Foundation (CNCF).

## Configuration

To use NATS transporter, you need to install the following package from NuGet:

```powershell

PM> Install-Package DotNetCore.CAP.NATS

```

Then you can add configuration items to the `ConfigureServices` method of `Startup.cs`.

```csharp

public void ConfigureServices(IServiceCollection services)
{
services.AddCap(capOptions =>
{
capOptions.UseNATS(natsOptions=>{
//NATS Options
});
});
}

```

#### NATS Options

NATS configuration parameters provided directly by the CAP:

NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---
Options | NATS client configuration | Options | Options
Servers | Server url/urls used to connect to the NATs server. | string | NULL
ConnectionPoolSize | number of connections pool | uint | 10

#### NATS ConfigurationOptions

If you need **more** native NATS related configuration options, you can set them in the `Options` option:

```csharp
services.AddCap(capOptions =>
{
capOptions.UseNATS(natsOptions=>
{
// NATS options.
natsOptions.Options.Url="";
});
});
```

`Options` is a NATS.Client ConfigurationOptions , you can find more details through this [link](http://nats-io.github.io/nats.net/class_n_a_t_s_1_1_client_1_1_options.html)

+ 1
- 0
docs/content/user-guide/zh/transport/general.md 查看文件

@@ -10,6 +10,7 @@ CAP 支持以下几种运输方式:
* [Kafka](kafka.md)
* [Azure Service Bus](azure-service-bus.md)
* [Amazon SQS](aws-sqs.md)
* [NATS](nats.md)
* [In-Memory Queue](in-memory-queue.md)
* [Redis Streams](redis-streams.md)



+ 57
- 0
docs/content/user-guide/zh/transport/nats.md 查看文件

@@ -0,0 +1,57 @@
# NATS

[NATS](https://nats.io/)是一个简单、安全、高性能的数字系统、服务和设备通信系统。NATS 是 CNCF 的一部分。

## 配置

要使用NATS 传输器,你需要安装下面的NuGet包:

```powershell

PM> Install-Package DotNetCore.CAP.NATS

```

你可以通过在 `Startup.cs` 文件中配置 `ConfigureServices` 来添加配置:

```csharp

public void ConfigureServices(IServiceCollection services)
{
services.AddCap(capOptions =>
{
capOptions.UseNATS(natsOptions=>{
//NATS Options
});
});
}

```

#### NATS 配置

CAP 直接提供的关于 NATS 的配置参数:


NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---
Options | NATS 客户端配置 | Options | Options
Servers | 服务器Urls地址 | string | NULL
ConnectionPoolSize | 连接池数量 | uint | 10

#### NATS ConfigurationOptions

如果你需要 **更多** 原生相关的配置项,可以通过 `Options` 配置项进行设定:

```csharp
services.AddCap(capOptions =>
{
capOptions.UseNATS(natsOptions=>
{
// NATS options.
natsOptions.Options.Url="";
});
});
```

`Options` 是 NATS.Client 客户端提供的配置, 你可以在这个[链接](http://nats-io.github.io/nats.net/class_n_a_t_s_1_1_client_1_1_options.html)找到更多详细信息。

+ 2
- 0
docs/mkdocs.yml 查看文件

@@ -98,6 +98,7 @@ nav:
- Amazon SQS: user-guide/en/transport/aws-sqs.md
- Apache Kafka®: user-guide/en/transport/kafka.md
- Azure Service Bus: user-guide/en/transport/azure-service-bus.md
- NATS: user-guide/en/transport/nats.md
- RabbitMQ: user-guide/en/transport/rabbitmq.md
- Redis Streams: user-guide/en/transport/redis-streams.md
- In-Memory Queue: user-guide/en/transport/in-memory-queue.md
@@ -133,6 +134,7 @@ nav:
- Amazon SQS: user-guide/zh/transport/aws-sqs.md
- Apache Kafka®: user-guide/zh/transport/kafka.md
- Azure Service Bus: user-guide/zh/transport/azure-service-bus.md
- NATS: user-guide/zh/transport/nats.md
- RabbitMQ: user-guide/zh/transport/rabbitmq.md
- Redis Streams: user-guide/zh/transport/redis-streams.md
- In-Memory Queue: user-guide/zh/transport/in-memory-queue.md


+ 4
- 5
src/DotNetCore.CAP.Dashboard/DotNetCore.CAP.Dashboard.csproj 查看文件

@@ -6,19 +6,18 @@
</PropertyGroup>

<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Consul" Version="1.6.1.1" />
<SupportedPlatform Include="browser" />
</ItemGroup>

<ItemGroup>
<EmbeddedResource Include="wwwroot/dist/**/*" Exclude="**/*/*.map" />
</ItemGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Consul" Version="1.6.1.1" />
<Compile Include="..\DotNetCore.CAP\Internal\ObjectMethodExecutor\*.cs" Link="ObjectMethodExecutor\%(Filename)%(Extension)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</ItemGroup>

</Project>

+ 0
- 128
src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/AwaitableInfo.cs 查看文件

@@ -1,128 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;

namespace Microsoft.Extensions.Internal
{
internal struct AwaitableInfo
{
public Type AwaiterType { get; }
public PropertyInfo AwaiterIsCompletedProperty { get; }
public MethodInfo AwaiterGetResultMethod { get; }
public MethodInfo AwaiterOnCompletedMethod { get; }
public MethodInfo AwaiterUnsafeOnCompletedMethod { get; }
public Type ResultType { get; }
public MethodInfo GetAwaiterMethod { get; }

public AwaitableInfo(
Type awaiterType,
PropertyInfo awaiterIsCompletedProperty,
MethodInfo awaiterGetResultMethod,
MethodInfo awaiterOnCompletedMethod,
MethodInfo awaiterUnsafeOnCompletedMethod,
Type resultType,
MethodInfo getAwaiterMethod)
{
AwaiterType = awaiterType;
AwaiterIsCompletedProperty = awaiterIsCompletedProperty;
AwaiterGetResultMethod = awaiterGetResultMethod;
AwaiterOnCompletedMethod = awaiterOnCompletedMethod;
AwaiterUnsafeOnCompletedMethod = awaiterUnsafeOnCompletedMethod;
ResultType = resultType;
GetAwaiterMethod = getAwaiterMethod;
}

public static bool IsTypeAwaitable(Type type, out AwaitableInfo awaitableInfo)
{
// Based on Roslyn code: http://source.roslyn.io/#Microsoft.CodeAnalysis.Workspaces/Shared/Extensions/ISymbolExtensions.cs,db4d48ba694b9347

// Awaitable must have method matching "object GetAwaiter()"
var getAwaiterMethod = type.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetAwaiter", StringComparison.OrdinalIgnoreCase)
&& m.GetParameters().Length == 0
&& m.ReturnType != null);
if (getAwaiterMethod == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}

var awaiterType = getAwaiterMethod.ReturnType;

// Awaiter must have property matching "bool IsCompleted { get; }"
var isCompletedProperty = awaiterType.GetRuntimeProperties().FirstOrDefault(p =>
p.Name.Equals("IsCompleted", StringComparison.OrdinalIgnoreCase)
&& p.PropertyType == typeof(bool)
&& p.GetMethod != null);
if (isCompletedProperty == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}

// Awaiter must implement INotifyCompletion
var awaiterInterfaces = awaiterType.GetInterfaces();
var implementsINotifyCompletion = awaiterInterfaces.Any(t => t == typeof(INotifyCompletion));
if (!implementsINotifyCompletion)
{
awaitableInfo = default(AwaitableInfo);
return false;
}

// INotifyCompletion supplies a method matching "void OnCompleted(Action action)"
var iNotifyCompletionMap = awaiterType
.GetTypeInfo()
.GetRuntimeInterfaceMap(typeof(INotifyCompletion));
var onCompletedMethod = iNotifyCompletionMap.InterfaceMethods.Single(m =>
m.Name.Equals("OnCompleted", StringComparison.OrdinalIgnoreCase)
&& m.ReturnType == typeof(void)
&& m.GetParameters().Length == 1
&& m.GetParameters()[0].ParameterType == typeof(Action));

// Awaiter optionally implements ICriticalNotifyCompletion
var implementsICriticalNotifyCompletion =
awaiterInterfaces.Any(t => t == typeof(ICriticalNotifyCompletion));
MethodInfo unsafeOnCompletedMethod;
if (implementsICriticalNotifyCompletion)
{
// ICriticalNotifyCompletion supplies a method matching "void UnsafeOnCompleted(Action action)"
var iCriticalNotifyCompletionMap = awaiterType
.GetTypeInfo()
.GetRuntimeInterfaceMap(typeof(ICriticalNotifyCompletion));
unsafeOnCompletedMethod = iCriticalNotifyCompletionMap.InterfaceMethods.Single(m =>
m.Name.Equals("UnsafeOnCompleted", StringComparison.OrdinalIgnoreCase)
&& m.ReturnType == typeof(void)
&& m.GetParameters().Length == 1
&& m.GetParameters()[0].ParameterType == typeof(Action));
}
else
{
unsafeOnCompletedMethod = null;
}

// Awaiter must have method matching "void GetResult" or "T GetResult()"
var getResultMethod = awaiterType.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetResult")
&& m.GetParameters().Length == 0);
if (getResultMethod == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}

awaitableInfo = new AwaitableInfo(
awaiterType,
isCompletedProperty,
getResultMethod,
onCompletedMethod,
unsafeOnCompletedMethod,
getResultMethod.ReturnType,
getAwaiterMethod);
return true;
}
}
}

+ 0
- 56
src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/CoercedAwaitableInfo.cs 查看文件

@@ -1,56 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Linq.Expressions;

namespace Microsoft.Extensions.Internal
{
internal struct CoercedAwaitableInfo
{
public AwaitableInfo AwaitableInfo { get; }
public Expression CoercerExpression { get; }
public Type CoercerResultType { get; }
public bool RequiresCoercion => CoercerExpression != null;

public CoercedAwaitableInfo(AwaitableInfo awaitableInfo)
{
AwaitableInfo = awaitableInfo;
CoercerExpression = null;
CoercerResultType = null;
}

public CoercedAwaitableInfo(Expression coercerExpression, Type coercerResultType,
AwaitableInfo coercedAwaitableInfo)
{
CoercerExpression = coercerExpression;
CoercerResultType = coercerResultType;
AwaitableInfo = coercedAwaitableInfo;
}

public static bool IsTypeAwaitable(Type type, out CoercedAwaitableInfo info)
{
if (AwaitableInfo.IsTypeAwaitable(type, out var directlyAwaitableInfo))
{
info = new CoercedAwaitableInfo(directlyAwaitableInfo);
return true;
}

// It's not directly awaitable, but maybe we can coerce it.
// Currently we support coercing FSharpAsync<T>.
if (ObjectMethodExecutorFSharpSupport.TryBuildCoercerFromFSharpAsyncToAwaitable(type,
out var coercerExpression,
out var coercerResultType))
{
if (AwaitableInfo.IsTypeAwaitable(coercerResultType, out var coercedAwaitableInfo))
{
info = new CoercedAwaitableInfo(coercerExpression, coercerResultType, coercedAwaitableInfo);
return true;
}
}

info = default(CoercedAwaitableInfo);
return false;
}
}
}

+ 0
- 338
src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutor.cs 查看文件

@@ -1,338 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Reflection;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.Internal
{
internal class ObjectMethodExecutor
{
// ReSharper disable once InconsistentNaming
private static readonly ConstructorInfo _objectMethodExecutorAwaitableConstructor =
typeof(ObjectMethodExecutorAwaitable).GetConstructor(new[]
{
typeof(object), // customAwaitable
typeof(Func<object, object>), // getAwaiterMethod
typeof(Func<object, bool>), // isCompletedMethod
typeof(Func<object, object>), // getResultMethod
typeof(Action<object, Action>), // onCompletedMethod
typeof(Action<object, Action>) // unsafeOnCompletedMethod
});

private readonly MethodExecutor _executor;
private readonly MethodExecutorAsync _executorAsync;
private readonly object[] _parameterDefaultValues;

private ObjectMethodExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo, object[] parameterDefaultValues)
{
if (methodInfo == null)
{
throw new ArgumentNullException(nameof(methodInfo));
}

MethodInfo = methodInfo;
MethodParameters = methodInfo.GetParameters();
TargetTypeInfo = targetTypeInfo;
MethodReturnType = methodInfo.ReturnType;

var isAwaitable = CoercedAwaitableInfo.IsTypeAwaitable(MethodReturnType, out var coercedAwaitableInfo);

IsMethodAsync = isAwaitable;
AsyncResultType = isAwaitable ? coercedAwaitableInfo.AwaitableInfo.ResultType : null;

// Upstream code may prefer to use the sync-executor even for async methods, because if it knows
// that the result is a specific Task<T> where T is known, then it can directly cast to that type
// and await it without the extra heap allocations involved in the _executorAsync code path.
_executor = GetExecutor(methodInfo, targetTypeInfo);

if (IsMethodAsync)
{
_executorAsync = GetExecutorAsync(methodInfo, targetTypeInfo, coercedAwaitableInfo);
}

_parameterDefaultValues = parameterDefaultValues;
}

public MethodInfo MethodInfo { get; }

public ParameterInfo[] MethodParameters { get; }

public TypeInfo TargetTypeInfo { get; }

public Type AsyncResultType { get; }

// This field is made internal set because it is set in unit tests.
public Type MethodReturnType { get; internal set; }

public bool IsMethodAsync { get; }

public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo)
{
return new ObjectMethodExecutor(methodInfo, targetTypeInfo, null);
}

public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo,
object[] parameterDefaultValues)
{
if (parameterDefaultValues == null)
{
throw new ArgumentNullException(nameof(parameterDefaultValues));
}

return new ObjectMethodExecutor(methodInfo, targetTypeInfo, parameterDefaultValues);
}

/// <summary>
/// Executes the configured method on <paramref name="target" />. This can be used whether or not
/// the configured method is asynchronous.
/// </summary>
/// <remarks>
/// Even if the target method is asynchronous, it's desirable to invoke it using Execute rather than
/// ExecuteAsync if you know at compile time what the return type is, because then you can directly
/// "await" that value (via a cast), and then the generated code will be able to reference the
/// resulting awaitable as a value-typed variable. If you use ExecuteAsync instead, the generated
/// code will have to treat the resulting awaitable as a boxed object, because it doesn't know at
/// compile time what type it would be.
/// </remarks>
/// <param name="target">The object whose method is to be executed.</param>
/// <param name="parameters">Parameters to pass to the method.</param>
/// <returns>The method return value.</returns>
public object Execute(object target, params object[] parameters)
{
return _executor(target, parameters);
}

/// <summary>
/// Executes the configured method on <paramref name="target" />. This can only be used if the configured
/// method is asynchronous.
/// </summary>
/// <remarks>
/// If you don't know at compile time the type of the method's returned awaitable, you can use ExecuteAsync,
/// which supplies an awaitable-of-object. This always works, but can incur several extra heap allocations
/// as compared with using Execute and then using "await" on the result value typecasted to the known
/// awaitable type. The possible extra heap allocations are for:
/// 1. The custom awaitable (though usually there's a heap allocation for this anyway, since normally
/// it's a reference type, and you normally create a new instance per call).
/// 2. The custom awaiter (whether or not it's a value type, since if it's not, you need a new instance
/// of it, and if it is, it will have to be boxed so the calling code can reference it as an object).
/// 3. The async result value, if it's a value type (it has to be boxed as an object, since the calling
/// code doesn't know what type it's going to be).
/// </remarks>
/// <param name="target">The object whose method is to be executed.</param>
/// <param name="parameters">Parameters to pass to the method.</param>
/// <returns>An object that you can "await" to get the method return value.</returns>
public ObjectMethodExecutorAwaitable ExecuteAsync(object target, params object[] parameters)
{
return _executorAsync(target, parameters);
}

public object GetDefaultValueForParameter(int index)
{
if (_parameterDefaultValues == null)
{
throw new InvalidOperationException(
$"Cannot call {nameof(GetDefaultValueForParameter)}, because no parameter default values were supplied.");
}

if (index < 0 || index > MethodParameters.Length - 1)
{
throw new ArgumentOutOfRangeException(nameof(index));
}

return _parameterDefaultValues[index];
}

private static MethodExecutor GetExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo)
{
// Parameters to executor
var targetParameter = Expression.Parameter(typeof(object), "target");
var parametersParameter = Expression.Parameter(typeof(object[]), "parameters");

// Build parameter list
var parameters = new List<Expression>();
var paramInfos = methodInfo.GetParameters();
for (var i = 0; i < paramInfos.Length; i++)
{
var paramInfo = paramInfos[i];
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);

// valueCast is "(Ti) parameters[i]"
parameters.Add(valueCast);
}

// Call method
var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType());
var methodCall = Expression.Call(instanceCast, methodInfo, parameters);

// methodCall is "((Ttarget) target) method((T0) parameters[0], (T1) parameters[1], ...)"
// Create function
if (methodCall.Type == typeof(void))
{
var lambda = Expression.Lambda<VoidMethodExecutor>(methodCall, targetParameter, parametersParameter);
var voidExecutor = lambda.Compile();
return WrapVoidMethod(voidExecutor);
}
else
{
// must coerce methodCall to match ActionExecutor signature
var castMethodCall = Expression.Convert(methodCall, typeof(object));
var lambda = Expression.Lambda<MethodExecutor>(castMethodCall, targetParameter, parametersParameter);
return lambda.Compile();
}
}

private static MethodExecutor WrapVoidMethod(VoidMethodExecutor executor)
{
return delegate(object target, object[] parameters)
{
executor(target, parameters);
return null;
};
}

private static MethodExecutorAsync GetExecutorAsync(
MethodInfo methodInfo,
TypeInfo targetTypeInfo,
CoercedAwaitableInfo coercedAwaitableInfo)
{
// Parameters to executor
var targetParameter = Expression.Parameter(typeof(object), "target");
var parametersParameter = Expression.Parameter(typeof(object[]), "parameters");

// Build parameter list
var parameters = new List<Expression>();
var paramInfos = methodInfo.GetParameters();
for (var i = 0; i < paramInfos.Length; i++)
{
var paramInfo = paramInfos[i];
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);

// valueCast is "(Ti) parameters[i]"
parameters.Add(valueCast);
}

// Call method
var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType());
var methodCall = Expression.Call(instanceCast, methodInfo, parameters);

// Using the method return value, construct an ObjectMethodExecutorAwaitable based on
// the info we have about its implementation of the awaitable pattern. Note that all
// the funcs/actions we construct here are precompiled, so that only one instance of
// each is preserved throughout the lifetime of the ObjectMethodExecutor.

// var getAwaiterFunc = (object awaitable) =>
// (object)((CustomAwaitableType)awaitable).GetAwaiter();
var customAwaitableParam = Expression.Parameter(typeof(object), "awaitable");
var awaitableInfo = coercedAwaitableInfo.AwaitableInfo;
var postCoercionMethodReturnType = coercedAwaitableInfo.CoercerResultType ?? methodInfo.ReturnType;
var getAwaiterFunc = Expression.Lambda<Func<object, object>>(
Expression.Convert(
Expression.Call(
Expression.Convert(customAwaitableParam, postCoercionMethodReturnType),
awaitableInfo.GetAwaiterMethod),
typeof(object)),
customAwaitableParam).Compile();

// var isCompletedFunc = (object awaiter) =>
// ((CustomAwaiterType)awaiter).IsCompleted;
var isCompletedParam = Expression.Parameter(typeof(object), "awaiter");
var isCompletedFunc = Expression.Lambda<Func<object, bool>>(
Expression.MakeMemberAccess(
Expression.Convert(isCompletedParam, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterIsCompletedProperty),
isCompletedParam).Compile();

var getResultParam = Expression.Parameter(typeof(object), "awaiter");
Func<object, object> getResultFunc;
if (awaitableInfo.ResultType == typeof(void))
{
getResultFunc = Expression.Lambda<Func<object, object>>(
Expression.Block(
Expression.Call(
Expression.Convert(getResultParam, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterGetResultMethod),
Expression.Constant(null)
),
getResultParam).Compile();
}
else
{
getResultFunc = Expression.Lambda<Func<object, object>>(
Expression.Convert(
Expression.Call(
Expression.Convert(getResultParam, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterGetResultMethod),
typeof(object)),
getResultParam).Compile();
}

// var onCompletedFunc = (object awaiter, Action continuation) => {
// ((CustomAwaiterType)awaiter).OnCompleted(continuation);
// };
var onCompletedParam1 = Expression.Parameter(typeof(object), "awaiter");
var onCompletedParam2 = Expression.Parameter(typeof(Action), "continuation");
var onCompletedFunc = Expression.Lambda<Action<object, Action>>(
Expression.Call(
Expression.Convert(onCompletedParam1, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterOnCompletedMethod,
onCompletedParam2),
onCompletedParam1,
onCompletedParam2).Compile();

Action<object, Action> unsafeOnCompletedFunc = null;
if (awaitableInfo.AwaiterUnsafeOnCompletedMethod != null)
{
// var unsafeOnCompletedFunc = (object awaiter, Action continuation) => {
// ((CustomAwaiterType)awaiter).UnsafeOnCompleted(continuation);
// };
var unsafeOnCompletedParam1 = Expression.Parameter(typeof(object), "awaiter");
var unsafeOnCompletedParam2 = Expression.Parameter(typeof(Action), "continuation");
unsafeOnCompletedFunc = Expression.Lambda<Action<object, Action>>(
Expression.Call(
Expression.Convert(unsafeOnCompletedParam1, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterUnsafeOnCompletedMethod,
unsafeOnCompletedParam2),
unsafeOnCompletedParam1,
unsafeOnCompletedParam2).Compile();
}

// If we need to pass the method call result through a coercer function to get an
// awaitable, then do so.
var coercedMethodCall = coercedAwaitableInfo.RequiresCoercion
? Expression.Invoke(coercedAwaitableInfo.CoercerExpression, methodCall)
: (Expression) methodCall;

// return new ObjectMethodExecutorAwaitable(
// (object)coercedMethodCall,
// getAwaiterFunc,
// isCompletedFunc,
// getResultFunc,
// onCompletedFunc,
// unsafeOnCompletedFunc);
var returnValueExpression = Expression.New(
_objectMethodExecutorAwaitableConstructor,
Expression.Convert(coercedMethodCall, typeof(object)),
Expression.Constant(getAwaiterFunc),
Expression.Constant(isCompletedFunc),
Expression.Constant(getResultFunc),
Expression.Constant(onCompletedFunc),
Expression.Constant(unsafeOnCompletedFunc, typeof(Action<object, Action>)));

var lambda =
Expression.Lambda<MethodExecutorAsync>(returnValueExpression, targetParameter, parametersParameter);
return lambda.Compile();
}

private delegate ObjectMethodExecutorAwaitable MethodExecutorAsync(object target, params object[] parameters);

private delegate object MethodExecutor(object target, params object[] parameters);

private delegate void VoidMethodExecutor(object target, object[] parameters);
}
}

+ 0
- 118
src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs 查看文件

@@ -1,118 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Runtime.CompilerServices;

namespace Microsoft.Extensions.Internal
{
/// <summary>
/// Provides a common awaitable structure that <see cref="ObjectMethodExecutor.ExecuteAsync" /> can
/// return, regardless of whether the underlying value is a System.Task, an FSharpAsync, or an
/// application-defined custom awaitable.
/// </summary>
internal struct ObjectMethodExecutorAwaitable
{
private readonly object _customAwaitable;
private readonly Func<object, object> _getAwaiterMethod;
private readonly Func<object, bool> _isCompletedMethod;
private readonly Func<object, object> _getResultMethod;
private readonly Action<object, Action> _onCompletedMethod;
private readonly Action<object, Action> _unsafeOnCompletedMethod;

// Perf note: since we're requiring the customAwaitable to be supplied here as an object,
// this will trigger a further allocation if it was a value type (i.e., to box it). We can't
// fix this by making the customAwaitable type generic, because the calling code typically
// does not know the type of the awaitable/awaiter at compile-time anyway.
//
// However, we could fix it by not passing the customAwaitable here at all, and instead
// passing a func that maps directly from the target object (e.g., controller instance),
// target method (e.g., action method info), and params array to the custom awaiter in the
// GetAwaiter() method below. In effect, by delaying the actual method call until the
// upstream code calls GetAwaiter on this ObjectMethodExecutorAwaitable instance.
// This optimization is not currently implemented because:
// [1] It would make no difference when the awaitable was an object type, which is
// by far the most common scenario (e.g., System.Task<T>).
// [2] It would be complex - we'd need some kind of object pool to track all the parameter
// arrays until we needed to use them in GetAwaiter().
// We can reconsider this in the future if there's a need to optimize for ValueTask<T>
// or other value-typed awaitables.

public ObjectMethodExecutorAwaitable(
object customAwaitable,
Func<object, object> getAwaiterMethod,
Func<object, bool> isCompletedMethod,
Func<object, object> getResultMethod,
Action<object, Action> onCompletedMethod,
Action<object, Action> unsafeOnCompletedMethod)
{
_customAwaitable = customAwaitable;
_getAwaiterMethod = getAwaiterMethod;
_isCompletedMethod = isCompletedMethod;
_getResultMethod = getResultMethod;
_onCompletedMethod = onCompletedMethod;
_unsafeOnCompletedMethod = unsafeOnCompletedMethod;
}

public Awaiter GetAwaiter()
{
var customAwaiter = _getAwaiterMethod(_customAwaitable);
return new Awaiter(customAwaiter, _isCompletedMethod, _getResultMethod, _onCompletedMethod,
_unsafeOnCompletedMethod);
}

public struct Awaiter : ICriticalNotifyCompletion
{
private readonly object _customAwaiter;
private readonly Func<object, bool> _isCompletedMethod;
private readonly Func<object, object> _getResultMethod;
private readonly Action<object, Action> _onCompletedMethod;
private readonly Action<object, Action> _unsafeOnCompletedMethod;

public Awaiter(
object customAwaiter,
Func<object, bool> isCompletedMethod,
Func<object, object> getResultMethod,
Action<object, Action> onCompletedMethod,
Action<object, Action> unsafeOnCompletedMethod)
{
_customAwaiter = customAwaiter;
_isCompletedMethod = isCompletedMethod;
_getResultMethod = getResultMethod;
_onCompletedMethod = onCompletedMethod;
_unsafeOnCompletedMethod = unsafeOnCompletedMethod;
}

public bool IsCompleted => _isCompletedMethod(_customAwaiter);

public object GetResult()
{
return _getResultMethod(_customAwaiter);
}

public void OnCompleted(Action continuation)
{
_onCompletedMethod(_customAwaiter, continuation);
}

public void UnsafeOnCompleted(Action continuation)
{
// If the underlying awaitable implements ICriticalNotifyCompletion, use its UnsafeOnCompleted.
// If not, fall back on using its OnCompleted.
//
// Why this is safe:
// - Implementing ICriticalNotifyCompletion is a way of saying the caller can choose whether it
// needs the execution context to be preserved (which it signals by calling OnCompleted), or
// that it doesn't (which it signals by calling UnsafeOnCompleted). Obviously it's faster *not*
// to preserve and restore the context, so we prefer that where possible.
// - If a caller doesn't need the execution context to be preserved and hence calls UnsafeOnCompleted,
// there's no harm in preserving it anyway - it's just a bit of wasted cost. That's what will happen
// if a caller sees that the proxy implements ICriticalNotifyCompletion but the proxy chooses to
// pass the call on to the underlying awaitable's OnCompleted method.

var underlyingMethodToUse = _unsafeOnCompletedMethod ?? _onCompletedMethod;
underlyingMethodToUse(_customAwaiter, continuation);
}
}
}
}

+ 0
- 145
src/DotNetCore.CAP.Dashboard/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs 查看文件

@@ -1,145 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Extensions.Internal
{
/// <summary>
/// Helper for detecting whether a given type is FSharpAsync`1, and if so, supplying
/// an <see cref="Expression" /> for mapping instances of that type to a C# awaitable.
/// </summary>
/// <remarks>
/// The main design goal here is to avoid taking a compile-time dependency on
/// FSharp.Core.dll, because non-F# applications wouldn't use it. So all the references
/// to FSharp types have to be constructed dynamically at runtime.
/// </remarks>
internal static class ObjectMethodExecutorFSharpSupport
{
private static readonly object _fsharpValuesCacheLock = new object();
private static Assembly _fsharpCoreAssembly;
private static MethodInfo _fsharpAsyncStartAsTaskGenericMethod;
private static PropertyInfo _fsharpOptionOfTaskCreationOptionsNoneProperty;
private static PropertyInfo _fsharpOptionOfCancellationTokenNoneProperty;

public static bool TryBuildCoercerFromFSharpAsyncToAwaitable(
Type possibleFSharpAsyncType,
out Expression coerceToAwaitableExpression,
out Type awaitableType)
{
var methodReturnGenericType = possibleFSharpAsyncType.IsGenericType
? possibleFSharpAsyncType.GetGenericTypeDefinition()
: null;

if (!IsFSharpAsyncOpenGenericType(methodReturnGenericType))
{
coerceToAwaitableExpression = null;
awaitableType = null;
return false;
}

var awaiterResultType = possibleFSharpAsyncType.GetGenericArguments().Single();
awaitableType = typeof(Task<>).MakeGenericType(awaiterResultType);

// coerceToAwaitableExpression = (object fsharpAsync) =>
// {
// return (object)FSharpAsync.StartAsTask<TResult>(
// (Microsoft.FSharp.Control.FSharpAsync<TResult>)fsharpAsync,
// FSharpOption<TaskCreationOptions>.None,
// FSharpOption<CancellationToken>.None);
// };
var startAsTaskClosedMethod = _fsharpAsyncStartAsTaskGenericMethod
.MakeGenericMethod(awaiterResultType);
var coerceToAwaitableParam = Expression.Parameter(typeof(object));
coerceToAwaitableExpression = Expression.Lambda(
Expression.Convert(
Expression.Call(
startAsTaskClosedMethod,
Expression.Convert(coerceToAwaitableParam, possibleFSharpAsyncType),
Expression.MakeMemberAccess(null, _fsharpOptionOfTaskCreationOptionsNoneProperty),
Expression.MakeMemberAccess(null, _fsharpOptionOfCancellationTokenNoneProperty)),
typeof(object)),
coerceToAwaitableParam);

return true;
}

private static bool IsFSharpAsyncOpenGenericType(Type possibleFSharpAsyncGenericType)
{
var typeFullName = possibleFSharpAsyncGenericType?.FullName;
if (!string.Equals(typeFullName, "Microsoft.FSharp.Control.FSharpAsync`1", StringComparison.Ordinal))
{
return false;
}

lock (_fsharpValuesCacheLock)
{
if (_fsharpCoreAssembly != null)
{
return possibleFSharpAsyncGenericType.Assembly == _fsharpCoreAssembly;
}

return TryPopulateFSharpValueCaches(possibleFSharpAsyncGenericType);
}
}

private static bool TryPopulateFSharpValueCaches(Type possibleFSharpAsyncGenericType)
{
var assembly = possibleFSharpAsyncGenericType.Assembly;
var fsharpOptionType = assembly.GetType("Microsoft.FSharp.Core.FSharpOption`1");
var fsharpAsyncType = assembly.GetType("Microsoft.FSharp.Control.FSharpAsync");

if (fsharpOptionType == null || fsharpAsyncType == null)
{
return false;
}

// Get a reference to FSharpOption<TaskCreationOptions>.None
var fsharpOptionOfTaskCreationOptionsType = fsharpOptionType
.MakeGenericType(typeof(TaskCreationOptions));
_fsharpOptionOfTaskCreationOptionsNoneProperty = fsharpOptionOfTaskCreationOptionsType
.GetTypeInfo()
.GetRuntimeProperty("None");

// Get a reference to FSharpOption<CancellationToken>.None
var fsharpOptionOfCancellationTokenType = fsharpOptionType
.MakeGenericType(typeof(CancellationToken));
_fsharpOptionOfCancellationTokenNoneProperty = fsharpOptionOfCancellationTokenType
.GetTypeInfo()
.GetRuntimeProperty("None");

// Get a reference to FSharpAsync.StartAsTask<>
var fsharpAsyncMethods = fsharpAsyncType
.GetRuntimeMethods()
.Where(m => m.Name.Equals("StartAsTask", StringComparison.Ordinal));
foreach (var candidateMethodInfo in fsharpAsyncMethods)
{
var parameters = candidateMethodInfo.GetParameters();
if (parameters.Length == 3
&& TypesHaveSameIdentity(parameters[0].ParameterType, possibleFSharpAsyncGenericType)
&& parameters[1].ParameterType == fsharpOptionOfTaskCreationOptionsType
&& parameters[2].ParameterType == fsharpOptionOfCancellationTokenType)
{
// This really does look like the correct method (and hence assembly).
_fsharpAsyncStartAsTaskGenericMethod = candidateMethodInfo;
_fsharpCoreAssembly = assembly;
break;
}
}

return _fsharpCoreAssembly != null;
}

private static bool TypesHaveSameIdentity(Type type1, Type type2)
{
return type1.Assembly == type2.Assembly
&& string.Equals(type1.Namespace, type2.Namespace, StringComparison.Ordinal)
&& string.Equals(type1.Name, type2.Name, StringComparison.Ordinal);
}
}
}

+ 1
- 3
src/DotNetCore.CAP.Dashboard/wwwroot/public/index.html 查看文件

@@ -6,9 +6,7 @@
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width,initial-scale=1.0">
<link rel="icon" href="<%= BASE_URL %>favicon.ico">
<title>
<%= htmlWebpackPlugin.options.title %>
</title>
<title>CAP Dashboard</title>
</head>

<body>


+ 6
- 1
src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs 查看文件

@@ -46,11 +46,16 @@ namespace DotNetCore.CAP.Kafka
RequestTimeoutMs = 3000
};

producer = new ProducerBuilder<string, byte[]>(config).Build();
producer = BuildProducer(config);

return producer;
}

protected virtual IProducer<string, byte[]> BuildProducer(ProducerConfig config)
{
return new ProducerBuilder<string, byte[]>(config).Build();
}

public bool Return(IProducer<string, byte[]> producer)
{
if (Interlocked.Increment(ref _pCount) <= _maxSize)


+ 9
- 4
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs 查看文件

@@ -15,7 +15,7 @@ using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClient : IConsumerClient
public class KafkaConsumerClient : IConsumerClient
{
private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);

@@ -153,9 +153,7 @@ namespace DotNetCore.CAP.Kafka
config.EnableAutoCommit ??= false;
config.LogConnectionClose ??= false;

_consumerClient = new ConsumerBuilder<string, byte[]>(config)
.SetErrorHandler(ConsumerClient_OnConsumeError)
.Build();
_consumerClient = BuildConsumer(config);
}
}
finally
@@ -164,6 +162,13 @@ namespace DotNetCore.CAP.Kafka
}
}

protected virtual IConsumer<string, byte[]> BuildConsumer(ConsumerConfig config)
{
return new ConsumerBuilder<string, byte[]>(config)
.SetErrorHandler(ConsumerClient_OnConsumeError)
.Build();
}

private void ConsumerClient_OnConsumeError(IConsumer<string, byte[]> consumer, Error e)
{
var logArgs = new LogMessageEventArgs


+ 2
- 2
src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs 查看文件

@@ -6,7 +6,7 @@ using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory
public class KafkaConsumerClientFactory : IConsumerClientFactory
{
private readonly IOptions<KafkaOptions> _kafkaOptions;

@@ -15,7 +15,7 @@ namespace DotNetCore.CAP.Kafka
_kafkaOptions = kafkaOptions;
}

public IConsumerClient Create(string groupId)
public virtual IConsumerClient Create(string groupId)
{
try
{


+ 3
- 3
src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs 查看文件

@@ -40,15 +40,15 @@ namespace DotNetCore.CAP.RabbitMQ

var props = channel.CreateBasicProperties();
props.DeliveryMode = 2;
props.Headers = message.Headers.ToDictionary(x => x.Key, x => (object) x.Value);
props.Headers = message.Headers.ToDictionary(x => x.Key, x => (object)x.Value);
channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true);

channel.BasicPublish(_exchange, message.GetName(), props, message.Body);

channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));

_logger.LogDebug($"RabbitMQ topic message [{message.GetName()}] has been published.");
_logger.LogInformation("CAP message '{0}' published, internal id '{1}'", message.GetName(), message.GetId());

return Task.FromResult(OperateResult.Success);
}


+ 38
- 0
src/DotNetCore.CAP/CAP.Builder.cs 查看文件

@@ -1,9 +1,15 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Linq;
using System.Reflection;
using DotNetCore.CAP.Filter;
using DotNetCore.CAP.Internal;
using JetBrains.Annotations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
// ReSharper disable UnusedMember.Global

namespace DotNetCore.CAP
{
@@ -43,10 +49,42 @@ namespace DotNetCore.CAP
/// </summary>
public IServiceCollection Services { get; }

/// <summary>
/// Registers subscribers filter.
/// </summary>
/// <typeparam name="T">Type of filter</typeparam>
public CapBuilder AddSubscribeFilter<T>() where T : class, ISubscribeFilter
{
Services.TryAddScoped<ISubscribeFilter, T>();
return this;
}

/// <summary>
/// Registers subscribers from the specified assemblies.
/// </summary>
/// <param name="assemblies">Assemblies to scan subscriber</param>
public CapBuilder AddSubscriberAssembly(params Assembly[] assemblies)
{
if (assemblies == null) throw new ArgumentNullException(nameof(assemblies));

Services.Replace(new ServiceDescriptor(typeof(IConsumerServiceSelector),
x => new AssemblyConsumerServiceSelector(x, assemblies),
ServiceLifetime.Singleton));

return this;
}

/// <summary>
/// Registers subscribers from the specified types.
/// </summary>
/// <param name="handlerAssemblyMarkerTypes"></param>
public CapBuilder AddSubscriberAssembly([NotNull] params Type[] handlerAssemblyMarkerTypes)
{
if (handlerAssemblyMarkerTypes == null) throw new ArgumentNullException(nameof(handlerAssemblyMarkerTypes));

AddSubscriberAssembly(handlerAssemblyMarkerTypes.Select(t => t.GetTypeInfo().Assembly).ToArray());

return this;
}
}
}

+ 5
- 0
src/DotNetCore.CAP/Internal/Helper.cs 查看文件

@@ -35,6 +35,11 @@ namespace DotNetCore.CAP.Internal
return false;
}

if (typeInfo.ContainsGenericParameters)
{
return false;
}

return !typeInfo.ContainsGenericParameters
&& typeInfo.Name.EndsWith("Controller", StringComparison.OrdinalIgnoreCase);
}


+ 1
- 1
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs 查看文件

@@ -35,7 +35,7 @@ namespace DotNetCore.CAP.Internal
private BrokerAddress _serverAddress;
private Task _compositeTask;
private bool _disposed;
private static bool _isHealthy = true;
private bool _isHealthy = true;

// diagnostics listener
// ReSharper disable once InconsistentNaming


+ 47
- 0
src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Assembly.cs 查看文件

@@ -0,0 +1,47 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;

namespace DotNetCore.CAP.Internal
{
/// <inheritdoc />
/// <summary>
/// A <see cref="T:DotNetCore.CAP.Abstractions.IConsumerServiceSelector" /> implementation that scanning subscribers from the assembly.
/// </summary>
public class AssemblyConsumerServiceSelector : ConsumerServiceSelector
{
private readonly Assembly[] _assemblies;

public AssemblyConsumerServiceSelector(IServiceProvider serviceProvider, Assembly[] assemblies) : base(serviceProvider)
{
_assemblies = assemblies;
}

protected override IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(IServiceProvider provider)
{
var descriptors = new List<ConsumerExecutorDescriptor>();

descriptors.AddRange(base.FindConsumersFromInterfaceTypes(provider));

var assembliesToScan = _assemblies.Distinct().ToArray();

var capSubscribeTypeInfo = typeof(ICapSubscribe).GetTypeInfo();

foreach (var type in assembliesToScan.SelectMany(a => a.DefinedTypes))
{
if (!capSubscribeTypeInfo.IsAssignableFrom(type))
{
continue;
}

descriptors.AddRange(GetTopicAttributesDescription(type));
}

return descriptors;
}
}
}

+ 2
- 1
src/DotNetCore.CAP/Internal/IMessageSender.cs 查看文件

@@ -3,11 +3,12 @@

using System.Threading.Tasks;
using DotNetCore.CAP.Persistence;
using JetBrains.Annotations;

namespace DotNetCore.CAP.Internal
{
public interface IMessageSender
{
Task<OperateResult> SendAsync(MediumMessage message);
Task<OperateResult> SendAsync([NotNull] MediumMessage message);
}
}

+ 4
- 2
src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs 查看文件

@@ -89,6 +89,8 @@ namespace DotNetCore.CAP.Internal

try
{
_logger.ConsumerExecuting(descriptor.MethodInfo.Name);

var sp = Stopwatch.StartNew();

await InvokeConsumerMethodAsync(message, descriptor, cancellationToken);
@@ -97,7 +99,7 @@ namespace DotNetCore.CAP.Internal

await SetSuccessfulState(message);

_logger.ConsumerExecuted(sp.Elapsed.TotalMilliseconds);
_logger.ConsumerExecuted(descriptor.MethodInfo.Name, sp.Elapsed.TotalMilliseconds);

return (false, OperateResult.Success);
}
@@ -183,7 +185,7 @@ namespace DotNetCore.CAP.Internal
[Headers.CorrelationSequence] = (message.Origin.GetCorrelationSequence() + 1).ToString()
};

await _provider.GetService<ICapPublisher>().PublishAsync(ret.CallbackName, ret.Result, header, cancellationToken);
await _provider.GetRequiredService<ICapPublisher>().PublishAsync(ret.CallbackName, ret.Result, header, cancellationToken);
}
}
catch (OperationCanceledException)


+ 1
- 4
src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs 查看文件

@@ -40,10 +40,7 @@ namespace DotNetCore.CAP.Internal
var methodHandle = methodInfo.MethodHandle.Value;
var key = $"{reflectedTypeHandle}_{methodHandle}";

_logger.LogDebug("Executing subscriber method : {0}", methodInfo.Name);

var executor = _executors.GetOrAdd(key,
x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));
var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));

using var scope = _serviceProvider.CreateScope();



+ 2
- 1
src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs 查看文件

@@ -3,6 +3,7 @@

using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;

namespace DotNetCore.CAP.Internal
{
@@ -16,6 +17,6 @@ namespace DotNetCore.CAP.Internal
/// </summary>
/// <param name="context">consumer execute context</param>
/// <param name="cancellationToken">The object of <see cref="CancellationToken"/>.</param>
Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default);
Task<ConsumerExecutedResult> InvokeAsync([NotNull] ConsumerContext context, CancellationToken cancellationToken = default);
}
}

+ 7
- 2
src/DotNetCore.CAP/Internal/LoggerExtensions.cs 查看文件

@@ -45,9 +45,14 @@ namespace DotNetCore.CAP.Internal
logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}");
}

public static void ConsumerExecuted(this ILogger logger, double milliseconds)
public static void ConsumerExecuting(this ILogger logger, string methodName)
{
logger.LogDebug($"Consumer executed. Took: {milliseconds} ms.");
logger.LogInformation($"Executing subscriber method '{methodName}'");
}

public static void ConsumerExecuted(this ILogger logger, string methodName, double milliseconds)
{
logger.LogInformation($"Executed subscriber method '{methodName}' in {milliseconds} ms");
}

public static void ServerStarting(this ILogger logger)


+ 3
- 3
src/DotNetCore.CAP/Internal/ObjectMethodExecutor/AwaitableInfo.cs 查看文件

@@ -6,9 +6,10 @@ using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.Internal
{
internal struct AwaitableInfo
internal readonly struct AwaitableInfo
{
public Type AwaiterType { get; }
public PropertyInfo AwaiterIsCompletedProperty { get; }
@@ -43,8 +44,7 @@ namespace Microsoft.Extensions.Internal
// Awaitable must have method matching "object GetAwaiter()"
var getAwaiterMethod = type.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetAwaiter", StringComparison.OrdinalIgnoreCase)
&& m.GetParameters().Length == 0
&& m.ReturnType != null);
&& m.GetParameters().Length == 0);
if (getAwaiterMethod == null)
{
awaitableInfo = default(AwaitableInfo);


+ 2
- 1
src/DotNetCore.CAP/Internal/ObjectMethodExecutor/CoercedAwaitableInfo.cs 查看文件

@@ -4,9 +4,10 @@
using System;
using System.Linq.Expressions;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.Internal
{
internal struct CoercedAwaitableInfo
internal readonly struct CoercedAwaitableInfo
{
public AwaitableInfo AwaitableInfo { get; }
public Expression CoercerExpression { get; }


+ 2
- 1
src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs 查看文件

@@ -4,6 +4,7 @@
using System;
using System.Runtime.CompilerServices;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.Internal
{
/// <summary>
@@ -11,7 +12,7 @@ namespace Microsoft.Extensions.Internal
/// return, regardless of whether the underlying value is a System.Task, an FSharpAsync, or an
/// application-defined custom awaitable.
/// </summary>
internal struct ObjectMethodExecutorAwaitable
internal readonly struct ObjectMethodExecutorAwaitable
{
private readonly object _customAwaitable;
private readonly Func<object, object> _getAwaiterMethod;


+ 1
- 0
src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs 查看文件

@@ -8,6 +8,7 @@ using System.Reflection;
using System.Threading;
using System.Threading.Tasks;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.Internal
{
/// <summary>


+ 2
- 2
src/DotNetCore.CAP/Transport/IConsumerClient.cs 查看文件

@@ -23,7 +23,7 @@ namespace DotNetCore.CAP.Transport
/// </summary>
/// <param name="topicNames">Names of the requested topics</param>
/// <returns>Topic identifiers</returns>
ICollection<string> FetchTopics(IEnumerable<string> topicNames)
ICollection<string> FetchTopics([NotNull] IEnumerable<string> topicNames)
{
return topicNames.ToList();
}
@@ -32,7 +32,7 @@ namespace DotNetCore.CAP.Transport
/// Subscribe to a set of topics to the message queue
/// </summary>
/// <param name="topics"></param>
void Subscribe(IEnumerable<string> topics);
void Subscribe([NotNull] IEnumerable<string> topics);

/// <summary>
/// Start listening


+ 18
- 0
test/DotNetCore.CAP.Test/HelperTest.cs 查看文件

@@ -33,6 +33,19 @@ namespace DotNetCore.CAP.Test
Assert.True(result);
}

[Fact]
public void IsControllerAbstractTest()
{
//Arrange
var typeInfo = typeof(AbstractController).GetTypeInfo();

//Act
var result = Helper.IsController(typeInfo);

//Assert
Assert.False(result);
}

[Theory]
[InlineData(typeof(string))]
[InlineData(typeof(decimal))]
@@ -76,4 +89,9 @@ namespace DotNetCore.CAP.Test
{
}

public abstract class AbstractController
{

}
}

Loading…
取消
儲存