diff --git a/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs index aec7d39..60bcea7 100644 --- a/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs +++ b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs @@ -43,7 +43,7 @@ namespace DotNetCore.CAP.Kafka var result = await producer.ProduceAsync(message.GetName(), new Message { Headers = headers, - Key = message.GetId(), + Key = message.Headers.TryGetValue(KafkaHeaders.KafkaKey, out string kafkaMessageKey) && !string.IsNullOrEmpty(kafkaMessageKey) ? kafkaMessageKey : message.GetId(), Value = message.Body }); diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index dd18f21..28b7137 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -62,6 +62,8 @@ namespace DotNetCore.CAP.Kafka } headers.Add(Messages.Headers.Group, _groupId); + headers.Add(KafkaHeaders.KafkaKey, consumerResult.Key); + if (_kafkaOptions.CustomHeaders != null) { var customHeaders = _kafkaOptions.CustomHeaders(consumerResult); diff --git a/src/DotNetCore.CAP.Kafka/KafkaHeaders.cs b/src/DotNetCore.CAP.Kafka/KafkaHeaders.cs new file mode 100644 index 0000000..350fa2b --- /dev/null +++ b/src/DotNetCore.CAP.Kafka/KafkaHeaders.cs @@ -0,0 +1,10 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace DotNetCore.CAP.Kafka +{ + public static class KafkaHeaders + { + public const string KafkaKey = "cap-kafka-key"; + } +}