diff --git a/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerClient.cs b/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerClient.cs new file mode 100644 index 0000000..65866e8 --- /dev/null +++ b/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerClient.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Cap.Consistency.Infrastructure; +using Confluent.Kafka; +using Confluent.Kafka.Serialization; + +namespace Cap.Consistency.Consumer.Kafka +{ + public class KafkaTopicPartition + { + public string Topic { get; set; } + + public int Partition { get; set; } + } + + public class KafkaConsumerClient : IDisposable + { + + private readonly string _groupId; + private readonly string _bootstrapServers; + + private Consumer _consumerClient; + + public event EventHandler MessageReceieved; + + public IDeserializer StringDeserializer { get; set; } + + public KafkaConsumerClient(string groupId, string bootstrapServers) { + _groupId = groupId; + _bootstrapServers = bootstrapServers; + + StringDeserializer = new StringDeserializer(Encoding.UTF8); + } + + public void Subscribe(string topicName, int partition) { + + if (_consumerClient == null) { + InitKafkaClient(); + } + _consumerClient.Assignment.Add(new TopicPartition(topicName, partition)); + } + + public void Subscribe(IEnumerable topicList) { + + if (_consumerClient == null) { + InitKafkaClient(); + } + + if (topicList == null || topicList.Count() == 0) { + throw new ArgumentNullException(nameof(topicList)); + } + + foreach (var item in topicList) { + Subscribe(item.Topic, item.Partition); + } + } + + + public void Listening(TimeSpan timeout) { + while (true) { + _consumerClient.Poll(timeout); + } + } + + public void Dispose() { + _consumerClient.Dispose(); + } + + #region private methods + + private void InitKafkaClient() { + var config = new Dictionary{ + { "group.id", "simple-csharp-consumer" }, + { "bootstrap.servers", _bootstrapServers } + }; + + _consumerClient = new Consumer(config, null, StringDeserializer); + _consumerClient.OnMessage += ConsumerClient_OnMessage; + } + + private void ConsumerClient_OnMessage(object sender, Message e) { + var message = new DeliverMessage { + MessageKey = e.Topic, + Value = e.Value + }; + MessageReceieved?.Invoke(sender, message); + } + #endregion + + //public void + + } +} diff --git a/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs b/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs new file mode 100644 index 0000000..983fc22 --- /dev/null +++ b/src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs @@ -0,0 +1,75 @@ +//using System; +//using System.Collections.Generic; +//using System.Text; +//using System.Linq; +//using System.Threading.Tasks; +//using Cap.Consistency.Abstractions; +//using Cap.Consistency.Infrastructure; +//using Cap.Consistency.Routing; +//using Confluent.Kafka; +//using Confluent.Kafka.Serialization; +//using Microsoft.AspNetCore.Builder; +//using Microsoft.Extensions.Logging; +//using Microsoft.Extensions.Options; + +//namespace Cap.Consistency.Consumer.Kafka +//{ +// public class KafkaConsumerHandler : ConsumerHandler where T : ConsistencyMessage, new() +// { + +// protected override void OnMessageReceieved(T message) { + +// } + +// public Task RouteAsync(TopicRouteContext context) { + +// if (context == null) { +// throw new ArgumentNullException(nameof(context)); +// } + +// context.ServiceProvider = _serviceProvider; + +// var matchs = _selector.SelectCandidates(context); + +// var config = new Dictionary +// { +// { "group.id", "simple-csharp-consumer" }, +// { "bootstrap.servers", _options.BrokerUrlList } +// }; + +// using (var consumer = new Consumer(config, null, new StringDeserializer(Encoding.UTF8))) { + +// var topicList = matchs.Select(item => new TopicPartitionOffset(item.Topic.Name, item.Topic.Partition, new Offset(item.Topic.Offset))); +// consumer.Assign(topicList); + +// while (true) { +// if (consumer.Consume(out Message msg)) { + +// T consistencyMessage = new T(); +// var message = new DeliverMessage() { +// MessageKey = msg.Topic, +// Body = Encoding.UTF8.GetBytes(msg.Value) +// }; +// var routeContext = new TopicRouteContext(message); + +// var executeDescriptor = _selector.SelectBestCandidate(routeContext, matchs); + +// if (executeDescriptor == null) { +// _logger.LogInformation("can not be fond topic execute"); +// return Task.CompletedTask; +// } + +// var consumerContext = new ConsumerContext(executeDescriptor, message); +// var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); + +// _logger.LogInformation("consumer starting"); + +// return invoker.InvokeAsync(); + +// } +// } +// } + +// } +// } +//}