From 56400d91b0d33ecfe84fb11657d4450573f82ae6 Mon Sep 17 00:00:00 2001 From: David Smith Date: Mon, 19 Apr 2021 10:24:58 -0400 Subject: [PATCH] Add GenericConsumerServiceSelector (#835) * Custom selector does not route messages * Works with generic ConsumerServiceSelector. Works with generic ConsumerServiceSelector. Moves INamedGroup. Co-authored-by: Dave Smith --- CAP.sln | 7 ++ .../MyConsumerSelector/CustomSubscriber.cs | 47 ++++++++ .../GenericConsumerServiceSelector.cs | 113 ++++++++++++++++++ .../MyConsumerSelector.csproj | 19 +++ samples/MyConsumerSelector/Program.cs | 47 ++++++++ samples/MyConsumerSelector/Selectors.cs | 24 ++++ 6 files changed, 257 insertions(+) create mode 100644 samples/MyConsumerSelector/CustomSubscriber.cs create mode 100644 samples/MyConsumerSelector/GenericConsumerServiceSelector.cs create mode 100644 samples/MyConsumerSelector/MyConsumerSelector.csproj create mode 100644 samples/MyConsumerSelector/Program.cs create mode 100644 samples/MyConsumerSelector/Selectors.cs diff --git a/CAP.sln b/CAP.sln index a1b6644..1b77c31 100644 --- a/CAP.sln +++ b/CAP.sln @@ -71,6 +71,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyConsumerSelector", "samples\MyConsumerSelector\MyConsumerSelector.csproj", "{1AE86784-0B64-4A73-8D81-D913C6E33D7D}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -165,6 +167,10 @@ Global {54F6C206-2A23-4971-AE5A-FC47EB772452}.Debug|Any CPU.Build.0 = Debug|Any CPU {54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.ActiveCfg = Release|Any CPU {54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.Build.0 = Release|Any CPU + {1AE86784-0B64-4A73-8D81-D913C6E33D7D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1AE86784-0B64-4A73-8D81-D913C6E33D7D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1AE86784-0B64-4A73-8D81-D913C6E33D7D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1AE86784-0B64-4A73-8D81-D913C6E33D7D}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -192,6 +198,7 @@ Global {B187DD15-092D-4B72-9807-50856607D237} = {3A6B6931-A123-477A-9469-8B468B5385AF} {8B2FD3EA-E72B-4A82-B182-B87EC0C15D07} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {54F6C206-2A23-4971-AE5A-FC47EB772452} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {1AE86784-0B64-4A73-8D81-D913C6E33D7D} = {3A6B6931-A123-477A-9469-8B468B5385AF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/samples/MyConsumerSelector/CustomSubscriber.cs b/samples/MyConsumerSelector/CustomSubscriber.cs new file mode 100644 index 0000000..b29510e --- /dev/null +++ b/samples/MyConsumerSelector/CustomSubscriber.cs @@ -0,0 +1,47 @@ +using System; +using DotNetCore.CAP; + +namespace MyConsumerSelector +{ + public class CustomSubscriber : IMessageSubscriber, ICapSubscribe + { + [MessageSubscription("string")] + public void String(string message) + { + Console.WriteLine($"String: {message}"); + } + + [MessageSubscription("message.string")] + public void String(Message message) + { + Console.WriteLine($"String: {System.Text.Json.JsonSerializer.Serialize(message)}"); + } + + [MessageSubscription("message.datetime")] + public void Date(Message message, [FromCap] CapHeader header) + { + Console.WriteLine($"Date: {System.Text.Json.JsonSerializer.Serialize(message)}"); + Console.WriteLine(System.Text.Json.JsonSerializer.Serialize(header)); + } + + [MessageSubscription("message.bytes")] + public void Bytes(Message message, [FromCap] CapHeader header) + { + Console.WriteLine($"Bytes: {System.Text.Json.JsonSerializer.Serialize(message)}"); + Console.WriteLine(System.Text.Json.JsonSerializer.Serialize(header)); + } + + [CapSubscribe("cap")] + public void Cap(string message, [FromCap] CapHeader header) + { + Console.WriteLine($"Cap {message}"); + Console.WriteLine(System.Text.Json.JsonSerializer.Serialize(header)); + } + } + + public class Message + { + public string Name { get; set; } + public T Body { get; set; } + } +} \ No newline at end of file diff --git a/samples/MyConsumerSelector/GenericConsumerServiceSelector.cs b/samples/MyConsumerSelector/GenericConsumerServiceSelector.cs new file mode 100644 index 0000000..4bf1f16 --- /dev/null +++ b/samples/MyConsumerSelector/GenericConsumerServiceSelector.cs @@ -0,0 +1,113 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace MyConsumerSelector +{ + public class GenericConsumerServiceSelector : ConsumerServiceSelector + where TSubscriptionAttribute : Attribute, INamedGroup + { + private readonly CapOptions _capOptions; + + public GenericConsumerServiceSelector(IServiceProvider serviceProvider) + : base(serviceProvider) + { + _capOptions = serviceProvider.GetRequiredService>().Value; + } + + protected override IEnumerable FindConsumersFromInterfaceTypes(IServiceProvider provider) + { + var executorDescriptorList = new List(); + + using (var scoped = provider.CreateScope()) + { + var scopedProvider = scoped.ServiceProvider; + var subscribers = scopedProvider.GetServices(); + var subscriberTypeInfo = typeof(TSubscriber).GetTypeInfo(); + foreach (var service in subscribers) + { + var serviceTypeInfo = service.GetType().GetTypeInfo(); + if (!subscriberTypeInfo.IsAssignableFrom(serviceTypeInfo)) + { + continue; + } + + var descriptors = _GetDescriptors(serviceTypeInfo); + executorDescriptorList.AddRange(descriptors); + } + + return executorDescriptorList; + } + } + + private IEnumerable _GetDescriptors(TypeInfo typeInfo) + { + foreach (var method in typeInfo.DeclaredMethods) + { + var topicAttr = method.GetCustomAttributes(true); + var topicAttributes = topicAttr as IList ?? topicAttr.ToList(); + + if (!topicAttributes.Any()) + { + continue; + } + + foreach (var attr in topicAttributes) + { + _SetAttributeGroup(attr); + + yield return new ConsumerExecutorDescriptor + { + Attribute = new CapSubscribeAttribute(attr.Name) + { + Group = attr.Group + }, + MethodInfo = method, + ImplTypeInfo = typeInfo, + TopicNamePrefix = _capOptions.TopicNamePrefix, + Parameters = _GetParameterDescriptors(method) + }; + } + } + } + + private void _SetAttributeGroup(TSubscriptionAttribute attr) + { + if (attr.Group == null) + { + attr.Group = _capOptions.DefaultGroupName + "." + _capOptions.Version; + } + else + { + attr.Group = attr.Group + "." + _capOptions.Version; + } + + if (!string.IsNullOrEmpty(_capOptions.GroupNamePrefix)) + { + attr.Group = $"{_capOptions.GroupNamePrefix}.{attr.Group}"; + } + } + + private IList _GetParameterDescriptors(MethodInfo method) + { + var descriptors = method.GetParameters().Select(p => new ParameterDescriptor() + {Name = p.Name, ParameterType = p.ParameterType, IsFromCap = p.GetCustomAttributes().Any()}); + return new List(descriptors.ToArray()); + } + } + + /// + /// Implementers have a name and a group. + /// + public interface INamedGroup + { + string Name { get; } + + string Group { get; set; } + } +} \ No newline at end of file diff --git a/samples/MyConsumerSelector/MyConsumerSelector.csproj b/samples/MyConsumerSelector/MyConsumerSelector.csproj new file mode 100644 index 0000000..32e0e64 --- /dev/null +++ b/samples/MyConsumerSelector/MyConsumerSelector.csproj @@ -0,0 +1,19 @@ + + + + Exe + net5.0 + + + + + + + + + + + + + + diff --git a/samples/MyConsumerSelector/Program.cs b/samples/MyConsumerSelector/Program.cs new file mode 100644 index 0000000..7cb9a8f --- /dev/null +++ b/samples/MyConsumerSelector/Program.cs @@ -0,0 +1,47 @@ +using System; +using System.Collections.Generic; +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace MyConsumerSelector +{ + public class Program + { + private static bool _useCustomSelector = true; + public static void Main(string[] args) + { + var container = new ServiceCollection(); + + container.AddLogging(x => x.AddConsole()); + + if (_useCustomSelector) + container.AddSingleton>(); + + container.AddTransient(); + container.AddTransient(); + + container.AddCap(x => + { + x.UseInMemoryStorage(); + x.UseRabbitMQ(z => + { + z.ExchangeName = "MyConsumerSelector.Generic"; + z.HostName = "localhost"; + z.UserName = "guest"; + z.Password = "guest"; + z.CustomHeaders = e => new List> + { + new(DotNetCore.CAP.Messages.Headers.MessageId, SnowflakeId.Default().NextId().ToString()), + new(DotNetCore.CAP.Messages.Headers.MessageName, e.RoutingKey) + }; + }); + }); + + var sp = container.BuildServiceProvider(); + sp.GetRequiredService().BootstrapAsync(default); + Console.ReadLine(); + } + } +} \ No newline at end of file diff --git a/samples/MyConsumerSelector/Selectors.cs b/samples/MyConsumerSelector/Selectors.cs new file mode 100644 index 0000000..e24761d --- /dev/null +++ b/samples/MyConsumerSelector/Selectors.cs @@ -0,0 +1,24 @@ +using System; + +namespace MyConsumerSelector +{ + /// + /// Flags the implementer as a class that subscribes to messages. + /// + public interface IMessageSubscriber { } + + /// + /// Names the message being subscribed to. + /// + public class MessageSubscriptionAttribute : Attribute, INamedGroup + { + public MessageSubscriptionAttribute(string name) + { + Name = name; + } + + public string Name { get; } + + public string Group { get; set; } + } +} \ No newline at end of file