Browse Source

Merge pull request #833 from JanEggers/bugfixes

Bugfixes
release/3.x.x
Christian 5 years ago
committed by GitHub
parent
commit
2e7d074174
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 674 additions and 225 deletions
  1. +18
    -30
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  2. +6
    -14
      Source/MQTTnet/Adapter/MqttChannelAdapter.cs
  3. +28
    -8
      Source/MQTTnet/Client/MqttClient.cs
  4. +22
    -8
      Source/MQTTnet/Implementations/MqttTcpChannel.cs
  5. +13
    -3
      Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
  6. +1
    -6
      Source/MQTTnet/Implementations/MqttTcpServerListener.cs
  7. +13
    -3
      Source/MQTTnet/Implementations/MqttWebSocketChannel.cs
  8. +92
    -0
      Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs
  9. +7
    -3
      Source/MQTTnet/Internal/BlockingQueue.cs
  10. +56
    -0
      Source/MQTTnet/Internal/Disposable.cs
  11. +8
    -3
      Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs
  12. +8
    -3
      Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs
  13. +7
    -3
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  14. +2
    -1
      Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs
  15. +2
    -1
      Tests/MQTTnet.Benchmarks/MessageProcessingBenchmark.cs
  16. +2
    -1
      Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs
  17. +2
    -1
      Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs
  18. +1
    -1
      Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj
  19. +5
    -3
      Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs
  20. +3
    -1
      Tests/MQTTnet.Core.Tests/MQTTv5/Feature_Tests.cs
  21. +3
    -1
      Tests/MQTTnet.Core.Tests/MQTTv5/Server_Tests.cs
  22. +9
    -7
      Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs
  23. +94
    -0
      Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs
  24. +23
    -14
      Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs
  25. +108
    -0
      Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs
  26. +48
    -16
      Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs
  27. +4
    -4
      Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs
  28. +3
    -3
      Tests/MQTTnet.Core.Tests/MqttTcpChannel_Tests.cs
  29. +5
    -3
      Tests/MQTTnet.Core.Tests/RPC_Tests.cs
  30. +3
    -1
      Tests/MQTTnet.Core.Tests/RoundtripTime_Tests.cs
  31. +10
    -8
      Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs
  32. +60
    -66
      Tests/MQTTnet.Core.Tests/Server_Tests.cs
  33. +3
    -1
      Tests/MQTTnet.Core.Tests/Session_Tests.cs
  34. +1
    -4
      Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj
  35. +3
    -3
      Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs
  36. +1
    -1
      Tests/MQTTnet.TestApp.NetCore/ServerTest.cs

+ 18
- 30
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -16,7 +16,7 @@ using MQTTnet.Server;

namespace MQTTnet.Extensions.ManagedClient
{
public class ManagedMqttClient : IManagedMqttClient
public class ManagedMqttClient : Disposable, IManagedMqttClient
{
private readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();

@@ -42,9 +42,7 @@ namespace MQTTnet.Extensions.ManagedClient
private Task _maintainConnectionTask;

private ManagedMqttClientStorageManager _storageManager;

private bool _disposed;

public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
{
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
@@ -146,6 +144,7 @@ namespace MQTTnet.Extensions.ManagedClient
ThrowIfDisposed();

if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
if (Options == null) throw new InvalidOperationException("call StartAsync before publishing messages");

MqttTopicValidator.ThrowIfInvalid(applicationMessage.ApplicationMessage.Topic);

@@ -238,36 +237,25 @@ namespace MQTTnet.Extensions.ManagedClient
return Task.FromResult(0);
}

public void Dispose()
protected override void Dispose(bool disposing)
{
if (_disposed)
{
return;
}

_disposed = true;

StopPublishing();
StopMaintainingConnection();

if (_maintainConnectionTask != null)
if (disposing)
{
Task.WaitAny(_maintainConnectionTask);
_maintainConnectionTask = null;
}
StopPublishing();
StopMaintainingConnection();

_messageQueue.Dispose();
_messageQueueLock.Dispose();
_mqttClient.Dispose();
_subscriptionsQueuedSignal.Dispose();
}
if (_maintainConnectionTask != null)
{
_maintainConnectionTask.GetAwaiter().GetResult();
_maintainConnectionTask = null;
}

private void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ManagedMqttClient));
_messageQueue.Dispose();
_messageQueueLock.Dispose();
_mqttClient.Dispose();
_subscriptionsQueuedSignal.Dispose();
}
base.Dispose(disposing);
}

private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
@@ -288,7 +276,7 @@ namespace MQTTnet.Extensions.ManagedClient
}
finally
{
if (!_disposed)
if (!IsDisposed)
{
try
{


+ 6
- 14
Source/MQTTnet/Adapter/MqttChannelAdapter.cs View File

@@ -14,7 +14,7 @@ using MQTTnet.Packets;

namespace MQTTnet.Adapter
{
public class MqttChannelAdapter : IMqttChannelAdapter
public class MqttChannelAdapter : Disposable, IMqttChannelAdapter
{
private const uint ErrorOperationAborted = 0x800703E3;
private const int ReadBufferSize = 4096; // TODO: Move buffer size to config
@@ -26,9 +26,7 @@ namespace MQTTnet.Adapter
private readonly MqttPacketReader _packetReader;

private readonly byte[] _fixedHeaderBuffer = new byte[2];

private bool _isDisposed;

private long _bytesReceived;
private long _bytesSent;

@@ -269,19 +267,13 @@ namespace MQTTnet.Adapter
}
}

public void Dispose()
{
_isDisposed = true;

_channel?.Dispose();
}

private void ThrowIfDisposed()
protected override void Dispose(bool disposing)
{
if (_isDisposed)
if (disposing)
{
throw new ObjectDisposedException(nameof(MqttChannelAdapter));
_channel?.Dispose();
}
base.Dispose(disposing);
}

private static bool IsWrappedException(Exception exception)


+ 28
- 8
Source/MQTTnet/Client/MqttClient.cs View File

@@ -20,7 +20,7 @@ using MQTTnet.Protocol;

namespace MQTTnet.Client
{
public class MqttClient : IMqttClient
public class MqttClient : Disposable, IMqttClient
{
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
@@ -63,6 +63,8 @@ namespace MQTTnet.Client

ThrowIfConnected("It is not allowed to connect with a server after the connection is established.");

ThrowIfDisposed();

MqttClientAuthenticateResult authenticateResult = null;

try
@@ -79,13 +81,16 @@ namespace MQTTnet.Client
var adapter = _adapterFactory.CreateClientAdapter(options, _logger);
_adapter = adapter;

_logger.Verbose($"Trying to connect with server '{options.ChannelOptions}' (Timeout={options.CommunicationTimeout}).");
await _adapter.ConnectAsync(options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
_logger.Verbose("Connection with server established.");
using (var combined = CancellationTokenSource.CreateLinkedTokenSource(backgroundCancellationToken, cancellationToken))
{
_logger.Verbose($"Trying to connect with server '{options.ChannelOptions}' (Timeout={options.CommunicationTimeout}).");
await _adapter.ConnectAsync(options.CommunicationTimeout, combined.Token).ConfigureAwait(false);
_logger.Verbose("Connection with server established.");

_packetReceiverTask = Task.Run(() => TryReceivePacketsAsync(backgroundCancellationToken), backgroundCancellationToken);
_packetReceiverTask = Task.Run(() => TryReceivePacketsAsync(backgroundCancellationToken), backgroundCancellationToken);

authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, cancellationToken).ConfigureAwait(false);
authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, combined.Token).ConfigureAwait(false);
}

_sendTracker.Restart();

@@ -161,6 +166,7 @@ namespace MQTTnet.Client
{
if (options == null) throw new ArgumentNullException(nameof(options));

ThrowIfDisposed();
ThrowIfNotConnected();

var subscribePacket = _adapter.PacketFormatterAdapter.DataConverter.CreateSubscribePacket(options);
@@ -174,6 +180,7 @@ namespace MQTTnet.Client
{
if (options == null) throw new ArgumentNullException(nameof(options));

ThrowIfDisposed();
ThrowIfNotConnected();

var unsubscribePacket = _adapter.PacketFormatterAdapter.DataConverter.CreateUnsubscribePacket(options);
@@ -189,6 +196,7 @@ namespace MQTTnet.Client

MqttTopicValidator.ThrowIfInvalid(applicationMessage.Topic);

ThrowIfDisposed();
ThrowIfNotConnected();

var publishPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePublishPacket(applicationMessage);
@@ -214,7 +222,7 @@ namespace MQTTnet.Client
}
}

public void Dispose()
private void Cleanup()
{
_backgroundCancellationTokenSource?.Cancel(false);
_backgroundCancellationTokenSource?.Dispose();
@@ -224,6 +232,18 @@ namespace MQTTnet.Client
_adapter = null;
}


protected override void Dispose(bool disposing)
{
if (disposing)
{
Cleanup();

DisconnectedHandler = null;
}
base.Dispose(disposing);
}

private async Task<MqttClientAuthenticateResult> AuthenticateAsync(IMqttChannelAdapter channelAdapter, MqttApplicationMessage willApplicationMessage, CancellationToken cancellationToken)
{
var connectPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnectPacket(
@@ -288,7 +308,7 @@ namespace MQTTnet.Client
}
finally
{
Dispose();
Cleanup();
_cleanDisconnectInitiated = false;

_logger.Info("Disconnected.");


+ 22
- 8
Source/MQTTnet/Implementations/MqttTcpChannel.cs View File

@@ -10,10 +10,11 @@ using System.Runtime.ExceptionServices;
using System.Threading;
using MQTTnet.Channel;
using MQTTnet.Client.Options;
using MQTTnet.Internal;

namespace MQTTnet.Implementations
{
public class MqttTcpChannel : IMqttChannel
public class MqttTcpChannel : Disposable, IMqttChannel
{
private readonly IMqttClientOptions _clientOptions;
private readonly MqttClientTcpOptions _options;
@@ -72,11 +73,7 @@ namespace MQTTnet.Implementations
// Workaround for: workaround for https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(() => socket.Dispose()))
{
#if NET452 || NET461
await Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, _options.Server, _options.GetPort(), null).ConfigureAwait(false);
#else
await socket.ConnectAsync(_options.Server, _options.GetPort()).ConfigureAwait(false);
#endif
await PlatformAbstractionLayer.ConnectAsync(socket, _options.Server, _options.GetPort()).ConfigureAwait(false);
}

var networkStream = new NetworkStream(socket, true);
@@ -98,7 +95,7 @@ namespace MQTTnet.Implementations

public Task DisconnectAsync(CancellationToken cancellationToken)
{
Dispose();
Cleanup();
return Task.FromResult(0);
}

@@ -117,6 +114,10 @@ namespace MQTTnet.Implementations
return await _stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
}
catch (ObjectDisposedException)
{
return 0;
}
catch (IOException exception)
{
if (exception.InnerException is SocketException socketException)
@@ -143,6 +144,10 @@ namespace MQTTnet.Implementations
await _stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
}
catch (ObjectDisposedException)
{
return;
}
catch (IOException exception)
{
if (exception.InnerException is SocketException socketException)
@@ -154,7 +159,7 @@ namespace MQTTnet.Implementations
}
}

public void Dispose()
private void Cleanup()
{
// When the stream is disposed it will also close the socket and this will also dispose it.
// So there is no need to dispose the socket again.
@@ -173,6 +178,15 @@ namespace MQTTnet.Implementations
_stream = null;
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
Cleanup();
}
base.Dispose(disposing);
}

private bool InternalUserCertificateValidationCallback(object sender, X509Certificate x509Certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
if (_options.TlsOptions.CertificateValidationCallback != null)


+ 13
- 3
Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs View File

@@ -8,11 +8,12 @@ using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Internal;
using MQTTnet.Server;

namespace MQTTnet.Implementations
{
public class MqttTcpServerAdapter : IMqttServerAdapter
public class MqttTcpServerAdapter : Disposable, IMqttServerAdapter
{
private readonly List<MqttTcpServerListener> _listeners = new List<MqttTcpServerListener>();
private readonly IMqttNetChildLogger _logger;
@@ -72,11 +73,11 @@ namespace MQTTnet.Implementations

public Task StopAsync()
{
Dispose();
Cleanup();
return Task.FromResult(0);
}

public void Dispose()
private void Cleanup()
{
_cancellationTokenSource?.Cancel(false);
_cancellationTokenSource?.Dispose();
@@ -90,6 +91,15 @@ namespace MQTTnet.Implementations
_listeners.Clear();
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
Cleanup();
}
base.Dispose(disposing);
}

private void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, CancellationToken cancellationToken)
{
if (!options.BoundInterNetworkAddress.Equals(IPAddress.None))


+ 1
- 6
Source/MQTTnet/Implementations/MqttTcpServerListener.cs View File

@@ -107,12 +107,7 @@ namespace MQTTnet.Implementations
{
try
{
#if NET452 || NET461
var clientSocket = await Task.Factory.FromAsync(_socket.BeginAccept, _socket.EndAccept, null).ConfigureAwait(false);
#else
var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false);
#endif

var clientSocket = await PlatformAbstractionLayer.AcceptAsync(_socket).ConfigureAwait(false);
if (clientSocket == null)
{
continue;


+ 13
- 3
Source/MQTTnet/Implementations/MqttWebSocketChannel.cs View File

@@ -6,10 +6,11 @@ using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Channel;
using MQTTnet.Client.Options;
using MQTTnet.Internal;

namespace MQTTnet.Implementations
{
public class MqttWebSocketChannel : IMqttChannel
public class MqttWebSocketChannel : Disposable, IMqttChannel
{
private readonly MqttClientWebSocketOptions _options;

@@ -111,7 +112,7 @@ namespace MQTTnet.Implementations
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false);
}

Dispose();
Cleanup();
}

public async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
@@ -141,7 +142,16 @@ namespace MQTTnet.Implementations
}
}

public void Dispose()
protected override void Dispose(bool disposing)
{
if (disposing)
{
Cleanup();
}
base.Dispose(disposing);
}

private void Cleanup()
{
_sendLock?.Dispose();
_sendLock = null;


+ 92
- 0
Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs View File

@@ -0,0 +1,92 @@
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public static class PlatformAbstractionLayer
{
public static async Task<Socket> AcceptAsync(Socket socket)
{
#if NET452 || NET461
try
{
return await Task.Factory.FromAsync(socket.BeginAccept, socket.EndAccept, null).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
return null;
}
#else
return await socket.AcceptAsync().ConfigureAwait(false);
#endif
}


public static Task ConnectAsync(Socket socket, IPAddress ip, int port)
{
#if NET452 || NET461
return Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, ip, port, null);
#else
return socket.ConnectAsync(ip, port);
#endif
}

public static Task ConnectAsync(Socket socket, string host, int port)
{
#if NET452 || NET461
return Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, host, port, null);
#else
return socket.ConnectAsync(host, port);
#endif
}

#if NET452 || NET461
public class SocketWrapper
{
private readonly Socket _socket;
private readonly ArraySegment<byte> _buffer;
private readonly SocketFlags _socketFlags;

public SocketWrapper(Socket socket, ArraySegment<byte> buffer, SocketFlags socketFlags)
{
_socket = socket;
_buffer = buffer;
_socketFlags = socketFlags;
}

public static IAsyncResult BeginSend(AsyncCallback callback, object state)
{
var real = (SocketWrapper)state;
return real._socket.BeginSend(real._buffer.Array, real._buffer.Offset, real._buffer.Count, real._socketFlags, callback, state);
}

public static IAsyncResult BeginReceive(AsyncCallback callback, object state)
{
var real = (SocketWrapper)state;
return real._socket.BeginReceive(real._buffer.Array, real._buffer.Offset, real._buffer.Count, real._socketFlags, callback, state);
}
}
#endif

public static Task SendAsync(Socket socket, ArraySegment<byte> buffer, SocketFlags socketFlags)
{
#if NET452 || NET461
return Task.Factory.FromAsync(SocketWrapper.BeginSend, socket.EndSend, new SocketWrapper(socket, buffer, socketFlags));
#else
return socket.SendAsync(buffer, socketFlags);
#endif
}

public static Task<int> ReceiveAsync(Socket socket, ArraySegment<byte> buffer, SocketFlags socketFlags)
{
#if NET452 || NET461
return Task.Factory.FromAsync(SocketWrapper.BeginReceive, socket.EndReceive, new SocketWrapper(socket, buffer, socketFlags));
#else
return socket.ReceiveAsync(buffer, socketFlags);
#endif
}

}
}

+ 7
- 3
Source/MQTTnet/Internal/BlockingQueue.cs View File

@@ -4,7 +4,7 @@ using System.Threading;

namespace MQTTnet.Internal
{
public class BlockingQueue<TItem> : IDisposable
public class BlockingQueue<TItem> : Disposable
{
private readonly object _syncRoot = new object();
private readonly LinkedList<TItem> _items = new LinkedList<TItem>();
@@ -109,9 +109,13 @@ namespace MQTTnet.Internal
}
}

public void Dispose()
protected override void Dispose(bool disposing)
{
_gate.Dispose();
if (disposing)
{
_gate.Dispose();
}
base.Dispose(disposing);
}
}
}

+ 56
- 0
Source/MQTTnet/Internal/Disposable.cs View File

@@ -0,0 +1,56 @@
using System;

namespace MQTTnet.Internal
{
public class Disposable : IDisposable
{
protected bool IsDisposed => _isDisposed;

protected void ThrowIfDisposed()
{
if (_isDisposed)
{
throw new ObjectDisposedException(GetType().Name);
}
}


#region IDisposable Support
private bool _isDisposed = false; // To detect redundant calls

protected virtual void Dispose(bool disposing)
{
if (disposing)
{
// TODO: dispose managed state (managed objects).
}

// TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
// TODO: set large fields to null.
}

// TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
// ~Disposable()
// {
// // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
// Dispose(false);
// }

// This code added to correctly implement the disposable pattern.
public void Dispose()
{
if (_isDisposed)
{
return;
}
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// TODO: uncomment the following line if the finalizer is overridden above.
// GC.SuppressFinalize(this);

_isDisposed = true;
}
#endregion
}
}

+ 8
- 3
Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs View File

@@ -2,11 +2,12 @@
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Exceptions;
using MQTTnet.Internal;
using MQTTnet.Packets;

namespace MQTTnet.PacketDispatcher
{
public sealed class MqttPacketAwaiter<TPacket> : IMqttPacketAwaiter where TPacket : MqttBasePacket
public sealed class MqttPacketAwaiter<TPacket> : Disposable, IMqttPacketAwaiter where TPacket : MqttBasePacket
{
private readonly TaskCompletionSource<MqttBasePacket> _taskCompletionSource = new TaskCompletionSource<MqttBasePacket>();
private readonly ushort? _packetIdentifier;
@@ -52,9 +53,13 @@ namespace MQTTnet.PacketDispatcher
Task.Run(() => _taskCompletionSource.TrySetCanceled());
}

public void Dispose()
protected override void Dispose(bool disposing)
{
_owningPacketDispatcher.RemovePacketAwaiter<TPacket>(_packetIdentifier);
if (disposing)
{
_owningPacketDispatcher.RemovePacketAwaiter<TPacket>(_packetIdentifier);
}
base.Dispose(disposing);
}
}
}

+ 8
- 3
Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs View File

@@ -6,7 +6,7 @@ using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttClientSessionApplicationMessagesQueue : IDisposable
public class MqttClientSessionApplicationMessagesQueue : Disposable
{
private readonly AsyncQueue<MqttQueuedApplicationMessage> _messageQueue = new AsyncQueue<MqttQueuedApplicationMessage>();
@@ -71,9 +71,14 @@ namespace MQTTnet.Server
}
}

public void Dispose()
protected override void Dispose(bool disposing)
{
_messageQueue.Dispose();
if (disposing)
{
_messageQueue.Dispose();
}

base.Dispose(disposing);
}
}
}

+ 7
- 3
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -13,7 +13,7 @@ using MQTTnet.Server.Status;

namespace MQTTnet.Server
{
public class MqttClientSessionsManager : IDisposable
public class MqttClientSessionsManager : Disposable
{
private readonly AsyncQueue<MqttEnqueuedApplicationMessage> _messageQueue = new AsyncQueue<MqttEnqueuedApplicationMessage>();

@@ -145,9 +145,13 @@ namespace MQTTnet.Server
_logger.Verbose("Session for client '{0}' deleted.", clientId);
}

public void Dispose()
protected override void Dispose(bool disposing)
{
_messageQueue?.Dispose();
if (disposing)
{
_messageQueue?.Dispose();
}
base.Dispose(disposing);
}

private async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken)


+ 2
- 1
Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs View File

@@ -1,9 +1,10 @@
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using MQTTnet.Diagnostics;

namespace MQTTnet.Benchmarks
{
[ClrJob]
[SimpleJob(RuntimeMoniker.Net461)]
[RPlotExporter]
[MemoryDiagnoser]
public class LoggerBenchmark


+ 2
- 1
Tests/MQTTnet.Benchmarks/MessageProcessingBenchmark.cs View File

@@ -1,11 +1,12 @@
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Server;

namespace MQTTnet.Benchmarks
{
[ClrJob]
[SimpleJob(RuntimeMoniker.Net461)]
[RPlotExporter, RankColumn]
[MemoryDiagnoser]
public class MessageProcessingBenchmark


+ 2
- 1
Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs View File

@@ -8,10 +8,11 @@ using MQTTnet.Adapter;
using MQTTnet.Channel;
using MQTTnet.Formatter;
using MQTTnet.Formatter.V3;
using BenchmarkDotNet.Jobs;

namespace MQTTnet.Benchmarks
{
[ClrJob]
[SimpleJob(RuntimeMoniker.Net461)]
[RPlotExporter]
[MemoryDiagnoser]
public class SerializerBenchmark


+ 2
- 1
Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs View File

@@ -1,10 +1,11 @@
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using MQTTnet.Server;
using System;

namespace MQTTnet.Benchmarks
{
[ClrJob]
[SimpleJob(RuntimeMoniker.Net461)]
[RPlotExporter]
[MemoryDiagnoser]
public class TopicFilterComparerBenchmark


+ 1
- 1
Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj View File

@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
<TargetFrameworks>netcoreapp2.2;net461</TargetFrameworks>
<IsPackable>false</IsPackable>
</PropertyGroup>



+ 5
- 3
Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs View File

@@ -18,10 +18,12 @@ namespace MQTTnet.Tests.MQTTv5
[TestClass]
public class Client_Tests
{
public TestContext TestContext { get; set; }

[TestMethod]
public async Task Connect_With_New_Mqtt_Features()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -61,7 +63,7 @@ namespace MQTTnet.Tests.MQTTv5
[TestMethod]
public async Task Connect_With_AssignedClientId()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
string serverConnectedClientId = null;
string serverDisconnectedClientId = null;
@@ -357,7 +359,7 @@ namespace MQTTnet.Tests.MQTTv5
[TestMethod]
public async Task Publish_And_Receive_New_Properties()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();



+ 3
- 1
Tests/MQTTnet.Core.Tests/MQTTv5/Feature_Tests.cs View File

@@ -13,10 +13,12 @@ namespace MQTTnet.Tests.MQTTv5
[TestClass]
public class Feature_Tests
{
public TestContext TestContext { get; set; }

[TestMethod]
public async Task Use_User_Properties()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();



+ 3
- 1
Tests/MQTTnet.Core.Tests/MQTTv5/Server_Tests.cs View File

@@ -11,10 +11,12 @@ namespace MQTTnet.Tests.MQTTv5
[TestClass]
public class Server_Tests
{
public TestContext TestContext { get; set; }

[TestMethod]
public async Task Will_Message_Send()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var receivedMessagesCount = 0;



+ 9
- 7
Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs View File

@@ -18,6 +18,8 @@ namespace MQTTnet.Tests
[TestClass]
public class ManagedMqttClient_Tests
{
public TestContext TestContext { get; set; }

[TestMethod]
public async Task Drop_New_Messages_On_Full_Queue()
{
@@ -54,7 +56,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task ManagedClients_Will_Message_Send()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var receivedMessagesCount = 0;

@@ -88,7 +90,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Start_Stop()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var factory = new MqttFactory();

@@ -115,7 +117,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Storage_Queue_Drains()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
testEnvironment.IgnoreClientLogErrors = true;
testEnvironment.IgnoreServerLogErrors = true;
@@ -167,7 +169,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Subscriptions_And_Unsubscriptions_Are_Made_And_Reestablished_At_Reconnect()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var unmanagedClient = testEnvironment.CreateClient();
var managedClient = await CreateManagedClientAsync(testEnvironment, unmanagedClient);
@@ -232,7 +234,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Subscriptions_Subscribe_Only_New_Subscriptions()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var managedClient = await CreateManagedClientAsync(testEnvironment);

@@ -265,7 +267,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Subscriptions_Are_Published_Immediately()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
// Use a long connection check interval to verify that the subscriptions
// do not depend on the connection check interval anymore
@@ -289,7 +291,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Subscriptions_Are_Cleared_At_Logout()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var managedClient = await CreateManagedClientAsync(testEnvironment);



+ 94
- 0
Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs View File

@@ -0,0 +1,94 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.ExtendedAuthenticationExchange;
using MQTTnet.Client.Options;
using MQTTnet.Client.Publishing;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Subscribing;
using MQTTnet.Client.Unsubscribing;

namespace MQTTnet.Tests.Mockups
{
public class TestClientWrapper : IMqttClient
{
public TestClientWrapper(IMqttClient implementation, TestContext testContext)
{
Implementation = implementation;
TestContext = testContext;
}

public IMqttClient Implementation { get; }
public TestContext TestContext { get; }

public bool IsConnected => Implementation.IsConnected;

public IMqttClientOptions Options => Implementation.Options;

public IMqttClientConnectedHandler ConnectedHandler { get => Implementation.ConnectedHandler; set => Implementation.ConnectedHandler = value; }
public IMqttClientDisconnectedHandler DisconnectedHandler { get => Implementation.DisconnectedHandler; set => Implementation.DisconnectedHandler = value; }
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get => Implementation.ApplicationMessageReceivedHandler; set => Implementation.ApplicationMessageReceivedHandler = value; }

public Task<MqttClientAuthenticateResult> ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken)
{
switch (options)
{
case MqttClientOptionsBuilder builder:
{
var existingClientId = builder.Build().ClientId;
if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName))
{
builder.WithClientId(TestContext.TestName + existingClientId);
}
}
break;
case MqttClientOptions op:
{
var existingClientId = op.ClientId;
if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName))
{
op.ClientId = TestContext.TestName + existingClientId;
}
}
break;
default:
break;
}

return Implementation.ConnectAsync(options, cancellationToken);
}

public Task DisconnectAsync(MqttClientDisconnectOptions options, CancellationToken cancellationToken)
{
return Implementation.DisconnectAsync(options, cancellationToken);
}

public void Dispose()
{
Implementation.Dispose();
}

public Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
{
return Implementation.PublishAsync(applicationMessage, cancellationToken);
}

public Task SendExtendedAuthenticationExchangeDataAsync(MqttExtendedAuthenticationExchangeData data, CancellationToken cancellationToken)
{
return Implementation.SendExtendedAuthenticationExchangeDataAsync(data, cancellationToken);
}

public Task<Client.Subscribing.MqttClientSubscribeResult> SubscribeAsync(MqttClientSubscribeOptions options, CancellationToken cancellationToken)
{
return Implementation.SubscribeAsync(options, cancellationToken);
}

public Task<MqttClientUnsubscribeResult> UnsubscribeAsync(MqttClientUnsubscribeOptions options, CancellationToken cancellationToken)
{
return Implementation.UnsubscribeAsync(options, cancellationToken);
}
}
}

+ 23
- 14
Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs View File

@@ -2,14 +2,16 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;
using MQTTnet.Internal;
using MQTTnet.Server;

namespace MQTTnet.Tests.Mockups
{
public class TestEnvironment : IDisposable
public class TestEnvironment : Disposable
{
private readonly MqttFactory _mqttFactory = new MqttFactory();
private readonly List<IMqttClient> _clients = new List<IMqttClient>();
@@ -33,7 +35,9 @@ namespace MQTTnet.Tests.Mockups

public IMqttNetLogger ClientLogger => _clientLogger;

public TestEnvironment()
public TestContext TestContext { get; }

public TestEnvironment(TestContext testContext)
{
_serverLogger.LogMessagePublished += (s, e) =>
{
@@ -56,13 +60,14 @@ namespace MQTTnet.Tests.Mockups
}
}
};
TestContext = testContext;
}

public IMqttClient CreateClient()
{
var client = _mqttFactory.CreateMqttClient(_clientLogger);
_clients.Add(client);
return client;
return new TestClientWrapper(client, TestContext);
}

public Task<IMqttServer> StartServerAsync()
@@ -77,7 +82,7 @@ namespace MQTTnet.Tests.Mockups
throw new InvalidOperationException("Server already started.");
}

Server = _mqttFactory.CreateMqttServer(_serverLogger);
Server = new TestServerWrapper(_mqttFactory.CreateMqttServer(_serverLogger), TestContext, this);
await Server.StartAsync(options.WithDefaultEndpointPort(ServerPort).Build());

return Server;
@@ -85,7 +90,7 @@ namespace MQTTnet.Tests.Mockups

public Task<IMqttClient> ConnectClientAsync()
{
return ConnectClientAsync(new MqttClientOptionsBuilder());
return ConnectClientAsync(new MqttClientOptionsBuilder() );
}

public async Task<IMqttClient> ConnectClientAsync(MqttClientOptionsBuilder options)
@@ -127,21 +132,25 @@ namespace MQTTnet.Tests.Mockups
}
}

public void Dispose()
protected override void Dispose(bool disposing)
{
foreach (var mqttClient in _clients)
if (disposing)
{
mqttClient?.Dispose();
}
foreach (var mqttClient in _clients)
{
mqttClient?.Dispose();
}

Server?.StopAsync().GetAwaiter().GetResult();
Server?.StopAsync().GetAwaiter().GetResult();

ThrowIfLogErrors();
ThrowIfLogErrors();

if (_exceptions.Any())
{
throw new Exception($"{_exceptions.Count} exceptions tracked.\r\n" + string.Join(Environment.NewLine, _exceptions));
if (_exceptions.Any())
{
throw new Exception($"{_exceptions.Count} exceptions tracked.\r\n" + string.Join(Environment.NewLine, _exceptions));
}
}
base.Dispose(disposing);
}

public void TrackException(Exception exception)


+ 108
- 0
Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs View File

@@ -0,0 +1,108 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client.Publishing;
using MQTTnet.Client.Receiving;
using MQTTnet.Server;
using MQTTnet.Server.Status;

namespace MQTTnet.Tests.Mockups
{
public class TestServerWrapper : IMqttServer
{
public TestServerWrapper(IMqttServer implementation, TestContext testContext, TestEnvironment testEnvironment)
{
Implementation = implementation;
TestContext = testContext;
TestEnvironment = testEnvironment;
}

public IMqttServer Implementation { get; }
public TestContext TestContext { get; }
public TestEnvironment TestEnvironment { get; }
public IMqttServerStartedHandler StartedHandler { get => Implementation.StartedHandler; set => Implementation.StartedHandler = value; }
public IMqttServerStoppedHandler StoppedHandler { get => Implementation.StoppedHandler; set => Implementation.StoppedHandler = value; }
public IMqttServerClientConnectedHandler ClientConnectedHandler { get => Implementation.ClientConnectedHandler; set => Implementation.ClientConnectedHandler = value; }
public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get => Implementation.ClientDisconnectedHandler; set => Implementation.ClientDisconnectedHandler = value; }
public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler { get => Implementation.ClientSubscribedTopicHandler; set => Implementation.ClientSubscribedTopicHandler = value; }
public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler { get => Implementation.ClientUnsubscribedTopicHandler; set => Implementation.ClientUnsubscribedTopicHandler = value; }

public IMqttServerOptions Options => Implementation.Options;

public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get => Implementation.ApplicationMessageReceivedHandler; set => Implementation.ApplicationMessageReceivedHandler = value; }

public Task ClearRetainedApplicationMessagesAsync()
{
return Implementation.ClearRetainedApplicationMessagesAsync();
}

public Task<IList<IMqttClientStatus>> GetClientStatusAsync()
{
return Implementation.GetClientStatusAsync();
}

public Task<IList<MqttApplicationMessage>> GetRetainedApplicationMessagesAsync()
{
return Implementation.GetRetainedApplicationMessagesAsync();
}

public Task<IList<IMqttSessionStatus>> GetSessionStatusAsync()
{
return Implementation.GetSessionStatusAsync();
}

public Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
{
return Implementation.PublishAsync(applicationMessage, cancellationToken);
}

public Task StartAsync(IMqttServerOptions options)
{
switch (options)
{
case MqttServerOptionsBuilder builder:
if (builder.Build().ConnectionValidator == null)
{
builder.WithConnectionValidator(ConnectionValidator);
}
break;
case MqttServerOptions op:
if (op.ConnectionValidator == null)
{
op.ConnectionValidator = new MqttServerConnectionValidatorDelegate(ConnectionValidator);
}
break;
default:
break;
}

return Implementation.StartAsync(options);
}

public void ConnectionValidator(MqttConnectionValidatorContext ctx)
{
if (!ctx.ClientId.StartsWith(TestContext.TestName))
{
TestEnvironment.TrackException(new InvalidOperationException($"invalid client connected '{ctx.ClientId}'"));
ctx.ReasonCode = Protocol.MqttConnectReasonCode.ClientIdentifierNotValid;
}
}

public Task StopAsync()
{
return Implementation.StopAsync();
}

public Task SubscribeAsync(string clientId, ICollection<TopicFilter> topicFilters)
{
return Implementation.SubscribeAsync(clientId, topicFilters);
}

public Task UnsubscribeAsync(string clientId, ICollection<string> topicFilters)
{
return Implementation.UnsubscribeAsync(clientId, topicFilters);
}
}
}

+ 48
- 16
Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs View File

@@ -20,10 +20,12 @@ namespace MQTTnet.Tests
[TestClass]
public class Client_Tests
{
public TestContext TestContext { get; set; }

[TestMethod]
public async Task Send_Reply_In_Message_Handler_For_Same_Client()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();
var client = await testEnvironment.ConnectClientAsync();
@@ -57,7 +59,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Send_Reply_In_Message_Handler()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();
var client1 = await testEnvironment.ConnectClientAsync();
@@ -89,7 +91,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Reconnect()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();
var client = await testEnvironment.ConnectClientAsync();
@@ -112,7 +114,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Reconnect_While_Server_Offline()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
testEnvironment.IgnoreClientLogErrors = true;

@@ -149,7 +151,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Reconnect_From_Disconnected_Event()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
testEnvironment.IgnoreClientLogErrors = true;

@@ -189,7 +191,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task PacketIdentifier_In_Publish_Result()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();
var client = await testEnvironment.ConnectClientAsync();
@@ -235,10 +237,40 @@ namespace MQTTnet.Tests
}
}

[TestMethod]
public async Task ConnectTimeout_Throws_Exception()
{
var factory = new MqttFactory();
using (var client = factory.CreateMqttClient())
{
bool disconnectHandlerCalled = false;
try
{
client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(args =>
{
disconnectHandlerCalled = true;
});

await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("1.2.3.4").Build());

Assert.Fail("Must fail!");
}
catch (Exception exception)
{
Assert.IsNotNull(exception);
Assert.IsInstanceOfType(exception, typeof(MqttCommunicationException));
//Assert.IsInstanceOfType(exception.InnerException, typeof(SocketException));
}

await Task.Delay(100); // disconnected handler is called async
Assert.IsTrue(disconnectHandlerCalled);
}
}

[TestMethod]
public async Task Fire_Disconnected_Event_On_Server_Shutdown()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();
var client = await testEnvironment.ConnectClientAsync();
@@ -290,7 +322,7 @@ namespace MQTTnet.Tests
// is an issue).
const int MessagesCount = 50;

using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -330,7 +362,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Send_Reply_For_Any_Received_Message()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -374,7 +406,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Publish_With_Correct_Retain_Flag()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -405,7 +437,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Subscribe_In_Callback_Events()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -444,7 +476,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Message_Send_Retry()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
testEnvironment.IgnoreClientLogErrors = true;
testEnvironment.IgnoreServerLogErrors = true;
@@ -488,7 +520,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task NoConnectedHandler_Connect_DoesNotThrowException()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -501,7 +533,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task NoDisconnectedHandler_Disconnect_DoesNotThrowException()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();
var client = await testEnvironment.ConnectClientAsync();
@@ -516,7 +548,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Frequent_Connects()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -560,7 +592,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task No_Payload()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();



+ 4
- 4
Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs View File

@@ -18,7 +18,7 @@ namespace MQTTnet.Tests
//This test compares
//1. correct logID
string logId = "logId";
bool invalidLogIdOccured = false;
string invalidLogId = null;

//2. if the total log calls are the same for global and local
int globalLogCount = 0;
@@ -31,7 +31,7 @@ namespace MQTTnet.Tests
{
if (logId != e.TraceMessage.LogId)
{
invalidLogIdOccured = true;
invalidLogId = e.TraceMessage.LogId;
}
Interlocked.Increment(ref globalLogCount);
});
@@ -42,7 +42,7 @@ namespace MQTTnet.Tests
{
if (logId != e.TraceMessage.LogId)
{
invalidLogIdOccured = true;
invalidLogId = e.TraceMessage.LogId;
}
Interlocked.Increment(ref localLogCount);
};
@@ -69,7 +69,7 @@ namespace MQTTnet.Tests
MqttNetGlobalLogger.LogMessagePublished -= globalLog;
}

Assert.IsFalse(invalidLogIdOccured);
Assert.IsNull(invalidLogId);
Assert.AreNotEqual(0, globalLogCount);
Assert.AreEqual(globalLogCount, localLogCount);
}


+ 3
- 3
Tests/MQTTnet.Core.Tests/MqttTcpChannel_Tests.cs View File

@@ -28,14 +28,14 @@ namespace MQTTnet.Tests
{
while (!ct.IsCancellationRequested)
{
var client = await serverSocket.AcceptAsync();
var client = await PlatformAbstractionLayer.AcceptAsync(serverSocket);
var data = new byte[] { 128 };
await client.SendAsync(new ArraySegment<byte>(data), SocketFlags.None);
await PlatformAbstractionLayer.SendAsync(client, new ArraySegment<byte>(data), SocketFlags.None);
}
}, ct.Token);

var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await clientSocket.ConnectAsync(IPAddress.Loopback, 50001);
await PlatformAbstractionLayer.ConnectAsync(clientSocket, IPAddress.Loopback, 50001);

await Task.Delay(100, ct.Token);



+ 5
- 3
Tests/MQTTnet.Core.Tests/RPC_Tests.cs View File

@@ -18,6 +18,8 @@ namespace MQTTnet.Tests
[TestClass]
public class RPC_Tests
{
public TestContext TestContext { get; set; }

[TestMethod]
public Task Execute_Success_With_QoS_0()
{
@@ -58,7 +60,7 @@ namespace MQTTnet.Tests
[ExpectedException(typeof(MqttCommunicationTimedOutException))]
public async Task Execute_Timeout()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();
@@ -73,7 +75,7 @@ namespace MQTTnet.Tests
[ExpectedException(typeof(MqttCommunicationTimedOutException))]
public async Task Execute_With_Custom_Topic_Names()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -86,7 +88,7 @@ namespace MQTTnet.Tests

private async Task Execute_Success(MqttQualityOfServiceLevel qosLevel, MqttProtocolVersion protocolVersion)
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();
var responseSender = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithProtocolVersion(protocolVersion));


+ 3
- 1
Tests/MQTTnet.Core.Tests/RoundtripTime_Tests.cs View File

@@ -11,10 +11,12 @@ namespace MQTTnet.Tests
[TestClass]
public class RoundtripTime_Tests
{
public TestContext TestContext { get; set; }

[TestMethod]
public async Task Round_Trip_Time()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();
var receiverClient = await testEnvironment.ConnectClientAsync();


+ 10
- 8
Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs View File

@@ -13,10 +13,12 @@ namespace MQTTnet.Tests
[TestClass]
public class Server_Status_Tests
{
public TestContext TestContext { get; set; }

[TestMethod]
public async Task Show_Client_And_Session_Statistics()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();

@@ -31,8 +33,8 @@ namespace MQTTnet.Tests
Assert.AreEqual(2, clientStatus.Count);
Assert.AreEqual(2, sessionStatus.Count);

Assert.IsTrue(clientStatus.Any(s => s.ClientId == "client1"));
Assert.IsTrue(clientStatus.Any(s => s.ClientId == "client2"));
Assert.IsTrue(clientStatus.Any(s => s.ClientId == c1.Options.ClientId));
Assert.IsTrue(clientStatus.Any(s => s.ClientId == c2.Options.ClientId));

await c1.DisconnectAsync();
await c2.DisconnectAsync();
@@ -50,7 +52,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Disconnect_Client()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();

@@ -62,7 +64,7 @@ namespace MQTTnet.Tests

Assert.AreEqual(1, clientStatus.Count);
Assert.IsTrue(clientStatus.Any(s => s.ClientId == "client1"));
Assert.IsTrue(clientStatus.Any(s => s.ClientId == c1.Options.ClientId));

await clientStatus.First().DisconnectAsync();

@@ -79,7 +81,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Keep_Persistent_Session()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions());

@@ -111,7 +113,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Track_Sent_Application_Messages()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions());

@@ -132,7 +134,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Track_Sent_Packets()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions());



+ 60
- 66
Tests/MQTTnet.Core.Tests/Server_Tests.cs View File

@@ -13,6 +13,7 @@ using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Subscribing;
using MQTTnet.Implementations;
using MQTTnet.Protocol;
using MQTTnet.Server;
using MQTTnet.Tests.Mockups;
@@ -22,10 +23,12 @@ namespace MQTTnet.Tests
[TestClass]
public class Server_Tests
{
public TestContext TestContext { get; set; }

[TestMethod]
public async Task Use_Empty_Client_ID()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -51,7 +54,8 @@ namespace MQTTnet.Tests
MqttQualityOfServiceLevel.AtMostOnce,
"A/B/C",
MqttQualityOfServiceLevel.AtMostOnce,
1);
1,
TestContext);
}

[TestMethod]
@@ -62,7 +66,8 @@ namespace MQTTnet.Tests
MqttQualityOfServiceLevel.AtLeastOnce,
"A/B/C",
MqttQualityOfServiceLevel.AtLeastOnce,
1);
1,
TestContext);
}

[TestMethod]
@@ -73,13 +78,14 @@ namespace MQTTnet.Tests
MqttQualityOfServiceLevel.ExactlyOnce,
"A/B/C",
MqttQualityOfServiceLevel.ExactlyOnce,
1);
1,
TestContext);
}

[TestMethod]
public async Task Use_Clean_Session()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -93,7 +99,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Will_Message_Do_Not_Send()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var receivedMessagesCount = 0;

@@ -119,7 +125,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Will_Message_Send()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var receivedMessagesCount = 0;

@@ -145,7 +151,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Intercept_Subscription()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(
c =>
@@ -184,7 +190,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Subscribe_Unsubscribe()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var receivedMessagesCount = 0;

@@ -204,7 +210,7 @@ namespace MQTTnet.Tests
var subscribeEventCalled = false;
server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(e =>
{
subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1";
subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == c1.Options.ClientId;
});

await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
@@ -218,7 +224,7 @@ namespace MQTTnet.Tests
var unsubscribeEventCalled = false;
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e =>
{
unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == c1.Options.ClientId;
});

await c1.UnsubscribeAsync("a");
@@ -238,7 +244,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Subscribe_Multiple_In_Single_Request()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var receivedMessagesCount = 0;

@@ -271,7 +277,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Subscribe_Multiple_In_Multiple_Request()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var receivedMessagesCount = 0;

@@ -310,7 +316,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Publish_From_Server()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();

@@ -336,7 +342,7 @@ namespace MQTTnet.Tests
var receivedMessagesCount = 0;
var locked = new object();

using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -378,7 +384,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Session_Takeover()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -400,7 +406,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task No_Messages_If_No_Subscription()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -433,7 +439,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Set_Subscription_At_Server()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1"));
@@ -464,7 +470,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Shutdown_Disconnects_Clients_Gracefully()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());

@@ -486,7 +492,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Handle_Clean_Disconnect()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());

@@ -515,7 +521,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Client_Disconnect_Without_Errors()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
bool clientWasConnected;

@@ -546,7 +552,7 @@ namespace MQTTnet.Tests
{
const int ClientCount = 50;

using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();

@@ -598,7 +604,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Retained_Messages_Flow()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build();

@@ -635,7 +641,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Receive_No_Retained_Message_After_Subscribe()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -658,7 +664,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Receive_Retained_Message_After_Subscribe()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

@@ -689,7 +695,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Clear_Retained_Message_With_Empty_Payload()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var receivedMessagesCount = 0;

@@ -717,7 +723,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Clear_Retained_Message_With_Null_Payload()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var receivedMessagesCount = 0;

@@ -745,7 +751,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Intercept_Application_Message()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync(
new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(
@@ -768,7 +774,7 @@ namespace MQTTnet.Tests
{
var serverStorage = new TestServerStorage();

using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithStorage(serverStorage));

@@ -785,7 +791,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Publish_After_Client_Connects()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();
server.UseClientConnectedHandler(async e =>
@@ -818,7 +824,7 @@ namespace MQTTnet.Tests
context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
}

using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(Interceptor));

@@ -844,7 +850,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Send_Long_Body()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
const int PayloadSizeInMB = 30;
const int CharCount = PayloadSizeInMB * 1024 * 1024;
@@ -889,31 +895,18 @@ namespace MQTTnet.Tests
{
var serverOptions = new MqttServerOptionsBuilder().WithConnectionValidator(context =>
{
context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized;
context.ReasonCode = MqttConnectReasonCode.NotAuthorized;
});

using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
testEnvironment.IgnoreClientLogErrors = true;

await testEnvironment.StartServerAsync(serverOptions);

try
{
await testEnvironment.ConnectClientAsync();
Assert.Fail("An exception should be raised.");
}
catch (Exception exception)
{
if (exception is MqttConnectingFailedException connectingFailedException)
{
Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode);
}
else
{
Assert.Fail("Wrong exception.");
}
}

var connectingFailedException = await Assert.ThrowsExceptionAsync<MqttConnectingFailedException>(() => testEnvironment.ConnectClientAsync());
Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode);
}
}

@@ -934,7 +927,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Same_Client_Id_Refuse_Connection()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
testEnvironment.IgnoreClientLogErrors = true;

@@ -1023,7 +1016,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Same_Client_Id_Connect_Disconnect_Event_Order()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());

@@ -1107,7 +1100,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Remove_Session()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());

@@ -1126,7 +1119,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Stop_And_Restart()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
testEnvironment.IgnoreClientLogErrors = true;

@@ -1148,23 +1141,23 @@ namespace MQTTnet.Tests
await testEnvironment.ConnectClientAsync();
}
}
[TestMethod]
public async Task Close_Idle_Connection()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));

var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await client.ConnectAsync("localhost", testEnvironment.ServerPort);
await PlatformAbstractionLayer.ConnectAsync(client, "localhost", testEnvironment.ServerPort);

// Don't send anything. The server should close the connection.
await Task.Delay(TimeSpan.FromSeconds(3));

try
{
var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
var receivedBytes = await PlatformAbstractionLayer.ReceiveAsync(client, new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
if (receivedBytes == 0)
{
return;
@@ -1181,14 +1174,14 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Send_Garbage()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));

// Send an invalid packet and ensure that the server will close the connection and stay in a waiting state
// forever. This is security related.
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await client.ConnectAsync("localhost", testEnvironment.ServerPort);
await PlatformAbstractionLayer.ConnectAsync(client, "localhost", testEnvironment.ServerPort);
var buffer = Encoding.UTF8.GetBytes("Garbage");
client.Send(buffer, buffer.Length, SocketFlags.None);
@@ -1197,7 +1190,7 @@ namespace MQTTnet.Tests

try
{
var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
var receivedBytes = await PlatformAbstractionLayer.ReceiveAsync(client, new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
if (receivedBytes == 0)
{
return;
@@ -1214,7 +1207,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Do_Not_Send_Retained_Messages_For_Denied_Subscription()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(c =>
{
@@ -1258,7 +1251,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Collect_Messages_In_Disconnected_Session()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions());

@@ -1285,7 +1278,7 @@ namespace MQTTnet.Tests
Assert.AreEqual(0, clientStatus.Count);
Assert.AreEqual(2, sessionStatus.Count);

Assert.AreEqual(3, sessionStatus.First(s => s.ClientId == "a").PendingApplicationMessagesCount);
Assert.AreEqual(3, sessionStatus.First(s => s.ClientId == client1.Options.ClientId).PendingApplicationMessagesCount);
}
}

@@ -1294,9 +1287,10 @@ namespace MQTTnet.Tests
MqttQualityOfServiceLevel qualityOfServiceLevel,
string topicFilter,
MqttQualityOfServiceLevel filterQualityOfServiceLevel,
int expectedReceivedMessagesCount)
int expectedReceivedMessagesCount,
TestContext testContext)
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(testContext))
{
var receivedMessagesCount = 0;



+ 3
- 1
Tests/MQTTnet.Core.Tests/Session_Tests.cs View File

@@ -11,10 +11,12 @@ namespace MQTTnet.Tests
[TestClass]
public class Session_Tests
{
public TestContext TestContext { get; set; }

[TestMethod]
public async Task Set_Session_Item()
{
using (var testEnvironment = new TestEnvironment())
using (var testEnvironment = new TestEnvironment(TestContext))
{
var serverOptions = new MqttServerOptionsBuilder()
.WithConnectionValidator(delegate (MqttConnectionValidatorContext context)


+ 1
- 4
Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj View File

@@ -11,14 +11,11 @@

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore" Version="2.1.6" />
<PackageReference Include="Microsoft.AspNetCore.App" Version="2.1.6" />
<PackageReference Include="Microsoft.AspNetCore.Connections.Abstractions" Version="2.1.3" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.1.1" />
<PackageReference Include="Microsoft.AspNetCore.StaticFiles" Version="2.1.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Source\MQTTnet.AspnetCore\MQTTnet.AspNetCore.csproj" />
<ProjectReference Include="..\..\Source\MQTTnet\MQTTnet.csproj" />
</ItemGroup>

<ItemGroup>


+ 3
- 3
Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs View File

@@ -40,11 +40,11 @@ namespace MQTTnet.TestApp.NetCore
Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic);
});

await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("1"));
await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS());

await managedClient.StartAsync(options);

await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("1"));
await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS());
await managedClient.SubscribeAsync(new TopicFilter { Topic = "xyz", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
await managedClient.SubscribeAsync(new TopicFilter { Topic = "abc", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });



+ 1
- 1
Tests/MQTTnet.TestApp.NetCore/ServerTest.cs View File

@@ -30,7 +30,7 @@ namespace MQTTnet.TestApp.NetCore
{
if (p.Username != "USER" || p.Password != "PASS")
{
p.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
p.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
}
}
}),


Loading…
Cancel
Save