diff --git a/src/DotNetCore.CAP/IConsumerClient.cs b/src/DotNetCore.CAP/IConsumerClient.cs index f6adb66..4dd3143 100644 --- a/src/DotNetCore.CAP/IConsumerClient.cs +++ b/src/DotNetCore.CAP/IConsumerClient.cs @@ -33,6 +33,6 @@ namespace DotNetCore.CAP event EventHandler OnMessageReceived; - event EventHandler OnError; + event EventHandler OnLog; } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index e727702..645a121 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -23,6 +23,7 @@ namespace DotNetCore.CAP private readonly IServiceProvider _serviceProvider; private Task _compositeTask; + private bool _disposed; public ConsumerHandler( @@ -44,17 +45,18 @@ namespace DotNetCore.CAP foreach (var matchGroup in groupingMatches) Task.Factory.StartNew(() => - { - using (var client = _consumerClientFactory.Create(matchGroup.Key)) - { - RegisterMessageProcessor(client); + { + using (var client = _consumerClientFactory.Create(matchGroup.Key)) + { + RegisterMessageProcessor(client); - client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name)); + client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name)); - client.Listening(_pollingDelay, _cts.Token); - } - }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); - _compositeTask = Task.CompletedTask; + client.Listening(_pollingDelay, _cts.Token); + } + }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); + + _compositeTask = Task.CompletedTask; } public void Dispose() @@ -62,13 +64,10 @@ namespace DotNetCore.CAP if (_disposed) return; _disposed = true; - - _logger.ServerShuttingDown(); _cts.Cancel(); - try { - _compositeTask.Wait(TimeSpan.FromSeconds(10)); + _compositeTask.Wait(TimeSpan.FromSeconds(2)); } catch (AggregateException ex) { @@ -105,7 +104,34 @@ namespace DotNetCore.CAP Pulse(); }; - client.OnError += (sender, reason) => { _logger.MessageQueueError(reason); }; + client.OnLog += WriteLog; + } + + private void WriteLog(object sender, LogMessageEventArgs logmsg) + { + switch (logmsg.LogType) + { + case MqLogType.ConsumerCancelled: + _logger.LogWarning("RabbitMQ consumer cancelled. reason: " + logmsg.Reason); + break; + case MqLogType.ConsumerRegistered: + _logger.LogInformation("RabbitMQ consumer registered. " + logmsg.Reason); + break; + case MqLogType.ConsumerUnregistered: + _logger.LogWarning("RabbitMQ consumer unregistered. reason: " + logmsg.Reason); + break; + case MqLogType.ConsumerShutdown: + _logger.LogWarning("RabbitMQ consumer shutdown. reason:" + logmsg.Reason); + break; + case MqLogType.ConsumeError: + _logger.LogError("Kakfa client consume error. reason:" + logmsg.Reason); + break; + case MqLogType.ServerConnError: + _logger.LogCritical("Kafka server connection error. reason:" + logmsg.Reason); + break; + default: + throw new ArgumentOutOfRangeException(); + } } private static void StoreMessage(IServiceScope serviceScope, MessageContext messageContext)