@@ -37,7 +37,7 @@ namespace Sample.Kafka.MySql.Controllers | |||
[CapSubscribe("xxx.xxx.test2")] | |||
public void Test2(int value) | |||
{ | |||
Console.WriteLine(value); | |||
Console.WriteLine("Subscriber output message: " + value); | |||
} | |||
} | |||
} |
@@ -1,4 +1,5 @@ | |||
using Microsoft.AspNetCore.Builder; | |||
using System; | |||
using Microsoft.AspNetCore.Builder; | |||
using Microsoft.Extensions.DependencyInjection; | |||
namespace Sample.Kafka.MySql | |||
@@ -7,7 +7,7 @@ | |||
internalLogFile="logs/internal-nlog.txt"> | |||
<variable name="myLogLayout" | |||
value="---------------------------------------------------------------------------${newline}日期:${longdate} 级别:${uppercase:${level}} 用户:${aspnet-user-identity}(${aspnet-request-ip}) 记录器:${logger} ${newline}URL:${aspnet-request-method} ${aspnet-request-url:IncludePort=true:IncludeQueryString=true} ${newline}Action:${aspnet-mvc-action} ${newline}Message:${message} ${newline}${onexception:Exception:${exception:format=toString}}" /> | |||
value="---------------------------------------------------------------------------${newline}Date:${longdate} Level:${uppercase:${level}} User:${aspnet-user-identity}(${aspnet-request-ip}) Logger:${logger} URL:${aspnet-request-method} ${aspnet-request-url:IncludePort=true:IncludeQueryString=true} Action:${aspnet-mvc-action} ${newline}Message:${message} ${newline}${onexception:Exception:${exception:format=toString}}" /> | |||
<extensions> | |||
<add assembly="NLog.Web.AspNetCore" /> | |||
</extensions> | |||
@@ -1,8 +1,4 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using System.Threading.Tasks; | |||
using Microsoft.EntityFrameworkCore; | |||
using Microsoft.EntityFrameworkCore; | |||
namespace Sample.RabbitMQ.MySql | |||
{ | |||
@@ -10,8 +6,7 @@ namespace Sample.RabbitMQ.MySql | |||
{ | |||
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) | |||
{ | |||
optionsBuilder.UseMySql("Server=localhost;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;Allow User Variables=True"); | |||
//optionsBuilder.UseMySql("Server=192.168.2.206;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;"); | |||
optionsBuilder.UseMySql("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;"); | |||
} | |||
} | |||
} |
@@ -21,7 +21,7 @@ namespace Sample.RabbitMQ.MySql.Controllers | |||
public IActionResult PublishMessage() | |||
{ | |||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||
return Ok(); | |||
} | |||
@@ -50,7 +50,7 @@ namespace Sample.RabbitMQ.MySql.Controllers | |||
[CapSubscribe("sample.rabbitmq.mysql")] | |||
public void ReceiveMessage(DateTime time) | |||
{ | |||
Console.WriteLine("[sample.rabbitmq.mysql] message received: "+ DateTime.Now.ToString() +" , sent time: " + time.ToString()); | |||
Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now + ",sent time: " + time); | |||
} | |||
} | |||
} |
@@ -1,12 +1,6 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.IO; | |||
using System.Linq; | |||
using System.Threading.Tasks; | |||
using Microsoft.AspNetCore; | |||
using Microsoft.AspNetCore.Builder; | |||
using Microsoft.AspNetCore; | |||
using Microsoft.AspNetCore.Hosting; | |||
using Microsoft.Extensions.Configuration; | |||
using NLog.Web; | |||
namespace Sample.RabbitMQ.MySql | |||
{ | |||
@@ -20,6 +14,11 @@ namespace Sample.RabbitMQ.MySql | |||
public static IWebHost BuildWebHost(string[] args) => | |||
WebHost.CreateDefaultBuilder(args) | |||
.UseStartup<Startup>() | |||
.ConfigureLogging((hostingContext, builder) => | |||
{ | |||
hostingContext.HostingEnvironment.ConfigureNLog("nlog.config"); | |||
}) | |||
.UseNLog() | |||
.Build(); | |||
} | |||
} |
@@ -12,6 +12,7 @@ | |||
<ItemGroup> | |||
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.6" /> | |||
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.0.1" /> | |||
<PackageReference Include="NLog.Web.AspNetCore" Version="4.5.2" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" /> | |||
@@ -21,5 +22,10 @@ | |||
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" /> | |||
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<Content Update="nlog.config"> | |||
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> | |||
</Content> | |||
</ItemGroup> | |||
</Project> |
@@ -0,0 +1,26 @@ | |||
<?xml version="1.0" encoding="utf-8"?> | |||
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" | |||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |||
autoReload="true" | |||
internalLogLevel="Warn" | |||
internalLogFile="logs/internal-nlog.txt"> | |||
<variable name="myLogLayout" | |||
value="---------------------------------------------------------------------------${newline}Date:${longdate} Level:${uppercase:${level}} User:${aspnet-user-identity}(${aspnet-request-ip}) Logger:${logger} URL:${aspnet-request-method} ${aspnet-request-url:IncludePort=true:IncludeQueryString=true} Action:${aspnet-mvc-action} ${newline}Message:${message} ${newline}${onexception:Exception:${exception:format=toString}}" /> | |||
<extensions> | |||
<add assembly="NLog.Web.AspNetCore" /> | |||
</extensions> | |||
<!-- define various log targets --> | |||
<targets> | |||
<!-- write logs to file --> | |||
<target name="allfile" xsi:type="File" fileName="logs/cap-all-${shortdate}.log" | |||
layout="${myLogLayout}" /> | |||
</targets> | |||
<rules> | |||
<!--All logs, including from Microsoft--> | |||
<logger name="*" minlevel="Debug" writeTo="allfile" /> | |||
</rules> | |||
</nlog> |
@@ -58,9 +58,9 @@ namespace DotNetCore.CAP | |||
public int FailedRetryInterval { get; set; } | |||
/// <summary> | |||
/// We’ll invoke this call-back with message type,name,content when requeue failed message. | |||
/// We’ll invoke this call-back with message type,name,content when retry failed (send or executed) messages equals <see cref="FailedRetryCount"/> times. | |||
/// </summary> | |||
public Action<MessageType, string, string> FailedCallback { get; set; } | |||
public Action<MessageType, string, string> FailedThresholdCallback { get; set; } | |||
/// <summary> | |||
/// The number of message retries, the retry will stop when the threshold is reached. | |||
@@ -178,6 +178,8 @@ namespace DotNetCore.CAP | |||
private (Guid, string) TracingBefore(string topic, string values) | |||
{ | |||
_logger.LogDebug("CAP received topic message:" + topic); | |||
Guid operationId = Guid.NewGuid(); | |||
var eventData = new BrokerConsumeEventData( | |||
@@ -65,19 +65,6 @@ namespace DotNetCore.CAP.Processor | |||
continue; | |||
} | |||
if (!hasException) | |||
{ | |||
try | |||
{ | |||
_options.FailedCallback?.Invoke(MessageType.Publish, message.Name, message.Content); | |||
} | |||
catch (Exception ex) | |||
{ | |||
hasException = true; | |||
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message); | |||
} | |||
} | |||
using (var transaction = connection.CreateTransaction()) | |||
{ | |||
var result = await _publishExecutor.PublishAsync(message.Name, message.Content); | |||
@@ -101,6 +88,21 @@ namespace DotNetCore.CAP.Processor | |||
{ | |||
_logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " + | |||
"MessageId:" + message.Id); | |||
if (message.Retries == _options.FailedRetryCount) | |||
{ | |||
if (!hasException) | |||
{ | |||
try | |||
{ | |||
_options.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Name, message.Content); | |||
} | |||
catch (Exception ex) | |||
{ | |||
hasException = true; | |||
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message); | |||
} | |||
} | |||
} | |||
} | |||
} | |||
await transaction.CommitAsync(); | |||
@@ -124,19 +126,6 @@ namespace DotNetCore.CAP.Processor | |||
continue; | |||
} | |||
if (!hasException) | |||
{ | |||
try | |||
{ | |||
_options.FailedCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content); | |||
} | |||
catch (Exception ex) | |||
{ | |||
hasException = true; | |||
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message); | |||
} | |||
} | |||
using (var transaction = connection.CreateTransaction()) | |||
{ | |||
var result = await _subscriberExecutor.ExecuteAsync(message); | |||
@@ -160,6 +149,22 @@ namespace DotNetCore.CAP.Processor | |||
{ | |||
_logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " + | |||
"We will stop retrying to execute the message. message id:" + message.Id); | |||
if (message.Retries == _options.FailedRetryCount) | |||
{ | |||
if (!hasException) | |||
{ | |||
try | |||
{ | |||
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content); | |||
} | |||
catch (Exception ex) | |||
{ | |||
hasException = true; | |||
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message); | |||
} | |||
} | |||
} | |||
} | |||
} | |||
await transaction.CommitAsync(); | |||