diff --git a/src/DotNetCore.CAP.MySql/CapPublisher.cs b/src/DotNetCore.CAP.MySql/CapPublisher.cs index 644e743..b6d1d13 100644 --- a/src/DotNetCore.CAP.MySql/CapPublisher.cs +++ b/src/DotNetCore.CAP.MySql/CapPublisher.cs @@ -40,7 +40,7 @@ namespace DotNetCore.CAP.MySql await PublishAsyncInternal(message); } - protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, + protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, CancellationToken cancel = default(CancellationToken)) { var dbTrans = transaction.DbTransaction as IDbTransaction; diff --git a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs index 77f9fbb..b212178 100644 --- a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs +++ b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs @@ -52,7 +52,7 @@ namespace DotNetCore.CAP.MySql } } - public async Task StoreReceivedMessageAsync(CapReceivedMessage message) + public Task StoreReceivedMessageAsync(CapReceivedMessage message) { if (message == null) { @@ -61,11 +61,11 @@ namespace DotNetCore.CAP.MySql var sql = $@" INSERT INTO `{_prefix}.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) -VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST_INSERT_ID();"; +VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; using (var connection = new MySqlConnection(Options.ConnectionString)) { - return await connection.ExecuteScalarAsync(sql, message); + return connection.ExecuteScalarAsync(sql, message); } } diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs index b029f69..18700c9 100644 --- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs +++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs @@ -40,6 +40,7 @@ namespace DotNetCore.CAP.Abstractions { var message = new CapPublishedMessage { + Id = SnowflakeId.Default().NextId(), Name = name, Content = Serialize(contentObj, callbackName), StatusName = StatusName.Scheduled @@ -53,6 +54,7 @@ namespace DotNetCore.CAP.Abstractions { var message = new CapPublishedMessage { + Id = SnowflakeId.Default().NextId(), Name = name, Content = Serialize(contentObj, callbackName), StatusName = StatusName.Scheduled @@ -75,13 +77,11 @@ namespace DotNetCore.CAP.Abstractions { operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); - message.Id = await ExecuteAsync(message, CapTransaction); + await ExecuteAsync(message, CapTransaction); - if (message.Id > 0) - { - _capTransaction.AddToSent(message); - s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); - } + _capTransaction.AddToSent(message); + + s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); if (NotUseTransaction || CapTransaction.AutoCommit) { @@ -105,7 +105,7 @@ namespace DotNetCore.CAP.Abstractions protected abstract object GetDbTransaction(); - protected abstract Task ExecuteAsync(CapPublishedMessage message, + protected abstract Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction, CancellationToken cancel = default(CancellationToken)); diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index e2179ff..995668c 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -112,6 +112,7 @@ namespace DotNetCore.CAP var receivedMessage = new CapReceivedMessage(messageContext) { + Id = SnowflakeId.Default().NextId(), StatusName = StatusName.Scheduled, Content = messageBody }; @@ -170,10 +171,7 @@ namespace DotNetCore.CAP private void StoreMessage(CapReceivedMessage receivedMessage) { - var id = _connection.StoreReceivedMessageAsync(receivedMessage) - .GetAwaiter().GetResult(); - - receivedMessage.Id = id; + _connection.StoreReceivedMessageAsync(receivedMessage).GetAwaiter().GetResult(); } private (Guid, string) TracingBefore(string topic, string values) diff --git a/src/DotNetCore.CAP/IStorageConnection.cs b/src/DotNetCore.CAP/IStorageConnection.cs index 18d5ff8..b694737 100644 --- a/src/DotNetCore.CAP/IStorageConnection.cs +++ b/src/DotNetCore.CAP/IStorageConnection.cs @@ -32,7 +32,7 @@ namespace DotNetCore.CAP /// Stores the message. /// /// The message to store. - Task StoreReceivedMessageAsync(CapReceivedMessage message); + Task StoreReceivedMessageAsync(CapReceivedMessage message); /// /// Returns the message with the given id. diff --git a/src/DotNetCore.CAP/Internal/ICallbackMessageSender.Default.cs b/src/DotNetCore.CAP/Internal/ICallbackMessageSender.Default.cs index 1dc7fc8..2114eed 100644 --- a/src/DotNetCore.CAP/Internal/ICallbackMessageSender.Default.cs +++ b/src/DotNetCore.CAP/Internal/ICallbackMessageSender.Default.cs @@ -54,6 +54,7 @@ namespace DotNetCore.CAP.Internal var publishedMessage = new CapPublishedMessage { + Id = SnowflakeId.Default().NextId(), Name = topicName, Content = content, StatusName = StatusName.Scheduled diff --git a/src/DotNetCore.CAP/LoggerExtensions.cs b/src/DotNetCore.CAP/LoggerExtensions.cs index ab80367..c3497c2 100644 --- a/src/DotNetCore.CAP/LoggerExtensions.cs +++ b/src/DotNetCore.CAP/LoggerExtensions.cs @@ -10,12 +10,12 @@ namespace DotNetCore.CAP [SuppressMessage("ReSharper", "InconsistentNaming")] internal static class LoggerExtensions { - public static void ConsumerExecutedAfterThreshold(this ILogger logger, int messageId, int retries) + public static void ConsumerExecutedAfterThreshold(this ILogger logger, long messageId, int retries) { logger.LogWarning($"The Subscriber of the message({messageId}) still fails after {retries}th executions and we will stop retrying."); } - public static void SenderAfterThreshold(this ILogger logger, int messageId, int retries) + public static void SenderAfterThreshold(this ILogger logger, long messageId, int retries) { logger.LogWarning($"The Publisher of the message({messageId}) still fails after {retries}th sends and we will stop retrying."); } @@ -25,12 +25,12 @@ namespace DotNetCore.CAP logger.LogWarning(ex, "FailedThresholdCallback action raised an exception:" + ex.Message); } - public static void ConsumerExecutionRetrying(this ILogger logger, int messageId, int retries) + public static void ConsumerExecutionRetrying(this ILogger logger, long messageId, int retries) { logger.LogWarning($"The {retries}th retrying consume a message failed. message id: {messageId}"); } - public static void SenderRetrying(this ILogger logger, int messageId, int retries) + public static void SenderRetrying(this ILogger logger, long messageId, int retries) { logger.LogWarning($"The {retries}th retrying send a message failed. message id: {messageId} "); } @@ -40,7 +40,7 @@ namespace DotNetCore.CAP logger.LogDebug($"Message published. name: {name}, content:{content}."); } - public static void MessagePublishException(this ILogger logger, int messageId, string reason, Exception ex) + public static void MessagePublishException(this ILogger logger, long messageId, string reason, Exception ex) { logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}"); } diff --git a/src/DotNetCore.CAP/Models/CapPublishedMessage.cs b/src/DotNetCore.CAP/Models/CapPublishedMessage.cs index d95e72f..955063d 100644 --- a/src/DotNetCore.CAP/Models/CapPublishedMessage.cs +++ b/src/DotNetCore.CAP/Models/CapPublishedMessage.cs @@ -15,7 +15,7 @@ namespace DotNetCore.CAP.Models Added = DateTime.Now; } - public int Id { get; set; } + public long Id { get; set; } public string Name { get; set; } diff --git a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs index 790aca2..b14728e 100644 --- a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs +++ b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using DotNetCore.CAP.Infrastructure; namespace DotNetCore.CAP.Models { @@ -22,7 +23,7 @@ namespace DotNetCore.CAP.Models Content = message.Content; } - public int Id { get; set; } + public long Id { get; set; } public string Group { get; set; } diff --git a/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs b/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs index aefa831..f9fae9d 100644 --- a/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs +++ b/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs @@ -25,6 +25,7 @@ namespace DotNetCore.CAP.MySql.Test var sql = "INSERT INTO `cap.published`(`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;"; var publishMessage = new CapPublishedMessage { + Id = SnowflakeId.Default().NextId(), Name = "MySqlStorageConnectionTest", Content = "", StatusName = StatusName.Scheduled @@ -45,6 +46,7 @@ namespace DotNetCore.CAP.MySql.Test { var receivedMessage = new CapReceivedMessage { + Id = SnowflakeId.Default().NextId(), Name = "MySqlStorageConnectionTest", Content = "", Group = "mygroup", @@ -71,6 +73,7 @@ namespace DotNetCore.CAP.MySql.Test VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;"; var receivedMessage = new CapReceivedMessage { + Id = SnowflakeId.Default().NextId(), Name = "MySqlStorageConnectionTest", Content = "", Group = "mygroup", diff --git a/test/DotNetCore.CAP.Test/Processor/StateChangerTest.cs b/test/DotNetCore.CAP.Test/Processor/StateChangerTest.cs index 5a72ffa..26cb8ee 100644 --- a/test/DotNetCore.CAP.Test/Processor/StateChangerTest.cs +++ b/test/DotNetCore.CAP.Test/Processor/StateChangerTest.cs @@ -16,6 +16,7 @@ namespace DotNetCore.CAP.Test var fixture = Create(); var message = new CapPublishedMessage { + Id = SnowflakeId.Default().NextId(), StatusName = StatusName.Scheduled }; var state = Mock.Of(s => s.Name == "s" && s.ExpiresAfter == null); @@ -39,6 +40,7 @@ namespace DotNetCore.CAP.Test var fixture = Create(); var message = new CapPublishedMessage { + Id = SnowflakeId.Default().NextId(), StatusName = StatusName.Scheduled }; var state = Mock.Of(s => s.Name == "s" && s.ExpiresAfter == TimeSpan.FromHours(1));