Browse Source

Release 2.2.2 (#121)

* fixed message enqueue exception in v2.2

* add log to samples

* Improved log output.  #114

* add default timeout configuration for kafka client.

* fixed retry processor bugs.

* Fixed kafka producer exception log without logging when publish message.

* update version num to 2.2.2

* rename configuration options FailedCallback to FailedThresholdCallback

* rename files name.

* remove unused files.

* modify the error comments.

* update samples.

* add logs.
master
Savorboard 6 years ago
committed by GitHub
parent
commit
3a8bcf0324
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 198 additions and 110 deletions
  1. +1
    -1
      build/version.props
  2. +1
    -1
      samples/Sample.Kafka.MySql/Controllers/ValuesController.cs
  3. +6
    -1
      samples/Sample.Kafka.MySql/Program.cs
  4. +6
    -0
      samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj
  5. +26
    -0
      samples/Sample.Kafka.MySql/nlog.config
  6. +2
    -7
      samples/Sample.RabbitMQ.MySql/AppDbContext.cs
  7. +2
    -2
      samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs
  8. +7
    -8
      samples/Sample.RabbitMQ.MySql/Program.cs
  9. +6
    -0
      samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj
  10. +26
    -0
      samples/Sample.RabbitMQ.MySql/nlog.config
  11. +2
    -0
      src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs
  12. +0
    -0
      src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs
  13. +2
    -6
      src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs
  14. +0
    -0
      src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
  15. +1
    -1
      src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
  16. +2
    -2
      src/DotNetCore.CAP/CAP.Options.cs
  17. +2
    -2
      src/DotNetCore.CAP/ICapPublisher.cs
  18. +1
    -1
      src/DotNetCore.CAP/ICapSubscribe.cs
  19. +2
    -0
      src/DotNetCore.CAP/IConsumerHandler.Default.cs
  20. +0
    -19
      src/DotNetCore.CAP/IFetchedMessage.cs
  21. +13
    -20
      src/DotNetCore.CAP/IPublishMessageSender.Base.cs
  22. +0
    -1
      src/DotNetCore.CAP/Infrastructure/Helper.cs
  23. +4
    -0
      src/DotNetCore.CAP/Internal/PublisherSentFailedException.cs
  24. +11
    -11
      src/DotNetCore.CAP/LoggerExtensions.cs
  25. +75
    -27
      src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs

+ 1
- 1
build/version.props View File

@@ -2,7 +2,7 @@
<PropertyGroup> <PropertyGroup>
<VersionMajor>2</VersionMajor> <VersionMajor>2</VersionMajor>
<VersionMinor>2</VersionMinor> <VersionMinor>2</VersionMinor>
<VersionPatch>1</VersionPatch>
<VersionPatch>2</VersionPatch>
<VersionQuality></VersionQuality> <VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix> <VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup> </PropertyGroup>


+ 1
- 1
samples/Sample.Kafka.MySql/Controllers/ValuesController.cs View File

@@ -37,7 +37,7 @@ namespace Sample.Kafka.MySql.Controllers
[CapSubscribe("xxx.xxx.test2")] [CapSubscribe("xxx.xxx.test2")]
public void Test2(int value) public void Test2(int value)
{ {
Console.WriteLine(value);
Console.WriteLine("Subscriber output message: " + value);
} }
} }
} }

+ 6
- 1
samples/Sample.Kafka.MySql/Program.cs View File

@@ -1,5 +1,6 @@
using Microsoft.AspNetCore; using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using NLog.Web;


namespace Sample.Kafka.MySql namespace Sample.Kafka.MySql
{ {
@@ -14,7 +15,11 @@ namespace Sample.Kafka.MySql
public static IWebHost BuildWebHost(string[] args) => public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args) WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>() .UseStartup<Startup>()
.ConfigureLogging((hostingContext, builder) =>
{
hostingContext.HostingEnvironment.ConfigureNLog("nlog.config");
})
.UseNLog()
.Build(); .Build();

} }
} }

+ 6
- 0
samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj View File

@@ -10,6 +10,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.6" /> <PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.6" />
<PackageReference Include="MySqlConnector" Version="0.38.0" /> <PackageReference Include="MySqlConnector" Version="0.38.0" />
<PackageReference Include="NLog.Web.AspNetCore" Version="4.5.2" />
<PackageReference Include="zipkin4net" Version="1.2.0" /> <PackageReference Include="zipkin4net" Version="1.2.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
@@ -20,5 +21,10 @@
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Content Update="nlog.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>


</Project> </Project>

+ 26
- 0
samples/Sample.Kafka.MySql/nlog.config View File

@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="utf-8"?>

<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
internalLogLevel="Warn"
internalLogFile="logs/internal-nlog.txt">

<variable name="myLogLayout"
value="---------------------------------------------------------------------------${newline}Date:${longdate} Level:${uppercase:${level}} User:${aspnet-user-identity}(${aspnet-request-ip}) Logger:${logger} URL:${aspnet-request-method} ${aspnet-request-url:IncludePort=true:IncludeQueryString=true} Action:${aspnet-mvc-action} ${newline}Message:${message} ${newline}${onexception:Exception:${exception:format=toString}}" />
<extensions>
<add assembly="NLog.Web.AspNetCore" />
</extensions>

<!-- define various log targets -->
<targets>
<!-- write logs to file -->
<target name="allfile" xsi:type="File" fileName="logs/cap-all-${shortdate}.log"
layout="${myLogLayout}" />
</targets>

<rules>
<!--All logs, including from Microsoft-->
<logger name="*" minlevel="Debug" writeTo="allfile" />
</rules>
</nlog>

+ 2
- 7
samples/Sample.RabbitMQ.MySql/AppDbContext.cs View File

@@ -1,8 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;


namespace Sample.RabbitMQ.MySql namespace Sample.RabbitMQ.MySql
{ {
@@ -10,8 +6,7 @@ namespace Sample.RabbitMQ.MySql
{ {
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{ {
optionsBuilder.UseMySql("Server=localhost;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;Allow User Variables=True");
//optionsBuilder.UseMySql("Server=192.168.2.206;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;");
optionsBuilder.UseMySql("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;");
} }
} }
} }

+ 2
- 2
samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs View File

@@ -21,7 +21,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
public IActionResult PublishMessage() public IActionResult PublishMessage()
{ {
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
return Ok(); return Ok();
} }


@@ -50,7 +50,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
[CapSubscribe("sample.rabbitmq.mysql")] [CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage(DateTime time) public void ReceiveMessage(DateTime time)
{ {
Console.WriteLine("[sample.rabbitmq.mysql] message received: "+ DateTime.Now.ToString() +" , sent time: " + time.ToString());
Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now + ",sent time: " + time);
} }
} }
} }

+ 7
- 8
samples/Sample.RabbitMQ.MySql/Program.cs View File

@@ -1,12 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using NLog.Web;


namespace Sample.RabbitMQ.MySql namespace Sample.RabbitMQ.MySql
{ {
@@ -20,6 +14,11 @@ namespace Sample.RabbitMQ.MySql
public static IWebHost BuildWebHost(string[] args) => public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args) WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>() .UseStartup<Startup>()
.ConfigureLogging((hostingContext, builder) =>
{
hostingContext.HostingEnvironment.ConfigureNLog("nlog.config");
})
.UseNLog()
.Build(); .Build();
} }
} }

+ 6
- 0
samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj View File

@@ -12,6 +12,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.6" /> <PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.6" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.0.1" /> <PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.0.1" />
<PackageReference Include="NLog.Web.AspNetCore" Version="4.5.2" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" /> <DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
@@ -21,5 +22,10 @@
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Content Update="nlog.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>


</Project> </Project>

+ 26
- 0
samples/Sample.RabbitMQ.MySql/nlog.config View File

@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="utf-8"?>

<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
internalLogLevel="Warn"
internalLogFile="logs/internal-nlog.txt">

<variable name="myLogLayout"
value="---------------------------------------------------------------------------${newline}Date:${longdate} Level:${uppercase:${level}} User:${aspnet-user-identity}(${aspnet-request-ip}) Logger:${logger} URL:${aspnet-request-method} ${aspnet-request-url:IncludePort=true:IncludeQueryString=true} Action:${aspnet-mvc-action} ${newline}Message:${message} ${newline}${onexception:Exception:${exception:format=toString}}" />
<extensions>
<add assembly="NLog.Web.AspNetCore" />
</extensions>

<!-- define various log targets -->
<targets>
<!-- write logs to file -->
<target name="allfile" xsi:type="File" fileName="logs/cap-all-${shortdate}.log"
layout="${myLogLayout}" />
</targets>

<rules>
<!--All logs, including from Microsoft-->
<logger name="*" minlevel="Debug" writeTo="allfile" />
</rules>
</nlog>

+ 2
- 0
src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs View File

@@ -57,6 +57,8 @@ namespace DotNetCore.CAP
MainConfig["socket.blocking.max.ms"] = "10"; MainConfig["socket.blocking.max.ms"] = "10";
MainConfig["enable.auto.commit"] = "false"; MainConfig["enable.auto.commit"] = "false";
MainConfig["log.connection.close"] = "false"; MainConfig["log.connection.close"] = "false";
MainConfig["request.timeout.ms"] = "3000";
MainConfig["message.timeout.ms"] = "5000";


_kafkaConfig = MainConfig.AsEnumerable(); _kafkaConfig = MainConfig.AsEnumerable();
} }


src/DotNetCore.CAP.Kafka/ConnectionPool.cs → src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs View File


+ 2
- 6
src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs View File

@@ -36,11 +36,7 @@ namespace DotNetCore.CAP.Kafka


if (message.Error.HasError) if (message.Error.HasError)
{ {
return OperateResult.Failed(new OperateError
{
Code = message.Error.Code.ToString(),
Description = message.Error.Reason
});
throw new PublisherSentFailedException(message.Error.ToString());
} }


_logger.LogDebug($"kafka topic message [{keyName}] has been published."); _logger.LogDebug($"kafka topic message [{keyName}] has been published.");
@@ -61,6 +57,6 @@ namespace DotNetCore.CAP.Kafka
producer.Dispose(); producer.Dispose();
} }
} }
}
}
} }
} }

src/DotNetCore.CAP.RabbitMQ/ConnectionChannelPool.cs → src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs View File


+ 1
- 1
src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs View File

@@ -132,7 +132,7 @@ namespace DotNetCore.CAP.Abstractions
{ {
throw new InvalidOperationException( throw new InvalidOperationException(
"If you are using the EntityFramework, you need to configure the DbContextType first." + "If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
" otherwise you need to use overloaded method with IDbTransaction.");
} }
} }




+ 2
- 2
src/DotNetCore.CAP/CAP.Options.cs View File

@@ -58,9 +58,9 @@ namespace DotNetCore.CAP
public int FailedRetryInterval { get; set; } public int FailedRetryInterval { get; set; }


/// <summary> /// <summary>
/// We’ll invoke this call-back with message type,name,content when requeue failed message.
/// We’ll invoke this call-back with message type,name,content when retry failed (send or executed) messages equals <see cref="FailedRetryCount"/> times.
/// </summary> /// </summary>
public Action<MessageType, string, string> FailedCallback { get; set; }
public Action<MessageType, string, string> FailedThresholdCallback { get; set; }


/// <summary> /// <summary>
/// The number of message retries, the retry will stop when the threshold is reached. /// The number of message retries, the retry will stop when the threshold is reached.


+ 2
- 2
src/DotNetCore.CAP/ICapPublisher.cs View File

@@ -15,7 +15,7 @@ namespace DotNetCore.CAP
/// (EntityFramework) Asynchronous publish a object message. /// (EntityFramework) Asynchronous publish a object message.
/// <para> /// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first. /// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
/// otherwise you need to use overloaded method with IDbTransaction.
/// </para> /// </para>
/// </summary> /// </summary>
/// <typeparam name="T">The type of content object.</typeparam> /// <typeparam name="T">The type of content object.</typeparam>
@@ -28,7 +28,7 @@ namespace DotNetCore.CAP
/// (EntityFramework) Publish a object message. /// (EntityFramework) Publish a object message.
/// <para> /// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first. /// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
/// otherwise you need to use overloaded method with IDbTransaction.
/// </para> /// </para>
/// </summary> /// </summary>
/// <typeparam name="T">The type of content object.</typeparam> /// <typeparam name="T">The type of content object.</typeparam>


+ 1
- 1
src/DotNetCore.CAP/ICapSubscribe.cs View File

@@ -4,7 +4,7 @@
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
/// <summary> /// <summary>
/// An empty interface, which is used to mark the current class have a CAP methods.
/// An empty interface, which is used to mark the current class have a CAP subscriber methods.
/// </summary> /// </summary>
public interface ICapSubscribe public interface ICapSubscribe
{ {


+ 2
- 0
src/DotNetCore.CAP/IConsumerHandler.Default.cs View File

@@ -178,6 +178,8 @@ namespace DotNetCore.CAP


private (Guid, string) TracingBefore(string topic, string values) private (Guid, string) TracingBefore(string topic, string values)
{ {
_logger.LogDebug("CAP received topic message:" + topic);

Guid operationId = Guid.NewGuid(); Guid operationId = Guid.NewGuid();


var eventData = new BrokerConsumeEventData( var eventData = new BrokerConsumeEventData(


+ 0
- 19
src/DotNetCore.CAP/IFetchedMessage.cs View File

@@ -1,19 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP
{
public interface IFetchedMessage : IDisposable
{
int MessageId { get; }

MessageType MessageType { get; }

void RemoveFromQueue();

void Requeue();
}
}

+ 13
- 20
src/DotNetCore.CAP/IPublishMessageSender.Base.cs View File

@@ -67,15 +67,13 @@ namespace DotNetCore.CAP
} }
else else
{ {
TracingError(operationId, message.Name, sendValues, result.Exception, startTime, stopwatch.Elapsed);

_logger.MessagePublishException(message.Id, result.Exception);
TracingError(operationId, message, result, startTime, stopwatch.Elapsed);


await SetFailedState(message, result.Exception, out bool stillRetry); await SetFailedState(message, result.Exception, out bool stillRetry);


if (stillRetry) if (stillRetry)
{ {
_logger.SenderRetrying(3);
_logger.SenderRetrying(message.Id, message.Retries);


await SendAsync(message); await SendAsync(message);
} }
@@ -109,20 +107,11 @@ namespace DotNetCore.CAP
private Task SetFailedState(CapPublishedMessage message, Exception ex, out bool stillRetry) private Task SetFailedState(CapPublishedMessage message, Exception ex, out bool stillRetry)
{ {
IState newState = new FailedState(); IState newState = new FailedState();

if (ex is PublisherSentFailedException)
{
stillRetry = false;
message.Retries = _options.FailedRetryCount; // not retry if PublisherSentFailedException
}
else
stillRetry = UpdateMessageForRetryAsync(message);
if (stillRetry)
{ {
stillRetry = UpdateMessageForRetryAsync(message);
if (stillRetry)
{
_logger.ConsumerExecutionFailedWillRetry(ex);
return Task.CompletedTask;
}
_logger.ConsumerExecutionFailedWillRetry(ex);
return Task.CompletedTask;
} }


AddErrorReasonToContent(message, ex); AddErrorReasonToContent(message, ex);
@@ -166,14 +155,18 @@ namespace DotNetCore.CAP
_logger.MessageHasBeenSent(du.TotalSeconds); _logger.MessageHasBeenSent(du.TotalSeconds);
} }


private void TracingError(Guid operationId, string topic, string values, Exception ex, DateTimeOffset startTime, TimeSpan du)
private void TracingError(Guid operationId, CapPublishedMessage message, OperateResult result, DateTimeOffset startTime, TimeSpan du)
{ {
var ex = new PublisherSentFailedException(result.ToString(), result.Exception);

_logger.MessagePublishException(message.Id, result.ToString(), ex);

var eventData = new BrokerPublishErrorEventData( var eventData = new BrokerPublishErrorEventData(
operationId, operationId,
"", "",
ServersAddress, ServersAddress,
topic,
values,
message.Name,
message.Content,
ex, ex,
startTime, startTime,
du); du);


+ 0
- 1
src/DotNetCore.CAP/Infrastructure/Helper.cs View File

@@ -86,7 +86,6 @@ namespace DotNetCore.CAP.Infrastructure
return !CanConvertFromString(type); return !CanConvertFromString(type);
} }



public static string AddExceptionProperty(string json, Exception exception) public static string AddExceptionProperty(string json, Exception exception)
{ {
var jObject = ToJObject(exception); var jObject = ToJObject(exception);


+ 4
- 0
src/DotNetCore.CAP/Internal/PublisherSentFailedException.cs View File

@@ -7,6 +7,10 @@ namespace DotNetCore.CAP.Internal
{ {
public class PublisherSentFailedException : Exception public class PublisherSentFailedException : Exception
{ {
public PublisherSentFailedException(string message) : base(message)
{
}

public PublisherSentFailedException(string message, Exception ex) : base(message, ex) public PublisherSentFailedException(string message, Exception ex) : base(message, ex)
{ {
} }


+ 11
- 11
src/DotNetCore.CAP/LoggerExtensions.cs View File

@@ -17,10 +17,10 @@ namespace DotNetCore.CAP
private static readonly Action<ILogger, string, string, string, Exception> _modelBinderFormattingException; private static readonly Action<ILogger, string, string, string, Exception> _modelBinderFormattingException;
private static readonly Action<ILogger, Exception> _consumerFailedWillRetry; private static readonly Action<ILogger, Exception> _consumerFailedWillRetry;
private static readonly Action<ILogger, double, Exception> _consumerExecuted; private static readonly Action<ILogger, double, Exception> _consumerExecuted;
private static readonly Action<ILogger, int, Exception> _senderRetrying;
private static readonly Action<ILogger, int, int, Exception> _senderRetrying;
private static readonly Action<ILogger, string, Exception> _exceptionOccuredWhileExecuting; private static readonly Action<ILogger, string, Exception> _exceptionOccuredWhileExecuting;
private static readonly Action<ILogger, double, Exception> _messageHasBeenSent; private static readonly Action<ILogger, double, Exception> _messageHasBeenSent;
private static readonly Action<ILogger, int, Exception> _messagePublishException;
private static readonly Action<ILogger, int, string, Exception> _messagePublishException;


static LoggerExtensions() static LoggerExtensions()
{ {
@@ -60,10 +60,10 @@ namespace DotNetCore.CAP
"When call subscribe method, a parameter format conversion exception occurs. MethodName:'{MethodName}' ParameterName:'{ParameterName}' Content:'{Content}'." "When call subscribe method, a parameter format conversion exception occurs. MethodName:'{MethodName}' ParameterName:'{ParameterName}' Content:'{Content}'."
); );


_senderRetrying = LoggerMessage.Define<int>(
_senderRetrying = LoggerMessage.Define<int, int>(
LogLevel.Debug, LogLevel.Debug,
3, 3,
"Retrying send a message: {Retries}...");
"The {Retries}th retrying send a message failed. message id: {MessageId} ");


_consumerExecuted = LoggerMessage.Define<double>( _consumerExecuted = LoggerMessage.Define<double>(
LogLevel.Debug, LogLevel.Debug,
@@ -78,17 +78,17 @@ namespace DotNetCore.CAP
_exceptionOccuredWhileExecuting = LoggerMessage.Define<string>( _exceptionOccuredWhileExecuting = LoggerMessage.Define<string>(
LogLevel.Error, LogLevel.Error,
6, 6,
"An exception occured while trying to store a message: '{MessageId}'. ");
"An exception occured while trying to store a message. message id: {MessageId}");


_messageHasBeenSent = LoggerMessage.Define<double>( _messageHasBeenSent = LoggerMessage.Define<double>(
LogLevel.Debug, LogLevel.Debug,
4, 4,
"Message published. Took: {Seconds} secs."); "Message published. Took: {Seconds} secs.");


_messagePublishException = LoggerMessage.Define<int>(
_messagePublishException = LoggerMessage.Define<int, string>(
LogLevel.Error, LogLevel.Error,
6, 6,
"An exception occured while publishing a message: '{MessageId}'. ");
"An exception occured while publishing a message, reason:{Reason}. message id:{MessageId}");
} }


public static void ConsumerExecutionFailedWillRetry(this ILogger logger, Exception ex) public static void ConsumerExecutionFailedWillRetry(this ILogger logger, Exception ex)
@@ -96,9 +96,9 @@ namespace DotNetCore.CAP
_consumerFailedWillRetry(logger, ex); _consumerFailedWillRetry(logger, ex);
} }


public static void SenderRetrying(this ILogger logger, int retries)
public static void SenderRetrying(this ILogger logger, int messageId, int retries)
{ {
_senderRetrying(logger, retries, null);
_senderRetrying(logger, messageId, retries, null);
} }


public static void MessageHasBeenSent(this ILogger logger, double seconds) public static void MessageHasBeenSent(this ILogger logger, double seconds)
@@ -106,9 +106,9 @@ namespace DotNetCore.CAP
_messageHasBeenSent(logger, seconds, null); _messageHasBeenSent(logger, seconds, null);
} }


public static void MessagePublishException(this ILogger logger, int messageId, Exception ex)
public static void MessagePublishException(this ILogger logger, int messageId, string reason, Exception ex)
{ {
_messagePublishException(logger, messageId, ex);
_messagePublishException(logger, messageId, reason, ex);
} }


public static void ConsumerExecuted(this ILogger logger, double seconds) public static void ConsumerExecuted(this ILogger logger, double seconds)


+ 75
- 27
src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs View File

@@ -65,33 +65,46 @@ namespace DotNetCore.CAP.Processor
continue; continue;
} }


if (!hasException)
{
try
{
_options.FailedCallback?.Invoke(MessageType.Publish, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}

using (var transaction = connection.CreateTransaction()) using (var transaction = connection.CreateTransaction())
{ {
try
var result = await _publishExecutor.PublishAsync(message.Name, message.Content);
if (result.Succeeded)
{ {
await _publishExecutor.PublishAsync(message.Name, message.Content);

_stateChanger.ChangeState(message, new SucceededState(), transaction); _stateChanger.ChangeState(message, new SucceededState(), transaction);
_logger.LogInformation("The message was sent successfully during the retry. MessageId:" + message.Id);
} }
catch (Exception e)
else
{ {
message.Content = Helper.AddExceptionProperty(message.Content, e);
message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
message.Retries++;
if (message.StatusName == StatusName.Scheduled)
{
message.ExpiresAt = GetDueTime(message.Added, message.Retries);
message.StatusName = StatusName.Failed;
}
transaction.UpdateMessage(message); transaction.UpdateMessage(message);
}


if (message.Retries >= _options.FailedRetryCount)
{
_logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " +
"MessageId:" + message.Id);
if (message.Retries == _options.FailedRetryCount)
{
if (!hasException)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
}
}
}
await transaction.CommitAsync(); await transaction.CommitAsync();
} }


@@ -113,25 +126,60 @@ namespace DotNetCore.CAP.Processor
continue; continue;
} }


if (!hasException)
using (var transaction = connection.CreateTransaction())
{ {
try
var result = await _subscriberExecutor.ExecuteAsync(message);
if (result.Succeeded)
{ {
_options.FailedCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
_stateChanger.ChangeState(message, new SucceededState(), transaction);
_logger.LogInformation("The message was execute successfully during the retry. MessageId:" + message.Id);
} }
catch (Exception ex)
else
{ {
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
message.Content = Helper.AddExceptionProperty(message.Content, result.Exception);
message.Retries++;
if (message.StatusName == StatusName.Scheduled)
{
message.ExpiresAt = GetDueTime(message.Added, message.Retries);
message.StatusName = StatusName.Failed;
}
transaction.UpdateMessage(message);

if (message.Retries >= _options.FailedRetryCount)
{
_logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " +
"We will stop retrying to execute the message. message id:" + message.Id);

if (message.Retries == _options.FailedRetryCount)
{
if (!hasException)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
}
}
} }
await transaction.CommitAsync();
} }


await _subscriberExecutor.ExecuteAsync(message);

context.ThrowIfStopping(); context.ThrowIfStopping();


await context.WaitAsync(_delay); await context.WaitAsync(_delay);
} }
} }

public DateTime GetDueTime(DateTime addedTime, int retries)
{
var retryBehavior = RetryBehavior.DefaultRetry;
return addedTime.AddSeconds(retryBehavior.RetryIn(retries));
}
} }
} }

Loading…
Cancel
Save