@@ -20,6 +20,7 @@ namespace DotNetCore.CAP.Internal | |||||
private readonly IServiceProvider _serviceProvider; | private readonly IServiceProvider _serviceProvider; | ||||
private readonly ILogger<Bootstrapper> _logger; | private readonly ILogger<Bootstrapper> _logger; | ||||
private readonly CancellationTokenSource _cts = new (); | private readonly CancellationTokenSource _cts = new (); | ||||
private bool _disposed; | |||||
private IEnumerable<IProcessingServer> _processors = default!; | private IEnumerable<IProcessingServer> _processors = default!; | ||||
public Bootstrapper(IServiceProvider serviceProvider, ILogger<Bootstrapper> logger) | public Bootstrapper(IServiceProvider serviceProvider, ILogger<Bootstrapper> logger) | ||||
@@ -93,8 +94,13 @@ namespace DotNetCore.CAP.Internal | |||||
public override void Dispose() | public override void Dispose() | ||||
{ | { | ||||
if (_disposed) | |||||
{ | |||||
return; | |||||
} | |||||
_cts.Cancel(); | _cts.Cancel(); | ||||
_cts.Dispose(); | _cts.Dispose(); | ||||
_disposed = true; | |||||
} | } | ||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||||
@@ -31,7 +31,7 @@ namespace DotNetCore.CAP.Internal | |||||
private IDataStorage _storage = default!; | private IDataStorage _storage = default!; | ||||
private MethodMatcherCache _selector = default!; | private MethodMatcherCache _selector = default!; | ||||
private CancellationTokenSource _cts; | |||||
private CancellationTokenSource _cts = new(); | |||||
private BrokerAddress _serverAddress; | private BrokerAddress _serverAddress; | ||||
private Task? _compositeTask; | private Task? _compositeTask; | ||||
private bool _disposed; | private bool _disposed; | ||||
@@ -47,7 +47,6 @@ namespace DotNetCore.CAP.Internal | |||||
_logger = logger; | _logger = logger; | ||||
_serviceProvider = serviceProvider; | _serviceProvider = serviceProvider; | ||||
_options = serviceProvider.GetRequiredService<IOptions<CapOptions>>().Value; | _options = serviceProvider.GetRequiredService<IOptions<CapOptions>>().Value; | ||||
_cts = new CancellationTokenSource(); | |||||
} | } | ||||
public bool IsHealthy() | public bool IsHealthy() | ||||
@@ -63,7 +62,7 @@ namespace DotNetCore.CAP.Internal | |||||
_storage = _serviceProvider.GetRequiredService<IDataStorage>(); | _storage = _serviceProvider.GetRequiredService<IDataStorage>(); | ||||
_consumerClientFactory = _serviceProvider.GetRequiredService<IConsumerClientFactory>(); | _consumerClientFactory = _serviceProvider.GetRequiredService<IConsumerClientFactory>(); | ||||
stoppingToken.Register(() => _cts.Cancel()); | |||||
stoppingToken.Register(Dispose); | |||||
Execute(); | Execute(); | ||||
} | } | ||||
@@ -11,13 +11,13 @@ namespace DotNetCore.CAP.Test | |||||
public void ToTimestampTest() | public void ToTimestampTest() | ||||
{ | { | ||||
//Arrange | //Arrange | ||||
var time = DateTime.Parse("2018-01-01 00:00:00"); | |||||
var time = DateTimeOffset.Parse("2018-01-01T00:00:00Z"); | |||||
//Act | //Act | ||||
var result = Helper.ToTimestamp(time); | |||||
var result = Helper.ToTimestamp(time.DateTime); | |||||
//Assert | //Assert | ||||
Assert.Equal(1514764800, result); | |||||
Assert.Equal(1514736000, result); | |||||
} | } | ||||
[Fact] | [Fact] | ||||
@@ -25,6 +25,7 @@ namespace DotNetCore.CAP.Test.IntegrationTests | |||||
// Explicitly stop Bootstrapper to prove the cancellation token works. | // Explicitly stop Bootstrapper to prove the cancellation token works. | ||||
var bootstrapper = Container.GetRequiredService<Bootstrapper>(); | var bootstrapper = Container.GetRequiredService<Bootstrapper>(); | ||||
await bootstrapper.StopAsync(CancellationToken.None); | await bootstrapper.StopAsync(CancellationToken.None); | ||||
var (message, token) = HandledMessages | var (message, token) = HandledMessages | ||||
@@ -37,7 +37,11 @@ namespace DotNetCore.CAP.Test | |||||
Parameters = new List<ParameterDescriptor>() | Parameters = new List<ParameterDescriptor>() | ||||
}; | }; | ||||
var header = new Dictionary<string, string>(); | |||||
var header = new Dictionary<string, string>() | |||||
{ | |||||
[Headers.MessageId] = SnowflakeId.Default().NextId().ToString(), | |||||
[Headers.MessageName] = "fake.output.integer" | |||||
}; | |||||
var message = new Message(header, null); | var message = new Message(header, null); | ||||
var context = new ConsumerContext(descriptor, message); | var context = new ConsumerContext(descriptor, message); | ||||
@@ -47,7 +47,11 @@ namespace DotNetCore.CAP.Test | |||||
} | } | ||||
}; | }; | ||||
var header = new Dictionary<string, string>(); | |||||
var header = new Dictionary<string, string>() | |||||
{ | |||||
[Headers.MessageId] = SnowflakeId.Default().NextId().ToString(), | |||||
[Headers.MessageName] = "fake.output.withcancellation" | |||||
}; | |||||
var message = new Message(header, null); | var message = new Message(header, null); | ||||
var context = new ConsumerContext(descriptor, message); | var context = new ConsumerContext(descriptor, message); | ||||