diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/ConsistencyMessageStore.cs b/src/DotNetCore.CAP.EntityFrameworkCore/ConsistencyMessageStore.cs index a24beb9..641c4c7 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/ConsistencyMessageStore.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/ConsistencyMessageStore.cs @@ -35,7 +35,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore public async Task StoreSentMessageAsync(CapSentMessage message) { if (message == null) throw new ArgumentNullException(nameof(message)); - + Context.Add(message); await Context.SaveChangesAsync(); return OperateResult.Success; diff --git a/src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs b/src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs index 6ce3418..4c3375e 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs @@ -18,7 +18,6 @@ namespace Microsoft.Extensions.DependencyInjection /// An for creating and configuring the CAP system. public static CapBuilder AddKafka(this CapBuilder builder, Action setupAction) { - if (setupAction == null) throw new ArgumentNullException(nameof(setupAction)); builder.Services.Configure(setupAction); diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs index ad6d83c..30cd208 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs @@ -1,33 +1,49 @@ using System; using System.Collections.Generic; -using System.Text; +using System.Linq; namespace DotNetCore.CAP.Kafka { + /// + /// Provides programmatic configuration for the CAP kafka project. + /// public class KafkaOptions { - /// - /// Gets the Kafka broker id. - /// - public int BrokerId { get; } + public KafkaOptions() + { + MainConfig = new Dictionary(); + } /// - /// Gets the Kafka broker hostname. + /// librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). + /// + /// Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter. + /// /// - public string Host { get; } + public IDictionary MainConfig { get; private set; } /// - /// Gets the Kafka broker port. + /// The `bootstrap.servers` item config of `MainConfig`. + /// + /// Initial list of brokers as a CSV list of broker host or host:port. + /// /// - public int Port { get; } + public string Servers { get; set; } - /// - /// Returns a JSON representation of the BrokerMetadata object. - /// - /// - /// A JSON representation of the BrokerMetadata object. - /// - public override string ToString() - => $"{{ \"BrokerId\": {BrokerId}, \"Host\": \"{Host}\", \"Port\": {Port} }}"; + internal IEnumerable> AsRdkafkaConfig() + { + if (!MainConfig.ContainsKey("bootstrap.servers")) + { + if (string.IsNullOrEmpty(Servers)) + { + throw new ArgumentNullException(nameof(Servers)); + } + else + { + MainConfig.Add("bootstrap.servers", Servers); + } + } + return MainConfig.AsEnumerable(); + } } -} +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs b/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs index c0538dd..8a9e039 100644 --- a/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs +++ b/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs @@ -123,7 +123,7 @@ namespace DotNetCore.CAP.Kafka { try { - var config = new Dictionary { { "bootstrap.servers", _kafkaOptions.Host } }; + var config = _kafkaOptions.AsRdkafkaConfig(); using (var producer = new Producer(config, null, new StringSerializer(Encoding.UTF8))) { var message = producer.ProduceAsync(topic, null, content).Result; diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index ce9f383..ae574a4 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Generic; using System.Text; using Confluent.Kafka; using Confluent.Kafka.Serialization; @@ -56,12 +55,11 @@ namespace DotNetCore.CAP.Kafka private void InitKafkaClient() { - var config = new Dictionary{ - { "group.id", _groupId }, - { "bootstrap.servers", _kafkaOptions.Host } - }; + _kafkaOptions.MainConfig.Add("group.id", _groupId); + var config = _kafkaOptions.AsRdkafkaConfig(); _consumerClient = new Consumer(config, null, StringDeserializer); + _consumerClient.OnMessage += ConsumerClient_OnMessage; } diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs index 6d2a2d4..6163ae9 100644 --- a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs +++ b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace DotNetCore.CAP.RabbitMQ +namespace DotNetCore.CAP.RabbitMQ { public class RabbitMQOptions { @@ -68,4 +64,4 @@ namespace DotNetCore.CAP.RabbitMQ /// public int Port { get; set; } = -1; } -} +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs index 48a75e6..2d64f0c 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs @@ -1,5 +1,4 @@ -using DotNetCore.CAP.Infrastructure; -using Microsoft.Extensions.Options; +using Microsoft.Extensions.Options; namespace DotNetCore.CAP.RabbitMQ { diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 51d293f..2ca40b0 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -7,7 +7,7 @@ namespace DotNetCore.CAP.Infrastructure /// public class CapOptions { - /// + /// /// Corn expression for configuring retry cron job. Default is 1 min. /// public string CronExp { get; set; } = Cron.Minutely();