From a3807b6ab6c7c58d1de6a12bef5dcea739db8e66 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Sat, 30 Sep 2017 22:51:56 +0800 Subject: [PATCH] when storage a received message raising an eception, we will reject the message to queue. --- src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs | 15 +++++++++++++-- .../RabbitMQConsumerClient.cs | 7 ++++++- src/DotNetCore.CAP/IConsumerClient.cs | 2 ++ src/DotNetCore.CAP/IConsumerHandler.Default.cs | 13 ++++++++++--- 4 files changed, 31 insertions(+), 6 deletions(-) diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index 55fcd50..ef71cd7 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -45,6 +45,7 @@ namespace DotNetCore.CAP.Kafka cancellationToken.ThrowIfCancellationRequested(); _consumerClient.Poll(timeout); } + // ReSharper disable once FunctionNeverReturns } public void Commit() @@ -52,6 +53,11 @@ namespace DotNetCore.CAP.Kafka _consumerClient.CommitAsync(); } + public void Reject() + { + // Ignore, Kafka will not commit offset when not commit. + } + public void Dispose() { _consumerClient.Dispose(); @@ -65,11 +71,16 @@ namespace DotNetCore.CAP.Kafka var config = _kafkaOptions.AsKafkaConfig(); _consumerClient = new Consumer(config, null, StringDeserializer); - + _consumerClient.OnConsumeError += ConsumerClient_OnConsumeError; _consumerClient.OnMessage += ConsumerClient_OnMessage; _consumerClient.OnError += ConsumerClient_OnError; } + private void ConsumerClient_OnConsumeError(object sender, Message e) + { + OnError?.Invoke(sender, $"Consumer client raised an error. Topic:{e.Topic}, Reason:{e.Error}"); + } + private void ConsumerClient_OnMessage(object sender, Message e) { var message = new MessageContext @@ -84,7 +95,7 @@ namespace DotNetCore.CAP.Kafka private void ConsumerClient_OnError(object sender, Error e) { - OnError?.Invoke(sender, e.Reason); + OnError?.Invoke(sender, e.ToString()); } #endregion private methods diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index 336d187..49f3e8c 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -57,6 +57,11 @@ namespace DotNetCore.CAP.RabbitMQ _channel.BasicAck(_deliveryTag, false); } + public void Reject() + { + _channel.BasicReject(_deliveryTag, true); + } + public void Dispose() { _channel.Dispose(); @@ -73,7 +78,7 @@ namespace DotNetCore.CAP.RabbitMQ RabbitMQOptions.ExchangeType, true); - var arguments = new Dictionary {{"x-message-ttl", _rabbitMQOptions.QueueMessageExpires}}; + var arguments = new Dictionary { { "x-message-ttl", _rabbitMQOptions.QueueMessageExpires } }; _channel.QueueDeclare(_queueName, true, false, diff --git a/src/DotNetCore.CAP/IConsumerClient.cs b/src/DotNetCore.CAP/IConsumerClient.cs index 2d85c2e..461caa3 100644 --- a/src/DotNetCore.CAP/IConsumerClient.cs +++ b/src/DotNetCore.CAP/IConsumerClient.cs @@ -15,6 +15,8 @@ namespace DotNetCore.CAP void Commit(); + void Reject(); + event EventHandler OnMessageReceived; event EventHandler OnError; diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 5d6fb66..579ae6b 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -91,9 +91,16 @@ namespace DotNetCore.CAP using (var scope = _serviceProvider.CreateScope()) { - StoreMessage(scope, message); - - client.Commit(); + try + { + StoreMessage(scope, message); + client.Commit(); + } + catch (Exception e) + { + _logger.LogError(e, "Raised an exception when storage received message。 Message:{0}", message); + client.Reject(); + } } Pulse(); };