From 67267c05fd9438d1563ee94dc9b2edf91cc25b85 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 4 Oct 2017 13:58:51 +0200 Subject: [PATCH] Refactor options --- Build/MQTTnet.nuspec | 1 + .../MqttCommunicationAdapterFactory.cs | 27 +++++++++++++ .../Implementations/MqttTcpChannel.cs | 16 ++++---- .../Implementations/MqttWebSocketChannel.cs | 22 ++++++----- .../MQTTnet.NetFramework.csproj | 1 + .../MQTTnet.NetFramework/MqttClientFactory.cs | 19 +--------- .../MqttCommunicationAdapterFactory.cs | 27 +++++++++++++ .../Implementations/MqttTcpChannel.cs | 15 ++++---- .../Implementations/MqttWebSocketChannel.cs | 17 +++++---- .../MQTTnet.NetStandard/MqttClientFactory.cs | 19 +--------- .../MqttCommunicationAdapterFactory.cs | 27 +++++++++++++ .../Implementations/MqttTcpChannel.cs | 18 ++++----- .../Implementations/MqttWebSocketChannel.cs | 17 +++++---- .../MQTTnet.UniversalWindows.csproj | 1 + .../MqttClientFactory.cs | 19 +--------- .../Adapter/IMqttCommunicationAdapter.cs | 3 +- .../MqttChannelCommunicationAdapter.cs | 4 +- .../Channel/IMqttCommunicationChannel.cs | 3 +- MQTTnet.Core/Client/IMqttClient.cs | 2 +- .../IMqttCommunicationAdapterFactory.cs | 9 +++++ MQTTnet.Core/Client/MqttClient.cs | 23 ++++++----- MQTTnet.Core/Client/MqttClientOptions.cs | 10 ++--- MQTTnet.Core/Client/MqttClientTcpOptions.cs | 9 +++++ ...s.cs => MqttClientTcpOptionsExtensions.cs} | 4 +- .../Client/MqttClientWebSocketOptions.cs | 7 ++++ MQTTnet.Core/Client/MqttConnectionType.cs | 8 ---- .../Server/MqttClientPendingMessagesQueue.cs | 3 ++ MQTTnet.Core/Server/MqttClientSession.cs | 38 ++++++++++--------- .../Server/MqttClientSessionsManager.cs | 5 +-- MQTTnet.sln | 2 +- README.md | 2 +- .../MqttCommunicationAdapterFactory.cs | 20 ++++++++++ .../MqttPacketSerializerTests.cs | 3 +- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 1 - .../TestMqttCommunicationAdapter.cs | 3 +- .../TestMqttServerAdapter.cs | 14 +++---- Tests/MQTTnet.TestApp.NetCore/Program.cs | 11 +++--- .../PerformanceTest.cs | 6 +-- Tests/MQTTnet.TestApp.NetFramework/Program.cs | 17 ++++++--- .../MainPage.xaml.cs | 37 +++++++++++++----- 40 files changed, 292 insertions(+), 198 deletions(-) create mode 100644 Frameworks/MQTTnet.NetFramework/Implementations/MqttCommunicationAdapterFactory.cs create mode 100644 Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs create mode 100644 Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs create mode 100644 MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs create mode 100644 MQTTnet.Core/Client/MqttClientTcpOptions.cs rename MQTTnet.Core/Client/{MqttClientOptionsExtensions.cs => MqttClientTcpOptionsExtensions.cs} (73%) create mode 100644 MQTTnet.Core/Client/MqttClientWebSocketOptions.cs delete mode 100644 MQTTnet.Core/Client/MqttConnectionType.cs create mode 100644 Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index a4417c3..a20c21b 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -16,6 +16,7 @@ * [Server] The client connection is now closed if sending of one pending application message has failed * [Server] Fixed handling of _Dup_ flag (Thanks to haeberle) * [Core] Optimized exception handling +* [Core] Mono is now also supported (Thanks to JTrotta) Copyright Christian Kratky 2016-2017 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttCommunicationAdapterFactory.cs new file mode 100644 index 0000000..799ccdd --- /dev/null +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttCommunicationAdapterFactory.cs @@ -0,0 +1,27 @@ +using System; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Client; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Implementations +{ + public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory + { + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + if (options is MqttClientTcpOptions tcpOptions) + { + return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + } + + if (options is MqttClientWebSocketOptions webSocketOptions) + { + return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + } + + throw new NotSupportedException(); + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index e378423..a113ab3 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -12,6 +12,8 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { + private readonly MqttClientTcpOptions _options; + // ReSharper disable once MemberCanBePrivate.Global // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global public static int BufferSize { get; set; } = 4096 * 20; // Can be changed for fine tuning by library user. @@ -22,9 +24,9 @@ namespace MQTTnet.Implementations /// /// called on client sockets are created in connect /// - public MqttTcpChannel() + public MqttTcpChannel(MqttClientTcpOptions options) { - + _options = options ?? throw new ArgumentNullException(nameof(options)); } /// @@ -42,22 +44,20 @@ namespace MQTTnet.Implementations public Stream ReceiveStream { get; private set; } public Stream RawReceiveStream { get; private set; } - public async Task ConnectAsync(MqttClientOptions options) + public async Task ConnectAsync() { - if (options == null) throw new ArgumentNullException(nameof(options)); - if (_socket == null) { _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); } - await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null).ConfigureAwait(false); + await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, _options.Server, _options.GetPort(), null).ConfigureAwait(false); - if (options.TlsOptions.UseTls) + if (_options.TlsOptions.UseTls) { _sslStream = new SslStream(new NetworkStream(_socket, true)); - await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false); + await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(_options), SslProtocols.Tls12, _options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false); } CreateStreams(_socket, _sslStream); diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs index f898a78..d6c92ac 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs @@ -10,27 +10,29 @@ namespace MQTTnet.Implementations { public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { + private readonly MqttClientWebSocketOptions _options; + private ClientWebSocket _webSocket; + public MqttWebSocketChannel(MqttClientWebSocketOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + public Stream RawReceiveStream { get; private set; } public Stream SendStream => RawReceiveStream; public Stream ReceiveStream => RawReceiveStream; - - public async Task ConnectAsync(MqttClientOptions options) + + public async Task ConnectAsync() { - var uri = options.Server; + var uri = _options.Uri; if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase)) { uri = "ws://" + uri; } - - if (options.Port.HasValue) - { - uri += ":" + options.Port; - } - + _webSocket = new ClientWebSocket(); - _webSocket.Options.KeepAliveInterval = options.KeepAlivePeriod; + _webSocket.Options.KeepAliveInterval = _options.KeepAlivePeriod; await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None).ConfigureAwait(false); RawReceiveStream = new WebSocketStream(_webSocket); } diff --git a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj index 3f2519f..22d5032 100644 --- a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj +++ b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj @@ -100,6 +100,7 @@ + diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs index 6b011d1..a64deea 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs @@ -1,8 +1,5 @@ using System; -using MQTTnet.Core.Adapter; -using MQTTnet.Core.Channel; using MQTTnet.Core.Client; -using MQTTnet.Core.Serializer; using MQTTnet.Implementations; namespace MQTTnet @@ -13,21 +10,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer())); - } - - private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options) - { - switch (options.ConnectionType) - { - case MqttConnectionType.Tcp: - return new MqttTcpChannel(); - case MqttConnectionType.Ws: - return new MqttWebSocketChannel(); - - default: - throw new NotSupportedException(); - } + return new MqttClient(new MqttCommunicationAdapterFactory()); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs new file mode 100644 index 0000000..799ccdd --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs @@ -0,0 +1,27 @@ +using System; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Client; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Implementations +{ + public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory + { + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + if (options is MqttClientTcpOptions tcpOptions) + { + return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + } + + if (options is MqttClientWebSocketOptions webSocketOptions) + { + return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + } + + throw new NotSupportedException(); + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index ff92edb..fd28348 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -12,14 +12,17 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { + private readonly MqttClientTcpOptions _options; + private Socket _socket; private SslStream _sslStream; /// /// called on client sockets are created in connect /// - public MqttTcpChannel() + public MqttTcpChannel(MqttClientTcpOptions options) { + _options = options ?? throw new ArgumentNullException(nameof(options)); } /// @@ -37,22 +40,20 @@ namespace MQTTnet.Implementations public Stream ReceiveStream { get; private set; } public Stream RawReceiveStream => ReceiveStream; - public async Task ConnectAsync(MqttClientOptions options) + public async Task ConnectAsync() { - if (options == null) throw new ArgumentNullException(nameof(options)); - if (_socket == null) { _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); } - await _socket.ConnectAsync(options.Server, options.GetPort()).ConfigureAwait(false); + await _socket.ConnectAsync(_options.Server, _options.GetPort()).ConfigureAwait(false); - if (options.TlsOptions.UseTls) + if (_options.TlsOptions.UseTls) { _sslStream = new SslStream(new NetworkStream(_socket, true)); ReceiveStream = _sslStream; - await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false); + await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(_options), SslProtocols.Tls12, _options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false); } else { diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index b6c41d9..0433b80 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -10,27 +10,28 @@ namespace MQTTnet.Implementations { public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { + private readonly MqttClientWebSocketOptions _options; private ClientWebSocket _webSocket; + public MqttWebSocketChannel(MqttClientWebSocketOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + public Stream SendStream => RawReceiveStream; public Stream ReceiveStream => RawReceiveStream; public Stream RawReceiveStream { get; private set; } - public async Task ConnectAsync(MqttClientOptions options) + public async Task ConnectAsync() { - var uri = options.Server; + var uri = _options.Uri; if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase)) { uri = "ws://" + uri; } - if (options.Port.HasValue) - { - uri += ":" + options.Port; - } - _webSocket = new ClientWebSocket(); - _webSocket.Options.KeepAliveInterval = options.KeepAlivePeriod; + _webSocket.Options.KeepAliveInterval = _options.KeepAlivePeriod; await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None); RawReceiveStream = new WebSocketStream(_webSocket); } diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 6b011d1..a64deea 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -1,8 +1,5 @@ using System; -using MQTTnet.Core.Adapter; -using MQTTnet.Core.Channel; using MQTTnet.Core.Client; -using MQTTnet.Core.Serializer; using MQTTnet.Implementations; namespace MQTTnet @@ -13,21 +10,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer())); - } - - private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options) - { - switch (options.ConnectionType) - { - case MqttConnectionType.Tcp: - return new MqttTcpChannel(); - case MqttConnectionType.Ws: - return new MqttWebSocketChannel(); - - default: - throw new NotSupportedException(); - } + return new MqttClient(new MqttCommunicationAdapterFactory()); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs new file mode 100644 index 0000000..799ccdd --- /dev/null +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs @@ -0,0 +1,27 @@ +using System; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Client; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Implementations +{ + public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory + { + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + if (options is MqttClientTcpOptions tcpOptions) + { + return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + } + + if (options is MqttClientWebSocketOptions webSocketOptions) + { + return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + } + + throw new NotSupportedException(); + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs index 781abe7..975928e 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs @@ -13,10 +13,12 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { + private readonly MqttClientTcpOptions _options; private StreamSocket _socket; - public MqttTcpChannel() + public MqttTcpChannel(MqttClientTcpOptions options) { + _options = options ?? throw new ArgumentNullException(nameof(options)); } public MqttTcpChannel(StreamSocket socket) @@ -29,30 +31,28 @@ namespace MQTTnet.Implementations public Stream ReceiveStream { get; private set; } public Stream RawReceiveStream { get; private set; } - public async Task ConnectAsync(MqttClientOptions options) + public async Task ConnectAsync() { - if (options == null) throw new ArgumentNullException(nameof(options)); - if (_socket == null) { _socket = new StreamSocket(); } - if (!options.TlsOptions.UseTls) + if (!_options.TlsOptions.UseTls) { - await _socket.ConnectAsync(new HostName(options.Server), options.GetPort().ToString()); + await _socket.ConnectAsync(new HostName(_options.Server), _options.GetPort().ToString()); } else { - _socket.Control.ClientCertificate = LoadCertificate(options); + _socket.Control.ClientCertificate = LoadCertificate(_options); - if (!options.TlsOptions.CheckCertificateRevocation) + if (!_options.TlsOptions.CheckCertificateRevocation) { _socket.Control.IgnorableServerCertificateErrors.Add(ChainValidationResult.IncompleteChain); _socket.Control.IgnorableServerCertificateErrors.Add(ChainValidationResult.RevocationInformationMissing); } - await _socket.ConnectAsync(new HostName(options.Server), options.GetPort().ToString(), SocketProtectionLevel.Tls12); + await _socket.ConnectAsync(new HostName(_options.Server), _options.GetPort().ToString(), SocketProtectionLevel.Tls12); } CreateStreams(); diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs index 0051322..471cdbf 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs @@ -10,27 +10,28 @@ namespace MQTTnet.Implementations { public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { + private readonly MqttClientWebSocketOptions _options; private ClientWebSocket _webSocket = new ClientWebSocket(); + public MqttWebSocketChannel(MqttClientWebSocketOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + public Stream RawReceiveStream { get; private set; } public Stream SendStream => RawReceiveStream; public Stream ReceiveStream => RawReceiveStream; - public async Task ConnectAsync(MqttClientOptions options) + public async Task ConnectAsync() { - var uri = options.Server; + var uri = _options.Uri; if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase)) { uri = "ws://" + uri; } - if (options.Port.HasValue) - { - uri += ":" + options.Port; - } - _webSocket = new ClientWebSocket(); - _webSocket.Options.KeepAliveInterval = options.KeepAlivePeriod; + _webSocket.Options.KeepAliveInterval = _options.KeepAlivePeriod; await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None).ConfigureAwait(false); RawReceiveStream = new WebSocketStream(_webSocket); diff --git a/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj b/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj index 0d40ac2..71081e0 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj +++ b/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj @@ -110,6 +110,7 @@ bin\x64\Release\MQTTnet.XML + diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index 6b011d1..a64deea 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -1,8 +1,5 @@ using System; -using MQTTnet.Core.Adapter; -using MQTTnet.Core.Channel; using MQTTnet.Core.Client; -using MQTTnet.Core.Serializer; using MQTTnet.Implementations; namespace MQTTnet @@ -13,21 +10,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer())); - } - - private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options) - { - switch (options.ConnectionType) - { - case MqttConnectionType.Tcp: - return new MqttTcpChannel(); - case MqttConnectionType.Ws: - return new MqttWebSocketChannel(); - - default: - throw new NotSupportedException(); - } + return new MqttClient(new MqttCommunicationAdapterFactory()); } } } \ No newline at end of file diff --git a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs index a80fc87..78586f6 100644 --- a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; -using MQTTnet.Core.Client; using MQTTnet.Core.Packets; using MQTTnet.Core.Serializer; @@ -12,7 +11,7 @@ namespace MQTTnet.Core.Adapter { IMqttPacketSerializer PacketSerializer { get; } - Task ConnectAsync(TimeSpan timeout, MqttClientOptions options); + Task ConnectAsync(TimeSpan timeout); Task DisconnectAsync(TimeSpan timeout); diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index 600081c..3ed2ebc 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -26,11 +26,11 @@ namespace MQTTnet.Core.Adapter public IMqttPacketSerializer PacketSerializer { get; } - public async Task ConnectAsync(TimeSpan timeout, MqttClientOptions options) + public async Task ConnectAsync(TimeSpan timeout) { try { - await _channel.ConnectAsync(options).TimeoutAfter(timeout).ConfigureAwait(false); + await _channel.ConnectAsync().TimeoutAfter(timeout).ConfigureAwait(false); } catch (TaskCanceledException) { diff --git a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs index 24525e8..8c746d1 100644 --- a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs +++ b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs @@ -1,5 +1,4 @@ using System.Threading.Tasks; -using MQTTnet.Core.Client; using System.IO; namespace MQTTnet.Core.Channel @@ -10,7 +9,7 @@ namespace MQTTnet.Core.Channel Stream ReceiveStream { get; } Stream RawReceiveStream { get; } - Task ConnectAsync(MqttClientOptions options); + Task ConnectAsync(); Task DisconnectAsync(); } } diff --git a/MQTTnet.Core/Client/IMqttClient.cs b/MQTTnet.Core/Client/IMqttClient.cs index 165f490..7622867 100644 --- a/MQTTnet.Core/Client/IMqttClient.cs +++ b/MQTTnet.Core/Client/IMqttClient.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Core.Client event EventHandler Connected; event EventHandler Disconnected; - Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null); + Task ConnectAsync(MqttClientOptions options); Task DisconnectAsync(); Task> SubscribeAsync(IEnumerable topicFilters); diff --git a/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs b/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs new file mode 100644 index 0000000..092ea04 --- /dev/null +++ b/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs @@ -0,0 +1,9 @@ +using MQTTnet.Core.Adapter; + +namespace MQTTnet.Core.Client +{ + public interface IMqttCommunicationAdapterFactory + { + IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options); + } +} \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index aa42652..f17800b 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -16,19 +16,17 @@ namespace MQTTnet.Core.Client { private readonly HashSet _unacknowledgedPublishPackets = new HashSet(); private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); - private readonly MqttClientOptions _options; - private readonly IMqttCommunicationAdapter _adapter; + private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory; + private MqttClientOptions _options; private bool _isReceivingPackets; private int _latestPacketIdentifier; private CancellationTokenSource _cancellationTokenSource; + private IMqttCommunicationAdapter _adapter; - public MqttClient(MqttClientOptions options, IMqttCommunicationAdapter adapter) + public MqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory) { - _options = options ?? throw new ArgumentNullException(nameof(options)); - _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); - - _adapter.PacketSerializer.ProtocolVersion = options.ProtocolVersion; + _communicationChannelFactory = communicationChannelFactory; } public event EventHandler Connected; @@ -37,22 +35,27 @@ namespace MQTTnet.Core.Client public bool IsConnected => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested; - public async Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null) + public async Task ConnectAsync(MqttClientOptions options) { + if (options == null) throw new ArgumentNullException(nameof(options)); + ThrowIfConnected("It is not allowed to connect with a server after the connection is established."); try { + _options = options; _cancellationTokenSource = new CancellationTokenSource(); _latestPacketIdentifier = 0; _packetDispatcher.Reset(); + _adapter = _communicationChannelFactory.CreateMqttCommunicationAdapter(options); + MqttTrace.Verbose(nameof(MqttClient), "Trying to connect with server."); - await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout, _options).ConfigureAwait(false); + await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); await SetupIncomingPacketProcessingAsync(); - await AuthenticateAsync(willApplicationMessage); + await AuthenticateAsync(options.WillMessage); MqttTrace.Verbose(nameof(MqttClient), "MQTT connection with server established."); diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs index 12e7ae9..f2e28fb 100644 --- a/MQTTnet.Core/Client/MqttClientOptions.cs +++ b/MQTTnet.Core/Client/MqttClientOptions.cs @@ -3,14 +3,12 @@ using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Client { - public sealed class MqttClientOptions + public abstract class MqttClientOptions { - public string Server { get; set; } - - public int? Port { get; set; } - public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); + public MqttApplicationMessage WillMessage { get; set; } + public string UserName { get; set; } public string Password { get; set; } @@ -24,7 +22,5 @@ namespace MQTTnet.Core.Client public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; - - public MqttConnectionType ConnectionType { get; set; } = MqttConnectionType.Tcp; } } diff --git a/MQTTnet.Core/Client/MqttClientTcpOptions.cs b/MQTTnet.Core/Client/MqttClientTcpOptions.cs new file mode 100644 index 0000000..beaa506 --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientTcpOptions.cs @@ -0,0 +1,9 @@ +namespace MQTTnet.Core.Client +{ + public class MqttClientTcpOptions : MqttClientOptions + { + public string Server { get; set; } + + public int? Port { get; set; } + } +} diff --git a/MQTTnet.Core/Client/MqttClientOptionsExtensions.cs b/MQTTnet.Core/Client/MqttClientTcpOptionsExtensions.cs similarity index 73% rename from MQTTnet.Core/Client/MqttClientOptionsExtensions.cs rename to MQTTnet.Core/Client/MqttClientTcpOptionsExtensions.cs index 7b01fbd..cada6f8 100644 --- a/MQTTnet.Core/Client/MqttClientOptionsExtensions.cs +++ b/MQTTnet.Core/Client/MqttClientTcpOptionsExtensions.cs @@ -2,9 +2,9 @@ namespace MQTTnet.Core.Client { - public static class MqttClientOptionsExtensions + public static class MqttClientTcpOptionsExtensions { - public static int GetPort(this MqttClientOptions options) + public static int GetPort(this MqttClientTcpOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs b/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs new file mode 100644 index 0000000..4b90524 --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs @@ -0,0 +1,7 @@ +namespace MQTTnet.Core.Client +{ + public class MqttClientWebSocketOptions : MqttClientOptions + { + public string Uri { get; set; } + } +} diff --git a/MQTTnet.Core/Client/MqttConnectionType.cs b/MQTTnet.Core/Client/MqttConnectionType.cs deleted file mode 100644 index 830bf0b..0000000 --- a/MQTTnet.Core/Client/MqttConnectionType.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace MQTTnet.Core.Client -{ - public enum MqttConnectionType - { - Tcp, - Ws - } -} diff --git a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs index 61a1b3b..df3aeb0 100644 --- a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs +++ b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs @@ -72,6 +72,9 @@ namespace MQTTnet.Core.Server { MqttTrace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to communication exception."); } + if (exception is OperationCanceledException) + { + } else { MqttTrace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed."); diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index fe1ab38..e94bde0 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -8,6 +8,7 @@ using MQTTnet.Core.Exceptions; using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; +using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Server { @@ -19,7 +20,8 @@ namespace MQTTnet.Core.Server private readonly MqttClientSessionsManager _mqttClientSessionsManager; private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; private readonly MqttServerOptions _options; - + + private IMqttCommunicationAdapter _adapter; private CancellationTokenSource _cancellationTokenSource; private MqttApplicationMessage _willMessage; @@ -33,9 +35,9 @@ namespace MQTTnet.Core.Server public string ClientId { get; } - public bool IsConnected => Adapter != null; + public MqttProtocolVersion? ProtocolVersion => _adapter?.PacketSerializer.ProtocolVersion; - public IMqttCommunicationAdapter Adapter { get; private set; } + public bool IsConnected => _adapter != null; public async Task RunAsync(MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter) { @@ -45,7 +47,7 @@ namespace MQTTnet.Core.Server try { - Adapter = adapter; + _adapter = adapter; _cancellationTokenSource = new CancellationTokenSource(); _pendingMessagesQueue.Start(adapter, _cancellationTokenSource.Token); @@ -75,7 +77,7 @@ namespace MQTTnet.Core.Server _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; - Adapter = null; + _adapter = null; MqttTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId); } @@ -106,7 +108,7 @@ namespace MQTTnet.Core.Server while (!cancellationToken.IsCancellationRequested) { var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false); - await ProcessReceivedPacketAsync(packet).ConfigureAwait(false); + await ProcessReceivedPacketAsync(adapter, packet).ConfigureAwait(false); } } catch (OperationCanceledException) @@ -124,28 +126,28 @@ namespace MQTTnet.Core.Server } } - private async Task ProcessReceivedPacketAsync(MqttBasePacket packet) + private async Task ProcessReceivedPacketAsync(IMqttCommunicationAdapter adapter, MqttBasePacket packet) { if (packet is MqttSubscribePacket subscribePacket) { - await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Subscribe(subscribePacket)); + await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Subscribe(subscribePacket)); EnqueueRetainedMessages(subscribePacket); } else if (packet is MqttUnsubscribePacket unsubscribePacket) { - await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Unsubscribe(unsubscribePacket)); + await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Unsubscribe(unsubscribePacket)); } else if (packet is MqttPublishPacket publishPacket) { - await HandleIncomingPublishPacketAsync(publishPacket); + await HandleIncomingPublishPacketAsync(adapter, publishPacket); } else if (packet is MqttPubRelPacket pubRelPacket) { - await HandleIncomingPubRelPacketAsync(pubRelPacket); + await HandleIncomingPubRelPacketAsync(adapter, pubRelPacket); } else if (packet is MqttPubRecPacket pubRecPacket) { - await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, pubRecPacket.CreateResponse()); + await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, pubRecPacket.CreateResponse()); } else if (packet is MqttPubAckPacket || packet is MqttPubCompPacket) { @@ -153,7 +155,7 @@ namespace MQTTnet.Core.Server } else if (packet is MqttPingReqPacket) { - await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPingRespPacket()); + await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPingRespPacket()); } else if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) { @@ -175,7 +177,7 @@ namespace MQTTnet.Core.Server } } - private async Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket) + private async Task HandleIncomingPublishPacketAsync(IMqttCommunicationAdapter adapter, MqttPublishPacket publishPacket) { if (publishPacket.Retain) { @@ -191,7 +193,7 @@ namespace MQTTnet.Core.Server if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { _mqttClientSessionsManager.DispatchPublishPacket(this, publishPacket); - await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); + await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); return; } @@ -205,21 +207,21 @@ namespace MQTTnet.Core.Server _mqttClientSessionsManager.DispatchPublishPacket(this, publishPacket); - await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); + await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); return; } throw new MqttCommunicationException("Received a not supported QoS level."); } - private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket) + private Task HandleIncomingPubRelPacketAsync(IMqttCommunicationAdapter adapter, MqttPubRelPacket pubRelPacket) { lock (_unacknowledgedPublishPackets) { _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }); + return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }); } } } diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index 981cdd3..df95c7e 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -9,6 +9,7 @@ using MQTTnet.Core.Exceptions; using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; +using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Server { @@ -24,9 +25,7 @@ namespace MQTTnet.Core.Server } public event EventHandler ApplicationMessageReceived; - public event EventHandler ClientConnected; - public event EventHandler ClientDisconnected; public MqttClientRetainedMessagesManager RetainedMessagesManager { get; } @@ -111,7 +110,7 @@ namespace MQTTnet.Core.Server return _clientSessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient { ClientId = s.Value.ClientId, - ProtocolVersion = s.Value.Adapter.PacketSerializer.ProtocolVersion + ProtocolVersion = s.Value.ProtocolVersion ?? MqttProtocolVersion.V311 }).ToList(); } } diff --git a/MQTTnet.sln b/MQTTnet.sln index 419fa6a..8ba4c60 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -3,7 +3,7 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 VisualStudioVersion = 15.0.26730.16 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.NetFramework", "Frameworks\MQTTnet.NetFramework\MQTTnet.NetFramework.csproj", "{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}" EndProject diff --git a/README.md b/README.md index 9a3f851..c3993a1 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien * Performance optimized (processing ~27.000 messages / second)* * Interfaces included for mocking and testing * Access to internal trace messages -* Unit tested (57+ tests) +* Unit tested (58+ tests) \* Tested on local machine with MQTTnet client and server running in the same process using the TCP channel. The app for verification is part of this repository and stored in _/Tests/MQTTnet.TestApp.NetFramework_. diff --git a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs new file mode 100644 index 0000000..c83f83a --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs @@ -0,0 +1,20 @@ +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Client; + +namespace MQTTnet.Core.Tests +{ + public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory + { + private readonly IMqttCommunicationAdapter _adapter; + + public MqttCommunicationAdapterFactory(IMqttCommunicationAdapter adapter) + { + _adapter = adapter; + } + + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + { + return _adapter; + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index eb7cb83..de6265f 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -6,7 +6,6 @@ using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Core.Adapter; using MQTTnet.Core.Channel; -using MQTTnet.Core.Client; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Serializer; @@ -411,7 +410,7 @@ namespace MQTTnet.Core.Tests _stream.Position = 0; } - public Task ConnectAsync(MqttClientOptions options) + public Task ConnectAsync() { return Task.FromResult(0); } diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 19f96c6..f198576 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -6,7 +6,6 @@ using MQTTnet.Core.Client; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; -using System; namespace MQTTnet.Core.Tests { diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index 91d0719..9e71f8a 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -4,7 +4,6 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; -using MQTTnet.Core.Client; using MQTTnet.Core.Packets; using MQTTnet.Core.Serializer; @@ -18,7 +17,7 @@ namespace MQTTnet.Core.Tests public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer(); - public Task ConnectAsync(TimeSpan timeout, MqttClientOptions options) + public Task ConnectAsync(TimeSpan timeout) { return Task.FromResult(0); } diff --git a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs index ad099d2..1906564 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs @@ -17,10 +17,11 @@ namespace MQTTnet.Core.Tests adapterA.Partner = adapterB; adapterB.Partner = adapterA; - var client = new MqttClient(new MqttClientOptions() { ClientId = clientId }, adapterA); + var client = new MqttClient(new MqttCommunicationAdapterFactory(adapterA)); var connected = WaitForClientToConnect(server, clientId); + FireClientAcceptedEvent(adapterB); - await client.ConnectAsync(willMessage); + await client.ConnectAsync(new MqttClientTcpOptions { ClientId = clientId, WillMessage = willMessage }); await connected; return client; @@ -30,17 +31,16 @@ namespace MQTTnet.Core.Tests { var tcs = new TaskCompletionSource(); - EventHandler handler = null; - handler = (sender, args) => + void Handler(object sender, MqttClientConnectedEventArgs args) { if (args.Client.ClientId == clientId) { - s.ClientConnected -= handler; + s.ClientConnected -= Handler; tcs.SetResult(null); } - }; + } - s.ClientConnected += handler; + s.ClientConnected += Handler; return tcs.Task; } diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index e4ccf95..57d2d9a 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -46,12 +46,11 @@ namespace MQTTnet.TestApp.NetCore try { - var options = new MqttClientOptions + var options = new MqttClientWebSocketOptions { - Server = "localhost", + Uri = "localhost", ClientId = "XYZ", - CleanSession = true, - ConnectionType = MqttConnectionType.Ws + CleanSession = true }; var client = new MqttClientFactory().CreateMqttClient(options); @@ -84,7 +83,7 @@ namespace MQTTnet.TestApp.NetCore try { - await client.ConnectAsync(); + await client.ConnectAsync(options); } catch { @@ -94,7 +93,7 @@ namespace MQTTnet.TestApp.NetCore try { - await client.ConnectAsync(); + await client.ConnectAsync(options); } catch (Exception exception) { diff --git a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs index e78a3f9..e8890d5 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs @@ -32,7 +32,7 @@ namespace MQTTnet.TestApp.NetFramework { try { - var options = new MqttClientOptions + var options = new MqttClientTcpOptions { Server = "localhost", ClientId = "XYZ", @@ -64,7 +64,7 @@ namespace MQTTnet.TestApp.NetFramework try { - await client.ConnectAsync(); + await client.ConnectAsync(options); } catch { @@ -74,7 +74,7 @@ namespace MQTTnet.TestApp.NetFramework try { - await client.ConnectAsync(); + await client.ConnectAsync(options); } catch (Exception exception) { diff --git a/Tests/MQTTnet.TestApp.NetFramework/Program.cs b/Tests/MQTTnet.TestApp.NetFramework/Program.cs index b5a7b24..78cd406 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/Program.cs +++ b/Tests/MQTTnet.TestApp.NetFramework/Program.cs @@ -52,13 +52,18 @@ namespace MQTTnet.TestApp.NetFramework try { - var options = new MqttClientOptions + var options = new MqttClientWebSocketOptions { - Server = "localhost", - ClientId = "XYZ", - CleanSession = true + Uri = "broker.hivemq.com:8000/mqtt" }; + ////var options = new MqttClientOptions + ////{ + //// Server = "localhost", + //// ClientId = "XYZ", + //// CleanSession = true + ////}; + var client = new MqttClientFactory().CreateMqttClient(options); client.ApplicationMessageReceived += (s, e) => { @@ -87,7 +92,7 @@ namespace MQTTnet.TestApp.NetFramework try { - await client.ConnectAsync(); + await client.ConnectAsync(options); } catch { @@ -97,7 +102,7 @@ namespace MQTTnet.TestApp.NetFramework try { - await client.ConnectAsync(); + await client.ConnectAsync(options); } catch (Exception exception) { diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 4c1ce4f..8f6c039 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -37,15 +37,32 @@ namespace MQTTnet.TestApp.UniversalWindows private async void Connect(object sender, RoutedEventArgs e) { - var options = new MqttClientOptions - { - Server = Server.Text, - UserName = User.Text, - Password = Password.Text, - ClientId = ClientId.Text, - TlsOptions = { UseTls = UseTls.IsChecked == true }, - ConnectionType = UseTcp.IsChecked == true ? MqttConnectionType.Tcp : MqttConnectionType.Ws - }; + MqttClientOptions options = null; + if (UseTcp.IsChecked == true) + { + options = new MqttClientTcpOptions + { + Server = Server.Text + }; + } + + if (UseWs.IsChecked == true) + { + options = new MqttClientWebSocketOptions + { + Uri = Server.Text + }; + } + + if (options == null) + { + throw new InvalidOperationException(); + } + + options.UserName = User.Text; + options.Password = Password.Text; + options.ClientId = ClientId.Text; + options.TlsOptions.UseTls = UseTls.IsChecked == true; try { @@ -56,7 +73,7 @@ namespace MQTTnet.TestApp.UniversalWindows var factory = new MqttClientFactory(); _mqttClient = factory.CreateMqttClient(options); - await _mqttClient.ConnectAsync(); + await _mqttClient.ConnectAsync(options); } catch (Exception exception) {