From 3a8bcf0324423c63712db70c62ecdb38564072fa Mon Sep 17 00:00:00 2001 From: Savorboard Date: Sat, 28 Apr 2018 16:03:36 +0800 Subject: [PATCH] Release 2.2.2 (#121) * fixed message enqueue exception in v2.2 * add log to samples * Improved log output. #114 * add default timeout configuration for kafka client. * fixed retry processor bugs. * Fixed kafka producer exception log without logging when publish message. * update version num to 2.2.2 * rename configuration options FailedCallback to FailedThresholdCallback * rename files name. * remove unused files. * modify the error comments. * update samples. * add logs. --- build/version.props | 2 +- .../Controllers/ValuesController.cs | 2 +- samples/Sample.Kafka.MySql/Program.cs | 7 +- .../Sample.Kafka.MySql.csproj | 6 ++ samples/Sample.Kafka.MySql/nlog.config | 26 +++++ samples/Sample.RabbitMQ.MySql/AppDbContext.cs | 9 +- .../Controllers/ValuesController.cs | 4 +- samples/Sample.RabbitMQ.MySql/Program.cs | 15 ++- .../Sample.RabbitMQ.MySql.csproj | 6 ++ samples/Sample.RabbitMQ.MySql/nlog.config | 26 +++++ src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs | 2 + ...tionPool.cs => IConnectionPool.Default.cs} | 0 .../IPublishMessageSender.Kafka.cs | 8 +- ...l.cs => IConnectionChannelPool.Default.cs} | 0 .../Abstractions/CapPublisherBase.cs | 2 +- src/DotNetCore.CAP/CAP.Options.cs | 4 +- src/DotNetCore.CAP/ICapPublisher.cs | 4 +- src/DotNetCore.CAP/ICapSubscribe.cs | 2 +- .../IConsumerHandler.Default.cs | 2 + src/DotNetCore.CAP/IFetchedMessage.cs | 19 ---- .../IPublishMessageSender.Base.cs | 33 +++--- src/DotNetCore.CAP/Infrastructure/Helper.cs | 1 - .../Internal/PublisherSentFailedException.cs | 4 + src/DotNetCore.CAP/LoggerExtensions.cs | 22 ++-- .../Processor/IProcessor.NeedRetry.cs | 102 +++++++++++++----- 25 files changed, 198 insertions(+), 110 deletions(-) create mode 100644 samples/Sample.Kafka.MySql/nlog.config create mode 100644 samples/Sample.RabbitMQ.MySql/nlog.config rename src/DotNetCore.CAP.Kafka/{ConnectionPool.cs => IConnectionPool.Default.cs} (100%) rename src/DotNetCore.CAP.RabbitMQ/{ConnectionChannelPool.cs => IConnectionChannelPool.Default.cs} (100%) delete mode 100644 src/DotNetCore.CAP/IFetchedMessage.cs diff --git a/build/version.props b/build/version.props index 6f4fc75..ae5b04f 100644 --- a/build/version.props +++ b/build/version.props @@ -2,7 +2,7 @@ 2 2 - 1 + 2 $(VersionMajor).$(VersionMinor).$(VersionPatch) diff --git a/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs b/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs index afcec67..6f04946 100644 --- a/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs +++ b/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs @@ -37,7 +37,7 @@ namespace Sample.Kafka.MySql.Controllers [CapSubscribe("xxx.xxx.test2")] public void Test2(int value) { - Console.WriteLine(value); + Console.WriteLine("Subscriber output message: " + value); } } } \ No newline at end of file diff --git a/samples/Sample.Kafka.MySql/Program.cs b/samples/Sample.Kafka.MySql/Program.cs index 7d62e55..976b20a 100644 --- a/samples/Sample.Kafka.MySql/Program.cs +++ b/samples/Sample.Kafka.MySql/Program.cs @@ -1,5 +1,6 @@ using Microsoft.AspNetCore; using Microsoft.AspNetCore.Hosting; +using NLog.Web; namespace Sample.Kafka.MySql { @@ -14,7 +15,11 @@ namespace Sample.Kafka.MySql public static IWebHost BuildWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) .UseStartup() + .ConfigureLogging((hostingContext, builder) => + { + hostingContext.HostingEnvironment.ConfigureNLog("nlog.config"); + }) + .UseNLog() .Build(); - } } \ No newline at end of file diff --git a/samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj b/samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj index ad812ae..88c0c1d 100644 --- a/samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj +++ b/samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj @@ -10,6 +10,7 @@ + @@ -20,5 +21,10 @@ + + + PreserveNewest + + diff --git a/samples/Sample.Kafka.MySql/nlog.config b/samples/Sample.Kafka.MySql/nlog.config new file mode 100644 index 0000000..5b91105 --- /dev/null +++ b/samples/Sample.Kafka.MySql/nlog.config @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/samples/Sample.RabbitMQ.MySql/AppDbContext.cs b/samples/Sample.RabbitMQ.MySql/AppDbContext.cs index f803e26..ec83423 100644 --- a/samples/Sample.RabbitMQ.MySql/AppDbContext.cs +++ b/samples/Sample.RabbitMQ.MySql/AppDbContext.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore; namespace Sample.RabbitMQ.MySql { @@ -10,8 +6,7 @@ namespace Sample.RabbitMQ.MySql { protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { - optionsBuilder.UseMySql("Server=localhost;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;Allow User Variables=True"); - //optionsBuilder.UseMySql("Server=192.168.2.206;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;"); + optionsBuilder.UseMySql("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;"); } } } diff --git a/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs b/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs index 7b30772..1b47e5e 100644 --- a/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs +++ b/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs @@ -21,7 +21,7 @@ namespace Sample.RabbitMQ.MySql.Controllers public IActionResult PublishMessage() { _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); - + return Ok(); } @@ -50,7 +50,7 @@ namespace Sample.RabbitMQ.MySql.Controllers [CapSubscribe("sample.rabbitmq.mysql")] public void ReceiveMessage(DateTime time) { - Console.WriteLine("[sample.rabbitmq.mysql] message received: "+ DateTime.Now.ToString() +" , sent time: " + time.ToString()); + Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now + ",sent time: " + time); } } } diff --git a/samples/Sample.RabbitMQ.MySql/Program.cs b/samples/Sample.RabbitMQ.MySql/Program.cs index 94095e2..3cbbe15 100644 --- a/samples/Sample.RabbitMQ.MySql/Program.cs +++ b/samples/Sample.RabbitMQ.MySql/Program.cs @@ -1,12 +1,6 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Threading.Tasks; -using Microsoft.AspNetCore; -using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore; using Microsoft.AspNetCore.Hosting; -using Microsoft.Extensions.Configuration; +using NLog.Web; namespace Sample.RabbitMQ.MySql { @@ -20,6 +14,11 @@ namespace Sample.RabbitMQ.MySql public static IWebHost BuildWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) .UseStartup() + .ConfigureLogging((hostingContext, builder) => + { + hostingContext.HostingEnvironment.ConfigureNLog("nlog.config"); + }) + .UseNLog() .Build(); } } diff --git a/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj b/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj index d3a3987..5c981a3 100644 --- a/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj +++ b/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj @@ -12,6 +12,7 @@ + @@ -21,5 +22,10 @@ + + + PreserveNewest + + diff --git a/samples/Sample.RabbitMQ.MySql/nlog.config b/samples/Sample.RabbitMQ.MySql/nlog.config new file mode 100644 index 0000000..bba5bea --- /dev/null +++ b/samples/Sample.RabbitMQ.MySql/nlog.config @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs index 629963b..0e4161b 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs @@ -57,6 +57,8 @@ namespace DotNetCore.CAP MainConfig["socket.blocking.max.ms"] = "10"; MainConfig["enable.auto.commit"] = "false"; MainConfig["log.connection.close"] = "false"; + MainConfig["request.timeout.ms"] = "3000"; + MainConfig["message.timeout.ms"] = "5000"; _kafkaConfig = MainConfig.AsEnumerable(); } diff --git a/src/DotNetCore.CAP.Kafka/ConnectionPool.cs b/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs similarity index 100% rename from src/DotNetCore.CAP.Kafka/ConnectionPool.cs rename to src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs diff --git a/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs b/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs index f3e4e5d..4527eda 100644 --- a/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs +++ b/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs @@ -36,11 +36,7 @@ namespace DotNetCore.CAP.Kafka if (message.Error.HasError) { - return OperateResult.Failed(new OperateError - { - Code = message.Error.Code.ToString(), - Description = message.Error.Reason - }); + throw new PublisherSentFailedException(message.Error.ToString()); } _logger.LogDebug($"kafka topic message [{keyName}] has been published."); @@ -61,6 +57,6 @@ namespace DotNetCore.CAP.Kafka producer.Dispose(); } } - } + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/ConnectionChannelPool.cs b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs similarity index 100% rename from src/DotNetCore.CAP.RabbitMQ/ConnectionChannelPool.cs rename to src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs index 26760b2..4d7a321 100644 --- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs +++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs @@ -132,7 +132,7 @@ namespace DotNetCore.CAP.Abstractions { throw new InvalidOperationException( "If you are using the EntityFramework, you need to configure the DbContextType first." + - " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + " otherwise you need to use overloaded method with IDbTransaction."); } } diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 267a61f..05c0294 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -58,9 +58,9 @@ namespace DotNetCore.CAP public int FailedRetryInterval { get; set; } /// - /// We’ll invoke this call-back with message type,name,content when requeue failed message. + /// We’ll invoke this call-back with message type,name,content when retry failed (send or executed) messages equals times. /// - public Action FailedCallback { get; set; } + public Action FailedThresholdCallback { get; set; } /// /// The number of message retries, the retry will stop when the threshold is reached. diff --git a/src/DotNetCore.CAP/ICapPublisher.cs b/src/DotNetCore.CAP/ICapPublisher.cs index 8ba45e1..0688c02 100644 --- a/src/DotNetCore.CAP/ICapPublisher.cs +++ b/src/DotNetCore.CAP/ICapPublisher.cs @@ -15,7 +15,7 @@ namespace DotNetCore.CAP /// (EntityFramework) Asynchronous publish a object message. /// /// If you are using the EntityFramework, you need to configure the DbContextType first. - /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. + /// otherwise you need to use overloaded method with IDbTransaction. /// /// /// The type of content object. @@ -28,7 +28,7 @@ namespace DotNetCore.CAP /// (EntityFramework) Publish a object message. /// /// If you are using the EntityFramework, you need to configure the DbContextType first. - /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. + /// otherwise you need to use overloaded method with IDbTransaction. /// /// /// The type of content object. diff --git a/src/DotNetCore.CAP/ICapSubscribe.cs b/src/DotNetCore.CAP/ICapSubscribe.cs index 9b536ad..e223b84 100644 --- a/src/DotNetCore.CAP/ICapSubscribe.cs +++ b/src/DotNetCore.CAP/ICapSubscribe.cs @@ -4,7 +4,7 @@ namespace DotNetCore.CAP { /// - /// An empty interface, which is used to mark the current class have a CAP methods. + /// An empty interface, which is used to mark the current class have a CAP subscriber methods. /// public interface ICapSubscribe { diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 8d48a37..1d441ff 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -178,6 +178,8 @@ namespace DotNetCore.CAP private (Guid, string) TracingBefore(string topic, string values) { + _logger.LogDebug("CAP received topic message:" + topic); + Guid operationId = Guid.NewGuid(); var eventData = new BrokerConsumeEventData( diff --git a/src/DotNetCore.CAP/IFetchedMessage.cs b/src/DotNetCore.CAP/IFetchedMessage.cs deleted file mode 100644 index 34a78ca..0000000 --- a/src/DotNetCore.CAP/IFetchedMessage.cs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using DotNetCore.CAP.Models; - -namespace DotNetCore.CAP -{ - public interface IFetchedMessage : IDisposable - { - int MessageId { get; } - - MessageType MessageType { get; } - - void RemoveFromQueue(); - - void Requeue(); - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs index cc82be6..a597169 100644 --- a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs +++ b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs @@ -67,15 +67,13 @@ namespace DotNetCore.CAP } else { - TracingError(operationId, message.Name, sendValues, result.Exception, startTime, stopwatch.Elapsed); - - _logger.MessagePublishException(message.Id, result.Exception); + TracingError(operationId, message, result, startTime, stopwatch.Elapsed); await SetFailedState(message, result.Exception, out bool stillRetry); if (stillRetry) { - _logger.SenderRetrying(3); + _logger.SenderRetrying(message.Id, message.Retries); await SendAsync(message); } @@ -109,20 +107,11 @@ namespace DotNetCore.CAP private Task SetFailedState(CapPublishedMessage message, Exception ex, out bool stillRetry) { IState newState = new FailedState(); - - if (ex is PublisherSentFailedException) - { - stillRetry = false; - message.Retries = _options.FailedRetryCount; // not retry if PublisherSentFailedException - } - else + stillRetry = UpdateMessageForRetryAsync(message); + if (stillRetry) { - stillRetry = UpdateMessageForRetryAsync(message); - if (stillRetry) - { - _logger.ConsumerExecutionFailedWillRetry(ex); - return Task.CompletedTask; - } + _logger.ConsumerExecutionFailedWillRetry(ex); + return Task.CompletedTask; } AddErrorReasonToContent(message, ex); @@ -166,14 +155,18 @@ namespace DotNetCore.CAP _logger.MessageHasBeenSent(du.TotalSeconds); } - private void TracingError(Guid operationId, string topic, string values, Exception ex, DateTimeOffset startTime, TimeSpan du) + private void TracingError(Guid operationId, CapPublishedMessage message, OperateResult result, DateTimeOffset startTime, TimeSpan du) { + var ex = new PublisherSentFailedException(result.ToString(), result.Exception); + + _logger.MessagePublishException(message.Id, result.ToString(), ex); + var eventData = new BrokerPublishErrorEventData( operationId, "", ServersAddress, - topic, - values, + message.Name, + message.Content, ex, startTime, du); diff --git a/src/DotNetCore.CAP/Infrastructure/Helper.cs b/src/DotNetCore.CAP/Infrastructure/Helper.cs index 83d128f..3f023b9 100644 --- a/src/DotNetCore.CAP/Infrastructure/Helper.cs +++ b/src/DotNetCore.CAP/Infrastructure/Helper.cs @@ -86,7 +86,6 @@ namespace DotNetCore.CAP.Infrastructure return !CanConvertFromString(type); } - public static string AddExceptionProperty(string json, Exception exception) { var jObject = ToJObject(exception); diff --git a/src/DotNetCore.CAP/Internal/PublisherSentFailedException.cs b/src/DotNetCore.CAP/Internal/PublisherSentFailedException.cs index b0f0a2c..9be1758 100644 --- a/src/DotNetCore.CAP/Internal/PublisherSentFailedException.cs +++ b/src/DotNetCore.CAP/Internal/PublisherSentFailedException.cs @@ -7,6 +7,10 @@ namespace DotNetCore.CAP.Internal { public class PublisherSentFailedException : Exception { + public PublisherSentFailedException(string message) : base(message) + { + } + public PublisherSentFailedException(string message, Exception ex) : base(message, ex) { } diff --git a/src/DotNetCore.CAP/LoggerExtensions.cs b/src/DotNetCore.CAP/LoggerExtensions.cs index 031be9d..a0096e6 100644 --- a/src/DotNetCore.CAP/LoggerExtensions.cs +++ b/src/DotNetCore.CAP/LoggerExtensions.cs @@ -17,10 +17,10 @@ namespace DotNetCore.CAP private static readonly Action _modelBinderFormattingException; private static readonly Action _consumerFailedWillRetry; private static readonly Action _consumerExecuted; - private static readonly Action _senderRetrying; + private static readonly Action _senderRetrying; private static readonly Action _exceptionOccuredWhileExecuting; private static readonly Action _messageHasBeenSent; - private static readonly Action _messagePublishException; + private static readonly Action _messagePublishException; static LoggerExtensions() { @@ -60,10 +60,10 @@ namespace DotNetCore.CAP "When call subscribe method, a parameter format conversion exception occurs. MethodName:'{MethodName}' ParameterName:'{ParameterName}' Content:'{Content}'." ); - _senderRetrying = LoggerMessage.Define( + _senderRetrying = LoggerMessage.Define( LogLevel.Debug, 3, - "Retrying send a message: {Retries}..."); + "The {Retries}th retrying send a message failed. message id: {MessageId} "); _consumerExecuted = LoggerMessage.Define( LogLevel.Debug, @@ -78,17 +78,17 @@ namespace DotNetCore.CAP _exceptionOccuredWhileExecuting = LoggerMessage.Define( LogLevel.Error, 6, - "An exception occured while trying to store a message: '{MessageId}'. "); + "An exception occured while trying to store a message. message id: {MessageId}"); _messageHasBeenSent = LoggerMessage.Define( LogLevel.Debug, 4, "Message published. Took: {Seconds} secs."); - _messagePublishException = LoggerMessage.Define( + _messagePublishException = LoggerMessage.Define( LogLevel.Error, 6, - "An exception occured while publishing a message: '{MessageId}'. "); + "An exception occured while publishing a message, reason:{Reason}. message id:{MessageId}"); } public static void ConsumerExecutionFailedWillRetry(this ILogger logger, Exception ex) @@ -96,9 +96,9 @@ namespace DotNetCore.CAP _consumerFailedWillRetry(logger, ex); } - public static void SenderRetrying(this ILogger logger, int retries) + public static void SenderRetrying(this ILogger logger, int messageId, int retries) { - _senderRetrying(logger, retries, null); + _senderRetrying(logger, messageId, retries, null); } public static void MessageHasBeenSent(this ILogger logger, double seconds) @@ -106,9 +106,9 @@ namespace DotNetCore.CAP _messageHasBeenSent(logger, seconds, null); } - public static void MessagePublishException(this ILogger logger, int messageId, Exception ex) + public static void MessagePublishException(this ILogger logger, int messageId, string reason, Exception ex) { - _messagePublishException(logger, messageId, ex); + _messagePublishException(logger, messageId, reason, ex); } public static void ConsumerExecuted(this ILogger logger, double seconds) diff --git a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs index b1109a3..2f7b829 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs @@ -65,33 +65,46 @@ namespace DotNetCore.CAP.Processor continue; } - if (!hasException) - { - try - { - _options.FailedCallback?.Invoke(MessageType.Publish, message.Name, message.Content); - } - catch (Exception ex) - { - hasException = true; - _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message); - } - } - using (var transaction = connection.CreateTransaction()) { - try + var result = await _publishExecutor.PublishAsync(message.Name, message.Content); + if (result.Succeeded) { - await _publishExecutor.PublishAsync(message.Name, message.Content); - _stateChanger.ChangeState(message, new SucceededState(), transaction); + _logger.LogInformation("The message was sent successfully during the retry. MessageId:" + message.Id); } - catch (Exception e) + else { - message.Content = Helper.AddExceptionProperty(message.Content, e); + message.Content = Helper.AddExceptionProperty(message.Content, result.Exception); + message.Retries++; + if (message.StatusName == StatusName.Scheduled) + { + message.ExpiresAt = GetDueTime(message.Added, message.Retries); + message.StatusName = StatusName.Failed; + } transaction.UpdateMessage(message); - } + if (message.Retries >= _options.FailedRetryCount) + { + _logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " + + "MessageId:" + message.Id); + if (message.Retries == _options.FailedRetryCount) + { + if (!hasException) + { + try + { + _options.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Name, message.Content); + } + catch (Exception ex) + { + hasException = true; + _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message); + } + } + } + } + } await transaction.CommitAsync(); } @@ -113,25 +126,60 @@ namespace DotNetCore.CAP.Processor continue; } - if (!hasException) + using (var transaction = connection.CreateTransaction()) { - try + var result = await _subscriberExecutor.ExecuteAsync(message); + if (result.Succeeded) { - _options.FailedCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content); + _stateChanger.ChangeState(message, new SucceededState(), transaction); + _logger.LogInformation("The message was execute successfully during the retry. MessageId:" + message.Id); } - catch (Exception ex) + else { - hasException = true; - _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message); + message.Content = Helper.AddExceptionProperty(message.Content, result.Exception); + message.Retries++; + if (message.StatusName == StatusName.Scheduled) + { + message.ExpiresAt = GetDueTime(message.Added, message.Retries); + message.StatusName = StatusName.Failed; + } + transaction.UpdateMessage(message); + + if (message.Retries >= _options.FailedRetryCount) + { + _logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " + + "We will stop retrying to execute the message. message id:" + message.Id); + + if (message.Retries == _options.FailedRetryCount) + { + if (!hasException) + { + try + { + _options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content); + } + catch (Exception ex) + { + hasException = true; + _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message); + } + } + } + } } + await transaction.CommitAsync(); } - await _subscriberExecutor.ExecuteAsync(message); - context.ThrowIfStopping(); await context.WaitAsync(_delay); } } + + public DateTime GetDueTime(DateTime addedTime, int retries) + { + var retryBehavior = RetryBehavior.DefaultRetry; + return addedTime.AddSeconds(retryBehavior.RetryIn(retries)); + } } } \ No newline at end of file