diff --git a/src/Cap.Consistency.Server/ConsistencyServer.cs b/src/Cap.Consistency.Server/ConsistencyServer.cs
index 29c858d..b3d7f13 100644
--- a/src/Cap.Consistency.Server/ConsistencyServer.cs
+++ b/src/Cap.Consistency.Server/ConsistencyServer.cs
@@ -5,7 +5,6 @@ using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
-using RdKafka;
using System.Text;
namespace Cap.Consistency.Server
@@ -40,25 +39,25 @@ namespace Cap.Consistency.Server
public void Run() {
//配置消费者组
- var config = new Config() { GroupId = "example-csharp-consumer" };
- using (var consumer = new EventConsumer(config, "127.0.0.1:9092")) {
+ //var config = new Config() { GroupId = "example-csharp-consumer" };
+ //using (var consumer = new EventConsumer(config, "127.0.0.1:9092")) {
- //注册一个事件
- consumer.OnMessage += (obj, msg) =>
- {
- string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
- Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
- };
+ // //注册一个事件
+ // consumer.OnMessage += (obj, msg) =>
+ // {
+ // string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
+ // Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
+ // };
- //订阅一个或者多个Topic
- consumer.Subscribe(new[] { "testtopic" });
+ // //订阅一个或者多个Topic
+ // consumer.Subscribe(new[] { "testtopic" });
- //启动
- consumer.Start();
+ // //启动
+ // consumer.Start();
- _logger.LogInformation("Started consumer...");
+ // _logger.LogInformation("Started consumer...");
- }
+ //}
}
}
}
\ No newline at end of file
diff --git a/src/Cap.Consistency/Abstractions/ConsumerContext.cs b/src/Cap.Consistency/Abstractions/ConsumerContext.cs
new file mode 100644
index 0000000..6b4aab6
--- /dev/null
+++ b/src/Cap.Consistency/Abstractions/ConsumerContext.cs
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Cap.Consistency.Abstractions
+{
+
+ public class ConsumerContext
+ {
+ public ConsumerContext(ConsumerExecutorDescriptor descriptor) {
+ ConsumerDescriptor = descriptor ?? throw new ArgumentNullException(nameof(descriptor));
+ }
+
+ public ConsumerExecutorDescriptor ConsumerDescriptor { get; set; }
+
+ }
+}
diff --git a/src/Cap.Consistency/Abstractions/ConsumerExecutorDescriptor.cs b/src/Cap.Consistency/Abstractions/ConsumerExecutorDescriptor.cs
new file mode 100644
index 0000000..a01ec63
--- /dev/null
+++ b/src/Cap.Consistency/Abstractions/ConsumerExecutorDescriptor.cs
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Text;
+
+namespace Cap.Consistency.Abstractions
+{
+ public class ConsumerExecutorDescriptor
+ {
+ public MethodInfo MethodInfo { get; set; }
+
+ public Type ImplType { get; set; }
+
+ public TopicInfo Topic { get; set; }
+
+ public int GroupId { get; set; }
+ }
+}
diff --git a/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs b/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs
new file mode 100644
index 0000000..e2fdedc
--- /dev/null
+++ b/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Cap.Consistency.Abstractions
+{
+ public class ConsumerInvokerContext
+ {
+ public ConsumerInvokerContext(ConsumerContext consumerContext) {
+ ConsumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext));
+ }
+
+ public ConsumerContext ConsumerContext { get; set; }
+
+ public IConsumerInvoker Result { get; set; }
+
+ }
+}
diff --git a/src/Cap.Consistency/Abstractions/IConsumerInvoker.cs b/src/Cap.Consistency/Abstractions/IConsumerInvoker.cs
new file mode 100644
index 0000000..1c000c5
--- /dev/null
+++ b/src/Cap.Consistency/Abstractions/IConsumerInvoker.cs
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Cap.Consistency.Abstractions
+{
+ public interface IConsumerInvoker
+ {
+ Task InvokeAsync();
+ }
+}
diff --git a/src/Cap.Consistency/Attributes/KafkaTopicAttribute.cs b/src/Cap.Consistency/Attributes/KafkaTopicAttribute.cs
new file mode 100644
index 0000000..9c939b3
--- /dev/null
+++ b/src/Cap.Consistency/Attributes/KafkaTopicAttribute.cs
@@ -0,0 +1,33 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Cap.Consistency.Attributes
+{
+ public class KafkaTopicAttribute : TopicAttribute
+ {
+ public KafkaTopicAttribute(string topicName)
+ : this(topicName, 0) { }
+
+ public KafkaTopicAttribute(string topicName, int partition)
+ : this(topicName, partition, 0) { }
+
+ public KafkaTopicAttribute(string topicName, int partition, long offset)
+ : base(topicName) {
+ Offset = offset;
+ Partition = partition;
+ }
+
+ public int Partition { get; }
+
+ public long Offset { get; }
+
+ public bool IsPartition { get { return Partition == 0; } }
+
+ public bool IsOffset { get { return Offset == 0; } }
+
+ public override string ToString() {
+ return Name;
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Attributes/QMessageAttribute.cs b/src/Cap.Consistency/Attributes/QMessageAttribute.cs
new file mode 100644
index 0000000..f9e63a8
--- /dev/null
+++ b/src/Cap.Consistency/Attributes/QMessageAttribute.cs
@@ -0,0 +1,14 @@
+using System;
+
+namespace Cap.Consistency
+{
+ [AttributeUsage(AttributeTargets.Method, Inherited = true, AllowMultiple = true)]
+ sealed class QMessageAttribute : Attribute
+ {
+ public QMessageAttribute(string messageName) {
+ MessageName = messageName;
+ }
+
+ public string MessageName { get; private set; }
+ }
+}
diff --git a/src/Cap.Consistency/Attributes/TopicAttribute.cs b/src/Cap.Consistency/Attributes/TopicAttribute.cs
new file mode 100644
index 0000000..152a4fe
--- /dev/null
+++ b/src/Cap.Consistency/Attributes/TopicAttribute.cs
@@ -0,0 +1,25 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Cap.Consistency.Attributes
+{
+
+ [AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, Inherited = true, AllowMultiple = true)]
+ public abstract class TopicAttribute : Attribute
+ {
+ readonly string _name;
+
+ public TopicAttribute(string topicName) {
+ this._name = topicName;
+ }
+
+ public string Name {
+ get { return _name; }
+ }
+
+
+ public bool IsOneWay { get; set; }
+
+ }
+}
diff --git a/src/Cap.Consistency/Builder/BrokerOptions.cs b/src/Cap.Consistency/Builder/BrokerOptions.cs
new file mode 100644
index 0000000..0f567c2
--- /dev/null
+++ b/src/Cap.Consistency/Builder/BrokerOptions.cs
@@ -0,0 +1,8 @@
+namespace Cap.Consistency
+{
+ public class BrokerOptions
+ {
+ public string HostName { get; set; }
+
+ }
+}
\ No newline at end of file
diff --git a/src/Cap.Consistency/Builder/ConsistencyBuilder.cs b/src/Cap.Consistency/Builder/ConsistencyBuilder.cs
new file mode 100644
index 0000000..53dc333
--- /dev/null
+++ b/src/Cap.Consistency/Builder/ConsistencyBuilder.cs
@@ -0,0 +1,104 @@
+using System;
+using System.Reflection;
+using System.Collections.Concurrent;
+using System.Linq;
+using Microsoft.Extensions.DependencyInjection;
+using System.Collections.Generic;
+using Cap.Consistency.Consumer;
+using Cap.Consistency.Consumer.Kafka;
+
+namespace Cap.Consistency
+{
+ ///
+ /// Helper functions for configuring consistency services.
+ ///
+ public class ConsistencyBuilder
+ {
+ ///
+ /// Creates a new instance of .
+ ///
+ /// The to use for the message.
+ /// The to attach to.
+ public ConsistencyBuilder(Type message, IServiceCollection service) {
+ MessageType = message;
+ Services = service;
+
+ AddConsumerServices();
+ }
+
+ ///
+ /// Gets the services are attached to.
+ ///
+ ///
+ /// The services are attached to.
+ ///
+ public IServiceCollection Services { get; private set; }
+
+ ///
+ /// Gets the used for messages.
+ ///
+ ///
+ /// The used for messages.
+ ///
+ public Type MessageType { get; private set; }
+
+ public virtual ConsistencyBuilder AddConsumerServices() {
+
+ var IConsumerListenerServices = new Dictionary();
+ foreach (var rejectedServices in Services) {
+ if (rejectedServices.ImplementationType != null && typeof(IConsumerService).IsAssignableFrom(rejectedServices.ImplementationType))
+ IConsumerListenerServices.Add(typeof(IConsumerService), rejectedServices.ImplementationType);
+ }
+
+ foreach (var service in IConsumerListenerServices) {
+ Services.AddSingleton(service.Key, service.Value);
+ }
+
+ var types = Assembly.GetEntryAssembly().ExportedTypes;
+ foreach (var type in types) {
+ if (typeof(IConsumerService).IsAssignableFrom(type)) {
+ Services.AddSingleton(typeof(IConsumerService), type);
+ }
+ }
+ return this;
+ }
+
+ public virtual ConsistencyBuilder AddKafkaServices() {
+
+
+ Services.AddSingleton();
+ return this;
+ }
+
+
+ ///
+ /// Adds a for the .
+ ///
+ /// The role type held in the store.
+ /// The current instance.
+ public virtual ConsistencyBuilder AddMessageStore() where T : class {
+ return AddScoped(typeof(IConsistencyMessageStore<>).MakeGenericType(MessageType), typeof(T));
+ }
+
+ ///
+ /// Adds a for the .
+ ///
+ /// The type of the message manager to add.
+ /// The current instance.
+ public virtual ConsistencyBuilder AddConsistencyMessageManager() where TMessageManager : class {
+ var messageManagerType = typeof(ConsistencyMessageManager<>).MakeGenericType(MessageType);
+ var customType = typeof(TMessageManager);
+ if (messageManagerType == customType ||
+ !messageManagerType.GetTypeInfo().IsAssignableFrom(customType.GetTypeInfo())) {
+ throw new InvalidOperationException($"Type {customType.Name} must be derive from ConsistencyMessageManager<{MessageType.Name}>");
+ }
+ Services.AddScoped(customType, services => services.GetRequiredService(messageManagerType));
+ return AddScoped(messageManagerType, customType);
+ }
+
+ private ConsistencyBuilder AddScoped(Type serviceType, Type concreteType) {
+ Services.AddScoped(serviceType, concreteType);
+ return this;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Cap.Consistency/Builder/ConsistencyMarkerService.cs b/src/Cap.Consistency/Builder/ConsistencyMarkerService.cs
new file mode 100644
index 0000000..3ec6cbf
--- /dev/null
+++ b/src/Cap.Consistency/Builder/ConsistencyMarkerService.cs
@@ -0,0 +1,7 @@
+namespace Cap.Consistency
+{
+ ///
+ /// Used to verify Consistency service was called on a ServiceCollection
+ ///
+ public class ConsistencyMarkerService { }
+}
\ No newline at end of file
diff --git a/src/Cap.Consistency/Cap.Consistency.csproj b/src/Cap.Consistency/Cap.Consistency.csproj
index 3e0af40..f50a78e 100644
--- a/src/Cap.Consistency/Cap.Consistency.csproj
+++ b/src/Cap.Consistency/Cap.Consistency.csproj
@@ -12,11 +12,13 @@
+
+
diff --git a/src/Cap.Consistency/Consumer/ConsumerHandler.cs b/src/Cap.Consistency/Consumer/ConsumerHandler.cs
new file mode 100644
index 0000000..4a5eb1f
--- /dev/null
+++ b/src/Cap.Consistency/Consumer/ConsumerHandler.cs
@@ -0,0 +1,68 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using Cap.Consistency.Abstractions;
+using Cap.Consistency.Infrastructure;
+using Cap.Consistency.Route;
+using Microsoft.Extensions.Logging;
+
+namespace Cap.Consistency.Consumer
+{
+ public class ConsumerHandler : IConsumerHandler
+ {
+
+ private readonly IConsumerInvokerFactory _consumerInvokerFactory;
+ private readonly IConsumerExcutorSelector _selector;
+ private readonly ILoggerFactory _loggerFactory;
+ private readonly ILogger _logger;
+
+
+ public ConsumerHandler(
+ IConsumerInvokerFactory consumerInvokerFactory,
+ IConsumerExcutorSelector selector,
+ ILoggerFactory loggerFactory) {
+
+ _consumerInvokerFactory = consumerInvokerFactory;
+ _loggerFactory = loggerFactory;
+ _selector = selector;
+ _logger = loggerFactory.CreateLogger();
+ }
+
+ public Task Start(TopicRouteContext context) {
+ if (context == null) {
+ throw new ArgumentNullException(nameof(context));
+ }
+
+ var matchs = _selector.SelectCandidates(context);
+
+ if (matchs == null || matchs.Count==0) {
+ _logger.LogInformation("can not be fond topic route");
+ return Task.CompletedTask;
+ }
+
+ var executeDescriptor = _selector.SelectBestCandidate(context, matchs);
+
+ context.Handler = c => {
+
+ var consumerContext = new ConsumerContext(executeDescriptor);
+ var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);
+
+ _logger.LogInformation("consumer starting");
+
+ return invoker.InvokeAsync();
+ };
+
+ return Task.CompletedTask;
+ }
+
+
+ public void Start(IEnumerable consumers) {
+ throw new NotImplementedException();
+ }
+
+ public void Stop() {
+ throw new NotImplementedException();
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Consumer/IConsumerHandler.cs b/src/Cap.Consistency/Consumer/IConsumerHandler.cs
new file mode 100644
index 0000000..81514b8
--- /dev/null
+++ b/src/Cap.Consistency/Consumer/IConsumerHandler.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Cap.Consistency.Consumer
+{
+ public interface IConsumerHandler
+ {
+ void Start(IEnumerable consumers);
+
+ void Stop();
+ }
+
+
+}
diff --git a/src/Cap.Consistency/Consumer/IConsumerService.cs b/src/Cap.Consistency/Consumer/IConsumerService.cs
new file mode 100644
index 0000000..b61edf3
--- /dev/null
+++ b/src/Cap.Consistency/Consumer/IConsumerService.cs
@@ -0,0 +1,10 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Cap.Consistency.Consumer
+{
+ public interface IConsumerService
+ {
+ }
+}
diff --git a/src/Cap.Consistency/Consumer/ITaskSchedule.cs b/src/Cap.Consistency/Consumer/ITaskSchedule.cs
new file mode 100644
index 0000000..7df3dbf
--- /dev/null
+++ b/src/Cap.Consistency/Consumer/ITaskSchedule.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Text;
+using Cap.Consistency.Abstractions;
+
+namespace Cap.Consistency.Consumer
+{
+ public interface ITaskSchedule
+ {
+ void Start(IReadOnlyList methods);
+
+ void Stop();
+ }
+}
diff --git a/src/Cap.Consistency/Consumer/Kafka/IKafkaTaskSchedule.cs b/src/Cap.Consistency/Consumer/Kafka/IKafkaTaskSchedule.cs
new file mode 100644
index 0000000..9b3077d
--- /dev/null
+++ b/src/Cap.Consistency/Consumer/Kafka/IKafkaTaskSchedule.cs
@@ -0,0 +1,31 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Cap.Consistency.Abstractions;
+using Microsoft.Extensions.Logging;
+
+namespace Cap.Consistency.Consumer.Kafka
+{
+
+ public interface IKafkaTaskSchedule : ITaskSchedule { }
+
+
+ public class KafkaTaskSchedule : IKafkaTaskSchedule
+ {
+
+ private readonly ILogger _logger;
+
+
+ public KafkaTaskSchedule(ILoggerFactory loggerFactory) {
+ _logger = loggerFactory.CreateLogger();
+
+ }
+ public void Start(IReadOnlyList methods) {
+ throw new NotImplementedException();
+ }
+
+ public void Stop() {
+ throw new NotImplementedException();
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs b/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs
new file mode 100644
index 0000000..bbf8043
--- /dev/null
+++ b/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs
@@ -0,0 +1,28 @@
+using System;
+using System.Linq;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Text;
+
+namespace Cap.Consistency.Consumer.Kafka
+{
+ public class KafkaConsumerHandler : IConsumerHandler
+ {
+ public readonly QMessageFinder _finder;
+
+ public KafkaConsumerHandler(QMessageFinder finder) {
+ _finder = finder;
+ }
+
+ public void Start(IEnumerable consumers) {
+
+ var methods = _finder.GetQMessageMethodInfo(consumers.Select(x => x.GetType()).ToArray());
+
+
+ }
+
+ public void Stop() {
+ throw new NotImplementedException();
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs b/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs
new file mode 100644
index 0000000..bc1efb6
--- /dev/null
+++ b/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs
@@ -0,0 +1,45 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using Cap.Consistency.Route;
+using Confluent.Kafka;
+using Confluent.Kafka.Serialization;
+
+namespace Cap.Consistency.Consumer.Kafka
+{
+ public class RdKafkaClient
+ {
+
+ private Consumer _client;
+
+ public RdKafkaClient() {
+
+ }
+
+
+ public void Start(TopicRouteContext routeContext ) {
+
+ string brokerList = null;// args[0];
+ var topics = new List();// args.Skip(1).ToList();
+
+ var config = new Dictionary
+ {
+ { "group.id", "simple-csharp-consumer" },
+ { "bootstrap.servers", brokerList }
+ };
+
+ using (var consumer = new Consumer(config, null, new StringDeserializer(Encoding.UTF8))) {
+ //consumer.Assign(new List { new TopicInfo(topics.First(), 0, 0) });
+
+ while (true) {
+ Message msg;
+ if (consumer.Consume(out msg)) {
+ Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
+ }
+ }
+ }
+
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Extensions/BuilderExtensions.cs b/src/Cap.Consistency/Extensions/BuilderExtensions.cs
new file mode 100644
index 0000000..96822dc
--- /dev/null
+++ b/src/Cap.Consistency/Extensions/BuilderExtensions.cs
@@ -0,0 +1,31 @@
+using System;
+using Cap.Consistency;
+using Microsoft.Extensions.DependencyInjection;
+
+// ReSharper disable once CheckNamespace
+namespace Microsoft.AspNetCore.Builder
+{
+ ///
+ /// Consistence extensions for
+ ///
+ public static class BuilderExtensions
+ {
+ ///
+ /// Enables Consistence for the current application
+ ///
+ /// The instance this method extends.
+ /// The instance this method extends.
+ public static IApplicationBuilder UseConsistency(this IApplicationBuilder app) {
+ if (app == null) {
+ throw new ArgumentNullException(nameof(app));
+ }
+
+ var marker = app.ApplicationServices.GetService();
+ if (marker == null) {
+ throw new InvalidOperationException("Add Consistency must be called on the service collection.");
+ }
+
+ return app;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Cap.Consistency/Extensions/ConsistencyOptions.cs b/src/Cap.Consistency/Extensions/ConsistencyOptions.cs
new file mode 100644
index 0000000..344044c
--- /dev/null
+++ b/src/Cap.Consistency/Extensions/ConsistencyOptions.cs
@@ -0,0 +1,17 @@
+using Cap.Consistency;
+
+namespace Microsoft.AspNetCore.Builder
+{
+ ///
+ /// Represents all the options you can use to configure the system.
+ ///
+ public class ConsistencyOptions
+ {
+ ///
+ /// Gets or sets the for the consistency system.
+ ///
+ public BrokerOptions Broker { get; set; } = new BrokerOptions();
+
+
+ }
+}
\ No newline at end of file
diff --git a/src/Cap.Consistency/Extensions/ServiceCollectionExtensions.cs b/src/Cap.Consistency/Extensions/ServiceCollectionExtensions.cs
new file mode 100644
index 0000000..89f22c6
--- /dev/null
+++ b/src/Cap.Consistency/Extensions/ServiceCollectionExtensions.cs
@@ -0,0 +1,46 @@
+using System;
+using Cap.Consistency;
+using Microsoft.AspNetCore.Builder;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+// ReSharper disable once CheckNamespace
+namespace Microsoft.Extensions.DependencyInjection
+{
+ ///
+ /// Contains extension methods to for configuring consistence services.
+ ///
+ public static class ServiceCollectionExtensions
+ {
+ ///
+ /// Adds and configures the consistence services for the consitence.
+ ///
+ /// The services available in the application.
+ /// An for application services.
+ public static ConsistencyBuilder AddConsistency(this IServiceCollection services)
+ where TMessage : class {
+ return services.AddConsistency(setupAction: null);
+ }
+
+ ///
+ /// Adds and configures the consistence services for the consitence.
+ ///
+ /// The services available in the application.
+ /// An action to configure the .
+ /// An for application services.
+ public static ConsistencyBuilder AddConsistency(this IServiceCollection services, Action setupAction)
+ where TMessage : class {
+ services.TryAddSingleton();
+
+ services.TryAddScoped, ConsistencyMessageManager>();
+
+ services.AddSingleton();
+
+ if (setupAction != null) {
+ services.Configure(setupAction);
+ }
+
+ return new ConsistencyBuilder(typeof(TMessage), services);
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/src/Cap.Consistency/Infrastructure/DeliverMessage.cs b/src/Cap.Consistency/Infrastructure/DeliverMessage.cs
new file mode 100644
index 0000000..28ab90e
--- /dev/null
+++ b/src/Cap.Consistency/Infrastructure/DeliverMessage.cs
@@ -0,0 +1,32 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Cap.Consistency.Infrastructure
+{
+ public class DeliverMessage
+ {
+ ///
+ /// Kafka 对应 Topic name
+ ///
+ /// RabbitMQ 对应 RoutingKey
+ ///
+ ///
+ public string MessageKey { get; set; }
+
+
+ public byte[] Body { get; set; }
+ }
+
+
+ public class KafkaDeliverMessage : DeliverMessage
+ {
+
+ public int Partition { get; set; }
+
+ public long Offset { get; set; }
+
+ public string MessageId { get; set; }
+
+ }
+}
diff --git a/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs b/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs
new file mode 100644
index 0000000..7099b24
--- /dev/null
+++ b/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Cap.Consistency.Abstractions;
+using Cap.Consistency.Route;
+
+namespace Cap.Consistency.Infrastructure
+{
+ public interface IConsumerExcutorSelector
+ {
+ IReadOnlyList SelectCandidates(TopicRouteContext context);
+
+ ConsumerExecutorDescriptor SelectBestCandidate(TopicRouteContext context, IReadOnlyList executeDescriptor);
+ }
+}
diff --git a/src/Cap.Consistency/Infrastructure/IConsumerInvokerFactory.cs b/src/Cap.Consistency/Infrastructure/IConsumerInvokerFactory.cs
new file mode 100644
index 0000000..bc1260d
--- /dev/null
+++ b/src/Cap.Consistency/Infrastructure/IConsumerInvokerFactory.cs
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Cap.Consistency.Abstractions;
+
+namespace Cap.Consistency.Infrastructure
+{
+ public interface IConsumerInvokerFactory
+ {
+ IConsumerInvoker CreateInvoker(ConsumerContext actionContext);
+ }
+}
diff --git a/src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs b/src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs
new file mode 100644
index 0000000..884ae2b
--- /dev/null
+++ b/src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs
@@ -0,0 +1,57 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+using Cap.Consistency.Abstractions;
+using Cap.Consistency.Attributes;
+using Cap.Consistency.Consumer;
+using Cap.Consistency.Infrastructure;
+using Cap.Consistency.Route;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Cap.Consistency.Internal
+{
+ public class ConsumerExcutorSelector : IConsumerExcutorSelector
+ {
+ public ConsumerExecutorDescriptor SelectBestCandidate(TopicRouteContext context,
+ IReadOnlyList executeDescriptor) {
+
+ var key = context.Message.MessageKey;
+ return executeDescriptor.FirstOrDefault(x => x.Topic.Name == key);
+ }
+
+ public IReadOnlyList SelectCandidates(TopicRouteContext context) {
+
+ var consumerServices = context.ServiceProvider.GetServices();
+
+ var executorDescriptorList = new List();
+ foreach (var service in consumerServices) {
+ var typeInfo = service.GetType().GetTypeInfo();
+ if (!typeof(IConsumerService).GetTypeInfo().IsAssignableFrom(typeInfo)) {
+ continue;
+ }
+
+ foreach (var method in typeInfo.DeclaredMethods) {
+
+ var topicAttr = method.GetCustomAttribute(true);
+ if (topicAttr == null) continue;
+
+ executorDescriptorList.Add(InitDescriptor(topicAttr));
+ }
+ }
+
+ return executorDescriptorList;
+ }
+
+ private ConsumerExecutorDescriptor InitDescriptor(TopicAttribute attr) {
+ var descriptor = new ConsumerExecutorDescriptor();
+
+ descriptor.Topic = new TopicInfo(attr.Name);
+
+ return descriptor;
+ }
+
+
+ }
+}
diff --git a/src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs b/src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs
new file mode 100644
index 0000000..4fa022f
--- /dev/null
+++ b/src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs
@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Cap.Consistency.Abstractions;
+using Cap.Consistency.Infrastructure;
+
+namespace Cap.Consistency.Internal
+{
+ public class ConsumerInvokerFactory : IConsumerInvokerFactory
+ {
+ public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext) {
+ var context = new ConsumerInvokerContext(consumerContext);
+ return context.Result;
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Internal/QMessageFinder.cs b/src/Cap.Consistency/Internal/QMessageFinder.cs
new file mode 100644
index 0000000..363bbdf
--- /dev/null
+++ b/src/Cap.Consistency/Internal/QMessageFinder.cs
@@ -0,0 +1,43 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Threading.Tasks;
+using System.Collections.Concurrent;
+using Cap.Consistency.Extensions;
+using Cap.Consistency.Abstractions;
+
+namespace Cap.Consistency
+{
+ public class QMessageFinder
+ {
+ public ConcurrentDictionary GetQMessageMethodInfo(params Type[] serviceType) {
+
+ var qMessageTypes = new ConcurrentDictionary();
+
+ foreach (var type in serviceType) {
+
+ foreach (var method in type.GetTypeInfo().DeclaredMethods) {
+
+ var messageMethodInfo = new ConsumerExecutorDescriptor();
+
+ if (method.IsPropertyBinding()) {
+ continue;
+ }
+
+ var qMessageAttr = method.GetCustomAttribute();
+ if (qMessageAttr == null) {
+ continue;
+ }
+
+ messageMethodInfo.ImplType = method.DeclaringType;
+ messageMethodInfo.MethodInfo = method;
+
+ qMessageTypes.AddOrUpdate(qMessageAttr.MessageName, messageMethodInfo, (x, y) => y);
+ }
+ }
+
+ return qMessageTypes;
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Internal/QMessageMethodInfo.cs b/src/Cap.Consistency/Internal/QMessageMethodInfo.cs
new file mode 100644
index 0000000..07ffbbd
--- /dev/null
+++ b/src/Cap.Consistency/Internal/QMessageMethodInfo.cs
@@ -0,0 +1,36 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Threading.Tasks;
+
+namespace Cap.Consistency
+{
+ public class TopicInfo
+ {
+ public TopicInfo(string topicName) : this(topicName, 0) {}
+
+ public TopicInfo(string topicName, int partition) : this(topicName, partition, 0) {}
+
+ public TopicInfo(string topicName, int partition, long offset) {
+ Name = topicName;
+ Offset = offset;
+ Partition = partition;
+ }
+
+ public string Name { get; }
+
+ public int Partition { get; }
+
+ public long Offset { get; }
+
+ public bool IsPartition { get { return Partition == 0; } }
+
+ public bool IsOffset { get { return Offset == 0; } }
+
+ public override string ToString() {
+ return Name;
+ }
+
+ }
+}
diff --git a/src/Cap.Consistency/KafkaConsistency.cs b/src/Cap.Consistency/KafkaConsistency.cs
new file mode 100644
index 0000000..12ca288
--- /dev/null
+++ b/src/Cap.Consistency/KafkaConsistency.cs
@@ -0,0 +1,33 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Linq;
+using Cap.Consistency.Consumer;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Cap.Consistency
+{
+ public class KafkaConsistency
+ {
+ private IServiceProvider _serviceProvider;
+ private IEnumerable _handlers;
+
+ public KafkaConsistency(IServiceProvider serviceProvider) {
+ _serviceProvider = serviceProvider;
+ }
+
+ public void Start() {
+ _handlers = _serviceProvider.GetServices();
+ var services = _serviceProvider.GetServices();
+ foreach (var handler in _handlers) {
+ handler.Start(services);
+ }
+ }
+
+ public void Stop() {
+ foreach (var handler in _handlers) {
+ handler.Stop();
+ }
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Models/IConsumerModel.cs b/src/Cap.Consistency/Models/IConsumerModel.cs
new file mode 100644
index 0000000..437e782
--- /dev/null
+++ b/src/Cap.Consistency/Models/IConsumerModel.cs
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Cap.Consistency.Models
+{
+ public interface IConsumerModel
+ {
+ string TopicName { get; set; }
+
+ }
+}
diff --git a/src/Cap.Consistency/Route/TopicRouteContext.cs b/src/Cap.Consistency/Route/TopicRouteContext.cs
new file mode 100644
index 0000000..984e5a6
--- /dev/null
+++ b/src/Cap.Consistency/Route/TopicRouteContext.cs
@@ -0,0 +1,31 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using Cap.Consistency.Abstractions;
+using Cap.Consistency.Consumer;
+using Cap.Consistency.Infrastructure;
+
+namespace Cap.Consistency.Route
+{
+ public delegate Task HandlerConsumer(ConsumerExecutorDescriptor context);
+
+ public class TopicRouteContext
+ {
+
+ public TopicRouteContext(DeliverMessage message) {
+ Message = message;
+ }
+
+ public DeliverMessage Message { get; }
+
+ // public event EventHandler OnMessage;
+
+ public HandlerConsumer Handler { get; set; }
+
+ public IServiceProvider ServiceProvider { get; set; }
+
+ public IList Consumers { get; set; }
+
+ }
+}
diff --git a/src/Cap.Consistency/RouteTable.cs b/src/Cap.Consistency/RouteTable.cs
index f327cf6..b45953c 100644
--- a/src/Cap.Consistency/RouteTable.cs
+++ b/src/Cap.Consistency/RouteTable.cs
@@ -1,21 +1,22 @@
using System;
using System.Collections;
using System.Collections.Generic;
+using Cap.Consistency.Abstractions;
namespace Cap.Consistency
{
- public class RouteTable : IReadOnlyList
+ public class RouteTable : IReadOnlyList
{
public RouteTable() {
}
- public RouteTable(List messageMethods) {
+ public RouteTable(List messageMethods) {
QMessageMethods = messageMethods;
}
- public QMessageMethodInfo this[int index] {
+ public ConsumerExecutorDescriptor this[int index] {
get {
throw new NotImplementedException();
}
@@ -27,9 +28,9 @@ namespace Cap.Consistency
}
}
- public List QMessageMethods { get; set; }
+ public List QMessageMethods { get; set; }
- public IEnumerator GetEnumerator() {
+ public IEnumerator GetEnumerator() {
throw new NotImplementedException();
}
diff --git a/src/Cap.Consistency/Store/ConsistencyMessageManager.cs b/src/Cap.Consistency/Store/ConsistencyMessageManager.cs
new file mode 100644
index 0000000..7bcc947
--- /dev/null
+++ b/src/Cap.Consistency/Store/ConsistencyMessageManager.cs
@@ -0,0 +1,147 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.AspNetCore.Http;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+
+namespace Cap.Consistency
+{
+ ///
+ /// Provides the APIs for managing message in a persistence store.
+ ///
+ /// The type encapsulating a message.
+ public class ConsistencyMessageManager : IDisposable where TMessage : class
+ {
+ private bool _disposed;
+ private readonly HttpContext _context;
+ private CancellationToken CancellationToken => _context?.RequestAborted ?? CancellationToken.None;
+
+ ///
+ /// Constructs a new instance of .
+ ///
+ /// The persistence store the manager will operate over.
+ /// The used to resolve services.
+ /// The logger used to log messages, warnings and errors.
+ public ConsistencyMessageManager(IConsistencyMessageStore store,
+ IServiceProvider services,
+ ILogger> logger) {
+ if (store == null) {
+ throw new ArgumentNullException(nameof(store));
+ }
+
+ Store = store;
+ Logger = logger;
+
+ if (services != null) {
+ _context = services.GetService()?.HttpContext;
+ }
+ }
+
+ ///
+ /// Gets or sets the persistence store the manager operates over.
+ ///
+ /// The persistence store the manager operates over.
+ protected internal IConsistencyMessageStore Store { get; set; }
+
+ ///
+ /// Gets the used to log messages from the manager.
+ ///
+ ///
+ /// The used to log messages from the manager.
+ ///
+ protected internal virtual ILogger Logger { get; set; }
+
+ ///
+ /// Creates the specified in the backing store.
+ ///
+ /// The message to create.
+ ///
+ /// The that represents the asynchronous operation, containing the
+ /// of the operation.
+ ///
+ public virtual Task CreateAsync(TMessage message) {
+ ThrowIfDisposed();
+ //todo: validation message fileds is correct
+
+ return Store.CreateAsync(message, CancellationToken);
+ }
+
+ ///
+ /// Updates the specified in the backing store.
+ ///
+ /// The message to update.
+ ///
+ /// The that represents the asynchronous operation, containing the
+ /// of the operation.
+ ///
+ public virtual Task UpdateAsync(TMessage message) {
+ ThrowIfDisposed();
+ //todo: validation message fileds is correct
+
+ return Store.UpdateAsync(message, CancellationToken);
+ }
+
+ ///
+ /// Deletes the specified in the backing store.
+ ///
+ /// The message to delete.
+ ///
+ /// The that represents the asynchronous operation, containing the
+ /// of the operation.
+ ///
+ public virtual Task DeleteAsync(TMessage message) {
+ ThrowIfDisposed();
+
+ if (message == null) {
+ throw new ArgumentNullException(nameof(message));
+ }
+
+ return Store.DeleteAsync(message, CancellationToken);
+ }
+
+ ///
+ /// Finds and returns a message, if any, who has the specified .
+ ///
+ /// The message ID to search for.
+ ///
+ /// The that represents the asynchronous operation, containing the user matching the specified if it exists.
+ ///
+ public virtual Task FindByIdAsync(string messageId) {
+ ThrowIfDisposed();
+ return Store.FindByIdAsync(messageId, CancellationToken);
+ }
+
+ ///
+ /// Gets the message identifier for the specified .
+ ///
+ /// The message whose identifier should be retrieved.
+ /// The that represents the asynchronous operation, containing the identifier for the specified .
+ public virtual async Task GetMessageIdAsync(TMessage message) {
+ ThrowIfDisposed();
+ return await Store.GetMessageIdAsync(message, CancellationToken);
+ }
+
+ public void Dispose() {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ ///
+ /// Releases the unmanaged resources used by the message manager and optionally releases the managed resources.
+ ///
+ /// true to release both managed and unmanaged resources; false to release only unmanaged resources.
+ protected virtual void Dispose(bool disposing) {
+ if (disposing && !_disposed) {
+ Store.Dispose();
+ _disposed = true;
+ }
+ }
+
+ protected void ThrowIfDisposed() {
+ if (_disposed) {
+ throw new ObjectDisposedException(GetType().Name);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Cap.Consistency/Store/IConsistencyMessageStore.cs b/src/Cap.Consistency/Store/IConsistencyMessageStore.cs
new file mode 100644
index 0000000..80f54a3
--- /dev/null
+++ b/src/Cap.Consistency/Store/IConsistencyMessageStore.cs
@@ -0,0 +1,55 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Cap.Consistency
+{
+ ///
+ /// Provides an abstraction for a store which manages consistent message.
+ ///
+ ///
+ public interface IConsistencyMessageStore : IDisposable where TMessage : class
+ {
+ ///
+ /// Finds and returns a message, if any, who has the specified .
+ ///
+ /// The message ID to search for.
+ /// The used to propagate notifications that the operation should be canceled.
+ ///
+ /// The that represents the asynchronous operation, containing the message matching the specified if it exists.
+ ///
+ Task FindByIdAsync(string messageId, CancellationToken cancellationToken);
+
+ ///
+ /// Creates a new message in a store as an asynchronous operation.
+ ///
+ /// The message to create in the store.
+ /// The used to propagate notifications that the operation should be canceled.
+ /// A that represents the of the asynchronous query.
+ Task CreateAsync(TMessage message, CancellationToken cancellationToken);
+
+ ///
+ /// Updates a message in a store as an asynchronous operation.
+ ///
+ /// The message to update in the store.
+ /// The used to propagate notifications that the operation should be canceled.
+ /// A that represents the of the asynchronous query.
+ Task UpdateAsync(TMessage message, CancellationToken cancellationToken);
+
+ ///
+ /// Deletes a message from the store as an asynchronous operation.
+ ///
+ /// The message to delete in the store.
+ /// The used to propagate notifications that the operation should be canceled.
+ /// A that represents the of the asynchronous query.
+ Task DeleteAsync(TMessage message, CancellationToken cancellationToken);
+
+ ///
+ /// Gets the ID for a message from the store as an asynchronous operation.
+ ///
+ /// The message whose ID should be returned.
+ /// The used to propagate notifications that the operation should be canceled.
+ /// A that contains the ID of the message.
+ Task GetMessageIdAsync(TMessage message, CancellationToken cancellationToken);
+ }
+}
\ No newline at end of file
diff --git a/test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj b/test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj
index ac2c1df..70bc1b8 100644
--- a/test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj
+++ b/test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj
@@ -27,7 +27,7 @@
-
+
diff --git a/test/Cap.Consistency.Test/Cap.Consistency.Test.csproj b/test/Cap.Consistency.Test/Cap.Consistency.Test.csproj
index 7f2e537..148bb6a 100644
--- a/test/Cap.Consistency.Test/Cap.Consistency.Test.csproj
+++ b/test/Cap.Consistency.Test/Cap.Consistency.Test.csproj
@@ -23,7 +23,7 @@
-
+