diff --git a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs index 5730fec..e7569d8 100644 --- a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs +++ b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs @@ -35,6 +35,7 @@ namespace DotNetCore.CAP.InMemoryStorage { PublishedMessages[message.DbId].StatusName = state; PublishedMessages[message.DbId].ExpiresAt = message.ExpiresAt; + PublishedMessages[message.DbId].Content = _serializer.Serialize(message.Origin); return Task.CompletedTask; } @@ -42,6 +43,7 @@ namespace DotNetCore.CAP.InMemoryStorage { ReceivedMessages[message.DbId].StatusName = state; ReceivedMessages[message.DbId].ExpiresAt = message.ExpiresAt; + ReceivedMessages[message.DbId].Content = _serializer.Serialize(message.Origin); return Task.CompletedTask; } diff --git a/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs index 86f1e39..f6132ed 100644 --- a/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs +++ b/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs @@ -42,6 +42,7 @@ namespace DotNetCore.CAP.MongoDB var collection = _database.GetCollection(_options.Value.PublishedCollection); var updateDef = Builders.Update + .Set(x => x.Content, _serializer.Serialize(message.Origin)) .Set(x => x.Retries, message.Retries) .Set(x => x.ExpiresAt, message.ExpiresAt) .Set(x => x.StatusName, state.ToString("G")); @@ -54,6 +55,7 @@ namespace DotNetCore.CAP.MongoDB var collection = _database.GetCollection(_options.Value.ReceivedCollection); var updateDef = Builders.Update + .Set(x => x.Content, _serializer.Serialize(message.Origin)) .Set(x => x.Retries, message.Retries) .Set(x => x.ExpiresAt, message.ExpiresAt) .Set(x => x.StatusName, state.ToString("G")); diff --git a/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs index 3f3892c..72a0074 100644 --- a/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs @@ -158,11 +158,12 @@ namespace DotNetCore.CAP.MySql private async Task ChangeMessageStateAsync(string tableName, MediumMessage message, StatusName state) { var sql = - $"UPDATE `{tableName}` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; + $"UPDATE `{tableName}` SET `Content`=@Content,`Retries`=@Retries,`ExpiresAt`=@ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; object[] sqlParams = { new MySqlParameter("@Id", message.DbId), + new MySqlParameter("@Content", _serializer.Serialize(message.Origin)), new MySqlParameter("@Retries", message.Retries), new MySqlParameter("@ExpiresAt", message.ExpiresAt), new MySqlParameter("@StatusName", state.ToString("G")) diff --git a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs index 7d7f602..162f0c1 100644 --- a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs @@ -160,11 +160,12 @@ namespace DotNetCore.CAP.PostgreSql private async Task ChangeMessageStateAsync(string tableName, MediumMessage message, StatusName state) { var sql = - $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; + $"UPDATE {tableName} SET \"Content\"=@Content,\"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; object[] sqlParams = { new NpgsqlParameter("@Id", long.Parse(message.DbId)), + new NpgsqlParameter("@Content", _serializer.Serialize(message.Origin)), new NpgsqlParameter("@Retries", message.Retries), new NpgsqlParameter("@ExpiresAt", message.ExpiresAt), new NpgsqlParameter("@StatusName", state.ToString("G")) diff --git a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs index 0db52d1..d927d0d 100644 --- a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs @@ -159,11 +159,12 @@ namespace DotNetCore.CAP.SqlServer private async Task ChangeMessageStateAsync(string tableName, MediumMessage message, StatusName state) { var sql = - $"UPDATE {tableName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; + $"UPDATE {tableName} SET Content=@Content, Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; object[] sqlParams = { new SqlParameter("@Id", message.DbId), + new SqlParameter("@Content", _serializer.Serialize(message.Origin)), new SqlParameter("@Retries", message.Retries), new SqlParameter("@ExpiresAt", message.ExpiresAt), new SqlParameter("@StatusName", state.ToString("G")) diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 9b1098b..c309be7 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -177,6 +177,7 @@ namespace DotNetCore.CAP.Internal var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType; message = await _serializer.DeserializeAsync(transportMessage, type); + message.RemoveException(); } catch (Exception e) { @@ -276,7 +277,7 @@ namespace DotNetCore.CAP.Internal break; case MqLogType.MessageNotInflight: _logger.LogError("AmazonSQS subscriber change message's visibility failed, message isn't in flight. --> " + logmsg.Reason); - break; + break; default: throw new ArgumentOutOfRangeException(); } diff --git a/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs b/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs index 39c619c..76792d6 100644 --- a/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs +++ b/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs @@ -96,6 +96,7 @@ namespace DotNetCore.CAP.Internal { var needRetry = UpdateMessageForRetry(message); + message.Origin.AddOrUpdateException(ex); message.ExpiresAt = message.Added.AddDays(15); await _dataStorage.ChangePublishStateAsync(message, StatusName.Failed); @@ -118,7 +119,7 @@ namespace DotNetCore.CAP.Internal ServiceProvider = _serviceProvider, MessageType = MessageType.Publish, Message = message.Origin - }); + }); _logger.SenderAfterThreshold(message.DbId, _options.Value.FailedRetryCount); } diff --git a/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs index 1d1b9b3..8c57915 100644 --- a/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs @@ -122,11 +122,9 @@ namespace DotNetCore.CAP.Internal message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException } - //TODO: Add exception to content - // AddErrorReasonToContent(message, ex); - var needRetry = UpdateMessageForRetry(message); + message.Origin.AddOrUpdateException(ex); message.ExpiresAt = message.Added.AddDays(15); await _dataStorage.ChangeReceiveStateAsync(message, StatusName.Failed); @@ -167,11 +165,6 @@ namespace DotNetCore.CAP.Internal return true; } - //private static void AddErrorReasonToContent(CapReceivedMessage message, Exception exception) - //{ - // message.Content = Helper.AddExceptionProperty(message.Content, exception); - //} - private async Task InvokeConsumerMethodAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken) { var consumerContext = new ConsumerContext(descriptor, message.Origin); diff --git a/src/DotNetCore.CAP/Messages/Message.cs b/src/DotNetCore.CAP/Messages/Message.cs index eb52659..675fdad 100644 --- a/src/DotNetCore.CAP/Messages/Message.cs +++ b/src/DotNetCore.CAP/Messages/Message.cs @@ -13,7 +13,7 @@ namespace DotNetCore.CAP.Messages /// System.Text.Json requires that class explicitly has a parameterless constructor /// and public properties have a setter. /// - public Message() {} + public Message() { } public Message(IDictionary headers, [CanBeNull] object value) { @@ -67,6 +67,17 @@ namespace DotNetCore.CAP.Messages { return message.Headers.ContainsKey(Headers.Exception); } - } -} \ No newline at end of file + public static void AddOrUpdateException(this Message message, Exception ex) + { + var msg = $"{ex.GetType().Name}-->{ex.Message}"; + + message.Headers[Headers.Exception] = msg; + } + + public static void RemoveException(this Message message) + { + message.Headers.Remove(Headers.Exception); + } + } +}