From d5314e1ae318b87a4146f0a9f8153752edaee6af Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 31 Oct 2019 16:42:56 +0800 Subject: [PATCH] bug fixed --- .../Controllers/ValuesController.cs | 9 +-- .../Sample.RabbitMQ.MySql.csproj | 2 +- src/DotNetCore.CAP.MySql/MySqlDataStorage.cs | 75 ++++++++++++------- .../Abstractions/CapPublisher.cs | 4 +- .../IConsumerRegister.Default.cs | 42 +++++++++-- src/DotNetCore.CAP/Messages/Headers.cs | 2 + src/DotNetCore.CAP/Messages/Message.cs | 5 ++ .../Persistence/IDataStorage.cs | 4 +- 8 files changed, 98 insertions(+), 45 deletions(-) diff --git a/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs b/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs index b8c920f..8e9ae75 100644 --- a/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs +++ b/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Generic; using System.Data; using System.Threading.Tasks; using Dapper; @@ -36,7 +35,7 @@ namespace Sample.RabbitMQ.MySql.Controllers { using (var connection = new MySqlConnection(AppDbContext.ConnectionString)) { - using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false)) + using (var transaction = connection.BeginTransaction(_capBus, true)) { //your business code connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); @@ -45,8 +44,6 @@ namespace Sample.RabbitMQ.MySql.Controllers //{ _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); //} - - transaction.Commit(); } } @@ -60,7 +57,7 @@ namespace Sample.RabbitMQ.MySql.Controllers { dbContext.Persons.Add(new Person() { Name = "ef.transaction" }); - for (int i = 0; i < 5; i++) + for (int i = 0; i < 1; i++) { _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); } @@ -74,7 +71,7 @@ namespace Sample.RabbitMQ.MySql.Controllers [NonAction] [CapSubscribe("sample.rabbitmq.mysql")] - public void Subscriber(Person2 p) + public void Subscriber(DateTime p) { Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}"); } diff --git a/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj b/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj index 8756e56..ce72296 100644 --- a/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj +++ b/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj @@ -5,7 +5,7 @@ - + diff --git a/src/DotNetCore.CAP.MySql/MySqlDataStorage.cs b/src/DotNetCore.CAP.MySql/MySqlDataStorage.cs index 96aff83..f629203 100644 --- a/src/DotNetCore.CAP.MySql/MySqlDataStorage.cs +++ b/src/DotNetCore.CAP.MySql/MySqlDataStorage.cs @@ -34,8 +34,8 @@ namespace DotNetCore.CAP.MySql await connection.ExecuteAsync(sql, new { Id = message.DbId, - Retries = message.Retries, - ExpiresAt = message.ExpiresAt, + message.Retries, + message.ExpiresAt, StatusName = state.ToString("G") }); } @@ -50,8 +50,8 @@ namespace DotNetCore.CAP.MySql await connection.ExecuteAsync(sql, new { Id = message.DbId, - Retries = message.Retries, - ExpiresAt = message.ExpiresAt, + message.Retries, + message.ExpiresAt, StatusName = state.ToString("G") }); } @@ -61,7 +61,7 @@ namespace DotNetCore.CAP.MySql { var sql = $"INSERT INTO `{_options.Value.TableNamePrefix}.published`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; - var message = new MediumMessage() + var message = new MediumMessage { DbId = content.GetId(), Origin = content, @@ -75,9 +75,9 @@ namespace DotNetCore.CAP.MySql Id = message.DbId, Name = name, Content = StringSerializer.Serialize(message.Origin), - Retries = message.Retries, - Added = message.Added, - ExpiresAt = message.ExpiresAt, + message.Retries, + message.Added, + message.ExpiresAt, StatusName = StatusName.Scheduled }; @@ -103,37 +103,56 @@ namespace DotNetCore.CAP.MySql return message; } - public Task StoreMessageAsync(string name, string group, Message content, CancellationToken cancellationToken = default) + public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content) { var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; - var message = new MediumMessage() + using (var connection = new MySqlConnection(_options.Value.ConnectionString)) + { + await connection.ExecuteAsync(sql, new + { + Id = SnowflakeId.Default().NextId().ToString(), + Group = group, + Name = name, + Content = content, + Retries = _capOptions.Value.FailedRetryCount, + Added = DateTime.Now, + ExpiresAt = DateTime.Now.AddDays(15), + StatusName = nameof(StatusName.Failed) + }); + } + } + + public Task StoreReceivedMessageAsync(string name, string group, Message message) + { + var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + + var mdMessage = new MediumMessage { DbId = SnowflakeId.Default().NextId().ToString(), - Origin = content, + Origin = message, Added = DateTime.Now, ExpiresAt = null, Retries = 0 }; - - var po = new - { - Id = message.DbId, - Group = group, - Name = name, - Content = StringSerializer.Serialize(message.Origin), - Retries = message.Retries, - Added = message.Added, - ExpiresAt = message.ExpiresAt, - StatusName = nameof(StatusName.Scheduled) - }; - + var content = StringSerializer.Serialize(mdMessage.Origin); using (var connection = new MySqlConnection(_options.Value.ConnectionString)) { - connection.Execute(sql, po); + + connection.Execute(sql, new + { + Id = mdMessage.DbId, + Group = group, + Name = name, + Content = content, + mdMessage.Retries, + mdMessage.Added, + mdMessage.ExpiresAt, + StatusName = nameof(StatusName.Scheduled) + }); } - return Task.FromResult(message); + return Task.FromResult(mdMessage); } public async Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) @@ -157,7 +176,7 @@ namespace DotNetCore.CAP.MySql var reader = await connection.ExecuteReaderAsync(sql); while (reader.Read()) { - result.Add(new MediumMessage() + result.Add(new MediumMessage { DbId = reader.GetInt64(0).ToString(), Origin = StringSerializer.DeSerialize(reader.GetString(3)), @@ -181,7 +200,7 @@ namespace DotNetCore.CAP.MySql var reader = await connection.ExecuteReaderAsync(sql); while (reader.Read()) { - result.Add(new MediumMessage() + result.Add(new MediumMessage { DbId = reader.GetInt64(0).ToString(), Origin = StringSerializer.DeSerialize(reader.GetString(3)), diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisher.cs b/src/DotNetCore.CAP/Abstractions/CapPublisher.cs index 13df6a9..d9dd8de 100644 --- a/src/DotNetCore.CAP/Abstractions/CapPublisher.cs +++ b/src/DotNetCore.CAP/Abstractions/CapPublisher.cs @@ -57,7 +57,7 @@ namespace DotNetCore.CAP.Abstractions optionHeaders.Add(Headers.CorrelationSequence, 0.ToString()); } optionHeaders.Add(Headers.MessageName, name); - optionHeaders.Add(Headers.Type, typeof(T).AssemblyQualifiedName); + optionHeaders.Add(Headers.Type, typeof(T).FullName); optionHeaders.Add(Headers.SentTime, DateTimeOffset.Now.ToString()); var message = new Message(optionHeaders, value); @@ -79,7 +79,7 @@ namespace DotNetCore.CAP.Abstractions { var transaction = (CapTransactionBase)Transaction.Value; - var mediumMessage = await _storage.StoreMessageAsync(name, message, transaction, cancellationToken); + var mediumMessage = await _storage.StoreMessageAsync(name, message, transaction.DbTransaction, cancellationToken); s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); diff --git a/src/DotNetCore.CAP/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/IConsumerRegister.Default.cs index ecfb31c..dd4f2ec 100644 --- a/src/DotNetCore.CAP/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/IConsumerRegister.Default.cs @@ -4,6 +4,8 @@ using System; using System.Diagnostics; using System.Linq; +using System.Text; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Diagnostics; @@ -170,18 +172,44 @@ namespace DotNetCore.CAP var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType; - var message = await _serializer.DeserializeAsync(transportMessage, type); + Message message; + try + { + message = await _serializer.DeserializeAsync(transportMessage, type); + } + catch (Exception e) + { + transportMessage.Headers.Add(Headers.Exception, e.Message); + var dataUri = $"data:{transportMessage.Headers[Headers.Type]};base64," + Convert.ToBase64String(transportMessage.Body); + message = new Message(transportMessage.Headers, dataUri); + } - var mediumMessage = await _storage.StoreMessageAsync(name, group, message); + if (message.HasException()) + { + var content = StringSerializer.Serialize(message); + await _storage.StoreReceivedExceptionMessageAsync(name, group, content); - client.Commit(); + client.Commit(); - if (operationId != null) - { - TracingAfter(operationId.Value, message, startTime, stopwatch.Elapsed); + if (operationId != null) + { + TracingAfter(operationId.Value, message, startTime, stopwatch.Elapsed); + } } + else + { + var mediumMessage = await _storage.StoreReceivedMessageAsync(name, group, message); + mediumMessage.Origin = message; - _dispatcher.EnqueueToExecute(mediumMessage, executor); + client.Commit(); + + if (operationId != null) + { + TracingAfter(operationId.Value, message, startTime, stopwatch.Elapsed); + } + + _dispatcher.EnqueueToExecute(mediumMessage, executor); + } } catch (Exception e) { diff --git a/src/DotNetCore.CAP/Messages/Headers.cs b/src/DotNetCore.CAP/Messages/Headers.cs index f953bd4..0d9e3da 100644 --- a/src/DotNetCore.CAP/Messages/Headers.cs +++ b/src/DotNetCore.CAP/Messages/Headers.cs @@ -23,5 +23,7 @@ public const string Group = "cap-msg-group"; public const string SentTime = "cap-senttime"; + + public const string Exception = "cap-exception"; } } diff --git a/src/DotNetCore.CAP/Messages/Message.cs b/src/DotNetCore.CAP/Messages/Message.cs index 9fdd778..213193f 100644 --- a/src/DotNetCore.CAP/Messages/Message.cs +++ b/src/DotNetCore.CAP/Messages/Message.cs @@ -51,6 +51,11 @@ namespace DotNetCore.CAP.Messages return 0; } + + public static bool HasException(this Message message) + { + return message.Headers.ContainsKey(Headers.Exception); + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Persistence/IDataStorage.cs b/src/DotNetCore.CAP/Persistence/IDataStorage.cs index 41255c2..9bdec2a 100644 --- a/src/DotNetCore.CAP/Persistence/IDataStorage.cs +++ b/src/DotNetCore.CAP/Persistence/IDataStorage.cs @@ -15,7 +15,9 @@ namespace DotNetCore.CAP.Persistence Task StoreMessageAsync(string name, Message content, object dbTransaction = null, CancellationToken cancellationToken = default); - Task StoreMessageAsync(string name, string group, Message content, CancellationToken cancellationToken = default); + Task StoreReceivedExceptionMessageAsync(string name, string group, string content); + + Task StoreReceivedMessageAsync(string name, string group, Message content); Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default);