Преглед изворни кода

Fixes kafka consume excepiton for GroupLoadInProress errcode (#1085)

* Fixes and add options of kafka consume excepiton for GroupLoadInProress errcode.  #1084

* clean code

* Add kafka consume exception logs.
master
Savorboard пре 2 година
committed by GitHub
родитељ
комит
2288025c81
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 измењених фајлова са 32 додато и 3 уклоњено
  1. +9
    -0
      src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs
  2. +19
    -3
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
  3. +3
    -0
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  4. +1
    -0
      src/DotNetCore.CAP/Transport/MqLogType.cs

+ 9
- 0
src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs Прегледај датотеку

@@ -24,6 +24,10 @@ namespace DotNetCore.CAP
public KafkaOptions()
{
MainConfig = new Dictionary<string, string>();
RetriableErrorCodes = new List<ErrorCode>
{
ErrorCode.GroupLoadInProress
};
}

/// <summary>
@@ -43,5 +47,10 @@ namespace DotNetCore.CAP
/// If you need to get offset and partition and so on.., you can use this function to write additional header into <see cref="CapHeader"/>
/// </summary>
public Func<ConsumeResult<string, byte[]>, List<KeyValuePair<string, string>>>? CustomHeaders { get; set; }

/// <summary>
/// New retriable error code (refer to https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafkacpp_8h.html#a4c6b7af48c215724c323c60ea4080dbf)
/// </summary>
public IList<ErrorCode> RetriableErrorCodes { get; set; }
}
}

+ 19
- 3
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs Прегледај датотеку

@@ -17,7 +17,7 @@ namespace DotNetCore.CAP.Kafka
{
public class KafkaConsumerClient : IConsumerClient
{
private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private static readonly SemaphoreSlim ConnectionLock = new(initialCount: 1, maxCount: 1);

private readonly string _groupId;
private readonly KafkaOptions _kafkaOptions;
@@ -33,7 +33,7 @@ namespace DotNetCore.CAP.Kafka

public event EventHandler<LogMessageEventArgs>? OnLog;

public BrokerAddress BrokerAddress => new ("Kafka", _kafkaOptions.Servers);
public BrokerAddress BrokerAddress => new("Kafka", _kafkaOptions.Servers);

public ICollection<string> FetchTopics(IEnumerable<string> topicNames)
{
@@ -89,7 +89,23 @@ namespace DotNetCore.CAP.Kafka

while (true)
{
var consumerResult = _consumerClient!.Consume(cancellationToken);
ConsumeResult<string, byte[]> consumerResult;

try
{
consumerResult = _consumerClient!.Consume(cancellationToken);
}
catch (ConsumeException e) when (_kafkaOptions.RetriableErrorCodes.Contains(e.Error.Code))
{
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.ConsumeRetries,
Reason = e.Error.ToString()
};
OnLog?.Invoke(null, logArgs);

continue;
}

if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue;



+ 3
- 0
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs Прегледај датотеку

@@ -304,6 +304,9 @@ namespace DotNetCore.CAP.Internal
case MqLogType.ConsumeError:
_logger.LogError("Kafka client consume error. --> " + logmsg.Reason);
break;
case MqLogType.ConsumeRetries:
_logger.LogWarning("Kafka client consume exception, retying... --> " + logmsg.Reason);
break;
case MqLogType.ServerConnError:
_isHealthy = false;
_logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason);


+ 1
- 0
src/DotNetCore.CAP/Transport/MqLogType.cs Прегледај датотеку

@@ -15,6 +15,7 @@ namespace DotNetCore.CAP.Transport

//Kafka
ConsumeError,
ConsumeRetries,
ServerConnError,

//AzureServiceBus


Loading…
Откажи
Сачувај