Browse Source

Add low level client to test environment.

release/3.x.x
Christian 3 years ago
parent
commit
2aa4d1b77a
1 changed files with 70 additions and 53 deletions
  1. +70
    -53
      Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs

+ 70
- 53
Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs View File

@@ -18,6 +18,7 @@ namespace MQTTnet.Tests.Mockups
public sealed class TestEnvironment : IDisposable public sealed class TestEnvironment : IDisposable
{ {
readonly MqttFactory _mqttFactory = new MqttFactory(); readonly MqttFactory _mqttFactory = new MqttFactory();
readonly List<ILowLevelMqttClient> _lowLevelClients = new List<ILowLevelMqttClient>();
readonly List<IMqttClient> _clients = new List<IMqttClient>(); readonly List<IMqttClient> _clients = new List<IMqttClient>();
readonly List<string> _serverErrors = new List<string>(); readonly List<string> _serverErrors = new List<string>();
readonly List<string> _clientErrors = new List<string>(); readonly List<string> _clientErrors = new List<string>();
@@ -79,11 +80,6 @@ namespace MQTTnet.Tests.Mockups
}; };
} }


public async Task<IMqttRpcClient> ConnectRpcClientAsync(IMqttRpcClientOptions options)
{
return new MqttRpcClient(await ConnectClientAsync(), options);
}

public IMqttClient CreateClient() public IMqttClient CreateClient()
{ {
lock (_clients) lock (_clients)
@@ -95,35 +91,58 @@ namespace MQTTnet.Tests.Mockups
} }
} }


public Task<IMqttServer> StartServerAsync()
public Task<IMqttClient> ConnectClientAsync()
{ {
return StartServerAsync(new MqttServerOptionsBuilder());
return ConnectClientAsync(new MqttClientOptionsBuilder());
} }


public async Task<IMqttServer> StartServerAsync(MqttServerOptionsBuilder options)
public async Task<IMqttClient> ConnectClientAsync(Action<MqttClientOptionsBuilder> optionsBuilder)
{ {
if (options == null) throw new ArgumentNullException(nameof(options));
if (optionsBuilder == null) throw new ArgumentNullException(nameof(optionsBuilder));


if (Server != null)
{
throw new InvalidOperationException("Server already started.");
}
var options = new MqttClientOptionsBuilder();
options = options.WithTcpServer("localhost", ServerPort);
optionsBuilder.Invoke(options);


Server = new TestServerWrapper(_mqttFactory.CreateMqttServer(ServerLogger), TestContext, this);
var client = CreateClient();
await client.ConnectAsync(options.Build()).ConfigureAwait(false);


options.WithDefaultEndpointPort(ServerPort);
options.WithMaxPendingMessagesPerClient(int.MaxValue);
return client;
}
public async Task<IMqttClient> ConnectClientAsync(MqttClientOptionsBuilder options)
{
if (options == null) throw new ArgumentNullException(nameof(options));


await Server.StartAsync(options.Build()).ConfigureAwait(false);
options = options.WithTcpServer("localhost", ServerPort);


return Server;
var client = CreateClient();
await client.ConnectAsync(options.Build()).ConfigureAwait(false);

return client;
} }


public Task<IMqttClient> ConnectClientAsync()
public async Task<IMqttClient> ConnectClientAsync(IMqttClientOptions options)
{ {
return ConnectClientAsync(new MqttClientOptionsBuilder());
if (options == null) throw new ArgumentNullException(nameof(options));

var client = CreateClient();
await client.ConnectAsync(options).ConfigureAwait(false);

return client;
} }
public ILowLevelMqttClient CreateLowLevelClient()
{
lock (_clients)
{
var client = _mqttFactory.CreateLowLevelMqttClient(ClientLogger);
_lowLevelClients.Add(client);


return client;
}
}
public Task<ILowLevelMqttClient> ConnectLowLevelClientAsync() public Task<ILowLevelMqttClient> ConnectLowLevelClientAsync()
{ {
return ConnectLowLevelClientAsync(o => { }); return ConnectLowLevelClientAsync(o => { });
@@ -137,46 +156,39 @@ namespace MQTTnet.Tests.Mockups
options = options.WithTcpServer("127.0.0.1", ServerPort); options = options.WithTcpServer("127.0.0.1", ServerPort);
optionsBuilder.Invoke(options); optionsBuilder.Invoke(options);


var client = new MqttFactory().CreateLowLevelMqttClient();
var client = CreateLowLevelClient();
await client.ConnectAsync(options.Build(), CancellationToken.None).ConfigureAwait(false); await client.ConnectAsync(options.Build(), CancellationToken.None).ConfigureAwait(false);


return client; return client;
} }
public async Task<IMqttClient> ConnectClientAsync(Action<MqttClientOptionsBuilder> optionsBuilder)
public async Task<IMqttRpcClient> ConnectRpcClientAsync(IMqttRpcClientOptions options)
{ {
if (optionsBuilder == null) throw new ArgumentNullException(nameof(optionsBuilder));

var options = new MqttClientOptionsBuilder();
options = options.WithTcpServer("localhost", ServerPort);
optionsBuilder.Invoke(options);

var client = CreateClient();
await client.ConnectAsync(options.Build()).ConfigureAwait(false);
return new MqttRpcClient(await ConnectClientAsync(), options);
}


return client;
public Task<IMqttServer> StartServerAsync()
{
return StartServerAsync(new MqttServerOptionsBuilder());
} }


public async Task<IMqttClient> ConnectClientAsync(MqttClientOptionsBuilder options)
public async Task<IMqttServer> StartServerAsync(MqttServerOptionsBuilder options)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));


options = options.WithTcpServer("localhost", ServerPort);
var client = CreateClient();
await client.ConnectAsync(options.Build()).ConfigureAwait(false);
if (Server != null)
{
throw new InvalidOperationException("Server already started.");
}


return client;
}
Server = new TestServerWrapper(_mqttFactory.CreateMqttServer(ServerLogger), TestContext, this);


public async Task<IMqttClient> ConnectClientAsync(IMqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
options.WithDefaultEndpointPort(ServerPort);
options.WithMaxPendingMessagesPerClient(int.MaxValue);


var client = CreateClient();
await client.ConnectAsync(options).ConfigureAwait(false);
await Server.StartAsync(options.Build()).ConfigureAwait(false);


return client;
return Server;
} }


public void ThrowIfLogErrors() public void ThrowIfLogErrors()
@@ -198,6 +210,14 @@ namespace MQTTnet.Tests.Mockups
} }
} }


public void TrackException(Exception exception)
{
lock (_exceptions)
{
_exceptions.Add(exception);
}
}
public void Dispose() public void Dispose()
{ {
foreach (var mqttClient in _clients) foreach (var mqttClient in _clients)
@@ -216,6 +236,11 @@ namespace MQTTnet.Tests.Mockups
} }
} }


foreach (var lowLevelMqttClient in _lowLevelClients)
{
lowLevelMqttClient.Dispose();
}

try try
{ {
Server?.StopAsync().GetAwaiter().GetResult(); Server?.StopAsync().GetAwaiter().GetResult();
@@ -236,13 +261,5 @@ namespace MQTTnet.Tests.Mockups
throw new Exception($"{_exceptions.Count} exceptions tracked.\r\n" + string.Join(Environment.NewLine, _exceptions)); throw new Exception($"{_exceptions.Count} exceptions tracked.\r\n" + string.Join(Environment.NewLine, _exceptions));
} }
} }

public void TrackException(Exception exception)
{
lock (_exceptions)
{
_exceptions.Add(exception);
}
}
} }
} }

Loading…
Cancel
Save