Browse Source

fix multiple subscribing only received one message bug.

master
Savorboard 7 years ago
parent
commit
f8759398a8
3 changed files with 11 additions and 14 deletions
  1. +7
    -7
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
  2. +2
    -3
      src/DotNetCore.CAP/IConsumerClient.cs
  3. +2
    -4
      src/DotNetCore.CAP/IConsumerHandler.Default.cs

+ 7
- 7
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs View File

@@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using Confluent.Kafka; using Confluent.Kafka;
@@ -25,19 +26,18 @@ namespace DotNetCore.CAP.Kafka
StringDeserializer = new StringDeserializer(Encoding.UTF8); StringDeserializer = new StringDeserializer(Encoding.UTF8);
} }


public void Subscribe(string topic)
public void Subscribe(IEnumerable<string> topics)
{ {
Subscribe(topic, 0);
}
if (topics == null)
throw new ArgumentNullException(nameof(topics));


public void Subscribe(string topicName, int partition)
{
if (_consumerClient == null) if (_consumerClient == null)
{ {
InitKafkaClient(); InitKafkaClient();
} }
_consumerClient.Assignment.Add(new TopicPartition(topicName, partition));
_consumerClient.Subscribe(topicName);

//_consumerClient.Assign(topics.Select(x=> new TopicPartition(x, 0)));
_consumerClient.Subscribe(topics);
} }


public void Listening(TimeSpan timeout, CancellationToken cancellationToken) public void Listening(TimeSpan timeout, CancellationToken cancellationToken)


+ 2
- 3
src/DotNetCore.CAP/IConsumerClient.cs View File

@@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using System.Threading; using System.Threading;


namespace DotNetCore.CAP namespace DotNetCore.CAP
@@ -8,9 +9,7 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public interface IConsumerClient : IDisposable public interface IConsumerClient : IDisposable
{ {
void Subscribe(string topic);

void Subscribe(string topic, int partition);
void Subscribe(IEnumerable<string> topics);


void Listening(TimeSpan timeout, CancellationToken cancellationToken); void Listening(TimeSpan timeout, CancellationToken cancellationToken);




+ 2
- 4
src/DotNetCore.CAP/IConsumerHandler.Default.cs View File

@@ -1,4 +1,5 @@
using System; using System;
using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
@@ -56,10 +57,7 @@ namespace DotNetCore.CAP
{ {
RegisterMessageProcessor(client); RegisterMessageProcessor(client);


foreach (var item in matchGroup.Value)
{
client.Subscribe(item.Attribute.Name);
}
client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));


client.Listening(_pollingDelay, _cts.Token); client.Listening(_pollingDelay, _cts.Token);
} }


Loading…
Cancel
Save