From 004ed56991f5925fef4cd416126db793280cdf64 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Fri, 15 Nov 2019 15:03:13 +0800 Subject: [PATCH] Refactoring kafka transport implementation for version 3.0 --- .../CAP.KafkaCapOptionsExtension.cs | 6 ++-- .../DotNetCore.CAP.Kafka.csproj | 2 +- .../IConnectionPool.Default.cs | 16 ++++----- src/DotNetCore.CAP.Kafka/IConnectionPool.cs | 4 +-- ...ageSender.Kafka.cs => ITransport.Kafka.cs} | 35 +++++++++++-------- .../KafkaConsumerClient.cs | 21 ++++++----- src/DotNetCore.CAP/Messages/Headers.cs | 2 +- .../Messages/TransportMessage.cs | 5 +++ 8 files changed, 50 insertions(+), 41 deletions(-) rename src/DotNetCore.CAP.Kafka/{IPublishMessageSender.Kafka.cs => ITransport.Kafka.cs} (59%) diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs index 6699eb0..f0950b7 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs @@ -3,6 +3,7 @@ using System; using DotNetCore.CAP.Kafka; +using DotNetCore.CAP.Transport; using Microsoft.Extensions.DependencyInjection; // ReSharper disable once CheckNamespace @@ -23,10 +24,9 @@ namespace DotNetCore.CAP services.Configure(_configure); + services.AddSingleton(); services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj index c052d29..a6d7aea 100644 --- a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj +++ b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs b/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs index 5417bdd..0271d1d 100644 --- a/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs +++ b/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs @@ -3,33 +3,33 @@ using System; using System.Collections.Concurrent; +using System.Text.Json; using System.Threading; using Confluent.Kafka; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using Newtonsoft.Json; namespace DotNetCore.CAP.Kafka { public class ConnectionPool : IConnectionPool, IDisposable { private readonly KafkaOptions _options; - private readonly ConcurrentQueue> _producerPool; + private readonly ConcurrentQueue> _producerPool; private int _pCount; private int _maxSize; public ConnectionPool(ILogger logger, IOptions options) { _options = options.Value; - _producerPool = new ConcurrentQueue>(); + _producerPool = new ConcurrentQueue>(); _maxSize = _options.ConnectionPoolSize; - - logger.LogDebug("Kafka configuration of CAP :\r\n {0}", JsonConvert.SerializeObject(_options.AsKafkaConfig(), Formatting.Indented)); + + logger.LogDebug("Kafka configuration of CAP :\r\n {0}", JsonSerializer.Serialize(_options.AsKafkaConfig())); } public string ServersAddress => _options.Servers; - public IProducer RentProducer() + public IProducer RentProducer() { if (_producerPool.TryDequeue(out var producer)) { @@ -38,12 +38,12 @@ namespace DotNetCore.CAP.Kafka return producer; } - producer = new ProducerBuilder(_options.AsKafkaConfig()).Build(); + producer = new ProducerBuilder(_options.AsKafkaConfig()).Build(); return producer; } - public bool Return(IProducer producer) + public bool Return(IProducer producer) { if (Interlocked.Increment(ref _pCount) <= _maxSize) { diff --git a/src/DotNetCore.CAP.Kafka/IConnectionPool.cs b/src/DotNetCore.CAP.Kafka/IConnectionPool.cs index cb62a7d..e07786d 100644 --- a/src/DotNetCore.CAP.Kafka/IConnectionPool.cs +++ b/src/DotNetCore.CAP.Kafka/IConnectionPool.cs @@ -9,8 +9,8 @@ namespace DotNetCore.CAP.Kafka { string ServersAddress { get; } - IProducer RentProducer(); + IProducer RentProducer(); - bool Return(IProducer producer); + bool Return(IProducer producer); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs similarity index 59% rename from src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs rename to src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs index 2cd0b3e..29151a9 100644 --- a/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs +++ b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs @@ -2,48 +2,53 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Linq; +using System.Text; using System.Threading.Tasks; using Confluent.Kafka; using DotNetCore.CAP.Internal; -using DotNetCore.CAP.Processor.States; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Transport; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Kafka { - internal class KafkaPublishMessageSender : BasePublishMessageSender + internal class KafkaTransport : ITransport { private readonly IConnectionPool _connectionPool; private readonly ILogger _logger; - public KafkaPublishMessageSender( - ILogger logger, - IOptions options, - IStorageConnection connection, - IConnectionPool connectionPool, - IStateChanger stateChanger) - : base(logger, options, connection, stateChanger) + public KafkaTransport(ILogger logger, IConnectionPool connectionPool) { _logger = logger; _connectionPool = connectionPool; } - protected override string ServersAddress => _connectionPool.ServersAddress; + public string Address => _connectionPool.ServersAddress; - public override async Task PublishAsync(string keyName, string content) + public async Task SendAsync(TransportMessage message) { var producer = _connectionPool.RentProducer(); try { - var result = await producer.ProduceAsync(keyName, new Message() + var headers = new Confluent.Kafka.Headers(); + + foreach (var header in message.Headers.Select(x => new Header(x.Key, Encoding.UTF8.GetBytes(x.Value)))) + { + headers.Add(header); + } + + var result = await producer.ProduceAsync(message.GetName(), new Message { - Value = content + Headers = headers, + Key = message.GetId(), + Value = message.Body }); if (result.Status == PersistenceStatus.Persisted || result.Status == PersistenceStatus.PossiblyPersisted) { - _logger.LogDebug($"kafka topic message [{keyName}] has been published."); + _logger.LogDebug($"kafka topic message [{message.GetName()}] has been published."); return OperateResult.Success; } diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index 2a38cbf..a78eb9f 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -3,8 +3,11 @@ using System; using System.Collections.Generic; +using System.Linq; +using System.Text; using System.Threading; using Confluent.Kafka; +using DotNetCore.CAP.Messages; using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Kafka @@ -15,7 +18,7 @@ namespace DotNetCore.CAP.Kafka private readonly string _groupId; private readonly KafkaOptions _kafkaOptions; - private IConsumer _consumerClient; + private IConsumer _consumerClient; public KafkaConsumerClient(string groupId, IOptions options) { @@ -23,7 +26,7 @@ namespace DotNetCore.CAP.Kafka _kafkaOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); } - public event EventHandler OnMessageReceived; + public event EventHandler OnMessageReceived; public event EventHandler OnLog; @@ -51,12 +54,8 @@ namespace DotNetCore.CAP.Kafka if (consumerResult.IsPartitionEOF || consumerResult.Value == null) continue; - var message = new MessageContext - { - Group = _groupId, - Name = consumerResult.Topic, - Content = consumerResult.Value - }; + var header = consumerResult.Headers.ToDictionary(x => x.Key, y => Encoding.UTF8.GetString(y.GetValueBytes())); + var message = new TransportMessage(header, consumerResult.Value); OnMessageReceived?.Invoke(consumerResult, message); } @@ -97,7 +96,7 @@ namespace DotNetCore.CAP.Kafka _kafkaOptions.MainConfig["auto.offset.reset"] = "earliest"; var config = _kafkaOptions.AsKafkaConfig(); - _consumerClient = new ConsumerBuilder(config) + _consumerClient = new ConsumerBuilder(config) .SetErrorHandler(ConsumerClient_OnConsumeError) .Build(); } @@ -105,10 +104,10 @@ namespace DotNetCore.CAP.Kafka finally { _connectionLock.Release(); - } + } } - private void ConsumerClient_OnConsumeError(IConsumer consumer, Error e) + private void ConsumerClient_OnConsumeError(IConsumer consumer, Error e) { var logArgs = new LogMessageEventArgs { diff --git a/src/DotNetCore.CAP/Messages/Headers.cs b/src/DotNetCore.CAP/Messages/Headers.cs index 0d9e3da..02d1de7 100644 --- a/src/DotNetCore.CAP/Messages/Headers.cs +++ b/src/DotNetCore.CAP/Messages/Headers.cs @@ -3,7 +3,7 @@ public static class Headers { /// - /// Id of the message. Either set the ID explicitly when sending a message, or Rebus will assign one to the message. + /// Id of the message. Either set the ID explicitly when sending a message, or assign one to the message. /// public const string MessageId = "cap-msg-id"; diff --git a/src/DotNetCore.CAP/Messages/TransportMessage.cs b/src/DotNetCore.CAP/Messages/TransportMessage.cs index 7183bb4..365d823 100644 --- a/src/DotNetCore.CAP/Messages/TransportMessage.cs +++ b/src/DotNetCore.CAP/Messages/TransportMessage.cs @@ -24,6 +24,11 @@ namespace DotNetCore.CAP.Messages /// public byte[] Body { get; } + public string GetId() + { + return Headers.TryGetValue(Messages.Headers.MessageId, out var value) ? value : null; + } + public string GetName() { return Headers.TryGetValue(Messages.Headers.MessageName, out var value) ? value : null;