diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs index 6c37b82..b805a58 100644 --- a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs +++ b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs @@ -70,13 +70,15 @@ namespace DotNetCore.CAP.NATS private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e) { var headers = new Dictionary(); + foreach (string h in e.Message.Header.Keys) { headers.Add(h, e.Message.Header[h]); } - var tranmessage = new TransportMessage(headers, e.Message.Data); - tranmessage.Headers.Add(Headers.Group, _groupId); - OnMessageReceived?.Invoke(e.Message.Reply, tranmessage); + + headers.Add(Headers.Group, _groupId); + + OnMessageReceived?.Invoke(e.Message.Reply, new TransportMessage(headers, e.Message.Data)); } public void Commit(object sender) diff --git a/src/DotNetCore.CAP/Internal/Helper.cs b/src/DotNetCore.CAP/Internal/Helper.cs index 2650554..2b3d64a 100644 --- a/src/DotNetCore.CAP/Internal/Helper.cs +++ b/src/DotNetCore.CAP/Internal/Helper.cs @@ -66,7 +66,6 @@ namespace DotNetCore.CAP.Internal public static bool IsInnerIP(string ipAddress) { - bool isInnerIp; var ipNum = GetIpNum(ipAddress); //Private IP: @@ -80,9 +79,9 @@ namespace DotNetCore.CAP.Internal var bEnd = GetIpNum("172.31.255.255"); var cBegin = GetIpNum("192.168.0.0"); var cEnd = GetIpNum("192.168.255.255"); - isInnerIp = IsInner(ipNum, aBegin, aEnd) || IsInner(ipNum, bBegin, bEnd) || IsInner(ipNum, cBegin, cEnd); - return isInnerIp; + return IsInner(ipNum, aBegin, aEnd) || IsInner(ipNum, bBegin, bEnd) || IsInner(ipNum, cBegin, cEnd); } + private static long GetIpNum(string ipAddress) { var ip = ipAddress.Split('.'); diff --git a/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs b/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs index 2841f04..1b9c7b4 100644 --- a/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs +++ b/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs @@ -30,7 +30,7 @@ namespace DotNetCore.CAP.Internal ServiceProvider = service; _dispatcher = service.GetRequiredService(); _storage = service.GetRequiredService(); - _capOptions = service.GetService>().Value; + _capOptions = service.GetRequiredService>().Value; Transaction = new AsyncLocal(); } diff --git a/src/DotNetCore.CAP/Processor/IProcessor.Collector.cs b/src/DotNetCore.CAP/Processor/IProcessor.Collector.cs index d1673f1..1ad0a70 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.Collector.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.Collector.cs @@ -27,7 +27,7 @@ namespace DotNetCore.CAP.Processor _serviceProvider = serviceProvider; _waitingInterval = TimeSpan.FromSeconds(options.Value.CollectorCleaningInterval); - var initializer = _serviceProvider.GetService(); + var initializer = _serviceProvider.GetRequiredService(); _tableNames = new[] { initializer.GetPublishedTableName(), initializer.GetReceivedTableName() }; } @@ -42,7 +42,7 @@ namespace DotNetCore.CAP.Processor var time = DateTime.Now; do { - deletedCount = await _serviceProvider.GetService() + deletedCount = await _serviceProvider.GetRequiredService() .DeleteExpiresAsync(table, time, ItemBatch, context.CancellationToken); if (deletedCount != 0)