From ddc99e4e9fbba879388f008ab04558724348e61f Mon Sep 17 00:00:00 2001 From: Savorboard Date: Sun, 9 Jul 2017 19:56:44 +0800 Subject: [PATCH] Modify the consumption side, use an ack mechanism. --- src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs | 1 + .../KafkaConsumerClient.cs | 6 ++ .../RabbitMQConsumerClient.cs | 9 +- src/DotNetCore.CAP/IConsumerClient.cs | 2 + .../IConsumerHandler.Default.cs | 101 ++++++++++-------- .../Job/IProcessingServer.Job.cs | 3 +- 6 files changed, 78 insertions(+), 44 deletions(-) diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs index 255b1ab..b7836b6 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs @@ -43,6 +43,7 @@ namespace DotNetCore.CAP.Kafka { MainConfig.Add("bootstrap.servers", Servers); } + MainConfig["enable.auto.commit"] = "false"; return MainConfig.AsEnumerable(); } } diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index f1542b1..52b34e3 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -46,6 +46,11 @@ namespace DotNetCore.CAP.Kafka } } + public void Commit() + { + _consumerClient.CommitAsync(); + } + public void Dispose() { _consumerClient.Dispose(); @@ -74,6 +79,7 @@ namespace DotNetCore.CAP.Kafka MessageReceieved?.Invoke(sender, message); } + #endregion private methods } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index aced27f..adfe91d 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -16,6 +16,7 @@ namespace DotNetCore.CAP.RabbitMQ private IConnectionFactory _connectionFactory; private IConnection _connection; private IModel _channel; + private ulong _deliveryTag; public event EventHandler MessageReceieved; @@ -52,7 +53,7 @@ namespace DotNetCore.CAP.RabbitMQ { var consumer = new EventingBasicConsumer(_channel); consumer.Received += OnConsumerReceived; - _channel.BasicConsume(_queueName, true, consumer); + _channel.BasicConsume(_queueName, false, consumer); while (true) { Task.Delay(timeout); @@ -69,6 +70,11 @@ namespace DotNetCore.CAP.RabbitMQ _channel.QueueBind(_queueName, _exchageName, topic); } + public void Commit() + { + _channel.BasicAck(_deliveryTag, false); + } + public void Dispose() { _channel.Dispose(); @@ -77,6 +83,7 @@ namespace DotNetCore.CAP.RabbitMQ private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) { + _deliveryTag = e.DeliveryTag; var message = new MessageContext { Group = _queueName, diff --git a/src/DotNetCore.CAP/IConsumerClient.cs b/src/DotNetCore.CAP/IConsumerClient.cs index f3ddd61..e3dc29d 100644 --- a/src/DotNetCore.CAP/IConsumerClient.cs +++ b/src/DotNetCore.CAP/IConsumerClient.cs @@ -14,6 +14,8 @@ namespace DotNetCore.CAP void Listening(TimeSpan timeout); + void Commit(); + event EventHandler MessageReceieved; } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index a85fc0f..aebf720 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -23,7 +23,7 @@ namespace DotNetCore.CAP private readonly CapOptions _options; private readonly CancellationTokenSource _cts; - public event EventHandler MessageReceieved; + private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); private Task _compositeTask; private bool _disposed; @@ -46,11 +46,6 @@ namespace DotNetCore.CAP _cts = new CancellationTokenSource(); } - protected virtual void OnMessageReceieved(MessageContext message) - { - MessageReceieved?.Invoke(this, message); - } - public void Start() { var groupingMatchs = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider); @@ -61,54 +56,20 @@ namespace DotNetCore.CAP { using (var client = _consumerClientFactory.Create(matchGroup.Key)) { - client.MessageReceieved += OnMessageReceieved; + RegisterMessageProcessor(client); foreach (var item in matchGroup.Value) { client.Subscribe(item.Attribute.Name); } - client.Listening(TimeSpan.FromSeconds(1)); + client.Listening(_pollingDelay); } }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Current); } _compositeTask = Task.CompletedTask; } - public virtual void OnMessageReceieved(object sender, MessageContext message) - { - _logger.EnqueuingReceivedMessage(message.KeyName, message.Content); - - using (var scope = _serviceProvider.CreateScope()) - { - var provider = scope.ServiceProvider; - var messageStore = provider.GetRequiredService(); - - var capMessage = new CapReceivedMessage(message) - { - StatusName = StatusName.Enqueued, - }; - messageStore.StoreReceivedMessageAsync(capMessage).Wait(); - try - { - var executeDescriptorGroup = _selector.GetTopicExector(message.KeyName); - if (executeDescriptorGroup.ContainsKey(message.Group)) - { - messageStore.ChangeReceivedMessageStateAsync(capMessage, StatusName.Processing).Wait(); - // If there are multiple consumers in the same group, we will take the first - var executeDescriptor = executeDescriptorGroup[message.Group][0]; - var consumerContext = new ConsumerContext(executeDescriptor, message); - _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); - messageStore.ChangeReceivedMessageStateAsync(capMessage, StatusName.Succeeded).Wait(); - } - } - catch (Exception ex) - { - _logger.ConsumerMethodExecutingFailed($"Group:{message.Group}, Topic:{message.KeyName}", ex); - } - } - } - public void Dispose() { if (_disposed) @@ -133,5 +94,61 @@ namespace DotNetCore.CAP } } } + + private void RegisterMessageProcessor(IConsumerClient client) + { + client.MessageReceieved += (sender, message) => + { + _logger.EnqueuingReceivedMessage(message.KeyName, message.Content); + + using (var scope = _serviceProvider.CreateScope()) + { + var receviedMessage = StoreMessage(scope, message); + client.Commit(); + ProcessMessage(scope, receviedMessage); + } + }; + } + + private CapReceivedMessage StoreMessage(IServiceScope serviceScope, MessageContext messageContext) + { + var provider = serviceScope.ServiceProvider; + var messageStore = provider.GetRequiredService(); + var receivedMessage = new CapReceivedMessage(messageContext) + { + StatusName = StatusName.Enqueued, + }; + messageStore.StoreReceivedMessageAsync(receivedMessage).Wait(); + return receivedMessage; + } + + private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage) + { + var provider = serviceScope.ServiceProvider; + var messageStore = provider.GetRequiredService(); + try + { + var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName); + + if (executeDescriptorGroup.ContainsKey(receivedMessage.Group)) + { + messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait(); + + // If there are multiple consumers in the same group, we will take the first + var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; + var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext()); + + _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); + + messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Succeeded).Wait(); + } + } + catch (Exception ex) + { + _logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex); + } + } + + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/IProcessingServer.Job.cs b/src/DotNetCore.CAP/Job/IProcessingServer.Job.cs index e517f5c..0bb912f 100644 --- a/src/DotNetCore.CAP/Job/IProcessingServer.Job.cs +++ b/src/DotNetCore.CAP/Job/IProcessingServer.Job.cs @@ -42,8 +42,9 @@ namespace DotNetCore.CAP.Job public void Start() { var processorCount = Environment.ProcessorCount; + //processorCount = 1; _processors = GetProcessors(processorCount); - _logger.ServerStarting(processorCount, 1); + _logger.ServerStarting(processorCount, processorCount); _context = new ProcessingContext( _provider,