diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index 1993d80..1db1fc9 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -12,7 +12,9 @@ namespace DotNetCore.CAP.Kafka private readonly KafkaOptions _kafkaOptions; private Consumer _consumerClient; - public event EventHandler MessageReceieved; + public event EventHandler OnMessageReceieved; + + public event EventHandler OnError; public IDeserializer StringDeserializer { get; set; } @@ -67,6 +69,7 @@ namespace DotNetCore.CAP.Kafka _consumerClient = new Consumer(config, null, StringDeserializer); _consumerClient.OnMessage += ConsumerClient_OnMessage; + _consumerClient.OnError += ConsumerClient_OnError; } private void ConsumerClient_OnMessage(object sender, Message e) @@ -77,7 +80,12 @@ namespace DotNetCore.CAP.Kafka Name = e.Topic, Content = e.Value }; - MessageReceieved?.Invoke(sender, message); + OnMessageReceieved?.Invoke(sender, message); + } + + private void ConsumerClient_OnError(object sender, Error e) + { + OnError?.Invoke(sender, e.Reason); } #endregion private methods diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index 4dd2883..483b2f4 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -18,7 +18,8 @@ namespace DotNetCore.CAP.RabbitMQ private IModel _channel; private ulong _deliveryTag; - public event EventHandler MessageReceieved; + public event EventHandler OnMessageReceieved; + public event EventHandler OnError; public RabbitMQConsumerClient(string queueName, RabbitMQOptions options) { @@ -53,6 +54,7 @@ namespace DotNetCore.CAP.RabbitMQ { var consumer = new EventingBasicConsumer(_channel); consumer.Received += OnConsumerReceived; + consumer.Shutdown += OnConsumerShutdown; _channel.BasicConsume(_queueName, false, consumer); while (true) { @@ -90,7 +92,12 @@ namespace DotNetCore.CAP.RabbitMQ Name = e.RoutingKey, Content = Encoding.UTF8.GetString(e.Body) }; - MessageReceieved?.Invoke(sender, message); + OnMessageReceieved?.Invoke(sender, message); + } + + private void OnConsumerShutdown(object sender, ShutdownEventArgs e) + { + OnError?.Invoke(sender, e.Cause?.ToString()); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/IConsumerClient.cs b/src/DotNetCore.CAP/IConsumerClient.cs index c59c20c..776ee9a 100644 --- a/src/DotNetCore.CAP/IConsumerClient.cs +++ b/src/DotNetCore.CAP/IConsumerClient.cs @@ -1,6 +1,5 @@ using System; using System.Threading; -using DotNetCore.CAP.Infrastructure; namespace DotNetCore.CAP { @@ -17,6 +16,8 @@ namespace DotNetCore.CAP void Commit(); - event EventHandler MessageReceieved; + event EventHandler OnMessageReceieved; + + event EventHandler OnError; } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 6bcf22d..689001a 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -95,7 +95,7 @@ namespace DotNetCore.CAP private void RegisterMessageProcessor(IConsumerClient client) { - client.MessageReceieved += (sender, message) => + client.OnMessageReceieved += (sender, message) => { _logger.EnqueuingReceivedMessage(message.Name, message.Content); @@ -106,6 +106,11 @@ namespace DotNetCore.CAP } Pulse(); }; + + client.OnError += (sender, reason) => + { + _logger.LogError(reason); + }; } private CapReceivedMessage StoreMessage(IServiceScope serviceScope, MessageContext messageContext)