alexinea vor 7 Jahren
Ursprung
Commit
4c5da09db6
4 geänderte Dateien mit 17 neuen und 13 gelöschten Zeilen
  1. +10
    -8
      src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs
  2. +3
    -2
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
  3. +3
    -2
      src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs
  4. +1
    -1
      src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs

+ 10
- 8
src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs Datei anzeigen

@@ -21,32 +21,34 @@ namespace DotNetCore.CAP
/// Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter.
/// </para>
/// </summary>
public IDictionary<string, object> MainConfig { get; private set; }
public readonly IDictionary<string, object> MainConfig;

/// <summary>
/// The `bootstrap.servers` item config of `MainConfig`.
/// The `bootstrap.servers` item config of <see cref="MainConfig"/>.
/// <para>
/// Initial list of brokers as a CSV list of broker host or host:port.
/// </para>
/// </summary>
public string Servers { get; set; }

internal IEnumerable<KeyValuePair<string, object>> AsRdkafkaConfig()
internal IEnumerable<KeyValuePair<string, object>> AskafkaConfig()
{
if (MainConfig.ContainsKey("bootstrap.servers"))
{
return MainConfig.AsEnumerable();
}

if (string.IsNullOrEmpty(Servers))
if (string.IsNullOrWhiteSpace(Servers))
{
throw new ArgumentNullException(nameof(Servers));
}
else
{
MainConfig.Add("bootstrap.servers", Servers);
}
MainConfig.Add("bootstrap.servers", Servers);

MainConfig["queue.buffering.max.ms"] = "10";
MainConfig["socket.blocking.max.ms"] = "10";
MainConfig["enable.auto.commit"] = "false";

return MainConfig.AsEnumerable();
}
}

+ 3
- 2
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs Datei anzeigen

@@ -21,7 +21,7 @@ namespace DotNetCore.CAP.Kafka
public KafkaConsumerClient(string groupId, KafkaOptions options)
{
_groupId = groupId;
_kafkaOptions = options;
_kafkaOptions = options ?? throw new ArgumentNullException(nameof(options));
StringDeserializer = new StringDeserializer(Encoding.UTF8);
}

@@ -65,7 +65,7 @@ namespace DotNetCore.CAP.Kafka
{
_kafkaOptions.MainConfig.Add("group.id", _groupId);

var config = _kafkaOptions.AsRdkafkaConfig();
var config = _kafkaOptions.AskafkaConfig();
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);

_consumerClient.OnMessage += ConsumerClient_OnMessage;
@@ -80,6 +80,7 @@ namespace DotNetCore.CAP.Kafka
Name = e.Topic,
Content = e.Value
};

OnMessageReceieved?.Invoke(sender, message);
}



+ 3
- 2
src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs Datei anzeigen

@@ -1,4 +1,5 @@
using Microsoft.Extensions.Options;
using System;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Kafka
{
@@ -8,7 +9,7 @@ namespace DotNetCore.CAP.Kafka

public KafkaConsumerClientFactory(IOptions<KafkaOptions> kafkaOptions)
{
_kafkaOptions = kafkaOptions.Value;
_kafkaOptions = kafkaOptions?.Value ?? throw new ArgumentNullException(nameof(kafkaOptions));
}

public IConsumerClient Create(string groupId)


+ 1
- 1
src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs Datei anzeigen

@@ -27,7 +27,7 @@ namespace DotNetCore.CAP.Kafka
{
try
{
var config = _kafkaOptions.AsRdkafkaConfig();
var config = _kafkaOptions.AskafkaConfig();
var contentBytes = Encoding.UTF8.GetBytes(content);
using (var producer = new Producer(config))
{


Laden…
Abbrechen
Speichern