From eb90cac6d902c63b3b20e8086ecbe36f3e908940 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Tue, 19 Nov 2019 14:15:33 +0800 Subject: [PATCH] Code refactoring --- .../Internal/ICapPublisher.Default.cs | 6 ++--- .../Internal/IConsumerRegister.Default.cs | 22 ++++++++++--------- .../Internal/IMessageSender.Default.cs | 6 +++-- src/DotNetCore.CAP/Messages/Headers.cs | 10 ++++----- src/DotNetCore.CAP/Transport/ITransport.cs | 5 ++++- 5 files changed, 28 insertions(+), 21 deletions(-) diff --git a/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs b/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs index e822fb6..b904d2f 100644 --- a/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs +++ b/src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs @@ -51,14 +51,14 @@ namespace DotNetCore.CAP.Internal var messageId = SnowflakeId.Default().NextId().ToString(); optionHeaders.Add(Headers.MessageId, messageId); + optionHeaders.Add(Headers.MessageName, name); + optionHeaders.Add(Headers.Type, typeof(T).FullName); + optionHeaders.Add(Headers.SentTime, DateTimeOffset.Now.ToString()); if (!optionHeaders.ContainsKey(Headers.CorrelationId)) { optionHeaders.Add(Headers.CorrelationId, messageId); optionHeaders.Add(Headers.CorrelationSequence, 0.ToString()); } - optionHeaders.Add(Headers.MessageName, name); - optionHeaders.Add(Headers.Type, typeof(T).FullName); - optionHeaders.Add(Headers.SentTime, DateTimeOffset.Now.ToString()); var message = new Message(optionHeaders, value); diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 70e800c..b485edf 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -4,6 +4,7 @@ using System; using System.Diagnostics; using System.Linq; +using System.Runtime.Serialization; using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Diagnostics; @@ -162,22 +163,23 @@ namespace DotNetCore.CAP.Internal var name = transportMessage.GetName(); var group = transportMessage.GetGroup(); - if (!_selector.TryGetTopicExecutor(name, group, out var executor)) - { - var error = $"Message can not be found subscriber. Name:{name}, Group:{group}. {Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63"; - throw new SubscriberNotFoundException(error); - } - - var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType; - Message message; + + var canFindSubscriber = _selector.TryGetTopicExecutor(name, group, out var executor); try { + if (!canFindSubscriber) + { + var error = $"Message can not be found subscriber. Name:{name}, Group:{group}. {Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63"; + throw new SubscriberNotFoundException(error); + } + + var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType; message = await _serializer.DeserializeAsync(transportMessage, type); } catch (Exception e) { - transportMessage.Headers.Add(Headers.Exception, e.Message); + transportMessage.Headers.Add(Headers.Exception, nameof(SerializationException) + "-->" + e.Message); var dataUri = $"data:{transportMessage.Headers[Headers.Type]};base64," + Convert.ToBase64String(transportMessage.Body); message = new Message(transportMessage.Headers, dataUri); } @@ -211,7 +213,7 @@ namespace DotNetCore.CAP.Internal } catch (Exception e) { - _logger.LogError(e, "An exception occurred when store received message. Message:'{0}'.", transportMessage); + _logger.LogError(e, "An exception occurred when process received message. Message:'{0}'.", transportMessage); client.Reject(); diff --git a/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs b/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs index 8cf6f5b..76227d9 100644 --- a/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs +++ b/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs @@ -102,10 +102,12 @@ namespace DotNetCore.CAP.Internal private async Task SetFailedState(MediumMessage message, Exception ex) { - //TODO: Add exception to content - var needRetry = UpdateMessageForRetry(message); + if (message.ExpiresAt != null) + { + message.ExpiresAt = DateTime.Now.AddDays(15); + } await _dataStorage.ChangePublishStateAsync(message, StatusName.Failed); return needRetry; diff --git a/src/DotNetCore.CAP/Messages/Headers.cs b/src/DotNetCore.CAP/Messages/Headers.cs index 02d1de7..b36c958 100644 --- a/src/DotNetCore.CAP/Messages/Headers.cs +++ b/src/DotNetCore.CAP/Messages/Headers.cs @@ -9,18 +9,18 @@ public const string MessageName = "cap-msg-name"; - public const string CorrelationId = "cap-corr-id"; - - public const string CorrelationSequence = "cap-corr-seq"; + public const string Group = "cap-msg-group"; /// /// Message value .NET type /// public const string Type = "cap-msg-type"; - public const string CallbackName = "cap-callback-name"; + public const string CorrelationId = "cap-corr-id"; - public const string Group = "cap-msg-group"; + public const string CorrelationSequence = "cap-corr-seq"; + + public const string CallbackName = "cap-callback-name"; public const string SentTime = "cap-senttime"; diff --git a/src/DotNetCore.CAP/Transport/ITransport.cs b/src/DotNetCore.CAP/Transport/ITransport.cs index 7c9e2e0..7cf3b1f 100644 --- a/src/DotNetCore.CAP/Transport/ITransport.cs +++ b/src/DotNetCore.CAP/Transport/ITransport.cs @@ -1,4 +1,7 @@ -using System.Threading.Tasks; +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Threading.Tasks; using DotNetCore.CAP.Messages; namespace DotNetCore.CAP.Transport