Explorar el Código

Merge pull request #717 from PaulFake/patch-2

Storage queue drain test
release/3.x.x
Christian hace 5 años
committed by GitHub
padre
commit
1dab729d84
No se encontró ninguna clave conocida en la base de datos para esta firma ID de clave GPG: 4AEE18F83AFDEB23
Se han modificado 1 ficheros con 88 adiciones y 1 borrados
  1. +88
    -1
      Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs

+ 88
- 1
Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs Ver fichero

@@ -108,5 +108,92 @@ namespace MQTTnet.Tests
Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count);
}
}
[TestMethod]
public async Task Storage_Queue_Drains()
{
using (var testEnvironment = new TestEnvironment())
{
testEnvironment.IgnoreClientLogErrors = true;
testEnvironment.IgnoreServerLogErrors = true;

var factory = new MqttFactory();

var server = await testEnvironment.StartServerAsync();

var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger());
var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", testEnvironment.ServerPort);
var storage = new ManagedMqttClientTestStorage();

TaskCompletionSource<bool> connected = new TaskCompletionSource<bool>();
managedClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e =>
{
managedClient.ConnectedHandler = null;
connected.SetResult(true);
});

await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
.WithClientOptions(clientOptions)
.WithStorage(storage)
.WithAutoReconnectDelay(System.TimeSpan.FromSeconds(5))
.Build());

await connected.Task;

await testEnvironment.Server.StopAsync();

await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "1" });

//Message should have been added to the storage queue in PublishAsync,
//and we are awaiting PublishAsync, so the message should already be
//in storage at this point (i.e. no waiting).
Assert.AreEqual(1, storage.GetMessageCount());

connected = new TaskCompletionSource<bool>();
managedClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e =>
{
managedClient.ConnectedHandler = null;
connected.SetResult(true);
});

await testEnvironment.Server.StartAsync(new MqttServerOptionsBuilder()
.WithDefaultEndpointPort(testEnvironment.ServerPort).Build());

await connected.Task;

//Wait 500ms here so the client has time to publish the queued message
await Task.Delay(500);

Assert.AreEqual(0, storage.GetMessageCount());

await managedClient.StopAsync();
}
}
}
public class ManagedMqttClientTestStorage : IManagedMqttClientStorage
{
private IList<ManagedMqttApplicationMessage> _messages = null;

public Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync()
{
if (_messages == null)
{
_messages = new List<ManagedMqttApplicationMessage>();
}
return Task.FromResult(_messages);
}

public Task SaveQueuedMessagesAsync(IList<ManagedMqttApplicationMessage> messages)
{
_messages = messages;
return Task.FromResult(0);
}

public int GetMessageCount()
{
return _messages.Count;
}
}
}
}

Cargando…
Cancelar
Guardar