|
@@ -108,5 +108,92 @@ namespace MQTTnet.Tests |
|
|
Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count); |
|
|
Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
[TestMethod] |
|
|
|
|
|
public async Task Storage_Queue_Drains() |
|
|
|
|
|
{ |
|
|
|
|
|
using (var testEnvironment = new TestEnvironment()) |
|
|
|
|
|
{ |
|
|
|
|
|
testEnvironment.IgnoreClientLogErrors = true; |
|
|
|
|
|
testEnvironment.IgnoreServerLogErrors = true; |
|
|
|
|
|
|
|
|
|
|
|
var factory = new MqttFactory(); |
|
|
|
|
|
|
|
|
|
|
|
var server = await testEnvironment.StartServerAsync(); |
|
|
|
|
|
|
|
|
|
|
|
var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger()); |
|
|
|
|
|
var clientOptions = new MqttClientOptionsBuilder() |
|
|
|
|
|
.WithTcpServer("localhost", testEnvironment.ServerPort); |
|
|
|
|
|
var storage = new ManagedMqttClientTestStorage(); |
|
|
|
|
|
|
|
|
|
|
|
TaskCompletionSource<bool> connected = new TaskCompletionSource<bool>(); |
|
|
|
|
|
managedClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e => |
|
|
|
|
|
{ |
|
|
|
|
|
managedClient.ConnectedHandler = null; |
|
|
|
|
|
connected.SetResult(true); |
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder() |
|
|
|
|
|
.WithClientOptions(clientOptions) |
|
|
|
|
|
.WithStorage(storage) |
|
|
|
|
|
.WithAutoReconnectDelay(System.TimeSpan.FromSeconds(5)) |
|
|
|
|
|
.Build()); |
|
|
|
|
|
|
|
|
|
|
|
await connected.Task; |
|
|
|
|
|
|
|
|
|
|
|
await testEnvironment.Server.StopAsync(); |
|
|
|
|
|
|
|
|
|
|
|
await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "1" }); |
|
|
|
|
|
|
|
|
|
|
|
//Message should have been added to the storage queue in PublishAsync, |
|
|
|
|
|
//and we are awaiting PublishAsync, so the message should already be |
|
|
|
|
|
//in storage at this point (i.e. no waiting). |
|
|
|
|
|
Assert.AreEqual(1, storage.GetMessageCount()); |
|
|
|
|
|
|
|
|
|
|
|
connected = new TaskCompletionSource<bool>(); |
|
|
|
|
|
managedClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e => |
|
|
|
|
|
{ |
|
|
|
|
|
managedClient.ConnectedHandler = null; |
|
|
|
|
|
connected.SetResult(true); |
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
await testEnvironment.Server.StartAsync(new MqttServerOptionsBuilder() |
|
|
|
|
|
.WithDefaultEndpointPort(testEnvironment.ServerPort).Build()); |
|
|
|
|
|
|
|
|
|
|
|
await connected.Task; |
|
|
|
|
|
|
|
|
|
|
|
//Wait 500ms here so the client has time to publish the queued message |
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
|
|
|
|
|
|
Assert.AreEqual(0, storage.GetMessageCount()); |
|
|
|
|
|
|
|
|
|
|
|
await managedClient.StopAsync(); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public class ManagedMqttClientTestStorage : IManagedMqttClientStorage |
|
|
|
|
|
{ |
|
|
|
|
|
private IList<ManagedMqttApplicationMessage> _messages = null; |
|
|
|
|
|
|
|
|
|
|
|
public Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync() |
|
|
|
|
|
{ |
|
|
|
|
|
if (_messages == null) |
|
|
|
|
|
{ |
|
|
|
|
|
_messages = new List<ManagedMqttApplicationMessage>(); |
|
|
|
|
|
} |
|
|
|
|
|
return Task.FromResult(_messages); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public Task SaveQueuedMessagesAsync(IList<ManagedMqttApplicationMessage> messages) |
|
|
|
|
|
{ |
|
|
|
|
|
_messages = messages; |
|
|
|
|
|
return Task.FromResult(0); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public int GetMessageCount() |
|
|
|
|
|
{ |
|
|
|
|
|
return _messages.Count; |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} |