|
@@ -9,14 +9,7 @@ using Confluent.Kafka.Serialization; |
|
|
|
|
|
|
|
|
namespace Cap.Consistency.Kafka |
|
|
namespace Cap.Consistency.Kafka |
|
|
{ |
|
|
{ |
|
|
public class KafkaTopicPartition |
|
|
|
|
|
{ |
|
|
|
|
|
public string Topic { get; set; } |
|
|
|
|
|
|
|
|
|
|
|
public int Partition { get; set; } |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public class KafkaConsumerClient : IConsumerClient, IDisposable |
|
|
|
|
|
|
|
|
public class KafkaConsumerClient : IConsumerClient |
|
|
{ |
|
|
{ |
|
|
|
|
|
|
|
|
private readonly string _groupId; |
|
|
private readonly string _groupId; |
|
@@ -47,22 +40,6 @@ namespace Cap.Consistency.Kafka |
|
|
_consumerClient.Assignment.Add(new TopicPartition(topicName, partition)); |
|
|
_consumerClient.Assignment.Add(new TopicPartition(topicName, partition)); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public void Subscribe(IEnumerable<KafkaTopicPartition> topicList) { |
|
|
|
|
|
|
|
|
|
|
|
if (_consumerClient == null) { |
|
|
|
|
|
InitKafkaClient(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (topicList == null || topicList.Count() == 0) { |
|
|
|
|
|
throw new ArgumentNullException(nameof(topicList)); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
foreach (var item in topicList) { |
|
|
|
|
|
Subscribe(item.Topic, item.Partition); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void Listening(TimeSpan timeout) { |
|
|
public void Listening(TimeSpan timeout) { |
|
|
while (true) { |
|
|
while (true) { |
|
|
_consumerClient.Poll(timeout); |
|
|
_consumerClient.Poll(timeout); |
|
@@ -95,7 +72,5 @@ namespace Cap.Consistency.Kafka |
|
|
|
|
|
|
|
|
#endregion |
|
|
#endregion |
|
|
|
|
|
|
|
|
//public void |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |