ソースを参照

Release 2.2.5 (#162)

* update version to 2.2.4

* Fixed Incorrect local IP address judgment of IPv6. (#140)

* Fixed DateTime localization format conversion error to sql.(#139)

* update version to 2.2.5

* remove unused constructor.

* Fixed DateTime localization format conversion error to sql.(#139)

* Improved logging

* support RabbitMQ cluster configuration.

* Fixed dashboard message page re-requeue and re-executed  operate bug. (#158)

* refactor code

* refactor log extensions.

* refactor retry task processor.

* Fixed  configuration options of FailedThresholdCallback could not be invoke when the value less then three.  (#161)

* update samples.

* Fixed SendAsync or ExecuteAsync recursion retries bug. (#160)

* Fixed SendAsync or ExecuteAsync recursion retries bug. (#160)
master
Savorboard 6年前
committed by GitHub
コミット
fa89e27cc6
この署名に対応する既知のキーがデータベースに存在しません GPGキーID: 4AEE18F83AFDEB23
16個のファイルの変更189行の追加312行の削除
  1. +1
    -1
      build/version.props
  2. +7
    -1
      samples/Sample.RabbitMQ.MySql/Startup.cs
  3. +2
    -2
      src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs
  4. +2
    -2
      src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs
  5. +4
    -1
      src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
  6. +7
    -1
      src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
  7. +2
    -2
      src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs
  8. +5
    -4
      src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
  9. +3
    -8
      src/DotNetCore.CAP/Dashboard/DashboardRequest.cs
  10. +15
    -5
      src/DotNetCore.CAP/Dashboard/DashboardRoutes.cs
  11. +5
    -4
      src/DotNetCore.CAP/Dashboard/LocalRequestsOnlyAuthorizationFilter.cs
  12. +58
    -36
      src/DotNetCore.CAP/IPublishMessageSender.Base.cs
  13. +50
    -27
      src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
  14. +21
    -92
      src/DotNetCore.CAP/LoggerExtensions.cs
  15. +0
    -6
      src/DotNetCore.CAP/Models/CapPublishedMessage.cs
  16. +7
    -120
      src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs

+ 1
- 1
build/version.props ファイルの表示

@@ -2,7 +2,7 @@
<PropertyGroup>
<VersionMajor>2</VersionMajor>
<VersionMinor>2</VersionMinor>
<VersionPatch>4</VersionPatch>
<VersionPatch>5</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>


+ 7
- 1
samples/Sample.RabbitMQ.MySql/Startup.cs ファイルの表示

@@ -1,4 +1,5 @@
using Microsoft.AspNetCore.Builder;
using System;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -16,6 +17,11 @@ namespace Sample.RabbitMQ.MySql
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("localhost");
x.UseDashboard();
x.FailedRetryCount = 5;
x.FailedThresholdCallback = (type, name, content) =>
{
Console.WriteLine($@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {name}, message body: {content}");
};
});

services.AddMvc();


+ 2
- 2
src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs ファイルの表示

@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.MySql

public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";

@@ -80,7 +80,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT LAST

public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))


+ 2
- 2
src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs ファイルの表示

@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.PostgreSql

public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";

@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.PostgreSql

public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Retries\"<{_capOptions.FailedRetryCount} AND \"Added\"<'{fourMinsAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))


+ 4
- 1
src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs ファイルの表示

@@ -37,7 +37,10 @@ namespace DotNetCore.CAP
/// <summary> The topic exchange type. </summary>
public const string ExchangeType = "topic";

/// <summary>The host to connect to.</summary>
/// <summary>
/// The host to connect to.
/// If you want connect to the cluster, you can assign like “192.168.1.111,192.168.1.112”
/// </summary>
public string HostName { get; set; } = "localhost";

/// <summary>


+ 7
- 1
src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs ファイルの表示

@@ -76,7 +76,6 @@ namespace DotNetCore.CAP.RabbitMQ
{
var factory = new ConnectionFactory
{
HostName = options.HostName,
UserName = options.UserName,
Port = options.Port,
Password = options.Password,
@@ -86,6 +85,13 @@ namespace DotNetCore.CAP.RabbitMQ
SocketWriteTimeout = options.SocketWriteTimeout
};

if (options.HostName.Contains(","))
{
return () => factory.CreateConnection(
options.HostName.Split(new[] { "," }, StringSplitOptions.RemoveEmptyEntries));
}

factory.HostName = options.HostName;
return () => factory.CreateConnection();
}



+ 2
- 2
src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs ファイルの表示

@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.SqlServer

public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";

@@ -78,7 +78,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT SCOP

public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
using (var connection = new SqlConnection(Options.ConnectionString))


+ 5
- 4
src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs ファイルの表示

@@ -182,7 +182,7 @@ namespace DotNetCore.CAP.Abstractions
}
catch (Exception e)
{
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
_logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
@@ -204,10 +204,11 @@ namespace DotNetCore.CAP.Abstractions

try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
Console.WriteLine("================22222222222222=====================");
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);

var id = Execute(DbConnection, DbTransaction, message);
Console.WriteLine("================777777777777777777777=====================");
ClosedCap();

if (id > 0)
@@ -220,7 +221,7 @@ namespace DotNetCore.CAP.Abstractions
}
catch (Exception e)
{
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
_logger.LogError(e, "An exception was occurred when publish message. message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;


+ 3
- 8
src/DotNetCore.CAP/Dashboard/DashboardRequest.cs ファイルの表示

@@ -28,19 +28,14 @@ namespace DotNetCore.CAP.Dashboard

public CapDashboardRequest(HttpContext context)
{
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}

_context = context;
_context = context ?? throw new ArgumentNullException(nameof(context));
}

public override string Method => _context.Request.Method;
public override string Path => _context.Request.Path.Value;
public override string PathBase => _context.Request.PathBase.Value;
public override string LocalIpAddress => _context.Connection.LocalIpAddress.ToString();
public override string RemoteIpAddress => _context.Connection.RemoteIpAddress.ToString();
public override string LocalIpAddress => _context.Connection.LocalIpAddress.MapToIPv4().ToString();
public override string RemoteIpAddress => _context.Connection.RemoteIpAddress.MapToIPv4().ToString();

public override string GetQuery(string key)
{


+ 15
- 5
src/DotNetCore.CAP/Dashboard/DashboardRoutes.cs ファイルの表示

@@ -3,7 +3,7 @@

using System.Reflection;
using DotNetCore.CAP.Dashboard.Pages;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection;

namespace DotNetCore.CAP.Dashboard
{
@@ -83,24 +83,34 @@ namespace DotNetCore.CAP.Dashboard
Routes.AddJsonResult("/published/message/(?<Id>.+)", x =>
{
var id = int.Parse(x.UriMatch.Groups["Id"].Value);
var message = x.Storage.GetConnection().GetPublishedMessageAsync(id).GetAwaiter().GetResult();
var message = x.Storage.GetConnection().GetPublishedMessageAsync(id)
.GetAwaiter().GetResult();
return message.Content;
});
Routes.AddJsonResult("/received/message/(?<Id>.+)", x =>
{
var id = int.Parse(x.UriMatch.Groups["Id"].Value);
var message = x.Storage.GetConnection().GetReceivedMessageAsync(id).GetAwaiter().GetResult();
var message = x.Storage.GetConnection().GetReceivedMessageAsync(id)
.GetAwaiter().GetResult();
return message.Content;
});

Routes.AddPublishBatchCommand(
"/published/requeue",
(client, messageId) =>
client.Storage.GetConnection().ChangePublishedState(messageId, StatusName.Scheduled));
{
var msg = client.Storage.GetConnection().GetPublishedMessageAsync(messageId)
.GetAwaiter().GetResult();
client.RequestServices.GetService<IDispatcher>().EnqueueToPublish(msg);
});
Routes.AddPublishBatchCommand(
"/received/requeue",
(client, messageId) =>
client.Storage.GetConnection().ChangeReceivedState(messageId, StatusName.Scheduled));
{
var msg = client.Storage.GetConnection().GetReceivedMessageAsync(messageId)
.GetAwaiter().GetResult();
client.RequestServices.GetService<IDispatcher>().EnqueueToExecute(msg);
});

Routes.AddRazorPage(
"/published/(?<StatusName>.+)",


+ 5
- 4
src/DotNetCore.CAP/Dashboard/LocalRequestsOnlyAuthorizationFilter.cs ファイルの表示

@@ -9,26 +9,27 @@ namespace DotNetCore.CAP.Dashboard
{
public bool Authorize(DashboardContext context)
{
var ipAddress = context.Request.RemoteIpAddress;
// if unknown, assume not local
if (string.IsNullOrEmpty(context.Request.RemoteIpAddress))
if (string.IsNullOrEmpty(ipAddress))
{
return false;
}

// check if localhost
if (context.Request.RemoteIpAddress == "127.0.0.1" || context.Request.RemoteIpAddress == "::1")
if (ipAddress == "127.0.0.1" || ipAddress == "0.0.0.1")
{
return true;
}

// compare with local address
if (context.Request.RemoteIpAddress == context.Request.LocalIpAddress)
if (ipAddress == context.Request.LocalIpAddress)
{
return true;
}

// check if private ip
if (Helper.IsInnerIP(context.Request.RemoteIpAddress))
if (Helper.IsInnerIP(ipAddress))
{
return true;
}


+ 58
- 36
src/DotNetCore.CAP/IPublishMessageSender.Base.cs ファイルの表示

@@ -43,6 +43,24 @@ namespace DotNetCore.CAP
public abstract Task<OperateResult> PublishAsync(string keyName, string content);

public async Task<OperateResult> SendAsync(CapPublishedMessage message)
{
bool retry;
OperateResult result;
do
{
var executedResult = await SendWithoutRetryAsync(message);
result = executedResult.Item2;
if (result == OperateResult.Success)
{
return result;
}
retry = executedResult.Item1;
} while (retry);

return result;
}

private async Task<(bool, OperateResult)> SendWithoutRetryAsync(CapPublishedMessage message)
{
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
@@ -63,60 +81,33 @@ namespace DotNetCore.CAP

TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed);

return OperateResult.Success;
return (false, OperateResult.Success);
}
else
{
TracingError(operationId, message, result, startTime, stopwatch.Elapsed);

await SetFailedState(message, result.Exception, out bool stillRetry);

if (stillRetry)
{
_logger.SenderRetrying(message.Id, message.Retries);

await SendAsync(message);
}
return OperateResult.Failed(result.Exception);
var needRetry = await SetFailedState(message, result.Exception);
return (needRetry, OperateResult.Failed(result.Exception));
}
}

private static bool UpdateMessageForRetryAsync(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;

var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}

var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;

return true;
}

private Task SetSuccessfulState(CapPublishedMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);

return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}

private Task SetFailedState(CapPublishedMessage message, Exception ex, out bool stillRetry)
private async Task<bool> SetFailedState(CapPublishedMessage message, Exception ex)
{
IState newState = new FailedState();
stillRetry = UpdateMessageForRetryAsync(message);
if (stillRetry)
{
_logger.ConsumerExecutionFailedWillRetry(ex);
return Task.CompletedTask;
}

AddErrorReasonToContent(message, ex);

return _stateChanger.ChangeStateAsync(message, newState, _connection);
var needRetry = UpdateMessageForRetry(message);

await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);

return needRetry;
}

private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception)
@@ -124,6 +115,37 @@ namespace DotNetCore.CAP
message.Content = Helper.AddExceptionProperty(message.Content, exception);
}

private bool UpdateMessageForRetry(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;

var retries = ++message.Retries;
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));

var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{
if (retries == _options.FailedRetryCount)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);

_logger.SenderAfterThreshold(message.Id, _options.FailedRetryCount);
}
catch (Exception ex)
{
_logger.ExecutedThresholdCallbackFailed(ex);
}
}
return false;
}

_logger.SenderRetrying(message.Id, retries);

return true;
}

private (Guid, TracingHeaders) TracingBefore(string topic, string values)
{
Guid operationId = Guid.NewGuid();


+ 50
- 27
src/DotNetCore.CAP/ISubscribeExecutor.Default.cs ファイルの表示

@@ -50,6 +50,29 @@ namespace DotNetCore.CAP
private IConsumerInvoker Invoker { get; }

public async Task<OperateResult> ExecuteAsync(CapReceivedMessage message)
{
bool retry;
OperateResult result;
do
{
var executedResult = await ExecuteWithoutRetryAsync(message);
result = executedResult.Item2;
if (result == OperateResult.Success)
{
return result;
}
retry = executedResult.Item1;
} while (retry);

return result;
}

/// <summary>
/// Execute message consumption once.
/// </summary>
/// <param name="message">the message rececived of <see cref="CapReceivedMessage"/></param>
/// <returns>Item1 is need still restry, Item2 is executed result.</returns>
private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(CapReceivedMessage message)
{
if (message == null)
{
@@ -68,65 +91,65 @@ namespace DotNetCore.CAP

_logger.ConsumerExecuted(sp.Elapsed.TotalSeconds);

return OperateResult.Success;
return (false, OperateResult.Success);
}
catch (Exception ex)
{
_logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}");

await SetFailedState(message, ex, out bool stillRetry);
if (stillRetry)
{
await ExecuteAsync(message);
}

return OperateResult.Failed(ex);
return (await SetFailedState(message, ex), OperateResult.Failed(ex));
}
}

private Task SetSuccessfulState(CapReceivedMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);

return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}

private Task SetFailedState(CapReceivedMessage message, Exception ex, out bool stillRetry)
private async Task<bool> SetFailedState(CapReceivedMessage message, Exception ex)
{
IState newState = new FailedState();

if (ex is SubscriberNotFoundException)
{
stillRetry = false;
message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException
}
else
{
stillRetry = UpdateMessageForRetry(message);
if (stillRetry)
{
_logger.ConsumerExecutionFailedWillRetry(ex);
return Task.CompletedTask;
}
}

AddErrorReasonToContent(message, ex);

return _stateChanger.ChangeStateAsync(message, newState, _connection);
var needRetry = UpdateMessageForRetry(message);

await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);

return needRetry;
}

private static bool UpdateMessageForRetry(CapReceivedMessage message)
private bool UpdateMessageForRetry(CapReceivedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;

var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));

var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{
if (retries == _options.FailedRetryCount)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);

_logger.ConsumerExecutedAfterThreshold(message.Id, _options.FailedRetryCount);
}
catch (Exception ex)
{
_logger.ExecutedThresholdCallbackFailed(ex);
}
}
return false;
}

var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;
_logger.ConsumerExecutionRetrying(message.Id, retries);

return true;
}


+ 21
- 92
src/DotNetCore.CAP/LoggerExtensions.cs ファイルの表示

@@ -10,141 +10,70 @@ namespace DotNetCore.CAP
[SuppressMessage("ReSharper", "InconsistentNaming")]
internal static class LoggerExtensions
{
private static readonly Action<ILogger, Exception> _serverStarting;
private static readonly Action<ILogger, Exception> _processorsStartingError;
private static readonly Action<ILogger, Exception> _serverShuttingDown;
private static readonly Action<ILogger, string, Exception> _expectedOperationCanceledException;
private static readonly Action<ILogger, string, string, string, Exception> _modelBinderFormattingException;
private static readonly Action<ILogger, Exception> _consumerFailedWillRetry;
private static readonly Action<ILogger, double, Exception> _consumerExecuted;
private static readonly Action<ILogger, int, int, Exception> _senderRetrying;
private static readonly Action<ILogger, string, Exception> _exceptionOccuredWhileExecuting;
private static readonly Action<ILogger, double, Exception> _messageHasBeenSent;
private static readonly Action<ILogger, int, string, Exception> _messagePublishException;

static LoggerExtensions()
public static void ConsumerExecutedAfterThreshold(this ILogger logger, int messageId, int retries)
{
_serverStarting = LoggerMessage.Define(
LogLevel.Debug,
1,
"Starting the processing server.");

_processorsStartingError = LoggerMessage.Define(
LogLevel.Error,
5,
"Starting the processors throw an exception.");

_serverShuttingDown = LoggerMessage.Define(
LogLevel.Information,
2,
"Shutting down the processing server...");

_expectedOperationCanceledException = LoggerMessage.Define<string>(
LogLevel.Warning,
3,
"Expected an OperationCanceledException, but found '{ExceptionMessage}'.");

LoggerMessage.Define<string>(
LogLevel.Error,
5,
"Consumer method '{methodName}' failed to execute.");

LoggerMessage.Define<string>(
LogLevel.Error,
5,
"Received message topic method '{topicName}' failed to execute.");

_modelBinderFormattingException = LoggerMessage.Define<string, string, string>(
LogLevel.Error,
5,
"When call subscribe method, a parameter format conversion exception occurs. MethodName:'{MethodName}' ParameterName:'{ParameterName}' Content:'{Content}'."
);

_senderRetrying = LoggerMessage.Define<int, int>(
LogLevel.Debug,
3,
"The {Retries}th retrying send a message failed. message id: {MessageId} ");

_consumerExecuted = LoggerMessage.Define<double>(
LogLevel.Debug,
4,
"Consumer executed. Took: {Seconds} secs.");

_consumerFailedWillRetry = LoggerMessage.Define(
LogLevel.Warning,
2,
"Consumer failed to execute. Will retry.");

_exceptionOccuredWhileExecuting = LoggerMessage.Define<string>(
LogLevel.Error,
6,
"An exception occured while trying to store a message. message id: {MessageId}");
logger.LogWarning($"The Subscriber of the message({messageId}) still fails after {retries}th executions and we will stop retrying.");
}

_messageHasBeenSent = LoggerMessage.Define<double>(
LogLevel.Debug,
4,
"Message published. Took: {Seconds} secs.");
public static void SenderAfterThreshold(this ILogger logger, int messageId, int retries)
{
logger.LogWarning($"The Publisher of the message({messageId}) still fails after {retries}th sends and we will stop retrying.");
}

_messagePublishException = LoggerMessage.Define<int, string>(
LogLevel.Error,
6,
"An exception occured while publishing a message, reason:{Reason}. message id:{MessageId}");
public static void ExecutedThresholdCallbackFailed(this ILogger logger, Exception ex)
{
logger.LogWarning(ex, "FailedThresholdCallback action raised an exception:" + ex.Message);
}

public static void ConsumerExecutionFailedWillRetry(this ILogger logger, Exception ex)
public static void ConsumerExecutionRetrying(this ILogger logger, int messageId, int retries)
{
_consumerFailedWillRetry(logger, ex);
logger.LogWarning($"The {retries}th retrying consume a message failed. message id: {messageId}");
}

public static void SenderRetrying(this ILogger logger, int messageId, int retries)
{
_senderRetrying(logger, messageId, retries, null);
logger.LogWarning($"The {retries}th retrying send a message failed. message id: {messageId} ");
}

public static void MessageHasBeenSent(this ILogger logger, double seconds)
{
_messageHasBeenSent(logger, seconds, null);
logger.LogDebug($"Message published. Took: {seconds} secs.");
}

public static void MessagePublishException(this ILogger logger, int messageId, string reason, Exception ex)
{
_messagePublishException(logger, messageId, reason, ex);
logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}");
}

public static void ConsumerExecuted(this ILogger logger, double seconds)
{
_consumerExecuted(logger, seconds, null);
logger.LogDebug($"Consumer executed. Took: {seconds} secs.");
}

public static void ServerStarting(this ILogger logger)
{
_serverStarting(logger, null);
logger.LogInformation("Starting the processing server.");
}

public static void ProcessorsStartedError(this ILogger logger, Exception ex)
{
_processorsStartingError(logger, ex);
logger.LogError(ex, "Starting the processors throw an exception.");
}

public static void ServerShuttingDown(this ILogger logger)
{
_serverShuttingDown(logger, null);
logger.LogInformation("Shutting down the processing server...");
}

public static void ExpectedOperationCanceledException(this ILogger logger, Exception ex)
{
_expectedOperationCanceledException(logger, ex.Message, ex);
}

public static void ExceptionOccuredWhileExecuting(this ILogger logger, string messageId, Exception ex)
{
_exceptionOccuredWhileExecuting(logger, messageId, ex);
logger.LogWarning(ex, $"Expected an OperationCanceledException, but found '{ex.Message}'.");
}

public static void ModelBinderFormattingException(this ILogger logger, string methodName, string parameterName,
string content, Exception ex)
{
_modelBinderFormattingException(logger, methodName, parameterName, content, ex);
logger.LogError(ex, $"When call subscribe method, a parameter format conversion exception occurs. MethodName:'{methodName}' ParameterName:'{parameterName}' Content:'{content}'.");
}
}
}

+ 0
- 6
src/DotNetCore.CAP/Models/CapPublishedMessage.cs ファイルの表示

@@ -15,12 +15,6 @@ namespace DotNetCore.CAP.Models
Added = DateTime.Now;
}

public CapPublishedMessage(MessageContext message)
{
Name = message.Name;
Content = message.Content;
}

public int Id { get; set; }

public string Name { get; set; }


+ 7
- 120
src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs ファイルの表示

@@ -3,11 +3,7 @@

using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Processor
@@ -15,26 +11,18 @@ namespace DotNetCore.CAP.Processor
public class NeedRetryMessageProcessor : IProcessor
{
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger _logger;
private readonly CapOptions _options;
private readonly IPublishExecutor _publishExecutor;
private readonly IStateChanger _stateChanger;
private readonly IPublishMessageSender _publishMessageSender;
private readonly ISubscriberExecutor _subscriberExecutor;
private readonly TimeSpan _waitingInterval;

public NeedRetryMessageProcessor(
IOptions<CapOptions> options,
ILogger<NeedRetryMessageProcessor> logger,
IStateChanger stateChanger,
ISubscriberExecutor subscriberExecutor,
IPublishExecutor publishExecutor)
IPublishMessageSender publishMessageSender)
{
_options = options.Value;
_logger = logger;
_stateChanger = stateChanger;
_subscriberExecutor = subscriberExecutor;
_publishExecutor = publishExecutor;
_waitingInterval = TimeSpan.FromSeconds(_options.FailedRetryInterval);
_publishMessageSender = publishMessageSender;
_waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval);
}

public async Task ProcessAsync(ProcessingContext context)
@@ -56,57 +44,10 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context)
{
var messages = await connection.GetPublishedMessagesOfNeedRetry();
var hasException = false;

foreach (var message in messages)
{
if (message.Retries > _options.FailedRetryCount)
{
continue;
}

using (var transaction = connection.CreateTransaction())
{
var result = await _publishExecutor.PublishAsync(message.Name, message.Content);
if (result.Succeeded)
{
_stateChanger.ChangeState(message, new SucceededState(), transaction);
_logger.LogInformation("The message was sent successfully during the retry. MessageId:" + message.Id);
}
else
{
message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
message.Retries++;
if (message.StatusName == StatusName.Scheduled)
{
message.ExpiresAt = GetDueTime(message.Added, message.Retries);
message.StatusName = StatusName.Failed;
}
transaction.UpdateMessage(message);

if (message.Retries >= _options.FailedRetryCount)
{
_logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " +
"MessageId:" + message.Id);
if (message.Retries == _options.FailedRetryCount)
{
if (!hasException)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
}
}
}
await transaction.CommitAsync();
}
await _publishMessageSender.SendAsync(message);

context.ThrowIfStopping();

@@ -117,69 +58,15 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessReceivedAsync(IStorageConnection connection, ProcessingContext context)
{
var messages = await connection.GetReceivedMessagesOfNeedRetry();
var hasException = false;

foreach (var message in messages)
{
if (message.Retries > _options.FailedRetryCount)
{
continue;
}

using (var transaction = connection.CreateTransaction())
{
var result = await _subscriberExecutor.ExecuteAsync(message);
if (result.Succeeded)
{
_stateChanger.ChangeState(message, new SucceededState(), transaction);
_logger.LogInformation("The message was execute successfully during the retry. MessageId:" + message.Id);
}
else
{
message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
message.Retries++;
if (message.StatusName == StatusName.Scheduled)
{
message.ExpiresAt = GetDueTime(message.Added, message.Retries);
message.StatusName = StatusName.Failed;
}
transaction.UpdateMessage(message);

if (message.Retries >= _options.FailedRetryCount)
{
_logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " +
"We will stop retrying to execute the message. message id:" + message.Id);

if (message.Retries == _options.FailedRetryCount)
{
if (!hasException)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
}
}
}
await transaction.CommitAsync();
}
await _subscriberExecutor.ExecuteAsync(message);

context.ThrowIfStopping();

await context.WaitAsync(_delay);
}
}

public DateTime GetDueTime(DateTime addedTime, int retries)
{
var retryBehavior = RetryBehavior.DefaultRetry;
return addedTime.AddSeconds(retryBehavior.RetryIn(retries));
}
}
}
}

読み込み中…
キャンセル
保存