@@ -4,7 +4,6 @@ | |||||
using System; | using System; | ||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Linq; | using System.Linq; | ||||
using System.Text; | |||||
using System.Threading; | using System.Threading; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using DotNetCore.CAP.Messages; | using DotNetCore.CAP.Messages; | ||||
@@ -27,8 +26,6 @@ namespace DotNetCore.CAP.AzureServiceBus | |||||
private SubscriptionClient _consumerClient; | private SubscriptionClient _consumerClient; | ||||
private string _lockToken; | |||||
public AzureServiceBusConsumerClient( | public AzureServiceBusConsumerClient( | ||||
ILogger logger, | ILogger logger, | ||||
string subscriptionName, | string subscriptionName, | ||||
@@ -97,12 +94,12 @@ namespace DotNetCore.CAP.AzureServiceBus | |||||
// ReSharper disable once FunctionNeverReturns | // ReSharper disable once FunctionNeverReturns | ||||
} | } | ||||
public void Commit() | |||||
public void Commit(object sender) | |||||
{ | { | ||||
_consumerClient.CompleteAsync(_lockToken); | |||||
_consumerClient.CompleteAsync((string)sender); | |||||
} | } | ||||
public void Reject() | |||||
public void Reject(object sender) | |||||
{ | { | ||||
// ignore | // ignore | ||||
} | } | ||||
@@ -162,13 +159,11 @@ namespace DotNetCore.CAP.AzureServiceBus | |||||
private Task OnConsumerReceived(Message message, CancellationToken token) | private Task OnConsumerReceived(Message message, CancellationToken token) | ||||
{ | { | ||||
_lockToken = message.SystemProperties.LockToken; | |||||
var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value.ToString()); | var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value.ToString()); | ||||
var context = new TransportMessage(header, message.Body); | var context = new TransportMessage(header, message.Body); | ||||
OnMessageReceived?.Invoke(null, context); | |||||
OnMessageReceived?.Invoke(message.SystemProperties.LockToken, context); | |||||
return Task.CompletedTask; | return Task.CompletedTask; | ||||
} | } | ||||
@@ -78,12 +78,12 @@ namespace DotNetCore.CAP.Kafka | |||||
// ReSharper disable once FunctionNeverReturns | // ReSharper disable once FunctionNeverReturns | ||||
} | } | ||||
public void Commit() | |||||
public void Commit(object sender) | |||||
{ | { | ||||
_consumerClient.Commit(); | |||||
_consumerClient.Commit((ConsumeResult<string, byte[]>)sender); | |||||
} | } | ||||
public void Reject() | |||||
public void Reject(object sender) | |||||
{ | { | ||||
_consumerClient.Assign(_consumerClient.Assignment); | _consumerClient.Assign(_consumerClient.Assignment); | ||||
} | } | ||||
@@ -25,7 +25,6 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
private IModel _channel; | private IModel _channel; | ||||
private IConnection _connection; | private IConnection _connection; | ||||
private ulong _deliveryTag; | |||||
public RabbitMQConsumerClient(string queueName, | public RabbitMQConsumerClient(string queueName, | ||||
IConnectionChannelPool connectionChannelPool, | IConnectionChannelPool connectionChannelPool, | ||||
@@ -80,14 +79,14 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
// ReSharper disable once FunctionNeverReturns | // ReSharper disable once FunctionNeverReturns | ||||
} | } | ||||
public void Commit() | |||||
public void Commit(object sender) | |||||
{ | { | ||||
_channel.BasicAck(_deliveryTag, false); | |||||
_channel.BasicAck((ulong)sender, false); | |||||
} | } | ||||
public void Reject() | |||||
public void Reject(object sender) | |||||
{ | { | ||||
_channel.BasicReject(_deliveryTag, true); | |||||
_channel.BasicReject((ulong)sender, true); | |||||
} | } | ||||
public void Dispose() | public void Dispose() | ||||
@@ -162,8 +161,6 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) | private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) | ||||
{ | { | ||||
_deliveryTag = e.DeliveryTag; | |||||
var headers = new Dictionary<string, string>(); | var headers = new Dictionary<string, string>(); | ||||
foreach (var header in e.BasicProperties.Headers) | foreach (var header in e.BasicProperties.Headers) | ||||
{ | { | ||||
@@ -173,7 +170,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
var message = new TransportMessage(headers, e.Body); | var message = new TransportMessage(headers, e.Body); | ||||
OnMessageReceived?.Invoke(sender, message); | |||||
OnMessageReceived?.Invoke(e.DeliveryTag, message); | |||||
} | } | ||||
private void OnConsumerShutdown(object sender, ShutdownEventArgs e) | private void OnConsumerShutdown(object sender, ShutdownEventArgs e) | ||||
@@ -193,7 +193,7 @@ namespace DotNetCore.CAP.Internal | |||||
_storage.StoreReceivedExceptionMessage(name, group, content); | _storage.StoreReceivedExceptionMessage(name, group, content); | ||||
client.Commit(); | |||||
client.Commit(sender); | |||||
TracingAfter(tracingTimestamp, transportMessage, _serverAddress); | TracingAfter(tracingTimestamp, transportMessage, _serverAddress); | ||||
} | } | ||||
@@ -202,7 +202,7 @@ namespace DotNetCore.CAP.Internal | |||||
var mediumMessage = _storage.StoreReceivedMessage(name, group, message); | var mediumMessage = _storage.StoreReceivedMessage(name, group, message); | ||||
mediumMessage.Origin = message; | mediumMessage.Origin = message; | ||||
client.Commit(); | |||||
client.Commit(sender); | |||||
TracingAfter(tracingTimestamp, transportMessage, _serverAddress); | TracingAfter(tracingTimestamp, transportMessage, _serverAddress); | ||||
@@ -213,7 +213,7 @@ namespace DotNetCore.CAP.Internal | |||||
{ | { | ||||
_logger.LogError(e, "An exception occurred when process received message. Message:'{0}'.", transportMessage); | _logger.LogError(e, "An exception occurred when process received message. Message:'{0}'.", transportMessage); | ||||
client.Reject(); | |||||
client.Reject(sender); | |||||
TracingError(tracingTimestamp, transportMessage, client.ServersAddress, e); | TracingError(tracingTimestamp, transportMessage, client.ServersAddress, e); | ||||
} | } | ||||
@@ -5,6 +5,7 @@ using System; | |||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Threading; | using System.Threading; | ||||
using DotNetCore.CAP.Messages; | using DotNetCore.CAP.Messages; | ||||
using JetBrains.Annotations; | |||||
namespace DotNetCore.CAP.Transport | namespace DotNetCore.CAP.Transport | ||||
{ | { | ||||
@@ -30,12 +31,12 @@ namespace DotNetCore.CAP.Transport | |||||
/// <summary> | /// <summary> | ||||
/// Manual submit message offset when the message consumption is complete | /// Manual submit message offset when the message consumption is complete | ||||
/// </summary> | /// </summary> | ||||
void Commit(); | |||||
void Commit([NotNull] object sender); | |||||
/// <summary> | /// <summary> | ||||
/// Reject message and resumption | /// Reject message and resumption | ||||
/// </summary> | /// </summary> | ||||
void Reject(); | |||||
void Reject([CanBeNull] object sender); | |||||
event EventHandler<TransportMessage> OnMessageReceived; | event EventHandler<TransportMessage> OnMessageReceived; | ||||
@@ -49,12 +49,12 @@ namespace DotNetCore.CAP.Test.FakeInMemoryQueue | |||||
} | } | ||||
} | } | ||||
public void Commit() | |||||
public void Commit(object sender) | |||||
{ | { | ||||
// ignore | // ignore | ||||
} | } | ||||
public void Reject() | |||||
public void Reject(object sender) | |||||
{ | { | ||||
// ignore | // ignore | ||||
} | } | ||||