diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index ad79353..a3c5503 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -1,5 +1,6 @@ using System; using System.Text; +using System.Threading; using Confluent.Kafka; using Confluent.Kafka.Serialization; using DotNetCore.CAP.Infrastructure; @@ -38,10 +39,11 @@ namespace DotNetCore.CAP.Kafka _consumerClient.Subscribe(topicName); } - public void Listening(TimeSpan timeout) + public void Listening(TimeSpan timeout, CancellationToken cancellationToken) { while (true) { + cancellationToken.ThrowIfCancellationRequested(); _consumerClient.Poll(timeout); } } diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index 35bbcc0..9ca8a86 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -1,5 +1,6 @@ using System; using System.Text; +using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Infrastructure; using RabbitMQ.Client; @@ -49,14 +50,14 @@ namespace DotNetCore.CAP.RabbitMQ _channel.QueueDeclare(_queueName, exclusive: false); } - public void Listening(TimeSpan timeout) + public void Listening(TimeSpan timeout, CancellationToken cancellationToken) { var consumer = new EventingBasicConsumer(_channel); consumer.Received += OnConsumerReceived; _channel.BasicConsume(_queueName, false, consumer); while (true) { - Task.Delay(timeout).Wait(); + Task.Delay(timeout, cancellationToken).Wait(); } } diff --git a/src/DotNetCore.CAP/IBootstrapper.Default.cs b/src/DotNetCore.CAP/IBootstrapper.Default.cs index 58bfe58..09b9291 100644 --- a/src/DotNetCore.CAP/IBootstrapper.Default.cs +++ b/src/DotNetCore.CAP/IBootstrapper.Default.cs @@ -68,8 +68,6 @@ namespace DotNetCore.CAP if (_cts.IsCancellationRequested) return; - if (_cts.IsCancellationRequested) return; - await BootstrapCoreAsync(); if (_cts.IsCancellationRequested) return; diff --git a/src/DotNetCore.CAP/IConsumerClient.cs b/src/DotNetCore.CAP/IConsumerClient.cs index e3dc29d..c59c20c 100644 --- a/src/DotNetCore.CAP/IConsumerClient.cs +++ b/src/DotNetCore.CAP/IConsumerClient.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using DotNetCore.CAP.Infrastructure; namespace DotNetCore.CAP @@ -12,7 +13,7 @@ namespace DotNetCore.CAP void Subscribe(string topic, int partition); - void Listening(TimeSpan timeout); + void Listening(TimeSpan timeout, CancellationToken cancellationToken); void Commit(); diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index b56eeea..706fb64 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -2,7 +2,6 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Models; @@ -18,12 +17,11 @@ namespace DotNetCore.CAP private readonly IServiceProvider _serviceProvider; private readonly IConsumerInvokerFactory _consumerInvokerFactory; private readonly IConsumerClientFactory _consumerClientFactory; - private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; + private readonly CancellationTokenSource _cts; private readonly MethodMatcherCache _selector; private readonly CapOptions _options; - private readonly CancellationTokenSource _cts; private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); @@ -34,13 +32,12 @@ namespace DotNetCore.CAP IServiceProvider serviceProvider, IConsumerInvokerFactory consumerInvokerFactory, IConsumerClientFactory consumerClientFactory, - ILoggerFactory loggerFactory, + ILogger logger, MethodMatcherCache selector, IOptions options) { _selector = selector; - _logger = loggerFactory.CreateLogger(); - _loggerFactory = loggerFactory; + _logger = logger; _serviceProvider = serviceProvider; _consumerInvokerFactory = consumerInvokerFactory; _consumerClientFactory = consumerClientFactory; @@ -65,7 +62,7 @@ namespace DotNetCore.CAP client.Subscribe(item.Attribute.Name); } - client.Listening(_pollingDelay); + client.Listening(_pollingDelay, _cts.Token); } }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } @@ -85,7 +82,7 @@ namespace DotNetCore.CAP try { - _compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds); + _compositeTask.Wait(TimeSpan.FromSeconds(60)); } catch (AggregateException ex) {