From fa89e27cc62fb0943e99bdc9bc4b7f6573500937 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 19 Jul 2018 15:49:43 +0800 Subject: [PATCH] Release 2.2.5 (#162) * update version to 2.2.4 * Fixed Incorrect local IP address judgment of IPv6. (#140) * Fixed DateTime localization format conversion error to sql.(#139) * update version to 2.2.5 * remove unused constructor. * Fixed DateTime localization format conversion error to sql.(#139) * Improved logging * support RabbitMQ cluster configuration. * Fixed dashboard message page re-requeue and re-executed operate bug. (#158) * refactor code * refactor log extensions. * refactor retry task processor. * Fixed configuration options of FailedThresholdCallback could not be invoke when the value less then three. (#161) * update samples. * Fixed SendAsync or ExecuteAsync recursion retries bug. (#160) * Fixed SendAsync or ExecuteAsync recursion retries bug. (#160) --- build/version.props | 2 +- samples/Sample.RabbitMQ.MySql/Startup.cs | 8 +- .../MySqlStorageConnection.cs | 4 +- .../PostgreSqlStorageConnection.cs | 4 +- .../CAP.RabbiMQOptions.cs | 5 +- .../IConnectionChannelPool.Default.cs | 8 +- .../SqlServerStorageConnection.cs | 4 +- .../Abstractions/CapPublisherBase.cs | 9 +- .../Dashboard/DashboardRequest.cs | 11 +- .../Dashboard/DashboardRoutes.cs | 20 ++- .../LocalRequestsOnlyAuthorizationFilter.cs | 9 +- .../IPublishMessageSender.Base.cs | 94 ++++++++----- .../ISubscribeExecutor.Default.cs | 77 +++++++---- src/DotNetCore.CAP/LoggerExtensions.cs | 113 +++------------- .../Models/CapPublishedMessage.cs | 6 - .../Processor/IProcessor.NeedRetry.cs | 127 +----------------- 16 files changed, 189 insertions(+), 312 deletions(-) diff --git a/build/version.props b/build/version.props index 537d4af..449a52f 100644 --- a/build/version.props +++ b/build/version.props @@ -2,7 +2,7 @@ 2 2 - 4 + 5 $(VersionMajor).$(VersionMinor).$(VersionPatch) diff --git a/samples/Sample.RabbitMQ.MySql/Startup.cs b/samples/Sample.RabbitMQ.MySql/Startup.cs index 554a624..007e78e 100644 --- a/samples/Sample.RabbitMQ.MySql/Startup.cs +++ b/samples/Sample.RabbitMQ.MySql/Startup.cs @@ -1,4 +1,5 @@ -using Microsoft.AspNetCore.Builder; +using System; +using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -16,6 +17,11 @@ namespace Sample.RabbitMQ.MySql x.UseEntityFramework(); x.UseRabbitMQ("localhost"); x.UseDashboard(); + x.FailedRetryCount = 5; + x.FailedThresholdCallback = (type, name, content) => + { + Console.WriteLine($@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {name}, message body: {content}"); + }; }); services.AddMvc(); diff --git a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs index 691717d..77f9fbb 100644 --- a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs +++ b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs @@ -42,7 +42,7 @@ namespace DotNetCore.CAP.MySql public async Task> GetPublishedMessagesOfNeedRetry() { - var fourMinsAgo = DateTime.Now.AddMinutes(-4); + var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = $"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; @@ -80,7 +80,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST public async Task> GetReceivedMessagesOfNeedRetry() { - var fourMinsAgo = DateTime.Now.AddMinutes(-4); + var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = $"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; using (var connection = new MySqlConnection(Options.ConnectionString)) diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs index 763407c..3d8fba7 100644 --- a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs +++ b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs @@ -40,7 +40,7 @@ namespace DotNetCore.CAP.PostgreSql public async Task> GetPublishedMessagesOfNeedRetry() { - var fourMinsAgo = DateTime.Now.AddMinutes(-4); + var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = $"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; @@ -77,7 +77,7 @@ namespace DotNetCore.CAP.PostgreSql public async Task> GetReceivedMessagesOfNeedRetry() { - var fourMinsAgo = DateTime.Now.AddMinutes(-4); + var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = $"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; using (var connection = new NpgsqlConnection(Options.ConnectionString)) diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs index c0abcc8..afb2fd2 100644 --- a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs +++ b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs @@ -37,7 +37,10 @@ namespace DotNetCore.CAP /// The topic exchange type. public const string ExchangeType = "topic"; - /// The host to connect to. + /// + /// The host to connect to. + /// If you want connect to the cluster, you can assign like “192.168.1.111,192.168.1.112” + /// public string HostName { get; set; } = "localhost"; /// diff --git a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs index e15fdfa..b50c9dc 100644 --- a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs +++ b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs @@ -76,7 +76,6 @@ namespace DotNetCore.CAP.RabbitMQ { var factory = new ConnectionFactory { - HostName = options.HostName, UserName = options.UserName, Port = options.Port, Password = options.Password, @@ -86,6 +85,13 @@ namespace DotNetCore.CAP.RabbitMQ SocketWriteTimeout = options.SocketWriteTimeout }; + if (options.HostName.Contains(",")) + { + return () => factory.CreateConnection( + options.HostName.Split(new[] { "," }, StringSplitOptions.RemoveEmptyEntries)); + } + + factory.HostName = options.HostName; return () => factory.CreateConnection(); } diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs index 9b931c1..b204fde 100644 --- a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs +++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs @@ -40,7 +40,7 @@ namespace DotNetCore.CAP.SqlServer public async Task> GetPublishedMessagesOfNeedRetry() { - var fourMinsAgo = DateTime.Now.AddMinutes(-4); + var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = $"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; @@ -78,7 +78,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOP public async Task> GetReceivedMessagesOfNeedRetry() { - var fourMinsAgo = DateTime.Now.AddMinutes(-4); + var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = $"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; using (var connection = new SqlConnection(Options.ConnectionString)) diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs index 4d7a321..77a6c44 100644 --- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs +++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs @@ -182,7 +182,7 @@ namespace DotNetCore.CAP.Abstractions } catch (Exception e) { - _logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e); + _logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name); s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); Console.WriteLine(e); throw; @@ -204,10 +204,11 @@ namespace DotNetCore.CAP.Abstractions try { - operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); + Console.WriteLine("================22222222222222====================="); + operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); var id = Execute(DbConnection, DbTransaction, message); - + Console.WriteLine("================777777777777777777777====================="); ClosedCap(); if (id > 0) @@ -220,7 +221,7 @@ namespace DotNetCore.CAP.Abstractions } catch (Exception e) { - _logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e); + _logger.LogError(e, "An exception was occurred when publish message. message:" + name); s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); Console.WriteLine(e); throw; diff --git a/src/DotNetCore.CAP/Dashboard/DashboardRequest.cs b/src/DotNetCore.CAP/Dashboard/DashboardRequest.cs index 5fdb054..23f46b8 100644 --- a/src/DotNetCore.CAP/Dashboard/DashboardRequest.cs +++ b/src/DotNetCore.CAP/Dashboard/DashboardRequest.cs @@ -28,19 +28,14 @@ namespace DotNetCore.CAP.Dashboard public CapDashboardRequest(HttpContext context) { - if (context == null) - { - throw new ArgumentNullException(nameof(context)); - } - - _context = context; + _context = context ?? throw new ArgumentNullException(nameof(context)); } public override string Method => _context.Request.Method; public override string Path => _context.Request.Path.Value; public override string PathBase => _context.Request.PathBase.Value; - public override string LocalIpAddress => _context.Connection.LocalIpAddress.ToString(); - public override string RemoteIpAddress => _context.Connection.RemoteIpAddress.ToString(); + public override string LocalIpAddress => _context.Connection.LocalIpAddress.MapToIPv4().ToString(); + public override string RemoteIpAddress => _context.Connection.RemoteIpAddress.MapToIPv4().ToString(); public override string GetQuery(string key) { diff --git a/src/DotNetCore.CAP/Dashboard/DashboardRoutes.cs b/src/DotNetCore.CAP/Dashboard/DashboardRoutes.cs index 312da57..9a51faa 100644 --- a/src/DotNetCore.CAP/Dashboard/DashboardRoutes.cs +++ b/src/DotNetCore.CAP/Dashboard/DashboardRoutes.cs @@ -3,7 +3,7 @@ using System.Reflection; using DotNetCore.CAP.Dashboard.Pages; -using DotNetCore.CAP.Infrastructure; +using Microsoft.Extensions.DependencyInjection; namespace DotNetCore.CAP.Dashboard { @@ -83,24 +83,34 @@ namespace DotNetCore.CAP.Dashboard Routes.AddJsonResult("/published/message/(?.+)", x => { var id = int.Parse(x.UriMatch.Groups["Id"].Value); - var message = x.Storage.GetConnection().GetPublishedMessageAsync(id).GetAwaiter().GetResult(); + var message = x.Storage.GetConnection().GetPublishedMessageAsync(id) + .GetAwaiter().GetResult(); return message.Content; }); Routes.AddJsonResult("/received/message/(?.+)", x => { var id = int.Parse(x.UriMatch.Groups["Id"].Value); - var message = x.Storage.GetConnection().GetReceivedMessageAsync(id).GetAwaiter().GetResult(); + var message = x.Storage.GetConnection().GetReceivedMessageAsync(id) + .GetAwaiter().GetResult(); return message.Content; }); Routes.AddPublishBatchCommand( "/published/requeue", (client, messageId) => - client.Storage.GetConnection().ChangePublishedState(messageId, StatusName.Scheduled)); + { + var msg = client.Storage.GetConnection().GetPublishedMessageAsync(messageId) + .GetAwaiter().GetResult(); + client.RequestServices.GetService().EnqueueToPublish(msg); + }); Routes.AddPublishBatchCommand( "/received/requeue", (client, messageId) => - client.Storage.GetConnection().ChangeReceivedState(messageId, StatusName.Scheduled)); + { + var msg = client.Storage.GetConnection().GetReceivedMessageAsync(messageId) + .GetAwaiter().GetResult(); + client.RequestServices.GetService().EnqueueToExecute(msg); + }); Routes.AddRazorPage( "/published/(?.+)", diff --git a/src/DotNetCore.CAP/Dashboard/LocalRequestsOnlyAuthorizationFilter.cs b/src/DotNetCore.CAP/Dashboard/LocalRequestsOnlyAuthorizationFilter.cs index 42746d7..dafd845 100644 --- a/src/DotNetCore.CAP/Dashboard/LocalRequestsOnlyAuthorizationFilter.cs +++ b/src/DotNetCore.CAP/Dashboard/LocalRequestsOnlyAuthorizationFilter.cs @@ -9,26 +9,27 @@ namespace DotNetCore.CAP.Dashboard { public bool Authorize(DashboardContext context) { + var ipAddress = context.Request.RemoteIpAddress; // if unknown, assume not local - if (string.IsNullOrEmpty(context.Request.RemoteIpAddress)) + if (string.IsNullOrEmpty(ipAddress)) { return false; } // check if localhost - if (context.Request.RemoteIpAddress == "127.0.0.1" || context.Request.RemoteIpAddress == "::1") + if (ipAddress == "127.0.0.1" || ipAddress == "0.0.0.1") { return true; } // compare with local address - if (context.Request.RemoteIpAddress == context.Request.LocalIpAddress) + if (ipAddress == context.Request.LocalIpAddress) { return true; } // check if private ip - if (Helper.IsInnerIP(context.Request.RemoteIpAddress)) + if (Helper.IsInnerIP(ipAddress)) { return true; } diff --git a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs index a597169..cbbc780 100644 --- a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs +++ b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs @@ -43,6 +43,24 @@ namespace DotNetCore.CAP public abstract Task PublishAsync(string keyName, string content); public async Task SendAsync(CapPublishedMessage message) + { + bool retry; + OperateResult result; + do + { + var executedResult = await SendWithoutRetryAsync(message); + result = executedResult.Item2; + if (result == OperateResult.Success) + { + return result; + } + retry = executedResult.Item1; + } while (retry); + + return result; + } + + private async Task<(bool, OperateResult)> SendWithoutRetryAsync(CapPublishedMessage message) { var startTime = DateTimeOffset.UtcNow; var stopwatch = Stopwatch.StartNew(); @@ -63,60 +81,33 @@ namespace DotNetCore.CAP TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed); - return OperateResult.Success; + return (false, OperateResult.Success); } else { TracingError(operationId, message, result, startTime, stopwatch.Elapsed); - await SetFailedState(message, result.Exception, out bool stillRetry); - - if (stillRetry) - { - _logger.SenderRetrying(message.Id, message.Retries); - - await SendAsync(message); - } - return OperateResult.Failed(result.Exception); + var needRetry = await SetFailedState(message, result.Exception); + return (needRetry, OperateResult.Failed(result.Exception)); } } - private static bool UpdateMessageForRetryAsync(CapPublishedMessage message) - { - var retryBehavior = RetryBehavior.DefaultRetry; - - var retries = ++message.Retries; - if (retries >= retryBehavior.RetryCount) - { - return false; - } - - var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); - message.ExpiresAt = due; - - return true; - } private Task SetSuccessfulState(CapPublishedMessage message) { var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter); - return _stateChanger.ChangeStateAsync(message, succeededState, _connection); } - private Task SetFailedState(CapPublishedMessage message, Exception ex, out bool stillRetry) + private async Task SetFailedState(CapPublishedMessage message, Exception ex) { - IState newState = new FailedState(); - stillRetry = UpdateMessageForRetryAsync(message); - if (stillRetry) - { - _logger.ConsumerExecutionFailedWillRetry(ex); - return Task.CompletedTask; - } - AddErrorReasonToContent(message, ex); - return _stateChanger.ChangeStateAsync(message, newState, _connection); + var needRetry = UpdateMessageForRetry(message); + + await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection); + + return needRetry; } private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception) @@ -124,6 +115,37 @@ namespace DotNetCore.CAP message.Content = Helper.AddExceptionProperty(message.Content, exception); } + private bool UpdateMessageForRetry(CapPublishedMessage message) + { + var retryBehavior = RetryBehavior.DefaultRetry; + + var retries = ++message.Retries; + message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); + + var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount); + if (retries >= retryCount) + { + if (retries == _options.FailedRetryCount) + { + try + { + _options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content); + + _logger.SenderAfterThreshold(message.Id, _options.FailedRetryCount); + } + catch (Exception ex) + { + _logger.ExecutedThresholdCallbackFailed(ex); + } + } + return false; + } + + _logger.SenderRetrying(message.Id, retries); + + return true; + } + private (Guid, TracingHeaders) TracingBefore(string topic, string values) { Guid operationId = Guid.NewGuid(); diff --git a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs index fe03a15..6bd1a68 100644 --- a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs +++ b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs @@ -50,6 +50,29 @@ namespace DotNetCore.CAP private IConsumerInvoker Invoker { get; } public async Task ExecuteAsync(CapReceivedMessage message) + { + bool retry; + OperateResult result; + do + { + var executedResult = await ExecuteWithoutRetryAsync(message); + result = executedResult.Item2; + if (result == OperateResult.Success) + { + return result; + } + retry = executedResult.Item1; + } while (retry); + + return result; + } + + /// + /// Execute message consumption once. + /// + /// the message rececived of + /// Item1 is need still restry, Item2 is executed result. + private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(CapReceivedMessage message) { if (message == null) { @@ -68,65 +91,65 @@ namespace DotNetCore.CAP _logger.ConsumerExecuted(sp.Elapsed.TotalSeconds); - return OperateResult.Success; + return (false, OperateResult.Success); } catch (Exception ex) { _logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}"); - await SetFailedState(message, ex, out bool stillRetry); - if (stillRetry) - { - await ExecuteAsync(message); - } - - return OperateResult.Failed(ex); + return (await SetFailedState(message, ex), OperateResult.Failed(ex)); } } private Task SetSuccessfulState(CapReceivedMessage message) { var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter); - return _stateChanger.ChangeStateAsync(message, succeededState, _connection); } - private Task SetFailedState(CapReceivedMessage message, Exception ex, out bool stillRetry) + private async Task SetFailedState(CapReceivedMessage message, Exception ex) { - IState newState = new FailedState(); - if (ex is SubscriberNotFoundException) { - stillRetry = false; message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException } - else - { - stillRetry = UpdateMessageForRetry(message); - if (stillRetry) - { - _logger.ConsumerExecutionFailedWillRetry(ex); - return Task.CompletedTask; - } - } AddErrorReasonToContent(message, ex); - return _stateChanger.ChangeStateAsync(message, newState, _connection); + var needRetry = UpdateMessageForRetry(message); + + await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection); + + return needRetry; } - private static bool UpdateMessageForRetry(CapReceivedMessage message) + private bool UpdateMessageForRetry(CapReceivedMessage message) { var retryBehavior = RetryBehavior.DefaultRetry; var retries = ++message.Retries; - if (retries >= retryBehavior.RetryCount) + message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); + + var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount); + if (retries >= retryCount) { + if (retries == _options.FailedRetryCount) + { + try + { + _options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content); + + _logger.ConsumerExecutedAfterThreshold(message.Id, _options.FailedRetryCount); + } + catch (Exception ex) + { + _logger.ExecutedThresholdCallbackFailed(ex); + } + } return false; } - var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); - message.ExpiresAt = due; + _logger.ConsumerExecutionRetrying(message.Id, retries); return true; } diff --git a/src/DotNetCore.CAP/LoggerExtensions.cs b/src/DotNetCore.CAP/LoggerExtensions.cs index a0096e6..808096c 100644 --- a/src/DotNetCore.CAP/LoggerExtensions.cs +++ b/src/DotNetCore.CAP/LoggerExtensions.cs @@ -10,141 +10,70 @@ namespace DotNetCore.CAP [SuppressMessage("ReSharper", "InconsistentNaming")] internal static class LoggerExtensions { - private static readonly Action _serverStarting; - private static readonly Action _processorsStartingError; - private static readonly Action _serverShuttingDown; - private static readonly Action _expectedOperationCanceledException; - private static readonly Action _modelBinderFormattingException; - private static readonly Action _consumerFailedWillRetry; - private static readonly Action _consumerExecuted; - private static readonly Action _senderRetrying; - private static readonly Action _exceptionOccuredWhileExecuting; - private static readonly Action _messageHasBeenSent; - private static readonly Action _messagePublishException; - - static LoggerExtensions() + public static void ConsumerExecutedAfterThreshold(this ILogger logger, int messageId, int retries) { - _serverStarting = LoggerMessage.Define( - LogLevel.Debug, - 1, - "Starting the processing server."); - - _processorsStartingError = LoggerMessage.Define( - LogLevel.Error, - 5, - "Starting the processors throw an exception."); - - _serverShuttingDown = LoggerMessage.Define( - LogLevel.Information, - 2, - "Shutting down the processing server..."); - - _expectedOperationCanceledException = LoggerMessage.Define( - LogLevel.Warning, - 3, - "Expected an OperationCanceledException, but found '{ExceptionMessage}'."); - - LoggerMessage.Define( - LogLevel.Error, - 5, - "Consumer method '{methodName}' failed to execute."); - - LoggerMessage.Define( - LogLevel.Error, - 5, - "Received message topic method '{topicName}' failed to execute."); - - _modelBinderFormattingException = LoggerMessage.Define( - LogLevel.Error, - 5, - "When call subscribe method, a parameter format conversion exception occurs. MethodName:'{MethodName}' ParameterName:'{ParameterName}' Content:'{Content}'." - ); - - _senderRetrying = LoggerMessage.Define( - LogLevel.Debug, - 3, - "The {Retries}th retrying send a message failed. message id: {MessageId} "); - - _consumerExecuted = LoggerMessage.Define( - LogLevel.Debug, - 4, - "Consumer executed. Took: {Seconds} secs."); - - _consumerFailedWillRetry = LoggerMessage.Define( - LogLevel.Warning, - 2, - "Consumer failed to execute. Will retry."); - - _exceptionOccuredWhileExecuting = LoggerMessage.Define( - LogLevel.Error, - 6, - "An exception occured while trying to store a message. message id: {MessageId}"); + logger.LogWarning($"The Subscriber of the message({messageId}) still fails after {retries}th executions and we will stop retrying."); + } - _messageHasBeenSent = LoggerMessage.Define( - LogLevel.Debug, - 4, - "Message published. Took: {Seconds} secs."); + public static void SenderAfterThreshold(this ILogger logger, int messageId, int retries) + { + logger.LogWarning($"The Publisher of the message({messageId}) still fails after {retries}th sends and we will stop retrying."); + } - _messagePublishException = LoggerMessage.Define( - LogLevel.Error, - 6, - "An exception occured while publishing a message, reason:{Reason}. message id:{MessageId}"); + public static void ExecutedThresholdCallbackFailed(this ILogger logger, Exception ex) + { + logger.LogWarning(ex, "FailedThresholdCallback action raised an exception:" + ex.Message); } - public static void ConsumerExecutionFailedWillRetry(this ILogger logger, Exception ex) + public static void ConsumerExecutionRetrying(this ILogger logger, int messageId, int retries) { - _consumerFailedWillRetry(logger, ex); + logger.LogWarning($"The {retries}th retrying consume a message failed. message id: {messageId}"); } public static void SenderRetrying(this ILogger logger, int messageId, int retries) { - _senderRetrying(logger, messageId, retries, null); + logger.LogWarning($"The {retries}th retrying send a message failed. message id: {messageId} "); } public static void MessageHasBeenSent(this ILogger logger, double seconds) { - _messageHasBeenSent(logger, seconds, null); + logger.LogDebug($"Message published. Took: {seconds} secs."); } public static void MessagePublishException(this ILogger logger, int messageId, string reason, Exception ex) { - _messagePublishException(logger, messageId, reason, ex); + logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}"); } public static void ConsumerExecuted(this ILogger logger, double seconds) { - _consumerExecuted(logger, seconds, null); + logger.LogDebug($"Consumer executed. Took: {seconds} secs."); } public static void ServerStarting(this ILogger logger) { - _serverStarting(logger, null); + logger.LogInformation("Starting the processing server."); } public static void ProcessorsStartedError(this ILogger logger, Exception ex) { - _processorsStartingError(logger, ex); + logger.LogError(ex, "Starting the processors throw an exception."); } public static void ServerShuttingDown(this ILogger logger) { - _serverShuttingDown(logger, null); + logger.LogInformation("Shutting down the processing server..."); } public static void ExpectedOperationCanceledException(this ILogger logger, Exception ex) { - _expectedOperationCanceledException(logger, ex.Message, ex); - } - - public static void ExceptionOccuredWhileExecuting(this ILogger logger, string messageId, Exception ex) - { - _exceptionOccuredWhileExecuting(logger, messageId, ex); + logger.LogWarning(ex, $"Expected an OperationCanceledException, but found '{ex.Message}'."); } public static void ModelBinderFormattingException(this ILogger logger, string methodName, string parameterName, string content, Exception ex) { - _modelBinderFormattingException(logger, methodName, parameterName, content, ex); + logger.LogError(ex, $"When call subscribe method, a parameter format conversion exception occurs. MethodName:'{methodName}' ParameterName:'{parameterName}' Content:'{content}'."); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Models/CapPublishedMessage.cs b/src/DotNetCore.CAP/Models/CapPublishedMessage.cs index 41c554d..d95e72f 100644 --- a/src/DotNetCore.CAP/Models/CapPublishedMessage.cs +++ b/src/DotNetCore.CAP/Models/CapPublishedMessage.cs @@ -15,12 +15,6 @@ namespace DotNetCore.CAP.Models Added = DateTime.Now; } - public CapPublishedMessage(MessageContext message) - { - Name = message.Name; - Content = message.Content; - } - public int Id { get; set; } public string Name { get; set; } diff --git a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs index 2f7b829..36a6deb 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs @@ -3,11 +3,7 @@ using System; using System.Threading.Tasks; -using DotNetCore.CAP.Infrastructure; -using DotNetCore.CAP.Models; -using DotNetCore.CAP.Processor.States; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Processor @@ -15,26 +11,18 @@ namespace DotNetCore.CAP.Processor public class NeedRetryMessageProcessor : IProcessor { private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); - private readonly ILogger _logger; - private readonly CapOptions _options; - private readonly IPublishExecutor _publishExecutor; - private readonly IStateChanger _stateChanger; + private readonly IPublishMessageSender _publishMessageSender; private readonly ISubscriberExecutor _subscriberExecutor; private readonly TimeSpan _waitingInterval; public NeedRetryMessageProcessor( IOptions options, - ILogger logger, - IStateChanger stateChanger, ISubscriberExecutor subscriberExecutor, - IPublishExecutor publishExecutor) + IPublishMessageSender publishMessageSender) { - _options = options.Value; - _logger = logger; - _stateChanger = stateChanger; _subscriberExecutor = subscriberExecutor; - _publishExecutor = publishExecutor; - _waitingInterval = TimeSpan.FromSeconds(_options.FailedRetryInterval); + _publishMessageSender = publishMessageSender; + _waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval); } public async Task ProcessAsync(ProcessingContext context) @@ -56,57 +44,10 @@ namespace DotNetCore.CAP.Processor private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context) { var messages = await connection.GetPublishedMessagesOfNeedRetry(); - var hasException = false; foreach (var message in messages) { - if (message.Retries > _options.FailedRetryCount) - { - continue; - } - - using (var transaction = connection.CreateTransaction()) - { - var result = await _publishExecutor.PublishAsync(message.Name, message.Content); - if (result.Succeeded) - { - _stateChanger.ChangeState(message, new SucceededState(), transaction); - _logger.LogInformation("The message was sent successfully during the retry. MessageId:" + message.Id); - } - else - { - message.Content = Helper.AddExceptionProperty(message.Content, result.Exception); - message.Retries++; - if (message.StatusName == StatusName.Scheduled) - { - message.ExpiresAt = GetDueTime(message.Added, message.Retries); - message.StatusName = StatusName.Failed; - } - transaction.UpdateMessage(message); - - if (message.Retries >= _options.FailedRetryCount) - { - _logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " + - "MessageId:" + message.Id); - if (message.Retries == _options.FailedRetryCount) - { - if (!hasException) - { - try - { - _options.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Name, message.Content); - } - catch (Exception ex) - { - hasException = true; - _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message); - } - } - } - } - } - await transaction.CommitAsync(); - } + await _publishMessageSender.SendAsync(message); context.ThrowIfStopping(); @@ -117,69 +58,15 @@ namespace DotNetCore.CAP.Processor private async Task ProcessReceivedAsync(IStorageConnection connection, ProcessingContext context) { var messages = await connection.GetReceivedMessagesOfNeedRetry(); - var hasException = false; foreach (var message in messages) { - if (message.Retries > _options.FailedRetryCount) - { - continue; - } - - using (var transaction = connection.CreateTransaction()) - { - var result = await _subscriberExecutor.ExecuteAsync(message); - if (result.Succeeded) - { - _stateChanger.ChangeState(message, new SucceededState(), transaction); - _logger.LogInformation("The message was execute successfully during the retry. MessageId:" + message.Id); - } - else - { - message.Content = Helper.AddExceptionProperty(message.Content, result.Exception); - message.Retries++; - if (message.StatusName == StatusName.Scheduled) - { - message.ExpiresAt = GetDueTime(message.Added, message.Retries); - message.StatusName = StatusName.Failed; - } - transaction.UpdateMessage(message); - - if (message.Retries >= _options.FailedRetryCount) - { - _logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " + - "We will stop retrying to execute the message. message id:" + message.Id); - - if (message.Retries == _options.FailedRetryCount) - { - if (!hasException) - { - try - { - _options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content); - } - catch (Exception ex) - { - hasException = true; - _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message); - } - } - } - } - } - await transaction.CommitAsync(); - } + await _subscriberExecutor.ExecuteAsync(message); context.ThrowIfStopping(); await context.WaitAsync(_delay); } - } - - public DateTime GetDueTime(DateTime addedTime, int retries) - { - var retryBehavior = RetryBehavior.DefaultRetry; - return addedTime.AddSeconds(retryBehavior.RetryIn(retries)); - } + } } } \ No newline at end of file