Просмотр исходного кода

Record the exception in the headers (#679)

* Record the exception in the headers

* Record only Exception.Message

Co-authored-by: wandone\xlw <123456>
master
xiangxiren 4 лет назад
committed by GitHub
Родитель
Сommit
1004b54ace
Не найден GPG ключ соответствующий данной подписи Идентификатор GPG ключа: 4AEE18F83AFDEB23
9 измененных файлов: 29 добавлений и 16 удалений
  1. +2
    -0
      src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs
  2. +2
    -0
      src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs
  3. +2
    -1
      src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
  4. +2
    -1
      src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
  5. +2
    -1
      src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs
  6. +2
    -1
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  7. +2
    -1
      src/DotNetCore.CAP/Internal/IMessageSender.Default.cs
  8. +1
    -8
      src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs
  9. +14
    -3
      src/DotNetCore.CAP/Messages/Message.cs

+ 2
- 0
src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs Просмотреть файл

@@ -35,6 +35,7 @@ namespace DotNetCore.CAP.InMemoryStorage
{
PublishedMessages[message.DbId].StatusName = state;
PublishedMessages[message.DbId].ExpiresAt = message.ExpiresAt;
PublishedMessages[message.DbId].Content = _serializer.Serialize(message.Origin);
return Task.CompletedTask;
}

@@ -42,6 +43,7 @@ namespace DotNetCore.CAP.InMemoryStorage
{
ReceivedMessages[message.DbId].StatusName = state;
ReceivedMessages[message.DbId].ExpiresAt = message.ExpiresAt;
ReceivedMessages[message.DbId].Content = _serializer.Serialize(message.Origin);
return Task.CompletedTask;
}



+ 2
- 0
src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs Просмотреть файл

@@ -42,6 +42,7 @@ namespace DotNetCore.CAP.MongoDB
var collection = _database.GetCollection<PublishedMessage>(_options.Value.PublishedCollection);

var updateDef = Builders<PublishedMessage>.Update
.Set(x => x.Content, _serializer.Serialize(message.Origin))
.Set(x => x.Retries, message.Retries)
.Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, state.ToString("G"));
@@ -54,6 +55,7 @@ namespace DotNetCore.CAP.MongoDB
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection);

var updateDef = Builders<ReceivedMessage>.Update
.Set(x => x.Content, _serializer.Serialize(message.Origin))
.Set(x => x.Retries, message.Retries)
.Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, state.ToString("G"));


+ 2
- 1
src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs Просмотреть файл

@@ -158,11 +158,12 @@ namespace DotNetCore.CAP.MySql
private async Task ChangeMessageStateAsync(string tableName, MediumMessage message, StatusName state)
{
var sql =
$"UPDATE `{tableName}` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
$"UPDATE `{tableName}` SET `Content`=@Content,`Retries`=@Retries,`ExpiresAt`=@ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";

object[] sqlParams =
{
new MySqlParameter("@Id", message.DbId),
new MySqlParameter("@Content", _serializer.Serialize(message.Origin)),
new MySqlParameter("@Retries", message.Retries),
new MySqlParameter("@ExpiresAt", message.ExpiresAt),
new MySqlParameter("@StatusName", state.ToString("G"))


+ 2
- 1
src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs Просмотреть файл

@@ -160,11 +160,12 @@ namespace DotNetCore.CAP.PostgreSql
private async Task ChangeMessageStateAsync(string tableName, MediumMessage message, StatusName state)
{
var sql =
$"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id";
$"UPDATE {tableName} SET \"Content\"=@Content,\"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id";

object[] sqlParams =
{
new NpgsqlParameter("@Id", long.Parse(message.DbId)),
new NpgsqlParameter("@Content", _serializer.Serialize(message.Origin)),
new NpgsqlParameter("@Retries", message.Retries),
new NpgsqlParameter("@ExpiresAt", message.ExpiresAt),
new NpgsqlParameter("@StatusName", state.ToString("G"))


+ 2
- 1
src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs Просмотреть файл

@@ -159,11 +159,12 @@ namespace DotNetCore.CAP.SqlServer
private async Task ChangeMessageStateAsync(string tableName, MediumMessage message, StatusName state)
{
var sql =
$"UPDATE {tableName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id";
$"UPDATE {tableName} SET Content=@Content, Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id";

object[] sqlParams =
{
new SqlParameter("@Id", message.DbId),
new SqlParameter("@Content", _serializer.Serialize(message.Origin)),
new SqlParameter("@Retries", message.Retries),
new SqlParameter("@ExpiresAt", message.ExpiresAt),
new SqlParameter("@StatusName", state.ToString("G"))


+ 2
- 1
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs Просмотреть файл

@@ -177,6 +177,7 @@ namespace DotNetCore.CAP.Internal

var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType;
message = await _serializer.DeserializeAsync(transportMessage, type);
message.RemoveException();
}
catch (Exception e)
{
@@ -276,7 +277,7 @@ namespace DotNetCore.CAP.Internal
break;
case MqLogType.MessageNotInflight:
_logger.LogError("AmazonSQS subscriber change message's visibility failed, message isn't in flight. --> " + logmsg.Reason);
break;
break;
default:
throw new ArgumentOutOfRangeException();
}


+ 2
- 1
src/DotNetCore.CAP/Internal/IMessageSender.Default.cs Просмотреть файл

@@ -96,6 +96,7 @@ namespace DotNetCore.CAP.Internal
{
var needRetry = UpdateMessageForRetry(message);

message.Origin.AddOrUpdateException(ex);
message.ExpiresAt = message.Added.AddDays(15);

await _dataStorage.ChangePublishStateAsync(message, StatusName.Failed);
@@ -118,7 +119,7 @@ namespace DotNetCore.CAP.Internal
ServiceProvider = _serviceProvider,
MessageType = MessageType.Publish,
Message = message.Origin
});
});

_logger.SenderAfterThreshold(message.DbId, _options.Value.FailedRetryCount);
}


+ 1
- 8
src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs Просмотреть файл

@@ -122,11 +122,9 @@ namespace DotNetCore.CAP.Internal
message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException
}

//TODO: Add exception to content
// AddErrorReasonToContent(message, ex);

var needRetry = UpdateMessageForRetry(message);

message.Origin.AddOrUpdateException(ex);
message.ExpiresAt = message.Added.AddDays(15);

await _dataStorage.ChangeReceiveStateAsync(message, StatusName.Failed);
@@ -167,11 +165,6 @@ namespace DotNetCore.CAP.Internal
return true;
}

//private static void AddErrorReasonToContent(CapReceivedMessage message, Exception exception)
//{
// message.Content = Helper.AddExceptionProperty(message.Content, exception);
//}

private async Task InvokeConsumerMethodAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken)
{
var consumerContext = new ConsumerContext(descriptor, message.Origin);


+ 14
- 3
src/DotNetCore.CAP/Messages/Message.cs Просмотреть файл

@@ -13,7 +13,7 @@ namespace DotNetCore.CAP.Messages
/// System.Text.Json requires that class explicitly has a parameterless constructor
/// and public properties have a setter.
/// </summary>
public Message() {}
public Message() { }

public Message(IDictionary<string, string> headers, [CanBeNull] object value)
{
@@ -67,6 +67,17 @@ namespace DotNetCore.CAP.Messages
{
return message.Headers.ContainsKey(Headers.Exception);
}
}

}
public static void AddOrUpdateException(this Message message, Exception ex)
{
var msg = $"{ex.GetType().Name}-->{ex.Message}";

message.Headers[Headers.Exception] = msg;
}

public static void RemoveException(this Message message)
{
message.Headers.Remove(Headers.Exception);
}
}
}

Загрузка…
Отмена
Сохранить