diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 4767f4a..8b5a48f 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -16,7 +16,7 @@ using MQTTnet.Server; namespace MQTTnet.Extensions.ManagedClient { - public class ManagedMqttClient : IManagedMqttClient + public class ManagedMqttClient : Disposable, IManagedMqttClient { private readonly BlockingQueue _messageQueue = new BlockingQueue(); @@ -42,9 +42,7 @@ namespace MQTTnet.Extensions.ManagedClient private Task _maintainConnectionTask; private ManagedMqttClientStorageManager _storageManager; - - private bool _disposed; - + public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) { _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); @@ -146,6 +144,7 @@ namespace MQTTnet.Extensions.ManagedClient ThrowIfDisposed(); if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); + if (Options == null) throw new InvalidOperationException("call StartAsync before publishing messages"); MqttTopicValidator.ThrowIfInvalid(applicationMessage.ApplicationMessage.Topic); @@ -238,36 +237,25 @@ namespace MQTTnet.Extensions.ManagedClient return Task.FromResult(0); } - public void Dispose() + protected override void Dispose(bool disposing) { - if (_disposed) - { - return; - } - - _disposed = true; - - StopPublishing(); - StopMaintainingConnection(); - - if (_maintainConnectionTask != null) + if (disposing) { - Task.WaitAny(_maintainConnectionTask); - _maintainConnectionTask = null; - } + StopPublishing(); + StopMaintainingConnection(); - _messageQueue.Dispose(); - _messageQueueLock.Dispose(); - _mqttClient.Dispose(); - _subscriptionsQueuedSignal.Dispose(); - } + if (_maintainConnectionTask != null) + { + _maintainConnectionTask.GetAwaiter().GetResult(); + _maintainConnectionTask = null; + } - private void ThrowIfDisposed() - { - if (_disposed) - { - throw new ObjectDisposedException(nameof(ManagedMqttClient)); + _messageQueue.Dispose(); + _messageQueueLock.Dispose(); + _mqttClient.Dispose(); + _subscriptionsQueuedSignal.Dispose(); } + base.Dispose(disposing); } private async Task MaintainConnectionAsync(CancellationToken cancellationToken) @@ -288,7 +276,7 @@ namespace MQTTnet.Extensions.ManagedClient } finally { - if (!_disposed) + if (!IsDisposed) { try { diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index 4a0a85d..f364168 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -14,7 +14,7 @@ using MQTTnet.Packets; namespace MQTTnet.Adapter { - public class MqttChannelAdapter : IMqttChannelAdapter + public class MqttChannelAdapter : Disposable, IMqttChannelAdapter { private const uint ErrorOperationAborted = 0x800703E3; private const int ReadBufferSize = 4096; // TODO: Move buffer size to config @@ -26,9 +26,7 @@ namespace MQTTnet.Adapter private readonly MqttPacketReader _packetReader; private readonly byte[] _fixedHeaderBuffer = new byte[2]; - - private bool _isDisposed; - + private long _bytesReceived; private long _bytesSent; @@ -269,19 +267,13 @@ namespace MQTTnet.Adapter } } - public void Dispose() - { - _isDisposed = true; - - _channel?.Dispose(); - } - - private void ThrowIfDisposed() + protected override void Dispose(bool disposing) { - if (_isDisposed) + if (disposing) { - throw new ObjectDisposedException(nameof(MqttChannelAdapter)); + _channel?.Dispose(); } + base.Dispose(disposing); } private static bool IsWrappedException(Exception exception) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index c8aa859..c9ef061 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -20,7 +20,7 @@ using MQTTnet.Protocol; namespace MQTTnet.Client { - public class MqttClient : IMqttClient + public class MqttClient : Disposable, IMqttClient { private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); @@ -63,6 +63,8 @@ namespace MQTTnet.Client ThrowIfConnected("It is not allowed to connect with a server after the connection is established."); + ThrowIfDisposed(); + MqttClientAuthenticateResult authenticateResult = null; try @@ -79,13 +81,16 @@ namespace MQTTnet.Client var adapter = _adapterFactory.CreateClientAdapter(options, _logger); _adapter = adapter; - _logger.Verbose($"Trying to connect with server '{options.ChannelOptions}' (Timeout={options.CommunicationTimeout})."); - await _adapter.ConnectAsync(options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); - _logger.Verbose("Connection with server established."); + using (var combined = CancellationTokenSource.CreateLinkedTokenSource(backgroundCancellationToken, cancellationToken)) + { + _logger.Verbose($"Trying to connect with server '{options.ChannelOptions}' (Timeout={options.CommunicationTimeout})."); + await _adapter.ConnectAsync(options.CommunicationTimeout, combined.Token).ConfigureAwait(false); + _logger.Verbose("Connection with server established."); - _packetReceiverTask = Task.Run(() => TryReceivePacketsAsync(backgroundCancellationToken), backgroundCancellationToken); + _packetReceiverTask = Task.Run(() => TryReceivePacketsAsync(backgroundCancellationToken), backgroundCancellationToken); - authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, cancellationToken).ConfigureAwait(false); + authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, combined.Token).ConfigureAwait(false); + } _sendTracker.Restart(); @@ -161,6 +166,7 @@ namespace MQTTnet.Client { if (options == null) throw new ArgumentNullException(nameof(options)); + ThrowIfDisposed(); ThrowIfNotConnected(); var subscribePacket = _adapter.PacketFormatterAdapter.DataConverter.CreateSubscribePacket(options); @@ -174,6 +180,7 @@ namespace MQTTnet.Client { if (options == null) throw new ArgumentNullException(nameof(options)); + ThrowIfDisposed(); ThrowIfNotConnected(); var unsubscribePacket = _adapter.PacketFormatterAdapter.DataConverter.CreateUnsubscribePacket(options); @@ -189,6 +196,7 @@ namespace MQTTnet.Client MqttTopicValidator.ThrowIfInvalid(applicationMessage.Topic); + ThrowIfDisposed(); ThrowIfNotConnected(); var publishPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePublishPacket(applicationMessage); @@ -214,7 +222,7 @@ namespace MQTTnet.Client } } - public void Dispose() + private void Cleanup() { _backgroundCancellationTokenSource?.Cancel(false); _backgroundCancellationTokenSource?.Dispose(); @@ -224,6 +232,18 @@ namespace MQTTnet.Client _adapter = null; } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + Cleanup(); + + DisconnectedHandler = null; + } + base.Dispose(disposing); + } + private async Task AuthenticateAsync(IMqttChannelAdapter channelAdapter, MqttApplicationMessage willApplicationMessage, CancellationToken cancellationToken) { var connectPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnectPacket( @@ -288,7 +308,7 @@ namespace MQTTnet.Client } finally { - Dispose(); + Cleanup(); _cleanDisconnectInitiated = false; _logger.Info("Disconnected."); diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs index 8f012cb..9b2ba56 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs @@ -10,10 +10,11 @@ using System.Runtime.ExceptionServices; using System.Threading; using MQTTnet.Channel; using MQTTnet.Client.Options; +using MQTTnet.Internal; namespace MQTTnet.Implementations { - public class MqttTcpChannel : IMqttChannel + public class MqttTcpChannel : Disposable, IMqttChannel { private readonly IMqttClientOptions _clientOptions; private readonly MqttClientTcpOptions _options; @@ -72,11 +73,7 @@ namespace MQTTnet.Implementations // Workaround for: workaround for https://github.com/dotnet/corefx/issues/24430 using (cancellationToken.Register(() => socket.Dispose())) { -#if NET452 || NET461 - await Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, _options.Server, _options.GetPort(), null).ConfigureAwait(false); -#else - await socket.ConnectAsync(_options.Server, _options.GetPort()).ConfigureAwait(false); -#endif + await PlatformAbstractionLayer.ConnectAsync(socket, _options.Server, _options.GetPort()).ConfigureAwait(false); } var networkStream = new NetworkStream(socket, true); @@ -98,7 +95,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync(CancellationToken cancellationToken) { - Dispose(); + Cleanup(); return Task.FromResult(0); } @@ -117,6 +114,10 @@ namespace MQTTnet.Implementations return await _stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); } } + catch (ObjectDisposedException) + { + return 0; + } catch (IOException exception) { if (exception.InnerException is SocketException socketException) @@ -143,6 +144,10 @@ namespace MQTTnet.Implementations await _stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); } } + catch (ObjectDisposedException) + { + return; + } catch (IOException exception) { if (exception.InnerException is SocketException socketException) @@ -154,7 +159,7 @@ namespace MQTTnet.Implementations } } - public void Dispose() + private void Cleanup() { // When the stream is disposed it will also close the socket and this will also dispose it. // So there is no need to dispose the socket again. @@ -173,6 +178,15 @@ namespace MQTTnet.Implementations _stream = null; } + protected override void Dispose(bool disposing) + { + if (disposing) + { + Cleanup(); + } + base.Dispose(disposing); + } + private bool InternalUserCertificateValidationCallback(object sender, X509Certificate x509Certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) { if (_options.TlsOptions.CertificateValidationCallback != null) diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs index d7f4e6f..501c4da 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs @@ -8,11 +8,12 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Adapter; using MQTTnet.Diagnostics; +using MQTTnet.Internal; using MQTTnet.Server; namespace MQTTnet.Implementations { - public class MqttTcpServerAdapter : IMqttServerAdapter + public class MqttTcpServerAdapter : Disposable, IMqttServerAdapter { private readonly List _listeners = new List(); private readonly IMqttNetChildLogger _logger; @@ -72,11 +73,11 @@ namespace MQTTnet.Implementations public Task StopAsync() { - Dispose(); + Cleanup(); return Task.FromResult(0); } - public void Dispose() + private void Cleanup() { _cancellationTokenSource?.Cancel(false); _cancellationTokenSource?.Dispose(); @@ -90,6 +91,15 @@ namespace MQTTnet.Implementations _listeners.Clear(); } + protected override void Dispose(bool disposing) + { + if (disposing) + { + Cleanup(); + } + base.Dispose(disposing); + } + private void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, CancellationToken cancellationToken) { if (!options.BoundInterNetworkAddress.Equals(IPAddress.None)) diff --git a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs index d57888e..f2f439e 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs @@ -107,12 +107,7 @@ namespace MQTTnet.Implementations { try { -#if NET452 || NET461 - var clientSocket = await Task.Factory.FromAsync(_socket.BeginAccept, _socket.EndAccept, null).ConfigureAwait(false); -#else - var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false); -#endif - + var clientSocket = await PlatformAbstractionLayer.AcceptAsync(_socket).ConfigureAwait(false); if (clientSocket == null) { continue; diff --git a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs index db53936..c159b91 100644 --- a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs +++ b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs @@ -6,10 +6,11 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Channel; using MQTTnet.Client.Options; +using MQTTnet.Internal; namespace MQTTnet.Implementations { - public class MqttWebSocketChannel : IMqttChannel + public class MqttWebSocketChannel : Disposable, IMqttChannel { private readonly MqttClientWebSocketOptions _options; @@ -111,7 +112,7 @@ namespace MQTTnet.Implementations await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); } - Dispose(); + Cleanup(); } public async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) @@ -141,7 +142,16 @@ namespace MQTTnet.Implementations } } - public void Dispose() + protected override void Dispose(bool disposing) + { + if (disposing) + { + Cleanup(); + } + base.Dispose(disposing); + } + + private void Cleanup() { _sendLock?.Dispose(); _sendLock = null; diff --git a/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs b/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs new file mode 100644 index 0000000..a940eac --- /dev/null +++ b/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs @@ -0,0 +1,92 @@ +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; + +namespace MQTTnet.Implementations +{ + public static class PlatformAbstractionLayer + { + public static async Task AcceptAsync(Socket socket) + { +#if NET452 || NET461 + try + { + return await Task.Factory.FromAsync(socket.BeginAccept, socket.EndAccept, null).ConfigureAwait(false); + } + catch (ObjectDisposedException) + { + return null; + } +#else + return await socket.AcceptAsync().ConfigureAwait(false); +#endif + } + + + public static Task ConnectAsync(Socket socket, IPAddress ip, int port) + { +#if NET452 || NET461 + return Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, ip, port, null); +#else + return socket.ConnectAsync(ip, port); +#endif + } + + public static Task ConnectAsync(Socket socket, string host, int port) + { +#if NET452 || NET461 + return Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, host, port, null); +#else + return socket.ConnectAsync(host, port); +#endif + } + +#if NET452 || NET461 + public class SocketWrapper + { + private readonly Socket _socket; + private readonly ArraySegment _buffer; + private readonly SocketFlags _socketFlags; + + public SocketWrapper(Socket socket, ArraySegment buffer, SocketFlags socketFlags) + { + _socket = socket; + _buffer = buffer; + _socketFlags = socketFlags; + } + + public static IAsyncResult BeginSend(AsyncCallback callback, object state) + { + var real = (SocketWrapper)state; + return real._socket.BeginSend(real._buffer.Array, real._buffer.Offset, real._buffer.Count, real._socketFlags, callback, state); + } + + public static IAsyncResult BeginReceive(AsyncCallback callback, object state) + { + var real = (SocketWrapper)state; + return real._socket.BeginReceive(real._buffer.Array, real._buffer.Offset, real._buffer.Count, real._socketFlags, callback, state); + } + } +#endif + + public static Task SendAsync(Socket socket, ArraySegment buffer, SocketFlags socketFlags) + { +#if NET452 || NET461 + return Task.Factory.FromAsync(SocketWrapper.BeginSend, socket.EndSend, new SocketWrapper(socket, buffer, socketFlags)); +#else + return socket.SendAsync(buffer, socketFlags); +#endif + } + + public static Task ReceiveAsync(Socket socket, ArraySegment buffer, SocketFlags socketFlags) + { +#if NET452 || NET461 + return Task.Factory.FromAsync(SocketWrapper.BeginReceive, socket.EndReceive, new SocketWrapper(socket, buffer, socketFlags)); +#else + return socket.ReceiveAsync(buffer, socketFlags); +#endif + } + + } +} diff --git a/Source/MQTTnet/Internal/BlockingQueue.cs b/Source/MQTTnet/Internal/BlockingQueue.cs index 6225105..2fa21be 100644 --- a/Source/MQTTnet/Internal/BlockingQueue.cs +++ b/Source/MQTTnet/Internal/BlockingQueue.cs @@ -4,7 +4,7 @@ using System.Threading; namespace MQTTnet.Internal { - public class BlockingQueue : IDisposable + public class BlockingQueue : Disposable { private readonly object _syncRoot = new object(); private readonly LinkedList _items = new LinkedList(); @@ -109,9 +109,13 @@ namespace MQTTnet.Internal } } - public void Dispose() + protected override void Dispose(bool disposing) { - _gate.Dispose(); + if (disposing) + { + _gate.Dispose(); + } + base.Dispose(disposing); } } } diff --git a/Source/MQTTnet/Internal/Disposable.cs b/Source/MQTTnet/Internal/Disposable.cs new file mode 100644 index 0000000..f8a72b5 --- /dev/null +++ b/Source/MQTTnet/Internal/Disposable.cs @@ -0,0 +1,56 @@ +using System; + +namespace MQTTnet.Internal +{ + public class Disposable : IDisposable + { + protected bool IsDisposed => _isDisposed; + + protected void ThrowIfDisposed() + { + if (_isDisposed) + { + throw new ObjectDisposedException(GetType().Name); + } + } + + + #region IDisposable Support + + private bool _isDisposed = false; // To detect redundant calls + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + // TODO: dispose managed state (managed objects). + } + + // TODO: free unmanaged resources (unmanaged objects) and override a finalizer below. + // TODO: set large fields to null. + } + + // TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources. + // ~Disposable() + // { + // // Do not change this code. Put cleanup code in Dispose(bool disposing) above. + // Dispose(false); + // } + + // This code added to correctly implement the disposable pattern. + public void Dispose() + { + if (_isDisposed) + { + return; + } + // Do not change this code. Put cleanup code in Dispose(bool disposing) above. + Dispose(true); + // TODO: uncomment the following line if the finalizer is overridden above. + // GC.SuppressFinalize(this); + + _isDisposed = true; + } + #endregion + } +} diff --git a/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs b/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs index 19df6d4..8f906f6 100644 --- a/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs +++ b/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs @@ -2,11 +2,12 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Exceptions; +using MQTTnet.Internal; using MQTTnet.Packets; namespace MQTTnet.PacketDispatcher { - public sealed class MqttPacketAwaiter : IMqttPacketAwaiter where TPacket : MqttBasePacket + public sealed class MqttPacketAwaiter : Disposable, IMqttPacketAwaiter where TPacket : MqttBasePacket { private readonly TaskCompletionSource _taskCompletionSource = new TaskCompletionSource(); private readonly ushort? _packetIdentifier; @@ -52,9 +53,13 @@ namespace MQTTnet.PacketDispatcher Task.Run(() => _taskCompletionSource.TrySetCanceled()); } - public void Dispose() + protected override void Dispose(bool disposing) { - _owningPacketDispatcher.RemovePacketAwaiter(_packetIdentifier); + if (disposing) + { + _owningPacketDispatcher.RemovePacketAwaiter(_packetIdentifier); + } + base.Dispose(disposing); } } } \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs b/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs index 901ac75..0cf19c8 100644 --- a/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs +++ b/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs @@ -6,7 +6,7 @@ using System.Threading.Tasks; namespace MQTTnet.Server { - public class MqttClientSessionApplicationMessagesQueue : IDisposable + public class MqttClientSessionApplicationMessagesQueue : Disposable { private readonly AsyncQueue _messageQueue = new AsyncQueue(); @@ -71,9 +71,14 @@ namespace MQTTnet.Server } } - public void Dispose() + protected override void Dispose(bool disposing) { - _messageQueue.Dispose(); + if (disposing) + { + _messageQueue.Dispose(); + } + + base.Dispose(disposing); } } } diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 7e366a6..28c163d 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -13,7 +13,7 @@ using MQTTnet.Server.Status; namespace MQTTnet.Server { - public class MqttClientSessionsManager : IDisposable + public class MqttClientSessionsManager : Disposable { private readonly AsyncQueue _messageQueue = new AsyncQueue(); @@ -145,9 +145,13 @@ namespace MQTTnet.Server _logger.Verbose("Session for client '{0}' deleted.", clientId); } - public void Dispose() + protected override void Dispose(bool disposing) { - _messageQueue?.Dispose(); + if (disposing) + { + _messageQueue?.Dispose(); + } + base.Dispose(disposing); } private async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken) diff --git a/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs b/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs index 0039434..cfc88d4 100644 --- a/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs @@ -1,9 +1,10 @@ using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Jobs; using MQTTnet.Diagnostics; namespace MQTTnet.Benchmarks { - [ClrJob] + [SimpleJob(RuntimeMoniker.Net461)] [RPlotExporter] [MemoryDiagnoser] public class LoggerBenchmark diff --git a/Tests/MQTTnet.Benchmarks/MessageProcessingBenchmark.cs b/Tests/MQTTnet.Benchmarks/MessageProcessingBenchmark.cs index c821ee0..99fe030 100644 --- a/Tests/MQTTnet.Benchmarks/MessageProcessingBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/MessageProcessingBenchmark.cs @@ -1,11 +1,12 @@ using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Jobs; using MQTTnet.Client; using MQTTnet.Client.Options; using MQTTnet.Server; namespace MQTTnet.Benchmarks { - [ClrJob] + [SimpleJob(RuntimeMoniker.Net461)] [RPlotExporter, RankColumn] [MemoryDiagnoser] public class MessageProcessingBenchmark diff --git a/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs b/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs index 51e7ecb..00433cf 100644 --- a/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs @@ -8,10 +8,11 @@ using MQTTnet.Adapter; using MQTTnet.Channel; using MQTTnet.Formatter; using MQTTnet.Formatter.V3; +using BenchmarkDotNet.Jobs; namespace MQTTnet.Benchmarks { - [ClrJob] + [SimpleJob(RuntimeMoniker.Net461)] [RPlotExporter] [MemoryDiagnoser] public class SerializerBenchmark diff --git a/Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs b/Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs index bbce630..2df92ad 100644 --- a/Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs @@ -1,10 +1,11 @@ using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Jobs; using MQTTnet.Server; using System; namespace MQTTnet.Benchmarks { - [ClrJob] + [SimpleJob(RuntimeMoniker.Net461)] [RPlotExporter] [MemoryDiagnoser] public class TopicFilterComparerBenchmark diff --git a/Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj b/Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj index a4b6881..7bf14cb 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj +++ b/Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj @@ -1,7 +1,7 @@  - netcoreapp2.2 + netcoreapp2.2;net461 false diff --git a/Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs b/Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs index 59d18fc..f033b5b 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs @@ -18,10 +18,12 @@ namespace MQTTnet.Tests.MQTTv5 [TestClass] public class Client_Tests { + public TestContext TestContext { get; set; } + [TestMethod] public async Task Connect_With_New_Mqtt_Features() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -61,7 +63,7 @@ namespace MQTTnet.Tests.MQTTv5 [TestMethod] public async Task Connect_With_AssignedClientId() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { string serverConnectedClientId = null; string serverDisconnectedClientId = null; @@ -357,7 +359,7 @@ namespace MQTTnet.Tests.MQTTv5 [TestMethod] public async Task Publish_And_Receive_New_Properties() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); diff --git a/Tests/MQTTnet.Core.Tests/MQTTv5/Feature_Tests.cs b/Tests/MQTTnet.Core.Tests/MQTTv5/Feature_Tests.cs index 294ee84..6507fff 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTv5/Feature_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MQTTv5/Feature_Tests.cs @@ -13,10 +13,12 @@ namespace MQTTnet.Tests.MQTTv5 [TestClass] public class Feature_Tests { + public TestContext TestContext { get; set; } + [TestMethod] public async Task Use_User_Properties() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); diff --git a/Tests/MQTTnet.Core.Tests/MQTTv5/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/MQTTv5/Server_Tests.cs index e3020ce..014bd92 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTv5/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MQTTv5/Server_Tests.cs @@ -11,10 +11,12 @@ namespace MQTTnet.Tests.MQTTv5 [TestClass] public class Server_Tests { + public TestContext TestContext { get; set; } + [TestMethod] public async Task Will_Message_Send() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var receivedMessagesCount = 0; diff --git a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs index 1cf9ab9..edfe058 100644 --- a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs @@ -18,6 +18,8 @@ namespace MQTTnet.Tests [TestClass] public class ManagedMqttClient_Tests { + public TestContext TestContext { get; set; } + [TestMethod] public async Task Drop_New_Messages_On_Full_Queue() { @@ -54,7 +56,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task ManagedClients_Will_Message_Send() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var receivedMessagesCount = 0; @@ -88,7 +90,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Start_Stop() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var factory = new MqttFactory(); @@ -115,7 +117,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Storage_Queue_Drains() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { testEnvironment.IgnoreClientLogErrors = true; testEnvironment.IgnoreServerLogErrors = true; @@ -167,7 +169,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Subscriptions_And_Unsubscriptions_Are_Made_And_Reestablished_At_Reconnect() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var unmanagedClient = testEnvironment.CreateClient(); var managedClient = await CreateManagedClientAsync(testEnvironment, unmanagedClient); @@ -232,7 +234,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Subscriptions_Subscribe_Only_New_Subscriptions() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var managedClient = await CreateManagedClientAsync(testEnvironment); @@ -265,7 +267,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Subscriptions_Are_Published_Immediately() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { // Use a long connection check interval to verify that the subscriptions // do not depend on the connection check interval anymore @@ -289,7 +291,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Subscriptions_Are_Cleared_At_Logout() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var managedClient = await CreateManagedClientAsync(testEnvironment); diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs new file mode 100644 index 0000000..2500a6f --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs @@ -0,0 +1,94 @@ +using System.Threading; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Client; +using MQTTnet.Client.Connecting; +using MQTTnet.Client.Disconnecting; +using MQTTnet.Client.ExtendedAuthenticationExchange; +using MQTTnet.Client.Options; +using MQTTnet.Client.Publishing; +using MQTTnet.Client.Receiving; +using MQTTnet.Client.Subscribing; +using MQTTnet.Client.Unsubscribing; + +namespace MQTTnet.Tests.Mockups +{ + public class TestClientWrapper : IMqttClient + { + public TestClientWrapper(IMqttClient implementation, TestContext testContext) + { + Implementation = implementation; + TestContext = testContext; + } + + public IMqttClient Implementation { get; } + public TestContext TestContext { get; } + + public bool IsConnected => Implementation.IsConnected; + + public IMqttClientOptions Options => Implementation.Options; + + public IMqttClientConnectedHandler ConnectedHandler { get => Implementation.ConnectedHandler; set => Implementation.ConnectedHandler = value; } + public IMqttClientDisconnectedHandler DisconnectedHandler { get => Implementation.DisconnectedHandler; set => Implementation.DisconnectedHandler = value; } + public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get => Implementation.ApplicationMessageReceivedHandler; set => Implementation.ApplicationMessageReceivedHandler = value; } + + public Task ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken) + { + switch (options) + { + case MqttClientOptionsBuilder builder: + { + var existingClientId = builder.Build().ClientId; + if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName)) + { + builder.WithClientId(TestContext.TestName + existingClientId); + } + } + break; + case MqttClientOptions op: + { + var existingClientId = op.ClientId; + if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName)) + { + op.ClientId = TestContext.TestName + existingClientId; + } + } + break; + default: + break; + } + + return Implementation.ConnectAsync(options, cancellationToken); + } + + public Task DisconnectAsync(MqttClientDisconnectOptions options, CancellationToken cancellationToken) + { + return Implementation.DisconnectAsync(options, cancellationToken); + } + + public void Dispose() + { + Implementation.Dispose(); + } + + public Task PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken) + { + return Implementation.PublishAsync(applicationMessage, cancellationToken); + } + + public Task SendExtendedAuthenticationExchangeDataAsync(MqttExtendedAuthenticationExchangeData data, CancellationToken cancellationToken) + { + return Implementation.SendExtendedAuthenticationExchangeDataAsync(data, cancellationToken); + } + + public Task SubscribeAsync(MqttClientSubscribeOptions options, CancellationToken cancellationToken) + { + return Implementation.SubscribeAsync(options, cancellationToken); + } + + public Task UnsubscribeAsync(MqttClientUnsubscribeOptions options, CancellationToken cancellationToken) + { + return Implementation.UnsubscribeAsync(options, cancellationToken); + } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs index 5cda537..7f2dcac 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs @@ -2,14 +2,16 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; using MQTTnet.Client.Options; using MQTTnet.Diagnostics; +using MQTTnet.Internal; using MQTTnet.Server; namespace MQTTnet.Tests.Mockups { - public class TestEnvironment : IDisposable + public class TestEnvironment : Disposable { private readonly MqttFactory _mqttFactory = new MqttFactory(); private readonly List _clients = new List(); @@ -33,7 +35,9 @@ namespace MQTTnet.Tests.Mockups public IMqttNetLogger ClientLogger => _clientLogger; - public TestEnvironment() + public TestContext TestContext { get; } + + public TestEnvironment(TestContext testContext) { _serverLogger.LogMessagePublished += (s, e) => { @@ -56,13 +60,14 @@ namespace MQTTnet.Tests.Mockups } } }; + TestContext = testContext; } public IMqttClient CreateClient() { var client = _mqttFactory.CreateMqttClient(_clientLogger); _clients.Add(client); - return client; + return new TestClientWrapper(client, TestContext); } public Task StartServerAsync() @@ -77,7 +82,7 @@ namespace MQTTnet.Tests.Mockups throw new InvalidOperationException("Server already started."); } - Server = _mqttFactory.CreateMqttServer(_serverLogger); + Server = new TestServerWrapper(_mqttFactory.CreateMqttServer(_serverLogger), TestContext, this); await Server.StartAsync(options.WithDefaultEndpointPort(ServerPort).Build()); return Server; @@ -85,7 +90,7 @@ namespace MQTTnet.Tests.Mockups public Task ConnectClientAsync() { - return ConnectClientAsync(new MqttClientOptionsBuilder()); + return ConnectClientAsync(new MqttClientOptionsBuilder() ); } public async Task ConnectClientAsync(MqttClientOptionsBuilder options) @@ -127,21 +132,25 @@ namespace MQTTnet.Tests.Mockups } } - public void Dispose() + protected override void Dispose(bool disposing) { - foreach (var mqttClient in _clients) + if (disposing) { - mqttClient?.Dispose(); - } + foreach (var mqttClient in _clients) + { + mqttClient?.Dispose(); + } - Server?.StopAsync().GetAwaiter().GetResult(); + Server?.StopAsync().GetAwaiter().GetResult(); - ThrowIfLogErrors(); + ThrowIfLogErrors(); - if (_exceptions.Any()) - { - throw new Exception($"{_exceptions.Count} exceptions tracked.\r\n" + string.Join(Environment.NewLine, _exceptions)); + if (_exceptions.Any()) + { + throw new Exception($"{_exceptions.Count} exceptions tracked.\r\n" + string.Join(Environment.NewLine, _exceptions)); + } } + base.Dispose(disposing); } public void TrackException(Exception exception) diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs new file mode 100644 index 0000000..f990197 --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs @@ -0,0 +1,108 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Client.Publishing; +using MQTTnet.Client.Receiving; +using MQTTnet.Server; +using MQTTnet.Server.Status; + +namespace MQTTnet.Tests.Mockups +{ + public class TestServerWrapper : IMqttServer + { + public TestServerWrapper(IMqttServer implementation, TestContext testContext, TestEnvironment testEnvironment) + { + Implementation = implementation; + TestContext = testContext; + TestEnvironment = testEnvironment; + } + + public IMqttServer Implementation { get; } + public TestContext TestContext { get; } + public TestEnvironment TestEnvironment { get; } + public IMqttServerStartedHandler StartedHandler { get => Implementation.StartedHandler; set => Implementation.StartedHandler = value; } + public IMqttServerStoppedHandler StoppedHandler { get => Implementation.StoppedHandler; set => Implementation.StoppedHandler = value; } + public IMqttServerClientConnectedHandler ClientConnectedHandler { get => Implementation.ClientConnectedHandler; set => Implementation.ClientConnectedHandler = value; } + public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get => Implementation.ClientDisconnectedHandler; set => Implementation.ClientDisconnectedHandler = value; } + public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler { get => Implementation.ClientSubscribedTopicHandler; set => Implementation.ClientSubscribedTopicHandler = value; } + public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler { get => Implementation.ClientUnsubscribedTopicHandler; set => Implementation.ClientUnsubscribedTopicHandler = value; } + + public IMqttServerOptions Options => Implementation.Options; + + public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get => Implementation.ApplicationMessageReceivedHandler; set => Implementation.ApplicationMessageReceivedHandler = value; } + + public Task ClearRetainedApplicationMessagesAsync() + { + return Implementation.ClearRetainedApplicationMessagesAsync(); + } + + public Task> GetClientStatusAsync() + { + return Implementation.GetClientStatusAsync(); + } + + public Task> GetRetainedApplicationMessagesAsync() + { + return Implementation.GetRetainedApplicationMessagesAsync(); + } + + public Task> GetSessionStatusAsync() + { + return Implementation.GetSessionStatusAsync(); + } + + public Task PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken) + { + return Implementation.PublishAsync(applicationMessage, cancellationToken); + } + + public Task StartAsync(IMqttServerOptions options) + { + switch (options) + { + case MqttServerOptionsBuilder builder: + if (builder.Build().ConnectionValidator == null) + { + builder.WithConnectionValidator(ConnectionValidator); + } + break; + case MqttServerOptions op: + if (op.ConnectionValidator == null) + { + op.ConnectionValidator = new MqttServerConnectionValidatorDelegate(ConnectionValidator); + } + break; + default: + break; + } + + return Implementation.StartAsync(options); + } + + public void ConnectionValidator(MqttConnectionValidatorContext ctx) + { + if (!ctx.ClientId.StartsWith(TestContext.TestName)) + { + TestEnvironment.TrackException(new InvalidOperationException($"invalid client connected '{ctx.ClientId}'")); + ctx.ReasonCode = Protocol.MqttConnectReasonCode.ClientIdentifierNotValid; + } + } + + public Task StopAsync() + { + return Implementation.StopAsync(); + } + + public Task SubscribeAsync(string clientId, ICollection topicFilters) + { + return Implementation.SubscribeAsync(clientId, topicFilters); + } + + public Task UnsubscribeAsync(string clientId, ICollection topicFilters) + { + return Implementation.UnsubscribeAsync(clientId, topicFilters); + } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs index 51d1753..b8f4fbc 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs @@ -20,10 +20,12 @@ namespace MQTTnet.Tests [TestClass] public class Client_Tests { + public TestContext TestContext { get; set; } + [TestMethod] public async Task Send_Reply_In_Message_Handler_For_Same_Client() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); var client = await testEnvironment.ConnectClientAsync(); @@ -57,7 +59,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Send_Reply_In_Message_Handler() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); var client1 = await testEnvironment.ConnectClientAsync(); @@ -89,7 +91,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Reconnect() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(); var client = await testEnvironment.ConnectClientAsync(); @@ -112,7 +114,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Reconnect_While_Server_Offline() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { testEnvironment.IgnoreClientLogErrors = true; @@ -149,7 +151,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Reconnect_From_Disconnected_Event() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { testEnvironment.IgnoreClientLogErrors = true; @@ -189,7 +191,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task PacketIdentifier_In_Publish_Result() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); var client = await testEnvironment.ConnectClientAsync(); @@ -235,10 +237,40 @@ namespace MQTTnet.Tests } } + [TestMethod] + public async Task ConnectTimeout_Throws_Exception() + { + var factory = new MqttFactory(); + using (var client = factory.CreateMqttClient()) + { + bool disconnectHandlerCalled = false; + try + { + client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(args => + { + disconnectHandlerCalled = true; + }); + + await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("1.2.3.4").Build()); + + Assert.Fail("Must fail!"); + } + catch (Exception exception) + { + Assert.IsNotNull(exception); + Assert.IsInstanceOfType(exception, typeof(MqttCommunicationException)); + //Assert.IsInstanceOfType(exception.InnerException, typeof(SocketException)); + } + + await Task.Delay(100); // disconnected handler is called async + Assert.IsTrue(disconnectHandlerCalled); + } + } + [TestMethod] public async Task Fire_Disconnected_Event_On_Server_Shutdown() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(); var client = await testEnvironment.ConnectClientAsync(); @@ -290,7 +322,7 @@ namespace MQTTnet.Tests // is an issue). const int MessagesCount = 50; - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -330,7 +362,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Send_Reply_For_Any_Received_Message() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -374,7 +406,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Publish_With_Correct_Retain_Flag() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -405,7 +437,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Subscribe_In_Callback_Events() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -444,7 +476,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Message_Send_Retry() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { testEnvironment.IgnoreClientLogErrors = true; testEnvironment.IgnoreServerLogErrors = true; @@ -488,7 +520,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task NoConnectedHandler_Connect_DoesNotThrowException() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -501,7 +533,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task NoDisconnectedHandler_Disconnect_DoesNotThrowException() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); var client = await testEnvironment.ConnectClientAsync(); @@ -516,7 +548,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Frequent_Connects() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -560,7 +592,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task No_Payload() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); diff --git a/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs index 75bda9b..eb7aba3 100644 --- a/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs @@ -18,7 +18,7 @@ namespace MQTTnet.Tests //This test compares //1. correct logID string logId = "logId"; - bool invalidLogIdOccured = false; + string invalidLogId = null; //2. if the total log calls are the same for global and local int globalLogCount = 0; @@ -31,7 +31,7 @@ namespace MQTTnet.Tests { if (logId != e.TraceMessage.LogId) { - invalidLogIdOccured = true; + invalidLogId = e.TraceMessage.LogId; } Interlocked.Increment(ref globalLogCount); }); @@ -42,7 +42,7 @@ namespace MQTTnet.Tests { if (logId != e.TraceMessage.LogId) { - invalidLogIdOccured = true; + invalidLogId = e.TraceMessage.LogId; } Interlocked.Increment(ref localLogCount); }; @@ -69,7 +69,7 @@ namespace MQTTnet.Tests MqttNetGlobalLogger.LogMessagePublished -= globalLog; } - Assert.IsFalse(invalidLogIdOccured); + Assert.IsNull(invalidLogId); Assert.AreNotEqual(0, globalLogCount); Assert.AreEqual(globalLogCount, localLogCount); } diff --git a/Tests/MQTTnet.Core.Tests/MqttTcpChannel_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttTcpChannel_Tests.cs index 436d2d1..a4b0ca7 100644 --- a/Tests/MQTTnet.Core.Tests/MqttTcpChannel_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttTcpChannel_Tests.cs @@ -28,14 +28,14 @@ namespace MQTTnet.Tests { while (!ct.IsCancellationRequested) { - var client = await serverSocket.AcceptAsync(); + var client = await PlatformAbstractionLayer.AcceptAsync(serverSocket); var data = new byte[] { 128 }; - await client.SendAsync(new ArraySegment(data), SocketFlags.None); + await PlatformAbstractionLayer.SendAsync(client, new ArraySegment(data), SocketFlags.None); } }, ct.Token); var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - await clientSocket.ConnectAsync(IPAddress.Loopback, 50001); + await PlatformAbstractionLayer.ConnectAsync(clientSocket, IPAddress.Loopback, 50001); await Task.Delay(100, ct.Token); diff --git a/Tests/MQTTnet.Core.Tests/RPC_Tests.cs b/Tests/MQTTnet.Core.Tests/RPC_Tests.cs index a420697..947c104 100644 --- a/Tests/MQTTnet.Core.Tests/RPC_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/RPC_Tests.cs @@ -18,6 +18,8 @@ namespace MQTTnet.Tests [TestClass] public class RPC_Tests { + public TestContext TestContext { get; set; } + [TestMethod] public Task Execute_Success_With_QoS_0() { @@ -58,7 +60,7 @@ namespace MQTTnet.Tests [ExpectedException(typeof(MqttCommunicationTimedOutException))] public async Task Execute_Timeout() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -73,7 +75,7 @@ namespace MQTTnet.Tests [ExpectedException(typeof(MqttCommunicationTimedOutException))] public async Task Execute_With_Custom_Topic_Names() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -86,7 +88,7 @@ namespace MQTTnet.Tests private async Task Execute_Success(MqttQualityOfServiceLevel qosLevel, MqttProtocolVersion protocolVersion) { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); var responseSender = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithProtocolVersion(protocolVersion)); diff --git a/Tests/MQTTnet.Core.Tests/RoundtripTime_Tests.cs b/Tests/MQTTnet.Core.Tests/RoundtripTime_Tests.cs index 02055a8..377d142 100644 --- a/Tests/MQTTnet.Core.Tests/RoundtripTime_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/RoundtripTime_Tests.cs @@ -11,10 +11,12 @@ namespace MQTTnet.Tests [TestClass] public class RoundtripTime_Tests { + public TestContext TestContext { get; set; } + [TestMethod] public async Task Round_Trip_Time() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); var receiverClient = await testEnvironment.ConnectClientAsync(); diff --git a/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs index c4a5d27..93c1d55 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs @@ -13,10 +13,12 @@ namespace MQTTnet.Tests [TestClass] public class Server_Status_Tests { + public TestContext TestContext { get; set; } + [TestMethod] public async Task Show_Client_And_Session_Statistics() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(); @@ -31,8 +33,8 @@ namespace MQTTnet.Tests Assert.AreEqual(2, clientStatus.Count); Assert.AreEqual(2, sessionStatus.Count); - Assert.IsTrue(clientStatus.Any(s => s.ClientId == "client1")); - Assert.IsTrue(clientStatus.Any(s => s.ClientId == "client2")); + Assert.IsTrue(clientStatus.Any(s => s.ClientId == c1.Options.ClientId)); + Assert.IsTrue(clientStatus.Any(s => s.ClientId == c2.Options.ClientId)); await c1.DisconnectAsync(); await c2.DisconnectAsync(); @@ -50,7 +52,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Disconnect_Client() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(); @@ -62,7 +64,7 @@ namespace MQTTnet.Tests Assert.AreEqual(1, clientStatus.Count); - Assert.IsTrue(clientStatus.Any(s => s.ClientId == "client1")); + Assert.IsTrue(clientStatus.Any(s => s.ClientId == c1.Options.ClientId)); await clientStatus.First().DisconnectAsync(); @@ -79,7 +81,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Keep_Persistent_Session() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions()); @@ -111,7 +113,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Track_Sent_Application_Messages() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions()); @@ -132,7 +134,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Track_Sent_Packets() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions()); diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index c3d6df1..3ebdaa6 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -13,6 +13,7 @@ using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Client.Subscribing; +using MQTTnet.Implementations; using MQTTnet.Protocol; using MQTTnet.Server; using MQTTnet.Tests.Mockups; @@ -22,10 +23,12 @@ namespace MQTTnet.Tests [TestClass] public class Server_Tests { + public TestContext TestContext { get; set; } + [TestMethod] public async Task Use_Empty_Client_ID() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -51,7 +54,8 @@ namespace MQTTnet.Tests MqttQualityOfServiceLevel.AtMostOnce, "A/B/C", MqttQualityOfServiceLevel.AtMostOnce, - 1); + 1, + TestContext); } [TestMethod] @@ -62,7 +66,8 @@ namespace MQTTnet.Tests MqttQualityOfServiceLevel.AtLeastOnce, "A/B/C", MqttQualityOfServiceLevel.AtLeastOnce, - 1); + 1, + TestContext); } [TestMethod] @@ -73,13 +78,14 @@ namespace MQTTnet.Tests MqttQualityOfServiceLevel.ExactlyOnce, "A/B/C", MqttQualityOfServiceLevel.ExactlyOnce, - 1); + 1, + TestContext); } [TestMethod] public async Task Use_Clean_Session() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -93,7 +99,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Will_Message_Do_Not_Send() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var receivedMessagesCount = 0; @@ -119,7 +125,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Will_Message_Send() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var receivedMessagesCount = 0; @@ -145,7 +151,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Intercept_Subscription() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor( c => @@ -184,7 +190,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Subscribe_Unsubscribe() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var receivedMessagesCount = 0; @@ -204,7 +210,7 @@ namespace MQTTnet.Tests var subscribeEventCalled = false; server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(e => { - subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1"; + subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == c1.Options.ClientId; }); await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }); @@ -218,7 +224,7 @@ namespace MQTTnet.Tests var unsubscribeEventCalled = false; server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e => { - unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1"; + unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == c1.Options.ClientId; }); await c1.UnsubscribeAsync("a"); @@ -238,7 +244,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Subscribe_Multiple_In_Single_Request() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var receivedMessagesCount = 0; @@ -271,7 +277,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Subscribe_Multiple_In_Multiple_Request() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var receivedMessagesCount = 0; @@ -310,7 +316,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Publish_From_Server() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(); @@ -336,7 +342,7 @@ namespace MQTTnet.Tests var receivedMessagesCount = 0; var locked = new object(); - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -378,7 +384,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Session_Takeover() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -400,7 +406,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task No_Messages_If_No_Subscription() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -433,7 +439,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Set_Subscription_At_Server() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(); server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1")); @@ -464,7 +470,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Shutdown_Disconnects_Clients_Gracefully() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder()); @@ -486,7 +492,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Handle_Clean_Disconnect() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder()); @@ -515,7 +521,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Client_Disconnect_Without_Errors() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { bool clientWasConnected; @@ -546,7 +552,7 @@ namespace MQTTnet.Tests { const int ClientCount = 50; - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(); @@ -598,7 +604,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Retained_Messages_Flow() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build(); @@ -635,7 +641,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Receive_No_Retained_Message_After_Subscribe() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -658,7 +664,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Receive_Retained_Message_After_Subscribe() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(); @@ -689,7 +695,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Clear_Retained_Message_With_Empty_Payload() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var receivedMessagesCount = 0; @@ -717,7 +723,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Clear_Retained_Message_With_Null_Payload() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var receivedMessagesCount = 0; @@ -745,7 +751,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Intercept_Application_Message() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync( new MqttServerOptionsBuilder().WithApplicationMessageInterceptor( @@ -768,7 +774,7 @@ namespace MQTTnet.Tests { var serverStorage = new TestServerStorage(); - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithStorage(serverStorage)); @@ -785,7 +791,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Publish_After_Client_Connects() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(); server.UseClientConnectedHandler(async e => @@ -818,7 +824,7 @@ namespace MQTTnet.Tests context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended"); } - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(Interceptor)); @@ -844,7 +850,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Send_Long_Body() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { const int PayloadSizeInMB = 30; const int CharCount = PayloadSizeInMB * 1024 * 1024; @@ -889,31 +895,18 @@ namespace MQTTnet.Tests { var serverOptions = new MqttServerOptionsBuilder().WithConnectionValidator(context => { - context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized; + context.ReasonCode = MqttConnectReasonCode.NotAuthorized; }); - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { testEnvironment.IgnoreClientLogErrors = true; await testEnvironment.StartServerAsync(serverOptions); - try - { - await testEnvironment.ConnectClientAsync(); - Assert.Fail("An exception should be raised."); - } - catch (Exception exception) - { - if (exception is MqttConnectingFailedException connectingFailedException) - { - Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode); - } - else - { - Assert.Fail("Wrong exception."); - } - } + + var connectingFailedException = await Assert.ThrowsExceptionAsync(() => testEnvironment.ConnectClientAsync()); + Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode); } } @@ -934,7 +927,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Same_Client_Id_Refuse_Connection() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { testEnvironment.IgnoreClientLogErrors = true; @@ -1023,7 +1016,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Same_Client_Id_Connect_Disconnect_Event_Order() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder()); @@ -1107,7 +1100,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Remove_Session() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder()); @@ -1126,7 +1119,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Stop_And_Restart() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { testEnvironment.IgnoreClientLogErrors = true; @@ -1148,23 +1141,23 @@ namespace MQTTnet.Tests await testEnvironment.ConnectClientAsync(); } } - + [TestMethod] public async Task Close_Idle_Connection() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1))); var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - await client.ConnectAsync("localhost", testEnvironment.ServerPort); + await PlatformAbstractionLayer.ConnectAsync(client, "localhost", testEnvironment.ServerPort); // Don't send anything. The server should close the connection. await Task.Delay(TimeSpan.FromSeconds(3)); try { - var receivedBytes = await client.ReceiveAsync(new ArraySegment(new byte[10]), SocketFlags.Partial); + var receivedBytes = await PlatformAbstractionLayer.ReceiveAsync(client, new ArraySegment(new byte[10]), SocketFlags.Partial); if (receivedBytes == 0) { return; @@ -1181,14 +1174,14 @@ namespace MQTTnet.Tests [TestMethod] public async Task Send_Garbage() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1))); // Send an invalid packet and ensure that the server will close the connection and stay in a waiting state // forever. This is security related. var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - await client.ConnectAsync("localhost", testEnvironment.ServerPort); + await PlatformAbstractionLayer.ConnectAsync(client, "localhost", testEnvironment.ServerPort); var buffer = Encoding.UTF8.GetBytes("Garbage"); client.Send(buffer, buffer.Length, SocketFlags.None); @@ -1197,7 +1190,7 @@ namespace MQTTnet.Tests try { - var receivedBytes = await client.ReceiveAsync(new ArraySegment(new byte[10]), SocketFlags.Partial); + var receivedBytes = await PlatformAbstractionLayer.ReceiveAsync(client, new ArraySegment(new byte[10]), SocketFlags.Partial); if (receivedBytes == 0) { return; @@ -1214,7 +1207,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Do_Not_Send_Retained_Messages_For_Denied_Subscription() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(c => { @@ -1258,7 +1251,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Collect_Messages_In_Disconnected_Session() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions()); @@ -1285,7 +1278,7 @@ namespace MQTTnet.Tests Assert.AreEqual(0, clientStatus.Count); Assert.AreEqual(2, sessionStatus.Count); - Assert.AreEqual(3, sessionStatus.First(s => s.ClientId == "a").PendingApplicationMessagesCount); + Assert.AreEqual(3, sessionStatus.First(s => s.ClientId == client1.Options.ClientId).PendingApplicationMessagesCount); } } @@ -1294,9 +1287,10 @@ namespace MQTTnet.Tests MqttQualityOfServiceLevel qualityOfServiceLevel, string topicFilter, MqttQualityOfServiceLevel filterQualityOfServiceLevel, - int expectedReceivedMessagesCount) + int expectedReceivedMessagesCount, + TestContext testContext) { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(testContext)) { var receivedMessagesCount = 0; diff --git a/Tests/MQTTnet.Core.Tests/Session_Tests.cs b/Tests/MQTTnet.Core.Tests/Session_Tests.cs index d06bd4e..073f272 100644 --- a/Tests/MQTTnet.Core.Tests/Session_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Session_Tests.cs @@ -11,10 +11,12 @@ namespace MQTTnet.Tests [TestClass] public class Session_Tests { + public TestContext TestContext { get; set; } + [TestMethod] public async Task Set_Session_Item() { - using (var testEnvironment = new TestEnvironment()) + using (var testEnvironment = new TestEnvironment(TestContext)) { var serverOptions = new MqttServerOptionsBuilder() .WithConnectionValidator(delegate (MqttConnectionValidatorContext context) diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj b/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj index b8a9307..391a044 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj +++ b/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj @@ -11,14 +11,11 @@ - - - + - diff --git a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs index dc7d925..c4fd68f 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs @@ -40,11 +40,11 @@ namespace MQTTnet.TestApp.NetCore Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic); }); - await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("1")); - await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS()); - await managedClient.StartAsync(options); + await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("1")); + await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS()); + await managedClient.SubscribeAsync(new TopicFilter { Topic = "xyz", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }); await managedClient.SubscribeAsync(new TopicFilter { Topic = "abc", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }); diff --git a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs index bd62671..b0d6534 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs @@ -30,7 +30,7 @@ namespace MQTTnet.TestApp.NetCore { if (p.Username != "USER" || p.Password != "PASS") { - p.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; + p.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; } } }),