diff --git a/build/version.props b/build/version.props
index 6f4fc75..ae5b04f 100644
--- a/build/version.props
+++ b/build/version.props
@@ -2,7 +2,7 @@
2
2
- 1
+ 2
$(VersionMajor).$(VersionMinor).$(VersionPatch)
diff --git a/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs b/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs
index afcec67..6f04946 100644
--- a/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs
+++ b/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs
@@ -37,7 +37,7 @@ namespace Sample.Kafka.MySql.Controllers
[CapSubscribe("xxx.xxx.test2")]
public void Test2(int value)
{
- Console.WriteLine(value);
+ Console.WriteLine("Subscriber output message: " + value);
}
}
}
\ No newline at end of file
diff --git a/samples/Sample.Kafka.MySql/Program.cs b/samples/Sample.Kafka.MySql/Program.cs
index 7d62e55..976b20a 100644
--- a/samples/Sample.Kafka.MySql/Program.cs
+++ b/samples/Sample.Kafka.MySql/Program.cs
@@ -1,5 +1,6 @@
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
+using NLog.Web;
namespace Sample.Kafka.MySql
{
@@ -14,7 +15,11 @@ namespace Sample.Kafka.MySql
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup()
+ .ConfigureLogging((hostingContext, builder) =>
+ {
+ hostingContext.HostingEnvironment.ConfigureNLog("nlog.config");
+ })
+ .UseNLog()
.Build();
-
}
}
\ No newline at end of file
diff --git a/samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj b/samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj
index ad812ae..88c0c1d 100644
--- a/samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj
+++ b/samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj
@@ -10,6 +10,7 @@
+
@@ -20,5 +21,10 @@
+
+
+ PreserveNewest
+
+
diff --git a/samples/Sample.Kafka.MySql/nlog.config b/samples/Sample.Kafka.MySql/nlog.config
new file mode 100644
index 0000000..5b91105
--- /dev/null
+++ b/samples/Sample.Kafka.MySql/nlog.config
@@ -0,0 +1,26 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/samples/Sample.RabbitMQ.MySql/AppDbContext.cs b/samples/Sample.RabbitMQ.MySql/AppDbContext.cs
index f803e26..ec83423 100644
--- a/samples/Sample.RabbitMQ.MySql/AppDbContext.cs
+++ b/samples/Sample.RabbitMQ.MySql/AppDbContext.cs
@@ -1,8 +1,4 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
-using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore;
namespace Sample.RabbitMQ.MySql
{
@@ -10,8 +6,7 @@ namespace Sample.RabbitMQ.MySql
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
- optionsBuilder.UseMySql("Server=localhost;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;Allow User Variables=True");
- //optionsBuilder.UseMySql("Server=192.168.2.206;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;");
+ optionsBuilder.UseMySql("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;");
}
}
}
diff --git a/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs b/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs
index 7b30772..1b47e5e 100644
--- a/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs
+++ b/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs
@@ -21,7 +21,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
public IActionResult PublishMessage()
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
-
+
return Ok();
}
@@ -50,7 +50,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
[CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage(DateTime time)
{
- Console.WriteLine("[sample.rabbitmq.mysql] message received: "+ DateTime.Now.ToString() +" , sent time: " + time.ToString());
+ Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now + ",sent time: " + time);
}
}
}
diff --git a/samples/Sample.RabbitMQ.MySql/Program.cs b/samples/Sample.RabbitMQ.MySql/Program.cs
index 94095e2..3cbbe15 100644
--- a/samples/Sample.RabbitMQ.MySql/Program.cs
+++ b/samples/Sample.RabbitMQ.MySql/Program.cs
@@ -1,12 +1,6 @@
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using System.Threading.Tasks;
-using Microsoft.AspNetCore;
-using Microsoft.AspNetCore.Builder;
+using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
-using Microsoft.Extensions.Configuration;
+using NLog.Web;
namespace Sample.RabbitMQ.MySql
{
@@ -20,6 +14,11 @@ namespace Sample.RabbitMQ.MySql
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup()
+ .ConfigureLogging((hostingContext, builder) =>
+ {
+ hostingContext.HostingEnvironment.ConfigureNLog("nlog.config");
+ })
+ .UseNLog()
.Build();
}
}
diff --git a/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj b/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj
index d3a3987..5c981a3 100644
--- a/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj
+++ b/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj
@@ -12,6 +12,7 @@
+
@@ -21,5 +22,10 @@
+
+
+ PreserveNewest
+
+
diff --git a/samples/Sample.RabbitMQ.MySql/nlog.config b/samples/Sample.RabbitMQ.MySql/nlog.config
new file mode 100644
index 0000000..bba5bea
--- /dev/null
+++ b/samples/Sample.RabbitMQ.MySql/nlog.config
@@ -0,0 +1,26 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs
index 629963b..0e4161b 100644
--- a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs
+++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs
@@ -57,6 +57,8 @@ namespace DotNetCore.CAP
MainConfig["socket.blocking.max.ms"] = "10";
MainConfig["enable.auto.commit"] = "false";
MainConfig["log.connection.close"] = "false";
+ MainConfig["request.timeout.ms"] = "3000";
+ MainConfig["message.timeout.ms"] = "5000";
_kafkaConfig = MainConfig.AsEnumerable();
}
diff --git a/src/DotNetCore.CAP.Kafka/ConnectionPool.cs b/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs
similarity index 100%
rename from src/DotNetCore.CAP.Kafka/ConnectionPool.cs
rename to src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs
diff --git a/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs b/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs
index f3e4e5d..4527eda 100644
--- a/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs
+++ b/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs
@@ -36,11 +36,7 @@ namespace DotNetCore.CAP.Kafka
if (message.Error.HasError)
{
- return OperateResult.Failed(new OperateError
- {
- Code = message.Error.Code.ToString(),
- Description = message.Error.Reason
- });
+ throw new PublisherSentFailedException(message.Error.ToString());
}
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");
@@ -61,6 +57,6 @@ namespace DotNetCore.CAP.Kafka
producer.Dispose();
}
}
- }
+ }
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.RabbitMQ/ConnectionChannelPool.cs b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
similarity index 100%
rename from src/DotNetCore.CAP.RabbitMQ/ConnectionChannelPool.cs
rename to src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
index 26760b2..4d7a321 100644
--- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
+++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
@@ -132,7 +132,7 @@ namespace DotNetCore.CAP.Abstractions
{
throw new InvalidOperationException(
"If you are using the EntityFramework, you need to configure the DbContextType first." +
- " otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
+ " otherwise you need to use overloaded method with IDbTransaction.");
}
}
diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs
index 267a61f..05c0294 100644
--- a/src/DotNetCore.CAP/CAP.Options.cs
+++ b/src/DotNetCore.CAP/CAP.Options.cs
@@ -58,9 +58,9 @@ namespace DotNetCore.CAP
public int FailedRetryInterval { get; set; }
///
- /// We’ll invoke this call-back with message type,name,content when requeue failed message.
+ /// We’ll invoke this call-back with message type,name,content when retry failed (send or executed) messages equals times.
///
- public Action FailedCallback { get; set; }
+ public Action FailedThresholdCallback { get; set; }
///
/// The number of message retries, the retry will stop when the threshold is reached.
diff --git a/src/DotNetCore.CAP/ICapPublisher.cs b/src/DotNetCore.CAP/ICapPublisher.cs
index 8ba45e1..0688c02 100644
--- a/src/DotNetCore.CAP/ICapPublisher.cs
+++ b/src/DotNetCore.CAP/ICapPublisher.cs
@@ -15,7 +15,7 @@ namespace DotNetCore.CAP
/// (EntityFramework) Asynchronous publish a object message.
///
/// If you are using the EntityFramework, you need to configure the DbContextType first.
- /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
+ /// otherwise you need to use overloaded method with IDbTransaction.
///
///
/// The type of content object.
@@ -28,7 +28,7 @@ namespace DotNetCore.CAP
/// (EntityFramework) Publish a object message.
///
/// If you are using the EntityFramework, you need to configure the DbContextType first.
- /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
+ /// otherwise you need to use overloaded method with IDbTransaction.
///
///
/// The type of content object.
diff --git a/src/DotNetCore.CAP/ICapSubscribe.cs b/src/DotNetCore.CAP/ICapSubscribe.cs
index 9b536ad..e223b84 100644
--- a/src/DotNetCore.CAP/ICapSubscribe.cs
+++ b/src/DotNetCore.CAP/ICapSubscribe.cs
@@ -4,7 +4,7 @@
namespace DotNetCore.CAP
{
///
- /// An empty interface, which is used to mark the current class have a CAP methods.
+ /// An empty interface, which is used to mark the current class have a CAP subscriber methods.
///
public interface ICapSubscribe
{
diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs
index 8d48a37..1d441ff 100644
--- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs
+++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs
@@ -178,6 +178,8 @@ namespace DotNetCore.CAP
private (Guid, string) TracingBefore(string topic, string values)
{
+ _logger.LogDebug("CAP received topic message:" + topic);
+
Guid operationId = Guid.NewGuid();
var eventData = new BrokerConsumeEventData(
diff --git a/src/DotNetCore.CAP/IFetchedMessage.cs b/src/DotNetCore.CAP/IFetchedMessage.cs
deleted file mode 100644
index 34a78ca..0000000
--- a/src/DotNetCore.CAP/IFetchedMessage.cs
+++ /dev/null
@@ -1,19 +0,0 @@
-// Copyright (c) .NET Core Community. All rights reserved.
-// Licensed under the MIT License. See License.txt in the project root for license information.
-
-using System;
-using DotNetCore.CAP.Models;
-
-namespace DotNetCore.CAP
-{
- public interface IFetchedMessage : IDisposable
- {
- int MessageId { get; }
-
- MessageType MessageType { get; }
-
- void RemoveFromQueue();
-
- void Requeue();
- }
-}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs
index cc82be6..a597169 100644
--- a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs
+++ b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs
@@ -67,15 +67,13 @@ namespace DotNetCore.CAP
}
else
{
- TracingError(operationId, message.Name, sendValues, result.Exception, startTime, stopwatch.Elapsed);
-
- _logger.MessagePublishException(message.Id, result.Exception);
+ TracingError(operationId, message, result, startTime, stopwatch.Elapsed);
await SetFailedState(message, result.Exception, out bool stillRetry);
if (stillRetry)
{
- _logger.SenderRetrying(3);
+ _logger.SenderRetrying(message.Id, message.Retries);
await SendAsync(message);
}
@@ -109,20 +107,11 @@ namespace DotNetCore.CAP
private Task SetFailedState(CapPublishedMessage message, Exception ex, out bool stillRetry)
{
IState newState = new FailedState();
-
- if (ex is PublisherSentFailedException)
- {
- stillRetry = false;
- message.Retries = _options.FailedRetryCount; // not retry if PublisherSentFailedException
- }
- else
+ stillRetry = UpdateMessageForRetryAsync(message);
+ if (stillRetry)
{
- stillRetry = UpdateMessageForRetryAsync(message);
- if (stillRetry)
- {
- _logger.ConsumerExecutionFailedWillRetry(ex);
- return Task.CompletedTask;
- }
+ _logger.ConsumerExecutionFailedWillRetry(ex);
+ return Task.CompletedTask;
}
AddErrorReasonToContent(message, ex);
@@ -166,14 +155,18 @@ namespace DotNetCore.CAP
_logger.MessageHasBeenSent(du.TotalSeconds);
}
- private void TracingError(Guid operationId, string topic, string values, Exception ex, DateTimeOffset startTime, TimeSpan du)
+ private void TracingError(Guid operationId, CapPublishedMessage message, OperateResult result, DateTimeOffset startTime, TimeSpan du)
{
+ var ex = new PublisherSentFailedException(result.ToString(), result.Exception);
+
+ _logger.MessagePublishException(message.Id, result.ToString(), ex);
+
var eventData = new BrokerPublishErrorEventData(
operationId,
"",
ServersAddress,
- topic,
- values,
+ message.Name,
+ message.Content,
ex,
startTime,
du);
diff --git a/src/DotNetCore.CAP/Infrastructure/Helper.cs b/src/DotNetCore.CAP/Infrastructure/Helper.cs
index 83d128f..3f023b9 100644
--- a/src/DotNetCore.CAP/Infrastructure/Helper.cs
+++ b/src/DotNetCore.CAP/Infrastructure/Helper.cs
@@ -86,7 +86,6 @@ namespace DotNetCore.CAP.Infrastructure
return !CanConvertFromString(type);
}
-
public static string AddExceptionProperty(string json, Exception exception)
{
var jObject = ToJObject(exception);
diff --git a/src/DotNetCore.CAP/Internal/PublisherSentFailedException.cs b/src/DotNetCore.CAP/Internal/PublisherSentFailedException.cs
index b0f0a2c..9be1758 100644
--- a/src/DotNetCore.CAP/Internal/PublisherSentFailedException.cs
+++ b/src/DotNetCore.CAP/Internal/PublisherSentFailedException.cs
@@ -7,6 +7,10 @@ namespace DotNetCore.CAP.Internal
{
public class PublisherSentFailedException : Exception
{
+ public PublisherSentFailedException(string message) : base(message)
+ {
+ }
+
public PublisherSentFailedException(string message, Exception ex) : base(message, ex)
{
}
diff --git a/src/DotNetCore.CAP/LoggerExtensions.cs b/src/DotNetCore.CAP/LoggerExtensions.cs
index 031be9d..a0096e6 100644
--- a/src/DotNetCore.CAP/LoggerExtensions.cs
+++ b/src/DotNetCore.CAP/LoggerExtensions.cs
@@ -17,10 +17,10 @@ namespace DotNetCore.CAP
private static readonly Action _modelBinderFormattingException;
private static readonly Action _consumerFailedWillRetry;
private static readonly Action _consumerExecuted;
- private static readonly Action _senderRetrying;
+ private static readonly Action _senderRetrying;
private static readonly Action _exceptionOccuredWhileExecuting;
private static readonly Action _messageHasBeenSent;
- private static readonly Action _messagePublishException;
+ private static readonly Action _messagePublishException;
static LoggerExtensions()
{
@@ -60,10 +60,10 @@ namespace DotNetCore.CAP
"When call subscribe method, a parameter format conversion exception occurs. MethodName:'{MethodName}' ParameterName:'{ParameterName}' Content:'{Content}'."
);
- _senderRetrying = LoggerMessage.Define(
+ _senderRetrying = LoggerMessage.Define(
LogLevel.Debug,
3,
- "Retrying send a message: {Retries}...");
+ "The {Retries}th retrying send a message failed. message id: {MessageId} ");
_consumerExecuted = LoggerMessage.Define(
LogLevel.Debug,
@@ -78,17 +78,17 @@ namespace DotNetCore.CAP
_exceptionOccuredWhileExecuting = LoggerMessage.Define(
LogLevel.Error,
6,
- "An exception occured while trying to store a message: '{MessageId}'. ");
+ "An exception occured while trying to store a message. message id: {MessageId}");
_messageHasBeenSent = LoggerMessage.Define(
LogLevel.Debug,
4,
"Message published. Took: {Seconds} secs.");
- _messagePublishException = LoggerMessage.Define(
+ _messagePublishException = LoggerMessage.Define(
LogLevel.Error,
6,
- "An exception occured while publishing a message: '{MessageId}'. ");
+ "An exception occured while publishing a message, reason:{Reason}. message id:{MessageId}");
}
public static void ConsumerExecutionFailedWillRetry(this ILogger logger, Exception ex)
@@ -96,9 +96,9 @@ namespace DotNetCore.CAP
_consumerFailedWillRetry(logger, ex);
}
- public static void SenderRetrying(this ILogger logger, int retries)
+ public static void SenderRetrying(this ILogger logger, int messageId, int retries)
{
- _senderRetrying(logger, retries, null);
+ _senderRetrying(logger, messageId, retries, null);
}
public static void MessageHasBeenSent(this ILogger logger, double seconds)
@@ -106,9 +106,9 @@ namespace DotNetCore.CAP
_messageHasBeenSent(logger, seconds, null);
}
- public static void MessagePublishException(this ILogger logger, int messageId, Exception ex)
+ public static void MessagePublishException(this ILogger logger, int messageId, string reason, Exception ex)
{
- _messagePublishException(logger, messageId, ex);
+ _messagePublishException(logger, messageId, reason, ex);
}
public static void ConsumerExecuted(this ILogger logger, double seconds)
diff --git a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
index b1109a3..2f7b829 100644
--- a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
+++ b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
@@ -65,33 +65,46 @@ namespace DotNetCore.CAP.Processor
continue;
}
- if (!hasException)
- {
- try
- {
- _options.FailedCallback?.Invoke(MessageType.Publish, message.Name, message.Content);
- }
- catch (Exception ex)
- {
- hasException = true;
- _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
- }
- }
-
using (var transaction = connection.CreateTransaction())
{
- try
+ var result = await _publishExecutor.PublishAsync(message.Name, message.Content);
+ if (result.Succeeded)
{
- await _publishExecutor.PublishAsync(message.Name, message.Content);
-
_stateChanger.ChangeState(message, new SucceededState(), transaction);
+ _logger.LogInformation("The message was sent successfully during the retry. MessageId:" + message.Id);
}
- catch (Exception e)
+ else
{
- message.Content = Helper.AddExceptionProperty(message.Content, e);
+ message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
+ message.Retries++;
+ if (message.StatusName == StatusName.Scheduled)
+ {
+ message.ExpiresAt = GetDueTime(message.Added, message.Retries);
+ message.StatusName = StatusName.Failed;
+ }
transaction.UpdateMessage(message);
- }
+ if (message.Retries >= _options.FailedRetryCount)
+ {
+ _logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " +
+ "MessageId:" + message.Id);
+ if (message.Retries == _options.FailedRetryCount)
+ {
+ if (!hasException)
+ {
+ try
+ {
+ _options.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Name, message.Content);
+ }
+ catch (Exception ex)
+ {
+ hasException = true;
+ _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
+ }
+ }
+ }
+ }
+ }
await transaction.CommitAsync();
}
@@ -113,25 +126,60 @@ namespace DotNetCore.CAP.Processor
continue;
}
- if (!hasException)
+ using (var transaction = connection.CreateTransaction())
{
- try
+ var result = await _subscriberExecutor.ExecuteAsync(message);
+ if (result.Succeeded)
{
- _options.FailedCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
+ _stateChanger.ChangeState(message, new SucceededState(), transaction);
+ _logger.LogInformation("The message was execute successfully during the retry. MessageId:" + message.Id);
}
- catch (Exception ex)
+ else
{
- hasException = true;
- _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
+ message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
+ message.Retries++;
+ if (message.StatusName == StatusName.Scheduled)
+ {
+ message.ExpiresAt = GetDueTime(message.Added, message.Retries);
+ message.StatusName = StatusName.Failed;
+ }
+ transaction.UpdateMessage(message);
+
+ if (message.Retries >= _options.FailedRetryCount)
+ {
+ _logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " +
+ "We will stop retrying to execute the message. message id:" + message.Id);
+
+ if (message.Retries == _options.FailedRetryCount)
+ {
+ if (!hasException)
+ {
+ try
+ {
+ _options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
+ }
+ catch (Exception ex)
+ {
+ hasException = true;
+ _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
+ }
+ }
+ }
+ }
}
+ await transaction.CommitAsync();
}
- await _subscriberExecutor.ExecuteAsync(message);
-
context.ThrowIfStopping();
await context.WaitAsync(_delay);
}
}
+
+ public DateTime GetDueTime(DateTime addedTime, int retries)
+ {
+ var retryBehavior = RetryBehavior.DefaultRetry;
+ return addedTime.AddSeconds(retryBehavior.RetryIn(retries));
+ }
}
}
\ No newline at end of file