diff --git a/src/Cap.Consistency/Consumer/ConsumerHandler.cs b/src/Cap.Consistency/Consumer/ConsumerHandler.cs index 73bca77..9bd6aaa 100644 --- a/src/Cap.Consistency/Consumer/ConsumerHandler.cs +++ b/src/Cap.Consistency/Consumer/ConsumerHandler.cs @@ -1,35 +1,53 @@ using System; using System.Collections.Generic; using System.Text; +using System.Linq; using System.Threading.Tasks; using Cap.Consistency.Abstractions; using Cap.Consistency.Infrastructure; using Cap.Consistency.Routing; +using Confluent.Kafka; +using Confluent.Kafka.Serialization; +using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Cap.Consistency.Consumer.Kafka; +using Cap.Consistency.Internal; namespace Cap.Consistency.Consumer { - public class ConsumerHandler : IConsumerHandler + public class ConsumerHandler : IConsumerHandler where T : ConsistencyMessage, new() { - private readonly IServiceProvider _serviceProvider; private readonly IConsumerInvokerFactory _consumerInvokerFactory; - private readonly IConsumerExcutorSelector _selector; private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; + private readonly MethodMatcherCache _selector; + private readonly ConsistencyOptions _options; + private readonly ConsistencyMessageManager _messageManager; + public event EventHandler MessageReceieved; public ConsumerHandler( IServiceProvider serviceProvider, IConsumerInvokerFactory consumerInvokerFactory, - IConsumerExcutorSelector selector, - ILoggerFactory loggerFactory) { + ILoggerFactory loggerFactory, + ConsistencyMessageManager messageManager, + MethodMatcherCache selector, + IOptions options) { _selector = selector; - _logger = loggerFactory.CreateLogger(); + _logger = loggerFactory.CreateLogger>(); _loggerFactory = loggerFactory; _serviceProvider = serviceProvider; _consumerInvokerFactory = consumerInvokerFactory; + _options = options.Value; + _messageManager = messageManager; + } + + + protected virtual void OnMessageReceieved(T message) { + MessageReceieved?.Invoke(this, message); } public Task RouteAsync(TopicRouteContext context) { @@ -40,47 +58,52 @@ namespace Cap.Consistency.Consumer context.ServiceProvider = _serviceProvider; - var matchs = _selector.SelectCandidates(context); - + var matchs = _selector.GetCandidatesMethods(context); + var groupingMatchs = matchs.GroupBy(x => x.Value.GroupId); - var config = new Dictionary - { - { "group.id", "simple-csharp-consumer" }, - { "bootstrap.servers", brokerList } - }; - - using (var consumer = new Consumer(config, null, new StringDeserializer(Encoding.UTF8))) { - //consumer.Assign(new List { new TopicInfo(topics.First(), 0, 0) }); + foreach (var matchGroup in groupingMatchs) { + using (var client = new KafkaConsumerClient(matchGroup.Key, _options.BrokerUrlList)) { + client.MessageReceieved += OnMessageReceieved; - while (true) { - Message msg; - if (consumer.Consume(out msg)) { - Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}"); + foreach (var item in matchGroup) { + client.Subscribe(item.Key, item.Value.Topic.Partition); } + + client.Listening(TimeSpan.Zero); } } + return Task.CompletedTask; + } + private void OnMessageReceieved(object sender, DeliverMessage message) { + T consistencyMessage = new T() { + Id = message.MessageKey, + Payload = Encoding.UTF8.GetString(message.Body) + }; + _logger.LogInformation("message receieved message topic name: " + consistencyMessage.Id); - if (matchs == null || matchs.Count == 0) { - _logger.LogInformation("can not be fond topic route"); - return Task.CompletedTask; - } + _messageManager.CreateAsync(consistencyMessage); - var executeDescriptor = _selector.SelectBestCandidate(context, matchs); + try { + var executeDescriptor = _selector.GetTopicExector(message.MessageKey); - context.Handler = c => { + var consumerContext = new ConsumerContext(executeDescriptor, message); - var consumerContext = new ConsumerContext(executeDescriptor); var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); - _logger.LogInformation("consumer starting"); + invoker.InvokeAsync(); - return invoker.InvokeAsync(); - }; - - return Task.CompletedTask; + _messageManager.UpdateAsync(consistencyMessage); + + } + catch (Exception ex) { + + _logger.LogError("exception raised when excute method : " + ex.Message); + + throw ex; + } } } } diff --git a/src/Cap.Consistency/Consumer/IConsumerClient.cs b/src/Cap.Consistency/Consumer/IConsumerClient.cs new file mode 100644 index 0000000..8e27fd0 --- /dev/null +++ b/src/Cap.Consistency/Consumer/IConsumerClient.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Cap.Consistency.Consumer +{ + public interface IConsumerClient + { + + } +} diff --git a/src/Cap.Consistency/Consumer/IConsumerHandler.cs b/src/Cap.Consistency/Consumer/IConsumerHandler.cs index 8a727ff..e216136 100644 --- a/src/Cap.Consistency/Consumer/IConsumerHandler.cs +++ b/src/Cap.Consistency/Consumer/IConsumerHandler.cs @@ -1,11 +1,12 @@ using System; using System.Collections.Generic; using System.Text; +using Cap.Consistency.Infrastructure; using Cap.Consistency.Routing; namespace Cap.Consistency.Consumer { - public interface IConsumerHandler : ITopicRoute + public interface IConsumerHandler : ITopicRoute where T : ConsistencyMessage, new() { }