From 5d69ab018a9c392159b8cbd2c711986db08d5241 Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Fri, 19 May 2017 19:10:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20Consumer=E7=9B=B8=E5=85=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ConsistencyServer.cs | 29 ++-- .../Abstractions/ConsumerContext.cs | 18 +++ .../ConsumerExecutorDescriptor.cs | 18 +++ .../Abstractions/ConsumerInvokerContext.cs | 18 +++ .../Abstractions/IConsumerInvoker.cs | 12 ++ .../Attributes/KafkaTopicAttribute.cs | 33 ++++ .../Attributes/QMessageAttribute.cs | 14 ++ .../Attributes/TopicAttribute.cs | 25 +++ src/Cap.Consistency/Builder/BrokerOptions.cs | 8 + .../Builder/ConsistencyBuilder.cs | 104 +++++++++++++ .../Builder/ConsistencyMarkerService.cs | 7 + src/Cap.Consistency/Cap.Consistency.csproj | 2 + .../Consumer/ConsumerHandler.cs | 68 ++++++++ .../Consumer/IConsumerHandler.cs | 15 ++ .../Consumer/IConsumerService.cs | 10 ++ src/Cap.Consistency/Consumer/ITaskSchedule.cs | 15 ++ .../Consumer/Kafka/IKafkaTaskSchedule.cs | 31 ++++ .../Consumer/Kafka/KafkaConsumerHandler.cs | 28 ++++ .../Consumer/Kafka/RdKafkaClient.cs | 45 ++++++ .../Extensions/BuilderExtensions.cs | 31 ++++ .../Extensions/ConsistencyOptions.cs | 17 ++ .../Extensions/ServiceCollectionExtensions.cs | 46 ++++++ .../Infrastructure/DeliverMessage.cs | 32 ++++ .../IConsumerExcutorSelector.cs | 15 ++ .../Infrastructure/IConsumerInvokerFactory.cs | 12 ++ .../Internal/ConsumerExcutorSelector.cs | 57 +++++++ .../Internal/ConsumerInvokerFactory.cs | 16 ++ .../Internal/QMessageFinder.cs | 43 +++++ .../Internal/QMessageMethodInfo.cs | 36 +++++ src/Cap.Consistency/KafkaConsistency.cs | 33 ++++ src/Cap.Consistency/Models/IConsumerModel.cs | 12 ++ .../Route/TopicRouteContext.cs | 31 ++++ src/Cap.Consistency/RouteTable.cs | 11 +- .../Store/ConsistencyMessageManager.cs | 147 ++++++++++++++++++ .../Store/IConsistencyMessageStore.cs | 55 +++++++ ...onsistency.EntityFrameworkCore.Test.csproj | 2 +- .../Cap.Consistency.Test.csproj | 2 +- 37 files changed, 1076 insertions(+), 22 deletions(-) create mode 100644 src/Cap.Consistency/Abstractions/ConsumerContext.cs create mode 100644 src/Cap.Consistency/Abstractions/ConsumerExecutorDescriptor.cs create mode 100644 src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs create mode 100644 src/Cap.Consistency/Abstractions/IConsumerInvoker.cs create mode 100644 src/Cap.Consistency/Attributes/KafkaTopicAttribute.cs create mode 100644 src/Cap.Consistency/Attributes/QMessageAttribute.cs create mode 100644 src/Cap.Consistency/Attributes/TopicAttribute.cs create mode 100644 src/Cap.Consistency/Builder/BrokerOptions.cs create mode 100644 src/Cap.Consistency/Builder/ConsistencyBuilder.cs create mode 100644 src/Cap.Consistency/Builder/ConsistencyMarkerService.cs create mode 100644 src/Cap.Consistency/Consumer/ConsumerHandler.cs create mode 100644 src/Cap.Consistency/Consumer/IConsumerHandler.cs create mode 100644 src/Cap.Consistency/Consumer/IConsumerService.cs create mode 100644 src/Cap.Consistency/Consumer/ITaskSchedule.cs create mode 100644 src/Cap.Consistency/Consumer/Kafka/IKafkaTaskSchedule.cs create mode 100644 src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs create mode 100644 src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs create mode 100644 src/Cap.Consistency/Extensions/BuilderExtensions.cs create mode 100644 src/Cap.Consistency/Extensions/ConsistencyOptions.cs create mode 100644 src/Cap.Consistency/Extensions/ServiceCollectionExtensions.cs create mode 100644 src/Cap.Consistency/Infrastructure/DeliverMessage.cs create mode 100644 src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs create mode 100644 src/Cap.Consistency/Infrastructure/IConsumerInvokerFactory.cs create mode 100644 src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs create mode 100644 src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs create mode 100644 src/Cap.Consistency/Internal/QMessageFinder.cs create mode 100644 src/Cap.Consistency/Internal/QMessageMethodInfo.cs create mode 100644 src/Cap.Consistency/KafkaConsistency.cs create mode 100644 src/Cap.Consistency/Models/IConsumerModel.cs create mode 100644 src/Cap.Consistency/Route/TopicRouteContext.cs create mode 100644 src/Cap.Consistency/Store/ConsistencyMessageManager.cs create mode 100644 src/Cap.Consistency/Store/IConsistencyMessageStore.cs diff --git a/src/Cap.Consistency.Server/ConsistencyServer.cs b/src/Cap.Consistency.Server/ConsistencyServer.cs index 29c858d..b3d7f13 100644 --- a/src/Cap.Consistency.Server/ConsistencyServer.cs +++ b/src/Cap.Consistency.Server/ConsistencyServer.cs @@ -5,7 +5,6 @@ using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using RdKafka; using System.Text; namespace Cap.Consistency.Server @@ -40,25 +39,25 @@ namespace Cap.Consistency.Server public void Run() { //配置消费者组 - var config = new Config() { GroupId = "example-csharp-consumer" }; - using (var consumer = new EventConsumer(config, "127.0.0.1:9092")) { + //var config = new Config() { GroupId = "example-csharp-consumer" }; + //using (var consumer = new EventConsumer(config, "127.0.0.1:9092")) { - //注册一个事件 - consumer.OnMessage += (obj, msg) => - { - string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length); - Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}"); - }; + // //注册一个事件 + // consumer.OnMessage += (obj, msg) => + // { + // string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length); + // Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}"); + // }; - //订阅一个或者多个Topic - consumer.Subscribe(new[] { "testtopic" }); + // //订阅一个或者多个Topic + // consumer.Subscribe(new[] { "testtopic" }); - //启动 - consumer.Start(); + // //启动 + // consumer.Start(); - _logger.LogInformation("Started consumer..."); + // _logger.LogInformation("Started consumer..."); - } + //} } } } \ No newline at end of file diff --git a/src/Cap.Consistency/Abstractions/ConsumerContext.cs b/src/Cap.Consistency/Abstractions/ConsumerContext.cs new file mode 100644 index 0000000..6b4aab6 --- /dev/null +++ b/src/Cap.Consistency/Abstractions/ConsumerContext.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Cap.Consistency.Abstractions +{ + + public class ConsumerContext + { + public ConsumerContext(ConsumerExecutorDescriptor descriptor) { + ConsumerDescriptor = descriptor ?? throw new ArgumentNullException(nameof(descriptor)); + } + + public ConsumerExecutorDescriptor ConsumerDescriptor { get; set; } + + } +} diff --git a/src/Cap.Consistency/Abstractions/ConsumerExecutorDescriptor.cs b/src/Cap.Consistency/Abstractions/ConsumerExecutorDescriptor.cs new file mode 100644 index 0000000..a01ec63 --- /dev/null +++ b/src/Cap.Consistency/Abstractions/ConsumerExecutorDescriptor.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; + +namespace Cap.Consistency.Abstractions +{ + public class ConsumerExecutorDescriptor + { + public MethodInfo MethodInfo { get; set; } + + public Type ImplType { get; set; } + + public TopicInfo Topic { get; set; } + + public int GroupId { get; set; } + } +} diff --git a/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs b/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs new file mode 100644 index 0000000..e2fdedc --- /dev/null +++ b/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Cap.Consistency.Abstractions +{ + public class ConsumerInvokerContext + { + public ConsumerInvokerContext(ConsumerContext consumerContext) { + ConsumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext)); + } + + public ConsumerContext ConsumerContext { get; set; } + + public IConsumerInvoker Result { get; set; } + + } +} diff --git a/src/Cap.Consistency/Abstractions/IConsumerInvoker.cs b/src/Cap.Consistency/Abstractions/IConsumerInvoker.cs new file mode 100644 index 0000000..1c000c5 --- /dev/null +++ b/src/Cap.Consistency/Abstractions/IConsumerInvoker.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Cap.Consistency.Abstractions +{ + public interface IConsumerInvoker + { + Task InvokeAsync(); + } +} diff --git a/src/Cap.Consistency/Attributes/KafkaTopicAttribute.cs b/src/Cap.Consistency/Attributes/KafkaTopicAttribute.cs new file mode 100644 index 0000000..9c939b3 --- /dev/null +++ b/src/Cap.Consistency/Attributes/KafkaTopicAttribute.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Cap.Consistency.Attributes +{ + public class KafkaTopicAttribute : TopicAttribute + { + public KafkaTopicAttribute(string topicName) + : this(topicName, 0) { } + + public KafkaTopicAttribute(string topicName, int partition) + : this(topicName, partition, 0) { } + + public KafkaTopicAttribute(string topicName, int partition, long offset) + : base(topicName) { + Offset = offset; + Partition = partition; + } + + public int Partition { get; } + + public long Offset { get; } + + public bool IsPartition { get { return Partition == 0; } } + + public bool IsOffset { get { return Offset == 0; } } + + public override string ToString() { + return Name; + } + } +} diff --git a/src/Cap.Consistency/Attributes/QMessageAttribute.cs b/src/Cap.Consistency/Attributes/QMessageAttribute.cs new file mode 100644 index 0000000..f9e63a8 --- /dev/null +++ b/src/Cap.Consistency/Attributes/QMessageAttribute.cs @@ -0,0 +1,14 @@ +using System; + +namespace Cap.Consistency +{ + [AttributeUsage(AttributeTargets.Method, Inherited = true, AllowMultiple = true)] + sealed class QMessageAttribute : Attribute + { + public QMessageAttribute(string messageName) { + MessageName = messageName; + } + + public string MessageName { get; private set; } + } +} diff --git a/src/Cap.Consistency/Attributes/TopicAttribute.cs b/src/Cap.Consistency/Attributes/TopicAttribute.cs new file mode 100644 index 0000000..152a4fe --- /dev/null +++ b/src/Cap.Consistency/Attributes/TopicAttribute.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Cap.Consistency.Attributes +{ + + [AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, Inherited = true, AllowMultiple = true)] + public abstract class TopicAttribute : Attribute + { + readonly string _name; + + public TopicAttribute(string topicName) { + this._name = topicName; + } + + public string Name { + get { return _name; } + } + + + public bool IsOneWay { get; set; } + + } +} diff --git a/src/Cap.Consistency/Builder/BrokerOptions.cs b/src/Cap.Consistency/Builder/BrokerOptions.cs new file mode 100644 index 0000000..0f567c2 --- /dev/null +++ b/src/Cap.Consistency/Builder/BrokerOptions.cs @@ -0,0 +1,8 @@ +namespace Cap.Consistency +{ + public class BrokerOptions + { + public string HostName { get; set; } + + } +} \ No newline at end of file diff --git a/src/Cap.Consistency/Builder/ConsistencyBuilder.cs b/src/Cap.Consistency/Builder/ConsistencyBuilder.cs new file mode 100644 index 0000000..53dc333 --- /dev/null +++ b/src/Cap.Consistency/Builder/ConsistencyBuilder.cs @@ -0,0 +1,104 @@ +using System; +using System.Reflection; +using System.Collections.Concurrent; +using System.Linq; +using Microsoft.Extensions.DependencyInjection; +using System.Collections.Generic; +using Cap.Consistency.Consumer; +using Cap.Consistency.Consumer.Kafka; + +namespace Cap.Consistency +{ + /// + /// Helper functions for configuring consistency services. + /// + public class ConsistencyBuilder + { + /// + /// Creates a new instance of . + /// + /// The to use for the message. + /// The to attach to. + public ConsistencyBuilder(Type message, IServiceCollection service) { + MessageType = message; + Services = service; + + AddConsumerServices(); + } + + /// + /// Gets the services are attached to. + /// + /// + /// The services are attached to. + /// + public IServiceCollection Services { get; private set; } + + /// + /// Gets the used for messages. + /// + /// + /// The used for messages. + /// + public Type MessageType { get; private set; } + + public virtual ConsistencyBuilder AddConsumerServices() { + + var IConsumerListenerServices = new Dictionary(); + foreach (var rejectedServices in Services) { + if (rejectedServices.ImplementationType != null && typeof(IConsumerService).IsAssignableFrom(rejectedServices.ImplementationType)) + IConsumerListenerServices.Add(typeof(IConsumerService), rejectedServices.ImplementationType); + } + + foreach (var service in IConsumerListenerServices) { + Services.AddSingleton(service.Key, service.Value); + } + + var types = Assembly.GetEntryAssembly().ExportedTypes; + foreach (var type in types) { + if (typeof(IConsumerService).IsAssignableFrom(type)) { + Services.AddSingleton(typeof(IConsumerService), type); + } + } + return this; + } + + public virtual ConsistencyBuilder AddKafkaServices() { + + + Services.AddSingleton(); + return this; + } + + + /// + /// Adds a for the . + /// + /// The role type held in the store. + /// The current instance. + public virtual ConsistencyBuilder AddMessageStore() where T : class { + return AddScoped(typeof(IConsistencyMessageStore<>).MakeGenericType(MessageType), typeof(T)); + } + + /// + /// Adds a for the . + /// + /// The type of the message manager to add. + /// The current instance. + public virtual ConsistencyBuilder AddConsistencyMessageManager() where TMessageManager : class { + var messageManagerType = typeof(ConsistencyMessageManager<>).MakeGenericType(MessageType); + var customType = typeof(TMessageManager); + if (messageManagerType == customType || + !messageManagerType.GetTypeInfo().IsAssignableFrom(customType.GetTypeInfo())) { + throw new InvalidOperationException($"Type {customType.Name} must be derive from ConsistencyMessageManager<{MessageType.Name}>"); + } + Services.AddScoped(customType, services => services.GetRequiredService(messageManagerType)); + return AddScoped(messageManagerType, customType); + } + + private ConsistencyBuilder AddScoped(Type serviceType, Type concreteType) { + Services.AddScoped(serviceType, concreteType); + return this; + } + } +} \ No newline at end of file diff --git a/src/Cap.Consistency/Builder/ConsistencyMarkerService.cs b/src/Cap.Consistency/Builder/ConsistencyMarkerService.cs new file mode 100644 index 0000000..3ec6cbf --- /dev/null +++ b/src/Cap.Consistency/Builder/ConsistencyMarkerService.cs @@ -0,0 +1,7 @@ +namespace Cap.Consistency +{ + /// + /// Used to verify Consistency service was called on a ServiceCollection + /// + public class ConsistencyMarkerService { } +} \ No newline at end of file diff --git a/src/Cap.Consistency/Cap.Consistency.csproj b/src/Cap.Consistency/Cap.Consistency.csproj index 3e0af40..f50a78e 100644 --- a/src/Cap.Consistency/Cap.Consistency.csproj +++ b/src/Cap.Consistency/Cap.Consistency.csproj @@ -12,11 +12,13 @@ + + diff --git a/src/Cap.Consistency/Consumer/ConsumerHandler.cs b/src/Cap.Consistency/Consumer/ConsumerHandler.cs new file mode 100644 index 0000000..4a5eb1f --- /dev/null +++ b/src/Cap.Consistency/Consumer/ConsumerHandler.cs @@ -0,0 +1,68 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Cap.Consistency.Abstractions; +using Cap.Consistency.Infrastructure; +using Cap.Consistency.Route; +using Microsoft.Extensions.Logging; + +namespace Cap.Consistency.Consumer +{ + public class ConsumerHandler : IConsumerHandler + { + + private readonly IConsumerInvokerFactory _consumerInvokerFactory; + private readonly IConsumerExcutorSelector _selector; + private readonly ILoggerFactory _loggerFactory; + private readonly ILogger _logger; + + + public ConsumerHandler( + IConsumerInvokerFactory consumerInvokerFactory, + IConsumerExcutorSelector selector, + ILoggerFactory loggerFactory) { + + _consumerInvokerFactory = consumerInvokerFactory; + _loggerFactory = loggerFactory; + _selector = selector; + _logger = loggerFactory.CreateLogger(); + } + + public Task Start(TopicRouteContext context) { + if (context == null) { + throw new ArgumentNullException(nameof(context)); + } + + var matchs = _selector.SelectCandidates(context); + + if (matchs == null || matchs.Count==0) { + _logger.LogInformation("can not be fond topic route"); + return Task.CompletedTask; + } + + var executeDescriptor = _selector.SelectBestCandidate(context, matchs); + + context.Handler = c => { + + var consumerContext = new ConsumerContext(executeDescriptor); + var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); + + _logger.LogInformation("consumer starting"); + + return invoker.InvokeAsync(); + }; + + return Task.CompletedTask; + } + + + public void Start(IEnumerable consumers) { + throw new NotImplementedException(); + } + + public void Stop() { + throw new NotImplementedException(); + } + } +} diff --git a/src/Cap.Consistency/Consumer/IConsumerHandler.cs b/src/Cap.Consistency/Consumer/IConsumerHandler.cs new file mode 100644 index 0000000..81514b8 --- /dev/null +++ b/src/Cap.Consistency/Consumer/IConsumerHandler.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Cap.Consistency.Consumer +{ + public interface IConsumerHandler + { + void Start(IEnumerable consumers); + + void Stop(); + } + + +} diff --git a/src/Cap.Consistency/Consumer/IConsumerService.cs b/src/Cap.Consistency/Consumer/IConsumerService.cs new file mode 100644 index 0000000..b61edf3 --- /dev/null +++ b/src/Cap.Consistency/Consumer/IConsumerService.cs @@ -0,0 +1,10 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Cap.Consistency.Consumer +{ + public interface IConsumerService + { + } +} diff --git a/src/Cap.Consistency/Consumer/ITaskSchedule.cs b/src/Cap.Consistency/Consumer/ITaskSchedule.cs new file mode 100644 index 0000000..7df3dbf --- /dev/null +++ b/src/Cap.Consistency/Consumer/ITaskSchedule.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using Cap.Consistency.Abstractions; + +namespace Cap.Consistency.Consumer +{ + public interface ITaskSchedule + { + void Start(IReadOnlyList methods); + + void Stop(); + } +} diff --git a/src/Cap.Consistency/Consumer/Kafka/IKafkaTaskSchedule.cs b/src/Cap.Consistency/Consumer/Kafka/IKafkaTaskSchedule.cs new file mode 100644 index 0000000..9b3077d --- /dev/null +++ b/src/Cap.Consistency/Consumer/Kafka/IKafkaTaskSchedule.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Cap.Consistency.Abstractions; +using Microsoft.Extensions.Logging; + +namespace Cap.Consistency.Consumer.Kafka +{ + + public interface IKafkaTaskSchedule : ITaskSchedule { } + + + public class KafkaTaskSchedule : IKafkaTaskSchedule + { + + private readonly ILogger _logger; + + + public KafkaTaskSchedule(ILoggerFactory loggerFactory) { + _logger = loggerFactory.CreateLogger(); + + } + public void Start(IReadOnlyList methods) { + throw new NotImplementedException(); + } + + public void Stop() { + throw new NotImplementedException(); + } + } +} diff --git a/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs b/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs new file mode 100644 index 0000000..bbf8043 --- /dev/null +++ b/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs @@ -0,0 +1,28 @@ +using System; +using System.Linq; +using System.Collections.Generic; +using System.Reflection; +using System.Text; + +namespace Cap.Consistency.Consumer.Kafka +{ + public class KafkaConsumerHandler : IConsumerHandler + { + public readonly QMessageFinder _finder; + + public KafkaConsumerHandler(QMessageFinder finder) { + _finder = finder; + } + + public void Start(IEnumerable consumers) { + + var methods = _finder.GetQMessageMethodInfo(consumers.Select(x => x.GetType()).ToArray()); + + + } + + public void Stop() { + throw new NotImplementedException(); + } + } +} diff --git a/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs b/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs new file mode 100644 index 0000000..bc1efb6 --- /dev/null +++ b/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Cap.Consistency.Route; +using Confluent.Kafka; +using Confluent.Kafka.Serialization; + +namespace Cap.Consistency.Consumer.Kafka +{ + public class RdKafkaClient + { + + private Consumer _client; + + public RdKafkaClient() { + + } + + + public void Start(TopicRouteContext routeContext ) { + + string brokerList = null;// args[0]; + var topics = new List();// args.Skip(1).ToList(); + + var config = new Dictionary + { + { "group.id", "simple-csharp-consumer" }, + { "bootstrap.servers", brokerList } + }; + + using (var consumer = new Consumer(config, null, new StringDeserializer(Encoding.UTF8))) { + //consumer.Assign(new List { new TopicInfo(topics.First(), 0, 0) }); + + while (true) { + Message msg; + if (consumer.Consume(out msg)) { + Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}"); + } + } + } + + } + } +} diff --git a/src/Cap.Consistency/Extensions/BuilderExtensions.cs b/src/Cap.Consistency/Extensions/BuilderExtensions.cs new file mode 100644 index 0000000..96822dc --- /dev/null +++ b/src/Cap.Consistency/Extensions/BuilderExtensions.cs @@ -0,0 +1,31 @@ +using System; +using Cap.Consistency; +using Microsoft.Extensions.DependencyInjection; + +// ReSharper disable once CheckNamespace +namespace Microsoft.AspNetCore.Builder +{ + /// + /// Consistence extensions for + /// + public static class BuilderExtensions + { + /// + /// Enables Consistence for the current application + /// + /// The instance this method extends. + /// The instance this method extends. + public static IApplicationBuilder UseConsistency(this IApplicationBuilder app) { + if (app == null) { + throw new ArgumentNullException(nameof(app)); + } + + var marker = app.ApplicationServices.GetService(); + if (marker == null) { + throw new InvalidOperationException("Add Consistency must be called on the service collection."); + } + + return app; + } + } +} \ No newline at end of file diff --git a/src/Cap.Consistency/Extensions/ConsistencyOptions.cs b/src/Cap.Consistency/Extensions/ConsistencyOptions.cs new file mode 100644 index 0000000..344044c --- /dev/null +++ b/src/Cap.Consistency/Extensions/ConsistencyOptions.cs @@ -0,0 +1,17 @@ +using Cap.Consistency; + +namespace Microsoft.AspNetCore.Builder +{ + /// + /// Represents all the options you can use to configure the system. + /// + public class ConsistencyOptions + { + /// + /// Gets or sets the for the consistency system. + /// + public BrokerOptions Broker { get; set; } = new BrokerOptions(); + + + } +} \ No newline at end of file diff --git a/src/Cap.Consistency/Extensions/ServiceCollectionExtensions.cs b/src/Cap.Consistency/Extensions/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..89f22c6 --- /dev/null +++ b/src/Cap.Consistency/Extensions/ServiceCollectionExtensions.cs @@ -0,0 +1,46 @@ +using System; +using Cap.Consistency; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection.Extensions; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + /// + /// Contains extension methods to for configuring consistence services. + /// + public static class ServiceCollectionExtensions + { + /// + /// Adds and configures the consistence services for the consitence. + /// + /// The services available in the application. + /// An for application services. + public static ConsistencyBuilder AddConsistency(this IServiceCollection services) + where TMessage : class { + return services.AddConsistency(setupAction: null); + } + + /// + /// Adds and configures the consistence services for the consitence. + /// + /// The services available in the application. + /// An action to configure the . + /// An for application services. + public static ConsistencyBuilder AddConsistency(this IServiceCollection services, Action setupAction) + where TMessage : class { + services.TryAddSingleton(); + + services.TryAddScoped, ConsistencyMessageManager>(); + + services.AddSingleton(); + + if (setupAction != null) { + services.Configure(setupAction); + } + + return new ConsistencyBuilder(typeof(TMessage), services); + } + + } +} \ No newline at end of file diff --git a/src/Cap.Consistency/Infrastructure/DeliverMessage.cs b/src/Cap.Consistency/Infrastructure/DeliverMessage.cs new file mode 100644 index 0000000..28ab90e --- /dev/null +++ b/src/Cap.Consistency/Infrastructure/DeliverMessage.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Cap.Consistency.Infrastructure +{ + public class DeliverMessage + { + /// + /// Kafka 对应 Topic name + /// + /// RabbitMQ 对应 RoutingKey + /// + /// + public string MessageKey { get; set; } + + + public byte[] Body { get; set; } + } + + + public class KafkaDeliverMessage : DeliverMessage + { + + public int Partition { get; set; } + + public long Offset { get; set; } + + public string MessageId { get; set; } + + } +} diff --git a/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs b/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs new file mode 100644 index 0000000..7099b24 --- /dev/null +++ b/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Cap.Consistency.Abstractions; +using Cap.Consistency.Route; + +namespace Cap.Consistency.Infrastructure +{ + public interface IConsumerExcutorSelector + { + IReadOnlyList SelectCandidates(TopicRouteContext context); + + ConsumerExecutorDescriptor SelectBestCandidate(TopicRouteContext context, IReadOnlyList executeDescriptor); + } +} diff --git a/src/Cap.Consistency/Infrastructure/IConsumerInvokerFactory.cs b/src/Cap.Consistency/Infrastructure/IConsumerInvokerFactory.cs new file mode 100644 index 0000000..bc1260d --- /dev/null +++ b/src/Cap.Consistency/Infrastructure/IConsumerInvokerFactory.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Cap.Consistency.Abstractions; + +namespace Cap.Consistency.Infrastructure +{ + public interface IConsumerInvokerFactory + { + IConsumerInvoker CreateInvoker(ConsumerContext actionContext); + } +} diff --git a/src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs b/src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs new file mode 100644 index 0000000..884ae2b --- /dev/null +++ b/src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs @@ -0,0 +1,57 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; +using Cap.Consistency.Abstractions; +using Cap.Consistency.Attributes; +using Cap.Consistency.Consumer; +using Cap.Consistency.Infrastructure; +using Cap.Consistency.Route; +using Microsoft.Extensions.DependencyInjection; + +namespace Cap.Consistency.Internal +{ + public class ConsumerExcutorSelector : IConsumerExcutorSelector + { + public ConsumerExecutorDescriptor SelectBestCandidate(TopicRouteContext context, + IReadOnlyList executeDescriptor) { + + var key = context.Message.MessageKey; + return executeDescriptor.FirstOrDefault(x => x.Topic.Name == key); + } + + public IReadOnlyList SelectCandidates(TopicRouteContext context) { + + var consumerServices = context.ServiceProvider.GetServices(); + + var executorDescriptorList = new List(); + foreach (var service in consumerServices) { + var typeInfo = service.GetType().GetTypeInfo(); + if (!typeof(IConsumerService).GetTypeInfo().IsAssignableFrom(typeInfo)) { + continue; + } + + foreach (var method in typeInfo.DeclaredMethods) { + + var topicAttr = method.GetCustomAttribute(true); + if (topicAttr == null) continue; + + executorDescriptorList.Add(InitDescriptor(topicAttr)); + } + } + + return executorDescriptorList; + } + + private ConsumerExecutorDescriptor InitDescriptor(TopicAttribute attr) { + var descriptor = new ConsumerExecutorDescriptor(); + + descriptor.Topic = new TopicInfo(attr.Name); + + return descriptor; + } + + + } +} diff --git a/src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs b/src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs new file mode 100644 index 0000000..4fa022f --- /dev/null +++ b/src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Cap.Consistency.Abstractions; +using Cap.Consistency.Infrastructure; + +namespace Cap.Consistency.Internal +{ + public class ConsumerInvokerFactory : IConsumerInvokerFactory + { + public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext) { + var context = new ConsumerInvokerContext(consumerContext); + return context.Result; + } + } +} diff --git a/src/Cap.Consistency/Internal/QMessageFinder.cs b/src/Cap.Consistency/Internal/QMessageFinder.cs new file mode 100644 index 0000000..363bbdf --- /dev/null +++ b/src/Cap.Consistency/Internal/QMessageFinder.cs @@ -0,0 +1,43 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Threading.Tasks; +using System.Collections.Concurrent; +using Cap.Consistency.Extensions; +using Cap.Consistency.Abstractions; + +namespace Cap.Consistency +{ + public class QMessageFinder + { + public ConcurrentDictionary GetQMessageMethodInfo(params Type[] serviceType) { + + var qMessageTypes = new ConcurrentDictionary(); + + foreach (var type in serviceType) { + + foreach (var method in type.GetTypeInfo().DeclaredMethods) { + + var messageMethodInfo = new ConsumerExecutorDescriptor(); + + if (method.IsPropertyBinding()) { + continue; + } + + var qMessageAttr = method.GetCustomAttribute(); + if (qMessageAttr == null) { + continue; + } + + messageMethodInfo.ImplType = method.DeclaringType; + messageMethodInfo.MethodInfo = method; + + qMessageTypes.AddOrUpdate(qMessageAttr.MessageName, messageMethodInfo, (x, y) => y); + } + } + + return qMessageTypes; + } + } +} diff --git a/src/Cap.Consistency/Internal/QMessageMethodInfo.cs b/src/Cap.Consistency/Internal/QMessageMethodInfo.cs new file mode 100644 index 0000000..07ffbbd --- /dev/null +++ b/src/Cap.Consistency/Internal/QMessageMethodInfo.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Threading.Tasks; + +namespace Cap.Consistency +{ + public class TopicInfo + { + public TopicInfo(string topicName) : this(topicName, 0) {} + + public TopicInfo(string topicName, int partition) : this(topicName, partition, 0) {} + + public TopicInfo(string topicName, int partition, long offset) { + Name = topicName; + Offset = offset; + Partition = partition; + } + + public string Name { get; } + + public int Partition { get; } + + public long Offset { get; } + + public bool IsPartition { get { return Partition == 0; } } + + public bool IsOffset { get { return Offset == 0; } } + + public override string ToString() { + return Name; + } + + } +} diff --git a/src/Cap.Consistency/KafkaConsistency.cs b/src/Cap.Consistency/KafkaConsistency.cs new file mode 100644 index 0000000..12ca288 --- /dev/null +++ b/src/Cap.Consistency/KafkaConsistency.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Linq; +using Cap.Consistency.Consumer; +using Microsoft.Extensions.DependencyInjection; + +namespace Cap.Consistency +{ + public class KafkaConsistency + { + private IServiceProvider _serviceProvider; + private IEnumerable _handlers; + + public KafkaConsistency(IServiceProvider serviceProvider) { + _serviceProvider = serviceProvider; + } + + public void Start() { + _handlers = _serviceProvider.GetServices(); + var services = _serviceProvider.GetServices(); + foreach (var handler in _handlers) { + handler.Start(services); + } + } + + public void Stop() { + foreach (var handler in _handlers) { + handler.Stop(); + } + } + } +} diff --git a/src/Cap.Consistency/Models/IConsumerModel.cs b/src/Cap.Consistency/Models/IConsumerModel.cs new file mode 100644 index 0000000..437e782 --- /dev/null +++ b/src/Cap.Consistency/Models/IConsumerModel.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Cap.Consistency.Models +{ + public interface IConsumerModel + { + string TopicName { get; set; } + + } +} diff --git a/src/Cap.Consistency/Route/TopicRouteContext.cs b/src/Cap.Consistency/Route/TopicRouteContext.cs new file mode 100644 index 0000000..984e5a6 --- /dev/null +++ b/src/Cap.Consistency/Route/TopicRouteContext.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Cap.Consistency.Abstractions; +using Cap.Consistency.Consumer; +using Cap.Consistency.Infrastructure; + +namespace Cap.Consistency.Route +{ + public delegate Task HandlerConsumer(ConsumerExecutorDescriptor context); + + public class TopicRouteContext + { + + public TopicRouteContext(DeliverMessage message) { + Message = message; + } + + public DeliverMessage Message { get; } + + // public event EventHandler OnMessage; + + public HandlerConsumer Handler { get; set; } + + public IServiceProvider ServiceProvider { get; set; } + + public IList Consumers { get; set; } + + } +} diff --git a/src/Cap.Consistency/RouteTable.cs b/src/Cap.Consistency/RouteTable.cs index f327cf6..b45953c 100644 --- a/src/Cap.Consistency/RouteTable.cs +++ b/src/Cap.Consistency/RouteTable.cs @@ -1,21 +1,22 @@ using System; using System.Collections; using System.Collections.Generic; +using Cap.Consistency.Abstractions; namespace Cap.Consistency { - public class RouteTable : IReadOnlyList + public class RouteTable : IReadOnlyList { public RouteTable() { } - public RouteTable(List messageMethods) { + public RouteTable(List messageMethods) { QMessageMethods = messageMethods; } - public QMessageMethodInfo this[int index] { + public ConsumerExecutorDescriptor this[int index] { get { throw new NotImplementedException(); } @@ -27,9 +28,9 @@ namespace Cap.Consistency } } - public List QMessageMethods { get; set; } + public List QMessageMethods { get; set; } - public IEnumerator GetEnumerator() { + public IEnumerator GetEnumerator() { throw new NotImplementedException(); } diff --git a/src/Cap.Consistency/Store/ConsistencyMessageManager.cs b/src/Cap.Consistency/Store/ConsistencyMessageManager.cs new file mode 100644 index 0000000..7bcc947 --- /dev/null +++ b/src/Cap.Consistency/Store/ConsistencyMessageManager.cs @@ -0,0 +1,147 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Cap.Consistency +{ + /// + /// Provides the APIs for managing message in a persistence store. + /// + /// The type encapsulating a message. + public class ConsistencyMessageManager : IDisposable where TMessage : class + { + private bool _disposed; + private readonly HttpContext _context; + private CancellationToken CancellationToken => _context?.RequestAborted ?? CancellationToken.None; + + /// + /// Constructs a new instance of . + /// + /// The persistence store the manager will operate over. + /// The used to resolve services. + /// The logger used to log messages, warnings and errors. + public ConsistencyMessageManager(IConsistencyMessageStore store, + IServiceProvider services, + ILogger> logger) { + if (store == null) { + throw new ArgumentNullException(nameof(store)); + } + + Store = store; + Logger = logger; + + if (services != null) { + _context = services.GetService()?.HttpContext; + } + } + + /// + /// Gets or sets the persistence store the manager operates over. + /// + /// The persistence store the manager operates over. + protected internal IConsistencyMessageStore Store { get; set; } + + /// + /// Gets the used to log messages from the manager. + /// + /// + /// The used to log messages from the manager. + /// + protected internal virtual ILogger Logger { get; set; } + + /// + /// Creates the specified in the backing store. + /// + /// The message to create. + /// + /// The that represents the asynchronous operation, containing the + /// of the operation. + /// + public virtual Task CreateAsync(TMessage message) { + ThrowIfDisposed(); + //todo: validation message fileds is correct + + return Store.CreateAsync(message, CancellationToken); + } + + /// + /// Updates the specified in the backing store. + /// + /// The message to update. + /// + /// The that represents the asynchronous operation, containing the + /// of the operation. + /// + public virtual Task UpdateAsync(TMessage message) { + ThrowIfDisposed(); + //todo: validation message fileds is correct + + return Store.UpdateAsync(message, CancellationToken); + } + + /// + /// Deletes the specified in the backing store. + /// + /// The message to delete. + /// + /// The that represents the asynchronous operation, containing the + /// of the operation. + /// + public virtual Task DeleteAsync(TMessage message) { + ThrowIfDisposed(); + + if (message == null) { + throw new ArgumentNullException(nameof(message)); + } + + return Store.DeleteAsync(message, CancellationToken); + } + + /// + /// Finds and returns a message, if any, who has the specified . + /// + /// The message ID to search for. + /// + /// The that represents the asynchronous operation, containing the user matching the specified if it exists. + /// + public virtual Task FindByIdAsync(string messageId) { + ThrowIfDisposed(); + return Store.FindByIdAsync(messageId, CancellationToken); + } + + /// + /// Gets the message identifier for the specified . + /// + /// The message whose identifier should be retrieved. + /// The that represents the asynchronous operation, containing the identifier for the specified . + public virtual async Task GetMessageIdAsync(TMessage message) { + ThrowIfDisposed(); + return await Store.GetMessageIdAsync(message, CancellationToken); + } + + public void Dispose() { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Releases the unmanaged resources used by the message manager and optionally releases the managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool disposing) { + if (disposing && !_disposed) { + Store.Dispose(); + _disposed = true; + } + } + + protected void ThrowIfDisposed() { + if (_disposed) { + throw new ObjectDisposedException(GetType().Name); + } + } + } +} \ No newline at end of file diff --git a/src/Cap.Consistency/Store/IConsistencyMessageStore.cs b/src/Cap.Consistency/Store/IConsistencyMessageStore.cs new file mode 100644 index 0000000..80f54a3 --- /dev/null +++ b/src/Cap.Consistency/Store/IConsistencyMessageStore.cs @@ -0,0 +1,55 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Cap.Consistency +{ + /// + /// Provides an abstraction for a store which manages consistent message. + /// + /// + public interface IConsistencyMessageStore : IDisposable where TMessage : class + { + /// + /// Finds and returns a message, if any, who has the specified . + /// + /// The message ID to search for. + /// The used to propagate notifications that the operation should be canceled. + /// + /// The that represents the asynchronous operation, containing the message matching the specified if it exists. + /// + Task FindByIdAsync(string messageId, CancellationToken cancellationToken); + + /// + /// Creates a new message in a store as an asynchronous operation. + /// + /// The message to create in the store. + /// The used to propagate notifications that the operation should be canceled. + /// A that represents the of the asynchronous query. + Task CreateAsync(TMessage message, CancellationToken cancellationToken); + + /// + /// Updates a message in a store as an asynchronous operation. + /// + /// The message to update in the store. + /// The used to propagate notifications that the operation should be canceled. + /// A that represents the of the asynchronous query. + Task UpdateAsync(TMessage message, CancellationToken cancellationToken); + + /// + /// Deletes a message from the store as an asynchronous operation. + /// + /// The message to delete in the store. + /// The used to propagate notifications that the operation should be canceled. + /// A that represents the of the asynchronous query. + Task DeleteAsync(TMessage message, CancellationToken cancellationToken); + + /// + /// Gets the ID for a message from the store as an asynchronous operation. + /// + /// The message whose ID should be returned. + /// The used to propagate notifications that the operation should be canceled. + /// A that contains the ID of the message. + Task GetMessageIdAsync(TMessage message, CancellationToken cancellationToken); + } +} \ No newline at end of file diff --git a/test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj b/test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj index ac2c1df..70bc1b8 100644 --- a/test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj +++ b/test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj @@ -27,7 +27,7 @@ - + diff --git a/test/Cap.Consistency.Test/Cap.Consistency.Test.csproj b/test/Cap.Consistency.Test/Cap.Consistency.Test.csproj index 7f2e537..148bb6a 100644 --- a/test/Cap.Consistency.Test/Cap.Consistency.Test.csproj +++ b/test/Cap.Consistency.Test/Cap.Consistency.Test.csproj @@ -23,7 +23,7 @@ - +