diff --git a/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs b/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs index 5792d6c..f5e0afc 100644 --- a/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs +++ b/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs @@ -7,7 +7,8 @@ namespace Cap.Consistency.Abstractions public class ConsumerInvokerContext { public ConsumerInvokerContext(ConsumerContext consumerContext) { - ConsumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext)); + ConsumerContext = consumerContext ?? + throw new ArgumentNullException(nameof(consumerContext)); } diff --git a/src/Cap.Consistency/Consumer/ConsumerHandler.cs b/src/Cap.Consistency/Consumer/ConsumerHandler.cs index 8990dc4..73bca77 100644 --- a/src/Cap.Consistency/Consumer/ConsumerHandler.cs +++ b/src/Cap.Consistency/Consumer/ConsumerHandler.cs @@ -25,11 +25,11 @@ namespace Cap.Consistency.Consumer IConsumerExcutorSelector selector, ILoggerFactory loggerFactory) { - _serviceProvider = serviceProvider; - _consumerInvokerFactory = consumerInvokerFactory; - _loggerFactory = loggerFactory; _selector = selector; _logger = loggerFactory.CreateLogger(); + _loggerFactory = loggerFactory; + _serviceProvider = serviceProvider; + _consumerInvokerFactory = consumerInvokerFactory; } public Task RouteAsync(TopicRouteContext context) { @@ -42,6 +42,27 @@ namespace Cap.Consistency.Consumer var matchs = _selector.SelectCandidates(context); + + + var config = new Dictionary + { + { "group.id", "simple-csharp-consumer" }, + { "bootstrap.servers", brokerList } + }; + + using (var consumer = new Consumer(config, null, new StringDeserializer(Encoding.UTF8))) { + //consumer.Assign(new List { new TopicInfo(topics.First(), 0, 0) }); + + while (true) { + Message msg; + if (consumer.Consume(out msg)) { + Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}"); + } + } + } + + + if (matchs == null || matchs.Count == 0) { _logger.LogInformation("can not be fond topic route"); return Task.CompletedTask; diff --git a/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs b/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs deleted file mode 100644 index bbf8043..0000000 --- a/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs +++ /dev/null @@ -1,28 +0,0 @@ -using System; -using System.Linq; -using System.Collections.Generic; -using System.Reflection; -using System.Text; - -namespace Cap.Consistency.Consumer.Kafka -{ - public class KafkaConsumerHandler : IConsumerHandler - { - public readonly QMessageFinder _finder; - - public KafkaConsumerHandler(QMessageFinder finder) { - _finder = finder; - } - - public void Start(IEnumerable consumers) { - - var methods = _finder.GetQMessageMethodInfo(consumers.Select(x => x.GetType()).ToArray()); - - - } - - public void Stop() { - throw new NotImplementedException(); - } - } -} diff --git a/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs b/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs index 7602617..68f77f9 100644 --- a/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs +++ b/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Linq; using System.Text; -using Cap.Consistency.Route; using Confluent.Kafka; using Confluent.Kafka.Serialization; diff --git a/src/Cap.Consistency/Extensions/BuilderExtensions.cs b/src/Cap.Consistency/Extensions/BuilderExtensions.cs index 0b716cf..03f9abc 100644 --- a/src/Cap.Consistency/Extensions/BuilderExtensions.cs +++ b/src/Cap.Consistency/Extensions/BuilderExtensions.cs @@ -45,8 +45,10 @@ namespace Microsoft.AspNetCore.Builder var context = new TopicRouteContext(); + router.RouteAsync(context).Wait(); + return builder; } - + } } \ No newline at end of file diff --git a/src/Cap.Consistency/Extensions/ConsistencyOptions.cs b/src/Cap.Consistency/Extensions/ConsistencyOptions.cs index 344044c..c2863d1 100644 --- a/src/Cap.Consistency/Extensions/ConsistencyOptions.cs +++ b/src/Cap.Consistency/Extensions/ConsistencyOptions.cs @@ -12,6 +12,9 @@ namespace Microsoft.AspNetCore.Builder /// public BrokerOptions Broker { get; set; } = new BrokerOptions(); + + public string BrokerUrlList { get; set; } + } } \ No newline at end of file diff --git a/src/Cap.Consistency/GlobalSuppressions.cs b/src/Cap.Consistency/GlobalSuppressions.cs deleted file mode 100644 index 52ebaa2..0000000 --- a/src/Cap.Consistency/GlobalSuppressions.cs +++ /dev/null @@ -1,8 +0,0 @@ - -// This file is used by Code Analysis to maintain SuppressMessage -// attributes that are applied to this project. -// Project-level suppressions either have no target or are given -// a specific target and scoped to a namespace, type, member, etc. - -[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0016:Use 'throw' expression", Justification = "", Scope = "member", Target = "~M:Cap.Consistency.Internal.ConsumerInvoker.#ctor(Microsoft.Extensions.Logging.ILogger,Cap.Consistency.Abstractions.ConsumerContext,Cap.Consistency.Internal.ObjectMethodExecutor)")] - diff --git a/src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs b/src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs index 884ae2b..666da23 100644 --- a/src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs +++ b/src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs @@ -7,7 +7,7 @@ using Cap.Consistency.Abstractions; using Cap.Consistency.Attributes; using Cap.Consistency.Consumer; using Cap.Consistency.Infrastructure; -using Cap.Consistency.Route; +using Cap.Consistency.Routing; using Microsoft.Extensions.DependencyInjection; namespace Cap.Consistency.Internal @@ -20,7 +20,7 @@ namespace Cap.Consistency.Internal var key = context.Message.MessageKey; return executeDescriptor.FirstOrDefault(x => x.Topic.Name == key); } - + public IReadOnlyList SelectCandidates(TopicRouteContext context) { var consumerServices = context.ServiceProvider.GetServices(); @@ -43,7 +43,6 @@ namespace Cap.Consistency.Internal return executorDescriptorList; } - private ConsumerExecutorDescriptor InitDescriptor(TopicAttribute attr) { var descriptor = new ConsumerExecutorDescriptor(); @@ -51,7 +50,5 @@ namespace Cap.Consistency.Internal return descriptor; } - - } } diff --git a/src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs b/src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs index 4fa022f..45df127 100644 --- a/src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs +++ b/src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs @@ -3,13 +3,33 @@ using System.Collections.Generic; using System.Text; using Cap.Consistency.Abstractions; using Cap.Consistency.Infrastructure; +using Microsoft.Extensions.Logging; namespace Cap.Consistency.Internal { public class ConsumerInvokerFactory : IConsumerInvokerFactory { + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + private readonly ObjectMethodExecutor _executor; + + public ConsumerInvokerFactory( + ILoggerFactory loggerFactory, + IServiceProvider serviceProvider, + ObjectMethodExecutor executor) { + + _logger = loggerFactory.CreateLogger(); + _serviceProvider = serviceProvider; + _executor = executor; + } + public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext) { + var context = new ConsumerInvokerContext(consumerContext); + + context.Result = new ConsumerInvoker(_logger, _serviceProvider, + consumerContext, _executor); + return context.Result; } } diff --git a/src/Cap.Consistency/KafkaConsistency.cs b/src/Cap.Consistency/KafkaConsistency.cs index b0851b6..0aee22d 100644 --- a/src/Cap.Consistency/KafkaConsistency.cs +++ b/src/Cap.Consistency/KafkaConsistency.cs @@ -8,7 +8,7 @@ using System.Threading.Tasks; namespace Cap.Consistency { - public class KafkaConsistency:IRoute + public class KafkaConsistency : IRoute { private IServiceProvider _serviceProvider; private IEnumerable _handlers; @@ -34,5 +34,6 @@ namespace Cap.Consistency public async Task Start() { } - + + } }