diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs index 49da33a..3616515 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs @@ -159,7 +159,7 @@ namespace DotNetCore.CAP.AzureServiceBus private Task OnConsumerReceived(Message message, CancellationToken token) { - var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value.ToString()); + var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value?.ToString()); header.Add(Headers.Group, _subscriptionName); var context = new TransportMessage(header, message.Body); diff --git a/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs index aec7d39..60bcea7 100644 --- a/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs +++ b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs @@ -43,7 +43,7 @@ namespace DotNetCore.CAP.Kafka var result = await producer.ProduceAsync(message.GetName(), new Message { Headers = headers, - Key = message.GetId(), + Key = message.Headers.TryGetValue(KafkaHeaders.KafkaKey, out string kafkaMessageKey) && !string.IsNullOrEmpty(kafkaMessageKey) ? kafkaMessageKey : message.GetId(), Value = message.Body }); diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index dd18f21..28b7137 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -62,6 +62,8 @@ namespace DotNetCore.CAP.Kafka } headers.Add(Messages.Headers.Group, _groupId); + headers.Add(KafkaHeaders.KafkaKey, consumerResult.Key); + if (_kafkaOptions.CustomHeaders != null) { var customHeaders = _kafkaOptions.CustomHeaders(consumerResult); diff --git a/src/DotNetCore.CAP.Kafka/KafkaHeaders.cs b/src/DotNetCore.CAP.Kafka/KafkaHeaders.cs new file mode 100644 index 0000000..350fa2b --- /dev/null +++ b/src/DotNetCore.CAP.Kafka/KafkaHeaders.cs @@ -0,0 +1,10 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace DotNetCore.CAP.Kafka +{ + public static class KafkaHeaders + { + public const string KafkaKey = "cap-kafka-key"; + } +} diff --git a/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs index 86432d1..a4391ec 100644 --- a/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs @@ -175,7 +175,7 @@ select count(Id) from {_recName} with (nolock) where StatusName = N'Failed';"; var sqlQuery2008 = $@" with aggr as ( select replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) as [Key], - count(id) [Count] + count(Id) [Count] from {tableName} where StatusName = @statusName group by replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) @@ -186,7 +186,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; var sqlQuery = $@" with aggr as ( select FORMAT(Added,'yyyy-MM-dd-HH') as [Key], - count(id) [Count] + count(Id) [Count] from {tableName} where StatusName = @statusName group by FORMAT(Added,'yyyy-MM-dd-HH')