diff --git a/src/Cap.Consistency.Kafka/Cap.Consistency.Kafka.csproj b/src/Cap.Consistency.Kafka/Cap.Consistency.Kafka.csproj new file mode 100644 index 0000000..59c209b --- /dev/null +++ b/src/Cap.Consistency.Kafka/Cap.Consistency.Kafka.csproj @@ -0,0 +1,15 @@ + + + + netstandard1.6 + + + + + + + + + + + \ No newline at end of file diff --git a/src/Cap.Consistency.Kafka/KafkaConsumerClient.cs b/src/Cap.Consistency.Kafka/KafkaConsumerClient.cs new file mode 100644 index 0000000..2e5e47a --- /dev/null +++ b/src/Cap.Consistency.Kafka/KafkaConsumerClient.cs @@ -0,0 +1,101 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Cap.Consistency.Consumer; +using Cap.Consistency.Infrastructure; +using Confluent.Kafka; +using Confluent.Kafka.Serialization; + +namespace Cap.Consistency.Kafka +{ + public class KafkaTopicPartition + { + public string Topic { get; set; } + + public int Partition { get; set; } + } + + public class KafkaConsumerClient : IConsumerClient, IDisposable + { + + private readonly string _groupId; + private readonly string _bootstrapServers; + + private Consumer _consumerClient; + + public event EventHandler MessageReceieved; + + public IDeserializer StringDeserializer { get; set; } + + public KafkaConsumerClient(string groupId, string bootstrapServers) { + _groupId = groupId; + _bootstrapServers = bootstrapServers; + + StringDeserializer = new StringDeserializer(Encoding.UTF8); + } + + public void Subscribe(string topic) { + Subscribe(topic, 0); + } + + public void Subscribe(string topicName, int partition) { + + if (_consumerClient == null) { + InitKafkaClient(); + } + _consumerClient.Assignment.Add(new TopicPartition(topicName, partition)); + } + + public void Subscribe(IEnumerable topicList) { + + if (_consumerClient == null) { + InitKafkaClient(); + } + + if (topicList == null || topicList.Count() == 0) { + throw new ArgumentNullException(nameof(topicList)); + } + + foreach (var item in topicList) { + Subscribe(item.Topic, item.Partition); + } + } + + + public void Listening(TimeSpan timeout) { + while (true) { + _consumerClient.Poll(timeout); + } + } + + public void Dispose() { + _consumerClient.Dispose(); + } + + #region private methods + + private void InitKafkaClient() { + var config = new Dictionary{ + { "group.id", _groupId }, + { "bootstrap.servers", _bootstrapServers } + }; + + _consumerClient = new Consumer(config, null, StringDeserializer); + _consumerClient.OnMessage += ConsumerClient_OnMessage; + } + + private void ConsumerClient_OnMessage(object sender, Message e) { + var message = new DeliverMessage { + MessageKey = e.Topic, + Value = e.Value + }; + MessageReceieved?.Invoke(sender, message); + } + + #endregion + + //public void + + } +} diff --git a/src/Cap.Consistency.Kafka/KafkaConsumerClientFactory.cs b/src/Cap.Consistency.Kafka/KafkaConsumerClientFactory.cs new file mode 100644 index 0000000..995c178 --- /dev/null +++ b/src/Cap.Consistency.Kafka/KafkaConsumerClientFactory.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Cap.Consistency.Consumer; + +namespace Cap.Consistency.Kafka +{ + public class KafkaConsumerClientFactory : IConsumerClientFactory + { + public IConsumerClient Create(string groupId, string clientHostAddress) { + return new KafkaConsumerClient(groupId, clientHostAddress); + } + } +} diff --git a/src/Cap.Consistency.Kafka/KafkaTopicAttribute.cs b/src/Cap.Consistency.Kafka/KafkaTopicAttribute.cs new file mode 100644 index 0000000..2f002ed --- /dev/null +++ b/src/Cap.Consistency.Kafka/KafkaTopicAttribute.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Cap.Consistency.Abstractions; + +namespace Cap.Consistency.Kafka +{ + public class KafkaTopicAttribute : TopicAttribute + { + public KafkaTopicAttribute(string topicName) + : this(topicName, 0) { } + + public KafkaTopicAttribute(string topicName, int partition) + : this(topicName, partition, 0) { } + + public KafkaTopicAttribute(string topicName, int partition, long offset) + : base(topicName) { + Offset = offset; + Partition = partition; + } + + public int Partition { get; } + + public long Offset { get; } + + public bool IsPartition { get { return Partition == 0; } } + + public bool IsOffset { get { return Offset == 0; } } + + public override string ToString() { + return Name; + } + } +}