diff --git a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs index ca665a1..f0812f3 100644 --- a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs @@ -108,5 +108,92 @@ namespace MQTTnet.Tests Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count); } } + + [TestMethod] + public async Task Storage_Queue_Drains() + { + using (var testEnvironment = new TestEnvironment()) + { + testEnvironment.IgnoreClientLogErrors = true; + testEnvironment.IgnoreServerLogErrors = true; + + var factory = new MqttFactory(); + + var server = await testEnvironment.StartServerAsync(); + + var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger()); + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost", testEnvironment.ServerPort); + var storage = new ManagedMqttClientTestStorage(); + + TaskCompletionSource connected = new TaskCompletionSource(); + managedClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e => + { + managedClient.ConnectedHandler = null; + connected.SetResult(true); + }); + + await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder() + .WithClientOptions(clientOptions) + .WithStorage(storage) + .WithAutoReconnectDelay(System.TimeSpan.FromSeconds(5)) + .Build()); + + await connected.Task; + + await testEnvironment.Server.StopAsync(); + + await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "1" }); + + //Message should have been added to the storage queue in PublishAsync, + //and we are awaiting PublishAsync, so the message should already be + //in storage at this point (i.e. no waiting). + Assert.AreEqual(1, storage.GetMessageCount()); + + connected = new TaskCompletionSource(); + managedClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e => + { + managedClient.ConnectedHandler = null; + connected.SetResult(true); + }); + + await testEnvironment.Server.StartAsync(new MqttServerOptionsBuilder() + .WithDefaultEndpointPort(testEnvironment.ServerPort).Build()); + + await connected.Task; + + //Wait 500ms here so the client has time to publish the queued message + await Task.Delay(500); + + Assert.AreEqual(0, storage.GetMessageCount()); + + await managedClient.StopAsync(); + } + } + } + + public class ManagedMqttClientTestStorage : IManagedMqttClientStorage + { + private IList _messages = null; + + public Task> LoadQueuedMessagesAsync() + { + if (_messages == null) + { + _messages = new List(); + } + return Task.FromResult(_messages); + } + + public Task SaveQueuedMessagesAsync(IList messages) + { + _messages = messages; + return Task.FromResult(0); + } + + public int GetMessageCount() + { + return _messages.Count; + } } -} \ No newline at end of file +}