diff --git a/src/DotNetCore.CAP/Internal/IBootstrapper.Default.cs b/src/DotNetCore.CAP/Internal/IBootstrapper.Default.cs index 5540eaf..9ed2cf0 100644 --- a/src/DotNetCore.CAP/Internal/IBootstrapper.Default.cs +++ b/src/DotNetCore.CAP/Internal/IBootstrapper.Default.cs @@ -20,6 +20,7 @@ namespace DotNetCore.CAP.Internal private readonly IServiceProvider _serviceProvider; private readonly ILogger _logger; private readonly CancellationTokenSource _cts = new (); + private bool _disposed; private IEnumerable _processors = default!; public Bootstrapper(IServiceProvider serviceProvider, ILogger logger) @@ -93,8 +94,13 @@ namespace DotNetCore.CAP.Internal public override void Dispose() { + if (_disposed) + { + return; + } _cts.Cancel(); _cts.Dispose(); + _disposed = true; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 483eb91..b78e264 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -31,7 +31,7 @@ namespace DotNetCore.CAP.Internal private IDataStorage _storage = default!; private MethodMatcherCache _selector = default!; - private CancellationTokenSource _cts; + private CancellationTokenSource _cts = new(); private BrokerAddress _serverAddress; private Task? _compositeTask; private bool _disposed; @@ -47,7 +47,6 @@ namespace DotNetCore.CAP.Internal _logger = logger; _serviceProvider = serviceProvider; _options = serviceProvider.GetRequiredService>().Value; - _cts = new CancellationTokenSource(); } public bool IsHealthy() @@ -63,7 +62,7 @@ namespace DotNetCore.CAP.Internal _storage = _serviceProvider.GetRequiredService(); _consumerClientFactory = _serviceProvider.GetRequiredService(); - stoppingToken.Register(() => _cts.Cancel()); + stoppingToken.Register(Dispose); Execute(); } diff --git a/test/DotNetCore.CAP.Test/HelperTest.cs b/test/DotNetCore.CAP.Test/HelperTest.cs index fcb550c..2a4ddbe 100644 --- a/test/DotNetCore.CAP.Test/HelperTest.cs +++ b/test/DotNetCore.CAP.Test/HelperTest.cs @@ -11,13 +11,13 @@ namespace DotNetCore.CAP.Test public void ToTimestampTest() { //Arrange - var time = DateTime.Parse("2018-01-01 00:00:00"); + var time = DateTimeOffset.Parse("2018-01-01T00:00:00Z"); //Act - var result = Helper.ToTimestamp(time); + var result = Helper.ToTimestamp(time.DateTime); //Assert - Assert.Equal(1514764800, result); + Assert.Equal(1514736000, result); } [Fact] diff --git a/test/DotNetCore.CAP.Test/IntegrationTests/CancellationTokenSubscriberTest.cs b/test/DotNetCore.CAP.Test/IntegrationTests/CancellationTokenSubscriberTest.cs index 7cf0a6a..deb4973 100644 --- a/test/DotNetCore.CAP.Test/IntegrationTests/CancellationTokenSubscriberTest.cs +++ b/test/DotNetCore.CAP.Test/IntegrationTests/CancellationTokenSubscriberTest.cs @@ -25,6 +25,7 @@ namespace DotNetCore.CAP.Test.IntegrationTests // Explicitly stop Bootstrapper to prove the cancellation token works. var bootstrapper = Container.GetRequiredService(); + await bootstrapper.StopAsync(CancellationToken.None); var (message, token) = HandledMessages diff --git a/test/DotNetCore.CAP.Test/SubscribeInvokerTest.cs b/test/DotNetCore.CAP.Test/SubscribeInvokerTest.cs index 5db7959..4b14f9b 100644 --- a/test/DotNetCore.CAP.Test/SubscribeInvokerTest.cs +++ b/test/DotNetCore.CAP.Test/SubscribeInvokerTest.cs @@ -37,7 +37,11 @@ namespace DotNetCore.CAP.Test Parameters = new List() }; - var header = new Dictionary(); + var header = new Dictionary() + { + [Headers.MessageId] = SnowflakeId.Default().NextId().ToString(), + [Headers.MessageName] = "fake.output.integer" + }; var message = new Message(header, null); var context = new ConsumerContext(descriptor, message); diff --git a/test/DotNetCore.CAP.Test/SubscribeInvokerWithCancellation.cs b/test/DotNetCore.CAP.Test/SubscribeInvokerWithCancellation.cs index 7eb7f20..7517479 100644 --- a/test/DotNetCore.CAP.Test/SubscribeInvokerWithCancellation.cs +++ b/test/DotNetCore.CAP.Test/SubscribeInvokerWithCancellation.cs @@ -47,7 +47,11 @@ namespace DotNetCore.CAP.Test } }; - var header = new Dictionary(); + var header = new Dictionary() + { + [Headers.MessageId] = SnowflakeId.Default().NextId().ToString(), + [Headers.MessageName] = "fake.output.withcancellation" + }; var message = new Message(header, null); var context = new ConsumerContext(descriptor, message);