@@ -10,7 +10,7 @@ namespace Sample.Kafka.InMemory | |||
services.AddCap(x => | |||
{ | |||
x.UseInMemoryStorage(); | |||
x.UseKafka("192.168.2.120:9093"); | |||
x.UseKafka("localhost:9092"); | |||
x.UseDashboard(); | |||
}); | |||
@@ -1,7 +1,7 @@ | |||
using System; | |||
using DotNetCore.CAP.Messages; | |||
using DotNetCore.CAP.Messages; | |||
using Microsoft.AspNetCore.Builder; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Logging; | |||
namespace Sample.RabbitMQ.MySql | |||
{ | |||
@@ -17,10 +17,11 @@ namespace Sample.RabbitMQ.MySql | |||
x.UseRabbitMQ("localhost"); | |||
x.UseDashboard(); | |||
x.FailedRetryCount = 5; | |||
x.FailedThresholdCallback = (type, msg) => | |||
x.FailedThresholdCallback = failed => | |||
{ | |||
Console.WriteLine( | |||
$@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {msg.GetName()}"); | |||
var logger = failed.ServiceProvider.GetService<ILogger<Startup>>(); | |||
logger.LogError($@"A message of type {failed.MessageType} failed after executing {x.FailedRetryCount} several times, | |||
requiring manual troubleshooting. Message name: {failed.Message.GetName()}"); | |||
}; | |||
}); | |||
@@ -2,6 +2,7 @@ | |||
using DotNetCore.CAP.Messages; | |||
using Microsoft.AspNetCore.Builder; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Logging; | |||
namespace Sample.RabbitMQ.SqlServer | |||
{ | |||
@@ -17,10 +18,11 @@ namespace Sample.RabbitMQ.SqlServer | |||
x.UseRabbitMQ("192.168.2.120"); | |||
x.UseDashboard(); | |||
x.FailedRetryCount = 5; | |||
x.FailedThresholdCallback = (type, msg) => | |||
x.FailedThresholdCallback = failed => | |||
{ | |||
Console.WriteLine( | |||
$@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {msg.GetName()}"); | |||
var logger = failed.ServiceProvider.GetService<ILogger<Startup>>(); | |||
logger.LogError($@"A message of type {failed.MessageType} failed after executing {x.FailedRetryCount} several times, | |||
requiring manual troubleshooting. Message name: {failed.Message.GetName()}"); | |||
}; | |||
}); | |||
@@ -32,9 +32,10 @@ namespace DotNetCore.CAP.RabbitMQ | |||
public Task<OperateResult> SendAsync(TransportMessage message) | |||
{ | |||
var channel = _connectionChannelPool.Rent(); | |||
IModel channel = null; | |||
try | |||
{ | |||
channel = _connectionChannelPool.Rent(); | |||
var props = new BasicProperties | |||
{ | |||
DeliveryMode = 2, | |||
@@ -62,12 +63,15 @@ namespace DotNetCore.CAP.RabbitMQ | |||
} | |||
finally | |||
{ | |||
var returned = _connectionChannelPool.Return(channel); | |||
if (!returned) | |||
if (channel != null) | |||
{ | |||
channel.Dispose(); | |||
var returned = _connectionChannelPool.Return(channel); | |||
if (!returned) | |||
{ | |||
channel.Dispose(); | |||
} | |||
} | |||
} | |||
} | |||
} | |||
} | |||
} |
@@ -53,7 +53,7 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// We’ll invoke this call-back with message type,name,content when retry failed (send or executed) messages equals <see cref="FailedRetryCount"/> times. | |||
/// </summary> | |||
public Action<MessageType, Message> FailedThresholdCallback { get; set; } | |||
public Action<FailedInfo> FailedThresholdCallback { get; set; } | |||
/// <summary> | |||
/// The number of message retries, the retry will stop when the threshold is reached. | |||
@@ -12,6 +12,7 @@ using DotNetCore.CAP.Messages; | |||
using DotNetCore.CAP.Persistence; | |||
using DotNetCore.CAP.Serialization; | |||
using DotNetCore.CAP.Transport; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
@@ -19,14 +20,16 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
internal class ConsumerRegister : IConsumerRegister | |||
{ | |||
private readonly ILogger _logger; | |||
private readonly IServiceProvider _serviceProvider; | |||
private readonly IConsumerClientFactory _consumerClientFactory; | |||
private readonly IDispatcher _dispatcher; | |||
private readonly ISerializer _serializer; | |||
private readonly IDataStorage _storage; | |||
private readonly ILogger _logger; | |||
private readonly MethodMatcherCache _selector; | |||
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); | |||
private readonly CapOptions _options; | |||
private readonly MethodMatcherCache _selector; | |||
private CancellationTokenSource _cts; | |||
private BrokerAddress _serverAddress; | |||
@@ -39,21 +42,17 @@ namespace DotNetCore.CAP.Internal | |||
private static readonly DiagnosticListener s_diagnosticListener = | |||
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName); | |||
public ConsumerRegister(ILogger<ConsumerRegister> logger, | |||
IOptions<CapOptions> options, | |||
MethodMatcherCache selector, | |||
IConsumerClientFactory consumerClientFactory, | |||
IDispatcher dispatcher, | |||
ISerializer serializer, | |||
IDataStorage storage) | |||
public ConsumerRegister(ILogger<ConsumerRegister> logger, IServiceProvider serviceProvider) | |||
{ | |||
_options = options.Value; | |||
_selector = selector; | |||
_logger = logger; | |||
_consumerClientFactory = consumerClientFactory; | |||
_dispatcher = dispatcher; | |||
_serializer = serializer; | |||
_storage = storage; | |||
_serviceProvider = serviceProvider; | |||
_options = serviceProvider.GetService<IOptions<CapOptions>>().Value; | |||
_selector = serviceProvider.GetService<MethodMatcherCache>(); | |||
_consumerClientFactory = serviceProvider.GetService<IConsumerClientFactory>(); | |||
_dispatcher = serviceProvider.GetService<IDispatcher>(); | |||
_serializer = serviceProvider.GetService<ISerializer>(); | |||
_storage = serviceProvider.GetService<IDataStorage>(); | |||
_cts = new CancellationTokenSource(); | |||
} | |||
@@ -152,7 +151,7 @@ namespace DotNetCore.CAP.Internal | |||
client.OnMessageReceived += async (sender, transportMessage) => | |||
{ | |||
_logger.MessageReceived(transportMessage.GetId(), transportMessage.GetName()); | |||
long? tracingTimestamp = null; | |||
try | |||
{ | |||
@@ -194,6 +193,22 @@ namespace DotNetCore.CAP.Internal | |||
client.Commit(sender); | |||
try | |||
{ | |||
_options.FailedThresholdCallback?.Invoke(new FailedInfo | |||
{ | |||
ServiceProvider = _serviceProvider, | |||
MessageType = MessageType.Subscribe, | |||
Message = message | |||
}); | |||
_logger.ConsumerExecutedAfterThreshold(message.GetId(), _options.FailedRetryCount); | |||
} | |||
catch (Exception e) | |||
{ | |||
_logger.ExecutedThresholdCallbackFailed(e); | |||
} | |||
TracingAfter(tracingTimestamp, transportMessage, _serverAddress); | |||
} | |||
else | |||
@@ -9,6 +9,7 @@ using DotNetCore.CAP.Messages; | |||
using DotNetCore.CAP.Persistence; | |||
using DotNetCore.CAP.Serialization; | |||
using DotNetCore.CAP.Transport; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
@@ -16,10 +17,12 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
internal class MessageSender : IMessageSender | |||
{ | |||
private readonly ILogger _logger; | |||
private readonly IServiceProvider _serviceProvider; | |||
private readonly IDataStorage _dataStorage; | |||
private readonly ISerializer _serializer; | |||
private readonly ITransport _transport; | |||
private readonly ILogger _logger; | |||
private readonly IOptions<CapOptions> _options; | |||
// ReSharper disable once InconsistentNaming | |||
@@ -28,16 +31,15 @@ namespace DotNetCore.CAP.Internal | |||
public MessageSender( | |||
ILogger<MessageSender> logger, | |||
IOptions<CapOptions> options, | |||
IDataStorage dataStorage, | |||
ISerializer serializer, | |||
ITransport transport) | |||
IServiceProvider serviceProvider) | |||
{ | |||
_options = options; | |||
_dataStorage = dataStorage; | |||
_serializer = serializer; | |||
_transport = transport; | |||
_logger = logger; | |||
_serviceProvider = serviceProvider; | |||
_options = serviceProvider.GetService<IOptions<CapOptions>>(); | |||
_dataStorage = serviceProvider.GetService<IDataStorage>(); | |||
_serializer = serviceProvider.GetService<ISerializer>(); | |||
_transport = serviceProvider.GetService<ITransport>(); | |||
} | |||
public async Task<OperateResult> SendAsync(MediumMessage message) | |||
@@ -111,7 +113,12 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
try | |||
{ | |||
_options.Value.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Origin); | |||
_options.Value.FailedThresholdCallback?.Invoke(new FailedInfo | |||
{ | |||
ServiceProvider = _serviceProvider, | |||
MessageType = MessageType.Publish, | |||
Message = message.Origin | |||
}); | |||
_logger.SenderAfterThreshold(message.DbId, _options.Value.FailedRetryCount); | |||
} | |||
@@ -145,7 +145,12 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
try | |||
{ | |||
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Origin); | |||
_options.FailedThresholdCallback?.Invoke(new FailedInfo | |||
{ | |||
ServiceProvider = _provider, | |||
MessageType = MessageType.Subscribe, | |||
Message = message.Origin | |||
}); | |||
_logger.ConsumerExecutedAfterThreshold(message.DbId, _options.FailedRetryCount); | |||
} | |||
@@ -0,0 +1,13 @@ | |||
using System; | |||
namespace DotNetCore.CAP.Messages | |||
{ | |||
public class FailedInfo | |||
{ | |||
public IServiceProvider ServiceProvider { get; set; } | |||
public MessageType MessageType { get; set; } | |||
public Message Message { get; set; } | |||
} | |||
} |