From 2288025c81327788b9108e84c640d03290d36be8 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Tue, 15 Feb 2022 10:49:01 +0800 Subject: [PATCH] Fixes kafka consume excepiton for GroupLoadInProress errcode (#1085) * Fixes and add options of kafka consume excepiton for GroupLoadInProress errcode. #1084 * clean code * Add kafka consume exception logs. --- src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs | 9 ++++++++ .../KafkaConsumerClient.cs | 22 ++++++++++++++++--- .../Internal/IConsumerRegister.Default.cs | 3 +++ src/DotNetCore.CAP/Transport/MqLogType.cs | 1 + 4 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs index 5ef9c30..3453ad5 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs @@ -24,6 +24,10 @@ namespace DotNetCore.CAP public KafkaOptions() { MainConfig = new Dictionary(); + RetriableErrorCodes = new List + { + ErrorCode.GroupLoadInProress + }; } /// @@ -43,5 +47,10 @@ namespace DotNetCore.CAP /// If you need to get offset and partition and so on.., you can use this function to write additional header into /// public Func, List>>? CustomHeaders { get; set; } + + /// + /// New retriable error code (refer to https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafkacpp_8h.html#a4c6b7af48c215724c323c60ea4080dbf) + /// + public IList RetriableErrorCodes { get; set; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index 98901bd..b0f3db5 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -17,7 +17,7 @@ namespace DotNetCore.CAP.Kafka { public class KafkaConsumerClient : IConsumerClient { - private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1); + private static readonly SemaphoreSlim ConnectionLock = new(initialCount: 1, maxCount: 1); private readonly string _groupId; private readonly KafkaOptions _kafkaOptions; @@ -33,7 +33,7 @@ namespace DotNetCore.CAP.Kafka public event EventHandler? OnLog; - public BrokerAddress BrokerAddress => new ("Kafka", _kafkaOptions.Servers); + public BrokerAddress BrokerAddress => new("Kafka", _kafkaOptions.Servers); public ICollection FetchTopics(IEnumerable topicNames) { @@ -89,7 +89,23 @@ namespace DotNetCore.CAP.Kafka while (true) { - var consumerResult = _consumerClient!.Consume(cancellationToken); + ConsumeResult consumerResult; + + try + { + consumerResult = _consumerClient!.Consume(cancellationToken); + } + catch (ConsumeException e) when (_kafkaOptions.RetriableErrorCodes.Contains(e.Error.Code)) + { + var logArgs = new LogMessageEventArgs + { + LogType = MqLogType.ConsumeRetries, + Reason = e.Error.ToString() + }; + OnLog?.Invoke(null, logArgs); + + continue; + } if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue; diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index b78e264..08df370 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -304,6 +304,9 @@ namespace DotNetCore.CAP.Internal case MqLogType.ConsumeError: _logger.LogError("Kafka client consume error. --> " + logmsg.Reason); break; + case MqLogType.ConsumeRetries: + _logger.LogWarning("Kafka client consume exception, retying... --> " + logmsg.Reason); + break; case MqLogType.ServerConnError: _isHealthy = false; _logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason); diff --git a/src/DotNetCore.CAP/Transport/MqLogType.cs b/src/DotNetCore.CAP/Transport/MqLogType.cs index 59ea457..5c1abe9 100644 --- a/src/DotNetCore.CAP/Transport/MqLogType.cs +++ b/src/DotNetCore.CAP/Transport/MqLogType.cs @@ -15,6 +15,7 @@ namespace DotNetCore.CAP.Transport //Kafka ConsumeError, + ConsumeRetries, ServerConnError, //AzureServiceBus