diff --git a/build/version.props b/build/version.props
index 537d4af..449a52f 100644
--- a/build/version.props
+++ b/build/version.props
@@ -2,7 +2,7 @@
2
2
- 4
+ 5
$(VersionMajor).$(VersionMinor).$(VersionPatch)
diff --git a/samples/Sample.RabbitMQ.MySql/Startup.cs b/samples/Sample.RabbitMQ.MySql/Startup.cs
index 554a624..007e78e 100644
--- a/samples/Sample.RabbitMQ.MySql/Startup.cs
+++ b/samples/Sample.RabbitMQ.MySql/Startup.cs
@@ -1,4 +1,5 @@
-using Microsoft.AspNetCore.Builder;
+using System;
+using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -16,6 +17,11 @@ namespace Sample.RabbitMQ.MySql
x.UseEntityFramework();
x.UseRabbitMQ("localhost");
x.UseDashboard();
+ x.FailedRetryCount = 5;
+ x.FailedThresholdCallback = (type, name, content) =>
+ {
+ Console.WriteLine($@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {name}, message body: {content}");
+ };
});
services.AddMvc();
diff --git a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs
index 691717d..77f9fbb 100644
--- a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs
+++ b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs
@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.MySql
public async Task> GetPublishedMessagesOfNeedRetry()
{
- var fourMinsAgo = DateTime.Now.AddMinutes(-4);
+ var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
@@ -80,7 +80,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST
public async Task> GetReceivedMessagesOfNeedRetry()
{
- var fourMinsAgo = DateTime.Now.AddMinutes(-4);
+ var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs
index 763407c..3d8fba7 100644
--- a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs
+++ b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs
@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task> GetPublishedMessagesOfNeedRetry()
{
- var fourMinsAgo = DateTime.Now.AddMinutes(-4);
+ var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task> GetReceivedMessagesOfNeedRetry()
{
- var fourMinsAgo = DateTime.Now.AddMinutes(-4);
+ var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
index c0abcc8..afb2fd2 100644
--- a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
@@ -37,7 +37,10 @@ namespace DotNetCore.CAP
/// The topic exchange type.
public const string ExchangeType = "topic";
- /// The host to connect to.
+ ///
+ /// The host to connect to.
+ /// If you want connect to the cluster, you can assign like “192.168.1.111,192.168.1.112”
+ ///
public string HostName { get; set; } = "localhost";
///
diff --git a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
index e15fdfa..b50c9dc 100644
--- a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
@@ -76,7 +76,6 @@ namespace DotNetCore.CAP.RabbitMQ
{
var factory = new ConnectionFactory
{
- HostName = options.HostName,
UserName = options.UserName,
Port = options.Port,
Password = options.Password,
@@ -86,6 +85,13 @@ namespace DotNetCore.CAP.RabbitMQ
SocketWriteTimeout = options.SocketWriteTimeout
};
+ if (options.HostName.Contains(","))
+ {
+ return () => factory.CreateConnection(
+ options.HostName.Split(new[] { "," }, StringSplitOptions.RemoveEmptyEntries));
+ }
+
+ factory.HostName = options.HostName;
return () => factory.CreateConnection();
}
diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs
index 9b931c1..b204fde 100644
--- a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs
+++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs
@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.SqlServer
public async Task> GetPublishedMessagesOfNeedRetry()
{
- var fourMinsAgo = DateTime.Now.AddMinutes(-4);
+ var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
@@ -78,7 +78,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOP
public async Task> GetReceivedMessagesOfNeedRetry()
{
- var fourMinsAgo = DateTime.Now.AddMinutes(-4);
+ var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString))
diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
index 4d7a321..77a6c44 100644
--- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
+++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
@@ -182,7 +182,7 @@ namespace DotNetCore.CAP.Abstractions
}
catch (Exception e)
{
- _logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
+ _logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
@@ -204,10 +204,11 @@ namespace DotNetCore.CAP.Abstractions
try
{
- operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
+ Console.WriteLine("================22222222222222=====================");
+ operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
var id = Execute(DbConnection, DbTransaction, message);
-
+ Console.WriteLine("================777777777777777777777=====================");
ClosedCap();
if (id > 0)
@@ -220,7 +221,7 @@ namespace DotNetCore.CAP.Abstractions
}
catch (Exception e)
{
- _logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
+ _logger.LogError(e, "An exception was occurred when publish message. message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
diff --git a/src/DotNetCore.CAP/Dashboard/DashboardRequest.cs b/src/DotNetCore.CAP/Dashboard/DashboardRequest.cs
index 5fdb054..23f46b8 100644
--- a/src/DotNetCore.CAP/Dashboard/DashboardRequest.cs
+++ b/src/DotNetCore.CAP/Dashboard/DashboardRequest.cs
@@ -28,19 +28,14 @@ namespace DotNetCore.CAP.Dashboard
public CapDashboardRequest(HttpContext context)
{
- if (context == null)
- {
- throw new ArgumentNullException(nameof(context));
- }
-
- _context = context;
+ _context = context ?? throw new ArgumentNullException(nameof(context));
}
public override string Method => _context.Request.Method;
public override string Path => _context.Request.Path.Value;
public override string PathBase => _context.Request.PathBase.Value;
- public override string LocalIpAddress => _context.Connection.LocalIpAddress.ToString();
- public override string RemoteIpAddress => _context.Connection.RemoteIpAddress.ToString();
+ public override string LocalIpAddress => _context.Connection.LocalIpAddress.MapToIPv4().ToString();
+ public override string RemoteIpAddress => _context.Connection.RemoteIpAddress.MapToIPv4().ToString();
public override string GetQuery(string key)
{
diff --git a/src/DotNetCore.CAP/Dashboard/DashboardRoutes.cs b/src/DotNetCore.CAP/Dashboard/DashboardRoutes.cs
index 312da57..9a51faa 100644
--- a/src/DotNetCore.CAP/Dashboard/DashboardRoutes.cs
+++ b/src/DotNetCore.CAP/Dashboard/DashboardRoutes.cs
@@ -3,7 +3,7 @@
using System.Reflection;
using DotNetCore.CAP.Dashboard.Pages;
-using DotNetCore.CAP.Infrastructure;
+using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.Dashboard
{
@@ -83,24 +83,34 @@ namespace DotNetCore.CAP.Dashboard
Routes.AddJsonResult("/published/message/(?.+)", x =>
{
var id = int.Parse(x.UriMatch.Groups["Id"].Value);
- var message = x.Storage.GetConnection().GetPublishedMessageAsync(id).GetAwaiter().GetResult();
+ var message = x.Storage.GetConnection().GetPublishedMessageAsync(id)
+ .GetAwaiter().GetResult();
return message.Content;
});
Routes.AddJsonResult("/received/message/(?.+)", x =>
{
var id = int.Parse(x.UriMatch.Groups["Id"].Value);
- var message = x.Storage.GetConnection().GetReceivedMessageAsync(id).GetAwaiter().GetResult();
+ var message = x.Storage.GetConnection().GetReceivedMessageAsync(id)
+ .GetAwaiter().GetResult();
return message.Content;
});
Routes.AddPublishBatchCommand(
"/published/requeue",
(client, messageId) =>
- client.Storage.GetConnection().ChangePublishedState(messageId, StatusName.Scheduled));
+ {
+ var msg = client.Storage.GetConnection().GetPublishedMessageAsync(messageId)
+ .GetAwaiter().GetResult();
+ client.RequestServices.GetService().EnqueueToPublish(msg);
+ });
Routes.AddPublishBatchCommand(
"/received/requeue",
(client, messageId) =>
- client.Storage.GetConnection().ChangeReceivedState(messageId, StatusName.Scheduled));
+ {
+ var msg = client.Storage.GetConnection().GetReceivedMessageAsync(messageId)
+ .GetAwaiter().GetResult();
+ client.RequestServices.GetService().EnqueueToExecute(msg);
+ });
Routes.AddRazorPage(
"/published/(?.+)",
diff --git a/src/DotNetCore.CAP/Dashboard/LocalRequestsOnlyAuthorizationFilter.cs b/src/DotNetCore.CAP/Dashboard/LocalRequestsOnlyAuthorizationFilter.cs
index 42746d7..dafd845 100644
--- a/src/DotNetCore.CAP/Dashboard/LocalRequestsOnlyAuthorizationFilter.cs
+++ b/src/DotNetCore.CAP/Dashboard/LocalRequestsOnlyAuthorizationFilter.cs
@@ -9,26 +9,27 @@ namespace DotNetCore.CAP.Dashboard
{
public bool Authorize(DashboardContext context)
{
+ var ipAddress = context.Request.RemoteIpAddress;
// if unknown, assume not local
- if (string.IsNullOrEmpty(context.Request.RemoteIpAddress))
+ if (string.IsNullOrEmpty(ipAddress))
{
return false;
}
// check if localhost
- if (context.Request.RemoteIpAddress == "127.0.0.1" || context.Request.RemoteIpAddress == "::1")
+ if (ipAddress == "127.0.0.1" || ipAddress == "0.0.0.1")
{
return true;
}
// compare with local address
- if (context.Request.RemoteIpAddress == context.Request.LocalIpAddress)
+ if (ipAddress == context.Request.LocalIpAddress)
{
return true;
}
// check if private ip
- if (Helper.IsInnerIP(context.Request.RemoteIpAddress))
+ if (Helper.IsInnerIP(ipAddress))
{
return true;
}
diff --git a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs
index a597169..cbbc780 100644
--- a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs
+++ b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs
@@ -43,6 +43,24 @@ namespace DotNetCore.CAP
public abstract Task PublishAsync(string keyName, string content);
public async Task SendAsync(CapPublishedMessage message)
+ {
+ bool retry;
+ OperateResult result;
+ do
+ {
+ var executedResult = await SendWithoutRetryAsync(message);
+ result = executedResult.Item2;
+ if (result == OperateResult.Success)
+ {
+ return result;
+ }
+ retry = executedResult.Item1;
+ } while (retry);
+
+ return result;
+ }
+
+ private async Task<(bool, OperateResult)> SendWithoutRetryAsync(CapPublishedMessage message)
{
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
@@ -63,60 +81,33 @@ namespace DotNetCore.CAP
TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed);
- return OperateResult.Success;
+ return (false, OperateResult.Success);
}
else
{
TracingError(operationId, message, result, startTime, stopwatch.Elapsed);
- await SetFailedState(message, result.Exception, out bool stillRetry);
-
- if (stillRetry)
- {
- _logger.SenderRetrying(message.Id, message.Retries);
-
- await SendAsync(message);
- }
- return OperateResult.Failed(result.Exception);
+ var needRetry = await SetFailedState(message, result.Exception);
+ return (needRetry, OperateResult.Failed(result.Exception));
}
}
- private static bool UpdateMessageForRetryAsync(CapPublishedMessage message)
- {
- var retryBehavior = RetryBehavior.DefaultRetry;
-
- var retries = ++message.Retries;
- if (retries >= retryBehavior.RetryCount)
- {
- return false;
- }
-
- var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
- message.ExpiresAt = due;
-
- return true;
- }
private Task SetSuccessfulState(CapPublishedMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);
-
return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}
- private Task SetFailedState(CapPublishedMessage message, Exception ex, out bool stillRetry)
+ private async Task SetFailedState(CapPublishedMessage message, Exception ex)
{
- IState newState = new FailedState();
- stillRetry = UpdateMessageForRetryAsync(message);
- if (stillRetry)
- {
- _logger.ConsumerExecutionFailedWillRetry(ex);
- return Task.CompletedTask;
- }
-
AddErrorReasonToContent(message, ex);
- return _stateChanger.ChangeStateAsync(message, newState, _connection);
+ var needRetry = UpdateMessageForRetry(message);
+
+ await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
+
+ return needRetry;
}
private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception)
@@ -124,6 +115,37 @@ namespace DotNetCore.CAP
message.Content = Helper.AddExceptionProperty(message.Content, exception);
}
+ private bool UpdateMessageForRetry(CapPublishedMessage message)
+ {
+ var retryBehavior = RetryBehavior.DefaultRetry;
+
+ var retries = ++message.Retries;
+ message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
+
+ var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
+ if (retries >= retryCount)
+ {
+ if (retries == _options.FailedRetryCount)
+ {
+ try
+ {
+ _options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
+
+ _logger.SenderAfterThreshold(message.Id, _options.FailedRetryCount);
+ }
+ catch (Exception ex)
+ {
+ _logger.ExecutedThresholdCallbackFailed(ex);
+ }
+ }
+ return false;
+ }
+
+ _logger.SenderRetrying(message.Id, retries);
+
+ return true;
+ }
+
private (Guid, TracingHeaders) TracingBefore(string topic, string values)
{
Guid operationId = Guid.NewGuid();
diff --git a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
index fe03a15..6bd1a68 100644
--- a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
+++ b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
@@ -50,6 +50,29 @@ namespace DotNetCore.CAP
private IConsumerInvoker Invoker { get; }
public async Task ExecuteAsync(CapReceivedMessage message)
+ {
+ bool retry;
+ OperateResult result;
+ do
+ {
+ var executedResult = await ExecuteWithoutRetryAsync(message);
+ result = executedResult.Item2;
+ if (result == OperateResult.Success)
+ {
+ return result;
+ }
+ retry = executedResult.Item1;
+ } while (retry);
+
+ return result;
+ }
+
+ ///
+ /// Execute message consumption once.
+ ///
+ /// the message rececived of
+ /// Item1 is need still restry, Item2 is executed result.
+ private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(CapReceivedMessage message)
{
if (message == null)
{
@@ -68,65 +91,65 @@ namespace DotNetCore.CAP
_logger.ConsumerExecuted(sp.Elapsed.TotalSeconds);
- return OperateResult.Success;
+ return (false, OperateResult.Success);
}
catch (Exception ex)
{
_logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}");
- await SetFailedState(message, ex, out bool stillRetry);
- if (stillRetry)
- {
- await ExecuteAsync(message);
- }
-
- return OperateResult.Failed(ex);
+ return (await SetFailedState(message, ex), OperateResult.Failed(ex));
}
}
private Task SetSuccessfulState(CapReceivedMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);
-
return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}
- private Task SetFailedState(CapReceivedMessage message, Exception ex, out bool stillRetry)
+ private async Task SetFailedState(CapReceivedMessage message, Exception ex)
{
- IState newState = new FailedState();
-
if (ex is SubscriberNotFoundException)
{
- stillRetry = false;
message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException
}
- else
- {
- stillRetry = UpdateMessageForRetry(message);
- if (stillRetry)
- {
- _logger.ConsumerExecutionFailedWillRetry(ex);
- return Task.CompletedTask;
- }
- }
AddErrorReasonToContent(message, ex);
- return _stateChanger.ChangeStateAsync(message, newState, _connection);
+ var needRetry = UpdateMessageForRetry(message);
+
+ await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
+
+ return needRetry;
}
- private static bool UpdateMessageForRetry(CapReceivedMessage message)
+ private bool UpdateMessageForRetry(CapReceivedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var retries = ++message.Retries;
- if (retries >= retryBehavior.RetryCount)
+ message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
+
+ var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
+ if (retries >= retryCount)
{
+ if (retries == _options.FailedRetryCount)
+ {
+ try
+ {
+ _options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
+
+ _logger.ConsumerExecutedAfterThreshold(message.Id, _options.FailedRetryCount);
+ }
+ catch (Exception ex)
+ {
+ _logger.ExecutedThresholdCallbackFailed(ex);
+ }
+ }
return false;
}
- var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
- message.ExpiresAt = due;
+ _logger.ConsumerExecutionRetrying(message.Id, retries);
return true;
}
diff --git a/src/DotNetCore.CAP/LoggerExtensions.cs b/src/DotNetCore.CAP/LoggerExtensions.cs
index a0096e6..808096c 100644
--- a/src/DotNetCore.CAP/LoggerExtensions.cs
+++ b/src/DotNetCore.CAP/LoggerExtensions.cs
@@ -10,141 +10,70 @@ namespace DotNetCore.CAP
[SuppressMessage("ReSharper", "InconsistentNaming")]
internal static class LoggerExtensions
{
- private static readonly Action _serverStarting;
- private static readonly Action _processorsStartingError;
- private static readonly Action _serverShuttingDown;
- private static readonly Action _expectedOperationCanceledException;
- private static readonly Action _modelBinderFormattingException;
- private static readonly Action _consumerFailedWillRetry;
- private static readonly Action _consumerExecuted;
- private static readonly Action _senderRetrying;
- private static readonly Action _exceptionOccuredWhileExecuting;
- private static readonly Action _messageHasBeenSent;
- private static readonly Action _messagePublishException;
-
- static LoggerExtensions()
+ public static void ConsumerExecutedAfterThreshold(this ILogger logger, int messageId, int retries)
{
- _serverStarting = LoggerMessage.Define(
- LogLevel.Debug,
- 1,
- "Starting the processing server.");
-
- _processorsStartingError = LoggerMessage.Define(
- LogLevel.Error,
- 5,
- "Starting the processors throw an exception.");
-
- _serverShuttingDown = LoggerMessage.Define(
- LogLevel.Information,
- 2,
- "Shutting down the processing server...");
-
- _expectedOperationCanceledException = LoggerMessage.Define(
- LogLevel.Warning,
- 3,
- "Expected an OperationCanceledException, but found '{ExceptionMessage}'.");
-
- LoggerMessage.Define(
- LogLevel.Error,
- 5,
- "Consumer method '{methodName}' failed to execute.");
-
- LoggerMessage.Define(
- LogLevel.Error,
- 5,
- "Received message topic method '{topicName}' failed to execute.");
-
- _modelBinderFormattingException = LoggerMessage.Define(
- LogLevel.Error,
- 5,
- "When call subscribe method, a parameter format conversion exception occurs. MethodName:'{MethodName}' ParameterName:'{ParameterName}' Content:'{Content}'."
- );
-
- _senderRetrying = LoggerMessage.Define(
- LogLevel.Debug,
- 3,
- "The {Retries}th retrying send a message failed. message id: {MessageId} ");
-
- _consumerExecuted = LoggerMessage.Define(
- LogLevel.Debug,
- 4,
- "Consumer executed. Took: {Seconds} secs.");
-
- _consumerFailedWillRetry = LoggerMessage.Define(
- LogLevel.Warning,
- 2,
- "Consumer failed to execute. Will retry.");
-
- _exceptionOccuredWhileExecuting = LoggerMessage.Define(
- LogLevel.Error,
- 6,
- "An exception occured while trying to store a message. message id: {MessageId}");
+ logger.LogWarning($"The Subscriber of the message({messageId}) still fails after {retries}th executions and we will stop retrying.");
+ }
- _messageHasBeenSent = LoggerMessage.Define(
- LogLevel.Debug,
- 4,
- "Message published. Took: {Seconds} secs.");
+ public static void SenderAfterThreshold(this ILogger logger, int messageId, int retries)
+ {
+ logger.LogWarning($"The Publisher of the message({messageId}) still fails after {retries}th sends and we will stop retrying.");
+ }
- _messagePublishException = LoggerMessage.Define(
- LogLevel.Error,
- 6,
- "An exception occured while publishing a message, reason:{Reason}. message id:{MessageId}");
+ public static void ExecutedThresholdCallbackFailed(this ILogger logger, Exception ex)
+ {
+ logger.LogWarning(ex, "FailedThresholdCallback action raised an exception:" + ex.Message);
}
- public static void ConsumerExecutionFailedWillRetry(this ILogger logger, Exception ex)
+ public static void ConsumerExecutionRetrying(this ILogger logger, int messageId, int retries)
{
- _consumerFailedWillRetry(logger, ex);
+ logger.LogWarning($"The {retries}th retrying consume a message failed. message id: {messageId}");
}
public static void SenderRetrying(this ILogger logger, int messageId, int retries)
{
- _senderRetrying(logger, messageId, retries, null);
+ logger.LogWarning($"The {retries}th retrying send a message failed. message id: {messageId} ");
}
public static void MessageHasBeenSent(this ILogger logger, double seconds)
{
- _messageHasBeenSent(logger, seconds, null);
+ logger.LogDebug($"Message published. Took: {seconds} secs.");
}
public static void MessagePublishException(this ILogger logger, int messageId, string reason, Exception ex)
{
- _messagePublishException(logger, messageId, reason, ex);
+ logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}");
}
public static void ConsumerExecuted(this ILogger logger, double seconds)
{
- _consumerExecuted(logger, seconds, null);
+ logger.LogDebug($"Consumer executed. Took: {seconds} secs.");
}
public static void ServerStarting(this ILogger logger)
{
- _serverStarting(logger, null);
+ logger.LogInformation("Starting the processing server.");
}
public static void ProcessorsStartedError(this ILogger logger, Exception ex)
{
- _processorsStartingError(logger, ex);
+ logger.LogError(ex, "Starting the processors throw an exception.");
}
public static void ServerShuttingDown(this ILogger logger)
{
- _serverShuttingDown(logger, null);
+ logger.LogInformation("Shutting down the processing server...");
}
public static void ExpectedOperationCanceledException(this ILogger logger, Exception ex)
{
- _expectedOperationCanceledException(logger, ex.Message, ex);
- }
-
- public static void ExceptionOccuredWhileExecuting(this ILogger logger, string messageId, Exception ex)
- {
- _exceptionOccuredWhileExecuting(logger, messageId, ex);
+ logger.LogWarning(ex, $"Expected an OperationCanceledException, but found '{ex.Message}'.");
}
public static void ModelBinderFormattingException(this ILogger logger, string methodName, string parameterName,
string content, Exception ex)
{
- _modelBinderFormattingException(logger, methodName, parameterName, content, ex);
+ logger.LogError(ex, $"When call subscribe method, a parameter format conversion exception occurs. MethodName:'{methodName}' ParameterName:'{parameterName}' Content:'{content}'.");
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/Models/CapPublishedMessage.cs b/src/DotNetCore.CAP/Models/CapPublishedMessage.cs
index 41c554d..d95e72f 100644
--- a/src/DotNetCore.CAP/Models/CapPublishedMessage.cs
+++ b/src/DotNetCore.CAP/Models/CapPublishedMessage.cs
@@ -15,12 +15,6 @@ namespace DotNetCore.CAP.Models
Added = DateTime.Now;
}
- public CapPublishedMessage(MessageContext message)
- {
- Name = message.Name;
- Content = message.Content;
- }
-
public int Id { get; set; }
public string Name { get; set; }
diff --git a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
index 2f7b829..36a6deb 100644
--- a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
+++ b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
@@ -3,11 +3,7 @@
using System;
using System.Threading.Tasks;
-using DotNetCore.CAP.Infrastructure;
-using DotNetCore.CAP.Models;
-using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor
@@ -15,26 +11,18 @@ namespace DotNetCore.CAP.Processor
public class NeedRetryMessageProcessor : IProcessor
{
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
- private readonly ILogger _logger;
- private readonly CapOptions _options;
- private readonly IPublishExecutor _publishExecutor;
- private readonly IStateChanger _stateChanger;
+ private readonly IPublishMessageSender _publishMessageSender;
private readonly ISubscriberExecutor _subscriberExecutor;
private readonly TimeSpan _waitingInterval;
public NeedRetryMessageProcessor(
IOptions options,
- ILogger logger,
- IStateChanger stateChanger,
ISubscriberExecutor subscriberExecutor,
- IPublishExecutor publishExecutor)
+ IPublishMessageSender publishMessageSender)
{
- _options = options.Value;
- _logger = logger;
- _stateChanger = stateChanger;
_subscriberExecutor = subscriberExecutor;
- _publishExecutor = publishExecutor;
- _waitingInterval = TimeSpan.FromSeconds(_options.FailedRetryInterval);
+ _publishMessageSender = publishMessageSender;
+ _waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval);
}
public async Task ProcessAsync(ProcessingContext context)
@@ -56,57 +44,10 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context)
{
var messages = await connection.GetPublishedMessagesOfNeedRetry();
- var hasException = false;
foreach (var message in messages)
{
- if (message.Retries > _options.FailedRetryCount)
- {
- continue;
- }
-
- using (var transaction = connection.CreateTransaction())
- {
- var result = await _publishExecutor.PublishAsync(message.Name, message.Content);
- if (result.Succeeded)
- {
- _stateChanger.ChangeState(message, new SucceededState(), transaction);
- _logger.LogInformation("The message was sent successfully during the retry. MessageId:" + message.Id);
- }
- else
- {
- message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
- message.Retries++;
- if (message.StatusName == StatusName.Scheduled)
- {
- message.ExpiresAt = GetDueTime(message.Added, message.Retries);
- message.StatusName = StatusName.Failed;
- }
- transaction.UpdateMessage(message);
-
- if (message.Retries >= _options.FailedRetryCount)
- {
- _logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " +
- "MessageId:" + message.Id);
- if (message.Retries == _options.FailedRetryCount)
- {
- if (!hasException)
- {
- try
- {
- _options.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Name, message.Content);
- }
- catch (Exception ex)
- {
- hasException = true;
- _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
- }
- }
- }
- }
- }
- await transaction.CommitAsync();
- }
+ await _publishMessageSender.SendAsync(message);
context.ThrowIfStopping();
@@ -117,69 +58,15 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessReceivedAsync(IStorageConnection connection, ProcessingContext context)
{
var messages = await connection.GetReceivedMessagesOfNeedRetry();
- var hasException = false;
foreach (var message in messages)
{
- if (message.Retries > _options.FailedRetryCount)
- {
- continue;
- }
-
- using (var transaction = connection.CreateTransaction())
- {
- var result = await _subscriberExecutor.ExecuteAsync(message);
- if (result.Succeeded)
- {
- _stateChanger.ChangeState(message, new SucceededState(), transaction);
- _logger.LogInformation("The message was execute successfully during the retry. MessageId:" + message.Id);
- }
- else
- {
- message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
- message.Retries++;
- if (message.StatusName == StatusName.Scheduled)
- {
- message.ExpiresAt = GetDueTime(message.Added, message.Retries);
- message.StatusName = StatusName.Failed;
- }
- transaction.UpdateMessage(message);
-
- if (message.Retries >= _options.FailedRetryCount)
- {
- _logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " +
- "We will stop retrying to execute the message. message id:" + message.Id);
-
- if (message.Retries == _options.FailedRetryCount)
- {
- if (!hasException)
- {
- try
- {
- _options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
- }
- catch (Exception ex)
- {
- hasException = true;
- _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
- }
- }
- }
- }
- }
- await transaction.CommitAsync();
- }
+ await _subscriberExecutor.ExecuteAsync(message);
context.ThrowIfStopping();
await context.WaitAsync(_delay);
}
- }
-
- public DateTime GetDueTime(DateTime addedTime, int retries)
- {
- var retryBehavior = RetryBehavior.DefaultRetry;
- return addedTime.AddSeconds(retryBehavior.RetryIn(retries));
- }
+ }
}
}
\ No newline at end of file