diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs index 97d3f66..9247576 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using Confluent.Kafka; // ReSharper disable once CheckNamespace namespace DotNetCore.CAP @@ -43,6 +44,11 @@ namespace DotNetCore.CAP /// public string Servers { get; set; } + /// + /// If you need to get offset and partition and so on.., you can use this function to write additional header into + /// + public Func, List>> CustomHeaders { get; set; } + internal IEnumerable> AsKafkaConfig() { if (_kafkaConfig == null) diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index eeecd80..afad455 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -62,6 +62,15 @@ namespace DotNetCore.CAP.Kafka } headers.Add(Messages.Headers.Group, _groupId); + if (_kafkaOptions.CustomHeaders != null) + { + var customHeaders = _kafkaOptions.CustomHeaders(consumerResult); + foreach (var customHeader in customHeaders) + { + headers.Add(customHeader.Key, customHeader.Value); + } + } + var message = new TransportMessage(headers, consumerResult.Value); OnMessageReceived?.Invoke(consumerResult, message);