소스 검색

Add GenericConsumerServiceSelector (#835)

* Custom selector does not route messages

* Works with generic ConsumerServiceSelector.


Works with generic ConsumerServiceSelector.


Moves INamedGroup.

Co-authored-by: Dave Smith <dave.smith@ventivtech.com>
master
David Smith 3 년 전
committed by GitHub
부모
커밋
56400d91b0
No known key found for this signature in database GPG 키 ID: 4AEE18F83AFDEB23
6개의 변경된 파일257개의 추가작업 그리고 0개의 파일을 삭제
  1. +7
    -0
      CAP.sln
  2. +47
    -0
      samples/MyConsumerSelector/CustomSubscriber.cs
  3. +113
    -0
      samples/MyConsumerSelector/GenericConsumerServiceSelector.cs
  4. +19
    -0
      samples/MyConsumerSelector/MyConsumerSelector.csproj
  5. +47
    -0
      samples/MyConsumerSelector/Program.cs
  6. +24
    -0
      samples/MyConsumerSelector/Selectors.cs

+ 7
- 0
CAP.sln 파일 보기

@@ -71,6 +71,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyConsumerSelector", "samples\MyConsumerSelector\MyConsumerSelector.csproj", "{1AE86784-0B64-4A73-8D81-D913C6E33D7D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -165,6 +167,10 @@ Global
{54F6C206-2A23-4971-AE5A-FC47EB772452}.Debug|Any CPU.Build.0 = Debug|Any CPU
{54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.ActiveCfg = Release|Any CPU
{54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.Build.0 = Release|Any CPU
{1AE86784-0B64-4A73-8D81-D913C6E33D7D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1AE86784-0B64-4A73-8D81-D913C6E33D7D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1AE86784-0B64-4A73-8D81-D913C6E33D7D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1AE86784-0B64-4A73-8D81-D913C6E33D7D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -192,6 +198,7 @@ Global
{B187DD15-092D-4B72-9807-50856607D237} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{8B2FD3EA-E72B-4A82-B182-B87EC0C15D07} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{54F6C206-2A23-4971-AE5A-FC47EB772452} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{1AE86784-0B64-4A73-8D81-D913C6E33D7D} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 47
- 0
samples/MyConsumerSelector/CustomSubscriber.cs 파일 보기

@@ -0,0 +1,47 @@
using System;
using DotNetCore.CAP;

namespace MyConsumerSelector
{
public class CustomSubscriber : IMessageSubscriber, ICapSubscribe
{
[MessageSubscription("string")]
public void String(string message)
{
Console.WriteLine($"String: {message}");
}
[MessageSubscription("message.string")]
public void String(Message<string> message)
{
Console.WriteLine($"String: {System.Text.Json.JsonSerializer.Serialize(message)}");
}
[MessageSubscription("message.datetime")]
public void Date(Message<DateTime> message, [FromCap] CapHeader header)
{
Console.WriteLine($"Date: {System.Text.Json.JsonSerializer.Serialize(message)}");
Console.WriteLine(System.Text.Json.JsonSerializer.Serialize(header));
}
[MessageSubscription("message.bytes")]
public void Bytes(Message<byte[]> message, [FromCap] CapHeader header)
{
Console.WriteLine($"Bytes: {System.Text.Json.JsonSerializer.Serialize(message)}");
Console.WriteLine(System.Text.Json.JsonSerializer.Serialize(header));
}

[CapSubscribe("cap")]
public void Cap(string message, [FromCap] CapHeader header)
{
Console.WriteLine($"Cap {message}");
Console.WriteLine(System.Text.Json.JsonSerializer.Serialize(header));
}
}

public class Message<T>
{
public string Name { get; set; }
public T Body { get; set; }
}
}

+ 113
- 0
samples/MyConsumerSelector/GenericConsumerServiceSelector.cs 파일 보기

@@ -0,0 +1,113 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;

namespace MyConsumerSelector
{
public class GenericConsumerServiceSelector<TSubscriber, TSubscriptionAttribute> : ConsumerServiceSelector
where TSubscriptionAttribute : Attribute, INamedGroup
{
private readonly CapOptions _capOptions;
public GenericConsumerServiceSelector(IServiceProvider serviceProvider)
: base(serviceProvider)
{
_capOptions = serviceProvider.GetRequiredService<IOptions<CapOptions>>().Value;
}

protected override IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(IServiceProvider provider)
{
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();

using (var scoped = provider.CreateScope())
{
var scopedProvider = scoped.ServiceProvider;
var subscribers = scopedProvider.GetServices<TSubscriber>();
var subscriberTypeInfo = typeof(TSubscriber).GetTypeInfo();
foreach (var service in subscribers)
{
var serviceTypeInfo = service.GetType().GetTypeInfo();
if (!subscriberTypeInfo.IsAssignableFrom(serviceTypeInfo))
{
continue;
}

var descriptors = _GetDescriptors(serviceTypeInfo);
executorDescriptorList.AddRange(descriptors);
}

return executorDescriptorList;
}
}

private IEnumerable<ConsumerExecutorDescriptor> _GetDescriptors(TypeInfo typeInfo)
{
foreach (var method in typeInfo.DeclaredMethods)
{
var topicAttr = method.GetCustomAttributes<TSubscriptionAttribute>(true);
var topicAttributes = topicAttr as IList<TSubscriptionAttribute> ?? topicAttr.ToList();

if (!topicAttributes.Any())
{
continue;
}

foreach (var attr in topicAttributes)
{
_SetAttributeGroup(attr);

yield return new ConsumerExecutorDescriptor
{
Attribute = new CapSubscribeAttribute(attr.Name)
{
Group = attr.Group
},
MethodInfo = method,
ImplTypeInfo = typeInfo,
TopicNamePrefix = _capOptions.TopicNamePrefix,
Parameters = _GetParameterDescriptors(method)
};
}
}
}

private void _SetAttributeGroup(TSubscriptionAttribute attr)
{
if (attr.Group == null)
{
attr.Group = _capOptions.DefaultGroupName + "." + _capOptions.Version;
}
else
{
attr.Group = attr.Group + "." + _capOptions.Version;
}

if (!string.IsNullOrEmpty(_capOptions.GroupNamePrefix))
{
attr.Group = $"{_capOptions.GroupNamePrefix}.{attr.Group}";
}
}

private IList<ParameterDescriptor> _GetParameterDescriptors(MethodInfo method)
{
var descriptors = method.GetParameters().Select(p => new ParameterDescriptor()
{Name = p.Name, ParameterType = p.ParameterType, IsFromCap = p.GetCustomAttributes<FromCapAttribute>().Any()});
return new List<ParameterDescriptor>(descriptors.ToArray());
}
}
/// <summary>
/// Implementers have a name and a group.
/// </summary>
public interface INamedGroup
{
string Name { get; }

string Group { get; set; }
}
}

+ 19
- 0
samples/MyConsumerSelector/MyConsumerSelector.csproj 파일 보기

@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.InMemoryStorage\DotNetCore.CAP.InMemoryStorage.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="5.0.0" />
</ItemGroup>

</Project>

+ 47
- 0
samples/MyConsumerSelector/Program.cs 파일 보기

@@ -0,0 +1,47 @@
using System;
using System.Collections.Generic;
using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace MyConsumerSelector
{
public class Program
{
private static bool _useCustomSelector = true;
public static void Main(string[] args)
{
var container = new ServiceCollection();

container.AddLogging(x => x.AddConsole());
if (_useCustomSelector)
container.AddSingleton<IConsumerServiceSelector, GenericConsumerServiceSelector<IMessageSubscriber, MessageSubscriptionAttribute>>();
container.AddTransient<IMessageSubscriber, CustomSubscriber>();
container.AddTransient<ICapSubscribe, CustomSubscriber>();
container.AddCap(x =>
{
x.UseInMemoryStorage();
x.UseRabbitMQ(z =>
{
z.ExchangeName = "MyConsumerSelector.Generic";
z.HostName = "localhost";
z.UserName = "guest";
z.Password = "guest";
z.CustomHeaders = e => new List<KeyValuePair<string, string>>
{
new(DotNetCore.CAP.Messages.Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
new(DotNetCore.CAP.Messages.Headers.MessageName, e.RoutingKey)
};
});
});

var sp = container.BuildServiceProvider();
sp.GetRequiredService<IBootstrapper>().BootstrapAsync(default);
Console.ReadLine();
}
}
}

+ 24
- 0
samples/MyConsumerSelector/Selectors.cs 파일 보기

@@ -0,0 +1,24 @@
using System;

namespace MyConsumerSelector
{
/// <summary>
/// Flags the implementer as a class that subscribes to messages.
/// </summary>
public interface IMessageSubscriber { }

/// <summary>
/// Names the message being subscribed to.
/// </summary>
public class MessageSubscriptionAttribute : Attribute, INamedGroup
{
public MessageSubscriptionAttribute(string name)
{
Name = name;
}

public string Name { get; }

public string Group { get; set; }
}
}

불러오는 중...
취소
저장