Browse Source

fixed kafka client bugs.

master
yangxiaodong 7 years ago
parent
commit
e0de7a16ca
2 changed files with 16 additions and 15 deletions
  1. +15
    -14
      src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs
  2. +1
    -1
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs

+ 15
- 14
src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs View File

@@ -10,6 +10,9 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public class KafkaOptions public class KafkaOptions
{ {
private IEnumerable<KeyValuePair<string, object>> _kafkaConfig;


public KafkaOptions() public KafkaOptions()
{ {
MainConfig = new Dictionary<string, object>(); MainConfig = new Dictionary<string, object>();
@@ -33,23 +36,21 @@ namespace DotNetCore.CAP


internal IEnumerable<KeyValuePair<string, object>> AskafkaConfig() internal IEnumerable<KeyValuePair<string, object>> AskafkaConfig()
{ {
if (MainConfig.ContainsKey("bootstrap.servers"))
{
return MainConfig.AsEnumerable();
}

if (string.IsNullOrWhiteSpace(Servers))
if (_kafkaConfig == null)
{ {
throw new ArgumentNullException(nameof(Servers));
}
if (string.IsNullOrWhiteSpace(Servers))
{
throw new ArgumentNullException(nameof(Servers));
}


MainConfig.Add("bootstrap.servers", Servers);
MainConfig["bootstrap.servers"] = Servers;
MainConfig["queue.buffering.max.ms"] = "10";
MainConfig["socket.blocking.max.ms"] = "10";
MainConfig["enable.auto.commit"] = "false";


MainConfig["queue.buffering.max.ms"] = "10";
MainConfig["socket.blocking.max.ms"] = "10";
MainConfig["enable.auto.commit"] = "false";

return MainConfig.AsEnumerable();
_kafkaConfig = MainConfig.AsEnumerable();
}
return _kafkaConfig;
} }
} }
} }

+ 1
- 1
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs View File

@@ -63,7 +63,7 @@ namespace DotNetCore.CAP.Kafka


private void InitKafkaClient() private void InitKafkaClient()
{ {
_kafkaOptions.MainConfig.Add("group.id", _groupId);
_kafkaOptions.MainConfig["group.id"] = _groupId;


var config = _kafkaOptions.AskafkaConfig(); var config = _kafkaOptions.AskafkaConfig();
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer); _consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);


Loading…
Cancel
Save