diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs index ceb7b13..bbbdeee 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs @@ -4,7 +4,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Messages; @@ -27,8 +26,6 @@ namespace DotNetCore.CAP.AzureServiceBus private SubscriptionClient _consumerClient; - private string _lockToken; - public AzureServiceBusConsumerClient( ILogger logger, string subscriptionName, @@ -97,12 +94,12 @@ namespace DotNetCore.CAP.AzureServiceBus // ReSharper disable once FunctionNeverReturns } - public void Commit() + public void Commit(object sender) { - _consumerClient.CompleteAsync(_lockToken); + _consumerClient.CompleteAsync((string)sender); } - public void Reject() + public void Reject(object sender) { // ignore } @@ -162,13 +159,11 @@ namespace DotNetCore.CAP.AzureServiceBus private Task OnConsumerReceived(Message message, CancellationToken token) { - _lockToken = message.SystemProperties.LockToken; - var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value.ToString()); var context = new TransportMessage(header, message.Body); - OnMessageReceived?.Invoke(null, context); + OnMessageReceived?.Invoke(message.SystemProperties.LockToken, context); return Task.CompletedTask; } diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index afad455..294792a 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -78,12 +78,12 @@ namespace DotNetCore.CAP.Kafka // ReSharper disable once FunctionNeverReturns } - public void Commit() + public void Commit(object sender) { - _consumerClient.Commit(); + _consumerClient.Commit((ConsumeResult)sender); } - public void Reject() + public void Reject(object sender) { _consumerClient.Assign(_consumerClient.Assignment); } diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index 2e61906..3795f05 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -25,7 +25,6 @@ namespace DotNetCore.CAP.RabbitMQ private IModel _channel; private IConnection _connection; - private ulong _deliveryTag; public RabbitMQConsumerClient(string queueName, IConnectionChannelPool connectionChannelPool, @@ -80,14 +79,14 @@ namespace DotNetCore.CAP.RabbitMQ // ReSharper disable once FunctionNeverReturns } - public void Commit() + public void Commit(object sender) { - _channel.BasicAck(_deliveryTag, false); + _channel.BasicAck((ulong)sender, false); } - public void Reject() + public void Reject(object sender) { - _channel.BasicReject(_deliveryTag, true); + _channel.BasicReject((ulong)sender, true); } public void Dispose() @@ -162,8 +161,6 @@ namespace DotNetCore.CAP.RabbitMQ private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) { - _deliveryTag = e.DeliveryTag; - var headers = new Dictionary(); foreach (var header in e.BasicProperties.Headers) { @@ -173,7 +170,7 @@ namespace DotNetCore.CAP.RabbitMQ var message = new TransportMessage(headers, e.Body); - OnMessageReceived?.Invoke(sender, message); + OnMessageReceived?.Invoke(e.DeliveryTag, message); } private void OnConsumerShutdown(object sender, ShutdownEventArgs e) diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index a146f48..9ab5f0e 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -193,7 +193,7 @@ namespace DotNetCore.CAP.Internal _storage.StoreReceivedExceptionMessage(name, group, content); - client.Commit(); + client.Commit(sender); TracingAfter(tracingTimestamp, transportMessage, _serverAddress); } @@ -202,7 +202,7 @@ namespace DotNetCore.CAP.Internal var mediumMessage = _storage.StoreReceivedMessage(name, group, message); mediumMessage.Origin = message; - client.Commit(); + client.Commit(sender); TracingAfter(tracingTimestamp, transportMessage, _serverAddress); @@ -213,7 +213,7 @@ namespace DotNetCore.CAP.Internal { _logger.LogError(e, "An exception occurred when process received message. Message:'{0}'.", transportMessage); - client.Reject(); + client.Reject(sender); TracingError(tracingTimestamp, transportMessage, client.ServersAddress, e); } diff --git a/src/DotNetCore.CAP/Transport/IConsumerClient.cs b/src/DotNetCore.CAP/Transport/IConsumerClient.cs index e74b247..7139086 100644 --- a/src/DotNetCore.CAP/Transport/IConsumerClient.cs +++ b/src/DotNetCore.CAP/Transport/IConsumerClient.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Threading; using DotNetCore.CAP.Messages; +using JetBrains.Annotations; namespace DotNetCore.CAP.Transport { @@ -30,12 +31,12 @@ namespace DotNetCore.CAP.Transport /// /// Manual submit message offset when the message consumption is complete /// - void Commit(); + void Commit([NotNull] object sender); /// /// Reject message and resumption /// - void Reject(); + void Reject([CanBeNull] object sender); event EventHandler OnMessageReceived; diff --git a/test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClient.cs b/test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClient.cs index 6879916..c981cbe 100644 --- a/test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClient.cs +++ b/test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClient.cs @@ -49,12 +49,12 @@ namespace DotNetCore.CAP.Test.FakeInMemoryQueue } } - public void Commit() + public void Commit(object sender) { // ignore } - public void Reject() + public void Reject(object sender) { // ignore }