diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index a4417c3..a20c21b 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -16,6 +16,7 @@
* [Server] The client connection is now closed if sending of one pending application message has failed
* [Server] Fixed handling of _Dup_ flag (Thanks to haeberle)
* [Core] Optimized exception handling
+* [Core] Mono is now also supported (Thanks to JTrotta)
Copyright Christian Kratky 2016-2017
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M
diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttCommunicationAdapterFactory.cs
new file mode 100644
index 0000000..799ccdd
--- /dev/null
+++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttCommunicationAdapterFactory.cs
@@ -0,0 +1,27 @@
+using System;
+using MQTTnet.Core.Adapter;
+using MQTTnet.Core.Client;
+using MQTTnet.Core.Serializer;
+
+namespace MQTTnet.Implementations
+{
+ public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
+ {
+ public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
+ {
+ if (options == null) throw new ArgumentNullException(nameof(options));
+
+ if (options is MqttClientTcpOptions tcpOptions)
+ {
+ return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
+ }
+
+ if (options is MqttClientWebSocketOptions webSocketOptions)
+ {
+ return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
+ }
+
+ throw new NotSupportedException();
+ }
+ }
+}
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
index e378423..a113ab3 100644
--- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
+++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
@@ -12,6 +12,8 @@ namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{
+ private readonly MqttClientTcpOptions _options;
+
// ReSharper disable once MemberCanBePrivate.Global
// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global
public static int BufferSize { get; set; } = 4096 * 20; // Can be changed for fine tuning by library user.
@@ -22,9 +24,9 @@ namespace MQTTnet.Implementations
///
/// called on client sockets are created in connect
///
- public MqttTcpChannel()
+ public MqttTcpChannel(MqttClientTcpOptions options)
{
-
+ _options = options ?? throw new ArgumentNullException(nameof(options));
}
///
@@ -42,22 +44,20 @@ namespace MQTTnet.Implementations
public Stream ReceiveStream { get; private set; }
public Stream RawReceiveStream { get; private set; }
- public async Task ConnectAsync(MqttClientOptions options)
+ public async Task ConnectAsync()
{
- if (options == null) throw new ArgumentNullException(nameof(options));
-
if (_socket == null)
{
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}
- await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null).ConfigureAwait(false);
+ await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, _options.Server, _options.GetPort(), null).ConfigureAwait(false);
- if (options.TlsOptions.UseTls)
+ if (_options.TlsOptions.UseTls)
{
_sslStream = new SslStream(new NetworkStream(_socket, true));
- await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
+ await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(_options), SslProtocols.Tls12, _options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
}
CreateStreams(_socket, _sslStream);
diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
index f898a78..d6c92ac 100644
--- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
+++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
@@ -10,27 +10,29 @@ namespace MQTTnet.Implementations
{
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
+ private readonly MqttClientWebSocketOptions _options;
+
private ClientWebSocket _webSocket;
+ public MqttWebSocketChannel(MqttClientWebSocketOptions options)
+ {
+ _options = options ?? throw new ArgumentNullException(nameof(options));
+ }
+
public Stream RawReceiveStream { get; private set; }
public Stream SendStream => RawReceiveStream;
public Stream ReceiveStream => RawReceiveStream;
-
- public async Task ConnectAsync(MqttClientOptions options)
+
+ public async Task ConnectAsync()
{
- var uri = options.Server;
+ var uri = _options.Uri;
if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase))
{
uri = "ws://" + uri;
}
-
- if (options.Port.HasValue)
- {
- uri += ":" + options.Port;
- }
-
+
_webSocket = new ClientWebSocket();
- _webSocket.Options.KeepAliveInterval = options.KeepAlivePeriod;
+ _webSocket.Options.KeepAliveInterval = _options.KeepAlivePeriod;
await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None).ConfigureAwait(false);
RawReceiveStream = new WebSocketStream(_webSocket);
}
diff --git a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
index 3f2519f..22d5032 100644
--- a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
+++ b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
@@ -100,6 +100,7 @@
+
diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
index 6b011d1..a64deea 100644
--- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
+++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
@@ -1,8 +1,5 @@
using System;
-using MQTTnet.Core.Adapter;
-using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
-using MQTTnet.Core.Serializer;
using MQTTnet.Implementations;
namespace MQTTnet
@@ -13,21 +10,7 @@ namespace MQTTnet
{
if (options == null) throw new ArgumentNullException(nameof(options));
- return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer()));
- }
-
- private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options)
- {
- switch (options.ConnectionType)
- {
- case MqttConnectionType.Tcp:
- return new MqttTcpChannel();
- case MqttConnectionType.Ws:
- return new MqttWebSocketChannel();
-
- default:
- throw new NotSupportedException();
- }
+ return new MqttClient(new MqttCommunicationAdapterFactory());
}
}
}
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs
new file mode 100644
index 0000000..799ccdd
--- /dev/null
+++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs
@@ -0,0 +1,27 @@
+using System;
+using MQTTnet.Core.Adapter;
+using MQTTnet.Core.Client;
+using MQTTnet.Core.Serializer;
+
+namespace MQTTnet.Implementations
+{
+ public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
+ {
+ public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
+ {
+ if (options == null) throw new ArgumentNullException(nameof(options));
+
+ if (options is MqttClientTcpOptions tcpOptions)
+ {
+ return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
+ }
+
+ if (options is MqttClientWebSocketOptions webSocketOptions)
+ {
+ return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
+ }
+
+ throw new NotSupportedException();
+ }
+ }
+}
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
index ff92edb..fd28348 100644
--- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
+++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
@@ -12,14 +12,17 @@ namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{
+ private readonly MqttClientTcpOptions _options;
+
private Socket _socket;
private SslStream _sslStream;
///
/// called on client sockets are created in connect
///
- public MqttTcpChannel()
+ public MqttTcpChannel(MqttClientTcpOptions options)
{
+ _options = options ?? throw new ArgumentNullException(nameof(options));
}
///
@@ -37,22 +40,20 @@ namespace MQTTnet.Implementations
public Stream ReceiveStream { get; private set; }
public Stream RawReceiveStream => ReceiveStream;
- public async Task ConnectAsync(MqttClientOptions options)
+ public async Task ConnectAsync()
{
- if (options == null) throw new ArgumentNullException(nameof(options));
-
if (_socket == null)
{
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}
- await _socket.ConnectAsync(options.Server, options.GetPort()).ConfigureAwait(false);
+ await _socket.ConnectAsync(_options.Server, _options.GetPort()).ConfigureAwait(false);
- if (options.TlsOptions.UseTls)
+ if (_options.TlsOptions.UseTls)
{
_sslStream = new SslStream(new NetworkStream(_socket, true));
ReceiveStream = _sslStream;
- await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
+ await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(_options), SslProtocols.Tls12, _options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
}
else
{
diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
index b6c41d9..0433b80 100644
--- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
+++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
@@ -10,27 +10,28 @@ namespace MQTTnet.Implementations
{
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
+ private readonly MqttClientWebSocketOptions _options;
private ClientWebSocket _webSocket;
+ public MqttWebSocketChannel(MqttClientWebSocketOptions options)
+ {
+ _options = options ?? throw new ArgumentNullException(nameof(options));
+ }
+
public Stream SendStream => RawReceiveStream;
public Stream ReceiveStream => RawReceiveStream;
public Stream RawReceiveStream { get; private set; }
- public async Task ConnectAsync(MqttClientOptions options)
+ public async Task ConnectAsync()
{
- var uri = options.Server;
+ var uri = _options.Uri;
if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase))
{
uri = "ws://" + uri;
}
- if (options.Port.HasValue)
- {
- uri += ":" + options.Port;
- }
-
_webSocket = new ClientWebSocket();
- _webSocket.Options.KeepAliveInterval = options.KeepAlivePeriod;
+ _webSocket.Options.KeepAliveInterval = _options.KeepAlivePeriod;
await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None);
RawReceiveStream = new WebSocketStream(_webSocket);
}
diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
index 6b011d1..a64deea 100644
--- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
+++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
@@ -1,8 +1,5 @@
using System;
-using MQTTnet.Core.Adapter;
-using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
-using MQTTnet.Core.Serializer;
using MQTTnet.Implementations;
namespace MQTTnet
@@ -13,21 +10,7 @@ namespace MQTTnet
{
if (options == null) throw new ArgumentNullException(nameof(options));
- return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer()));
- }
-
- private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options)
- {
- switch (options.ConnectionType)
- {
- case MqttConnectionType.Tcp:
- return new MqttTcpChannel();
- case MqttConnectionType.Ws:
- return new MqttWebSocketChannel();
-
- default:
- throw new NotSupportedException();
- }
+ return new MqttClient(new MqttCommunicationAdapterFactory());
}
}
}
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs
new file mode 100644
index 0000000..799ccdd
--- /dev/null
+++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs
@@ -0,0 +1,27 @@
+using System;
+using MQTTnet.Core.Adapter;
+using MQTTnet.Core.Client;
+using MQTTnet.Core.Serializer;
+
+namespace MQTTnet.Implementations
+{
+ public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
+ {
+ public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
+ {
+ if (options == null) throw new ArgumentNullException(nameof(options));
+
+ if (options is MqttClientTcpOptions tcpOptions)
+ {
+ return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
+ }
+
+ if (options is MqttClientWebSocketOptions webSocketOptions)
+ {
+ return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
+ }
+
+ throw new NotSupportedException();
+ }
+ }
+}
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs
index 781abe7..975928e 100644
--- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs
+++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs
@@ -13,10 +13,12 @@ namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{
+ private readonly MqttClientTcpOptions _options;
private StreamSocket _socket;
- public MqttTcpChannel()
+ public MqttTcpChannel(MqttClientTcpOptions options)
{
+ _options = options ?? throw new ArgumentNullException(nameof(options));
}
public MqttTcpChannel(StreamSocket socket)
@@ -29,30 +31,28 @@ namespace MQTTnet.Implementations
public Stream ReceiveStream { get; private set; }
public Stream RawReceiveStream { get; private set; }
- public async Task ConnectAsync(MqttClientOptions options)
+ public async Task ConnectAsync()
{
- if (options == null) throw new ArgumentNullException(nameof(options));
-
if (_socket == null)
{
_socket = new StreamSocket();
}
- if (!options.TlsOptions.UseTls)
+ if (!_options.TlsOptions.UseTls)
{
- await _socket.ConnectAsync(new HostName(options.Server), options.GetPort().ToString());
+ await _socket.ConnectAsync(new HostName(_options.Server), _options.GetPort().ToString());
}
else
{
- _socket.Control.ClientCertificate = LoadCertificate(options);
+ _socket.Control.ClientCertificate = LoadCertificate(_options);
- if (!options.TlsOptions.CheckCertificateRevocation)
+ if (!_options.TlsOptions.CheckCertificateRevocation)
{
_socket.Control.IgnorableServerCertificateErrors.Add(ChainValidationResult.IncompleteChain);
_socket.Control.IgnorableServerCertificateErrors.Add(ChainValidationResult.RevocationInformationMissing);
}
- await _socket.ConnectAsync(new HostName(options.Server), options.GetPort().ToString(), SocketProtectionLevel.Tls12);
+ await _socket.ConnectAsync(new HostName(_options.Server), _options.GetPort().ToString(), SocketProtectionLevel.Tls12);
}
CreateStreams();
diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs
index 0051322..471cdbf 100644
--- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs
+++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs
@@ -10,27 +10,28 @@ namespace MQTTnet.Implementations
{
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
+ private readonly MqttClientWebSocketOptions _options;
private ClientWebSocket _webSocket = new ClientWebSocket();
+ public MqttWebSocketChannel(MqttClientWebSocketOptions options)
+ {
+ _options = options ?? throw new ArgumentNullException(nameof(options));
+ }
+
public Stream RawReceiveStream { get; private set; }
public Stream SendStream => RawReceiveStream;
public Stream ReceiveStream => RawReceiveStream;
- public async Task ConnectAsync(MqttClientOptions options)
+ public async Task ConnectAsync()
{
- var uri = options.Server;
+ var uri = _options.Uri;
if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase))
{
uri = "ws://" + uri;
}
- if (options.Port.HasValue)
- {
- uri += ":" + options.Port;
- }
-
_webSocket = new ClientWebSocket();
- _webSocket.Options.KeepAliveInterval = options.KeepAlivePeriod;
+ _webSocket.Options.KeepAliveInterval = _options.KeepAlivePeriod;
await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None).ConfigureAwait(false);
RawReceiveStream = new WebSocketStream(_webSocket);
diff --git a/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj b/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj
index 0d40ac2..71081e0 100644
--- a/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj
+++ b/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj
@@ -110,6 +110,7 @@
bin\x64\Release\MQTTnet.XML
+
diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
index 6b011d1..a64deea 100644
--- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
+++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
@@ -1,8 +1,5 @@
using System;
-using MQTTnet.Core.Adapter;
-using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
-using MQTTnet.Core.Serializer;
using MQTTnet.Implementations;
namespace MQTTnet
@@ -13,21 +10,7 @@ namespace MQTTnet
{
if (options == null) throw new ArgumentNullException(nameof(options));
- return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer()));
- }
-
- private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options)
- {
- switch (options.ConnectionType)
- {
- case MqttConnectionType.Tcp:
- return new MqttTcpChannel();
- case MqttConnectionType.Ws:
- return new MqttWebSocketChannel();
-
- default:
- throw new NotSupportedException();
- }
+ return new MqttClient(new MqttCommunicationAdapterFactory());
}
}
}
\ No newline at end of file
diff --git a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs
index a80fc87..78586f6 100644
--- a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs
+++ b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs
@@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
-using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Serializer;
@@ -12,7 +11,7 @@ namespace MQTTnet.Core.Adapter
{
IMqttPacketSerializer PacketSerializer { get; }
- Task ConnectAsync(TimeSpan timeout, MqttClientOptions options);
+ Task ConnectAsync(TimeSpan timeout);
Task DisconnectAsync(TimeSpan timeout);
diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
index 600081c..3ed2ebc 100644
--- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
+++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
@@ -26,11 +26,11 @@ namespace MQTTnet.Core.Adapter
public IMqttPacketSerializer PacketSerializer { get; }
- public async Task ConnectAsync(TimeSpan timeout, MqttClientOptions options)
+ public async Task ConnectAsync(TimeSpan timeout)
{
try
{
- await _channel.ConnectAsync(options).TimeoutAfter(timeout).ConfigureAwait(false);
+ await _channel.ConnectAsync().TimeoutAfter(timeout).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
diff --git a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs
index 24525e8..8c746d1 100644
--- a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs
+++ b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs
@@ -1,5 +1,4 @@
using System.Threading.Tasks;
-using MQTTnet.Core.Client;
using System.IO;
namespace MQTTnet.Core.Channel
@@ -10,7 +9,7 @@ namespace MQTTnet.Core.Channel
Stream ReceiveStream { get; }
Stream RawReceiveStream { get; }
- Task ConnectAsync(MqttClientOptions options);
+ Task ConnectAsync();
Task DisconnectAsync();
}
}
diff --git a/MQTTnet.Core/Client/IMqttClient.cs b/MQTTnet.Core/Client/IMqttClient.cs
index 165f490..7622867 100644
--- a/MQTTnet.Core/Client/IMqttClient.cs
+++ b/MQTTnet.Core/Client/IMqttClient.cs
@@ -13,7 +13,7 @@ namespace MQTTnet.Core.Client
event EventHandler Connected;
event EventHandler Disconnected;
- Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null);
+ Task ConnectAsync(MqttClientOptions options);
Task DisconnectAsync();
Task> SubscribeAsync(IEnumerable topicFilters);
diff --git a/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs b/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs
new file mode 100644
index 0000000..092ea04
--- /dev/null
+++ b/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs
@@ -0,0 +1,9 @@
+using MQTTnet.Core.Adapter;
+
+namespace MQTTnet.Core.Client
+{
+ public interface IMqttCommunicationAdapterFactory
+ {
+ IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options);
+ }
+}
\ No newline at end of file
diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs
index aa42652..f17800b 100644
--- a/MQTTnet.Core/Client/MqttClient.cs
+++ b/MQTTnet.Core/Client/MqttClient.cs
@@ -16,19 +16,17 @@ namespace MQTTnet.Core.Client
{
private readonly HashSet _unacknowledgedPublishPackets = new HashSet();
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
- private readonly MqttClientOptions _options;
- private readonly IMqttCommunicationAdapter _adapter;
+ private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory;
+ private MqttClientOptions _options;
private bool _isReceivingPackets;
private int _latestPacketIdentifier;
private CancellationTokenSource _cancellationTokenSource;
+ private IMqttCommunicationAdapter _adapter;
- public MqttClient(MqttClientOptions options, IMqttCommunicationAdapter adapter)
+ public MqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory)
{
- _options = options ?? throw new ArgumentNullException(nameof(options));
- _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
-
- _adapter.PacketSerializer.ProtocolVersion = options.ProtocolVersion;
+ _communicationChannelFactory = communicationChannelFactory;
}
public event EventHandler Connected;
@@ -37,22 +35,27 @@ namespace MQTTnet.Core.Client
public bool IsConnected => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested;
- public async Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null)
+ public async Task ConnectAsync(MqttClientOptions options)
{
+ if (options == null) throw new ArgumentNullException(nameof(options));
+
ThrowIfConnected("It is not allowed to connect with a server after the connection is established.");
try
{
+ _options = options;
_cancellationTokenSource = new CancellationTokenSource();
_latestPacketIdentifier = 0;
_packetDispatcher.Reset();
+ _adapter = _communicationChannelFactory.CreateMqttCommunicationAdapter(options);
+
MqttTrace.Verbose(nameof(MqttClient), "Trying to connect with server.");
- await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout, _options).ConfigureAwait(false);
+ await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
MqttTrace.Verbose(nameof(MqttClient), "Connection with server established.");
await SetupIncomingPacketProcessingAsync();
- await AuthenticateAsync(willApplicationMessage);
+ await AuthenticateAsync(options.WillMessage);
MqttTrace.Verbose(nameof(MqttClient), "MQTT connection with server established.");
diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs
index 12e7ae9..f2e28fb 100644
--- a/MQTTnet.Core/Client/MqttClientOptions.cs
+++ b/MQTTnet.Core/Client/MqttClientOptions.cs
@@ -3,14 +3,12 @@ using MQTTnet.Core.Serializer;
namespace MQTTnet.Core.Client
{
- public sealed class MqttClientOptions
+ public abstract class MqttClientOptions
{
- public string Server { get; set; }
-
- public int? Port { get; set; }
-
public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions();
+ public MqttApplicationMessage WillMessage { get; set; }
+
public string UserName { get; set; }
public string Password { get; set; }
@@ -24,7 +22,5 @@ namespace MQTTnet.Core.Client
public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);
public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
-
- public MqttConnectionType ConnectionType { get; set; } = MqttConnectionType.Tcp;
}
}
diff --git a/MQTTnet.Core/Client/MqttClientTcpOptions.cs b/MQTTnet.Core/Client/MqttClientTcpOptions.cs
new file mode 100644
index 0000000..beaa506
--- /dev/null
+++ b/MQTTnet.Core/Client/MqttClientTcpOptions.cs
@@ -0,0 +1,9 @@
+namespace MQTTnet.Core.Client
+{
+ public class MqttClientTcpOptions : MqttClientOptions
+ {
+ public string Server { get; set; }
+
+ public int? Port { get; set; }
+ }
+}
diff --git a/MQTTnet.Core/Client/MqttClientOptionsExtensions.cs b/MQTTnet.Core/Client/MqttClientTcpOptionsExtensions.cs
similarity index 73%
rename from MQTTnet.Core/Client/MqttClientOptionsExtensions.cs
rename to MQTTnet.Core/Client/MqttClientTcpOptionsExtensions.cs
index 7b01fbd..cada6f8 100644
--- a/MQTTnet.Core/Client/MqttClientOptionsExtensions.cs
+++ b/MQTTnet.Core/Client/MqttClientTcpOptionsExtensions.cs
@@ -2,9 +2,9 @@
namespace MQTTnet.Core.Client
{
- public static class MqttClientOptionsExtensions
+ public static class MqttClientTcpOptionsExtensions
{
- public static int GetPort(this MqttClientOptions options)
+ public static int GetPort(this MqttClientTcpOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
diff --git a/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs b/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs
new file mode 100644
index 0000000..4b90524
--- /dev/null
+++ b/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs
@@ -0,0 +1,7 @@
+namespace MQTTnet.Core.Client
+{
+ public class MqttClientWebSocketOptions : MqttClientOptions
+ {
+ public string Uri { get; set; }
+ }
+}
diff --git a/MQTTnet.Core/Client/MqttConnectionType.cs b/MQTTnet.Core/Client/MqttConnectionType.cs
deleted file mode 100644
index 830bf0b..0000000
--- a/MQTTnet.Core/Client/MqttConnectionType.cs
+++ /dev/null
@@ -1,8 +0,0 @@
-namespace MQTTnet.Core.Client
-{
- public enum MqttConnectionType
- {
- Tcp,
- Ws
- }
-}
diff --git a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs
index 61a1b3b..df3aeb0 100644
--- a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs
+++ b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs
@@ -72,6 +72,9 @@ namespace MQTTnet.Core.Server
{
MqttTrace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to communication exception.");
}
+ if (exception is OperationCanceledException)
+ {
+ }
else
{
MqttTrace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed.");
diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs
index fe1ab38..e94bde0 100644
--- a/MQTTnet.Core/Server/MqttClientSession.cs
+++ b/MQTTnet.Core/Server/MqttClientSession.cs
@@ -8,6 +8,7 @@ using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
+using MQTTnet.Core.Serializer;
namespace MQTTnet.Core.Server
{
@@ -19,7 +20,8 @@ namespace MQTTnet.Core.Server
private readonly MqttClientSessionsManager _mqttClientSessionsManager;
private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
private readonly MqttServerOptions _options;
-
+
+ private IMqttCommunicationAdapter _adapter;
private CancellationTokenSource _cancellationTokenSource;
private MqttApplicationMessage _willMessage;
@@ -33,9 +35,9 @@ namespace MQTTnet.Core.Server
public string ClientId { get; }
- public bool IsConnected => Adapter != null;
+ public MqttProtocolVersion? ProtocolVersion => _adapter?.PacketSerializer.ProtocolVersion;
- public IMqttCommunicationAdapter Adapter { get; private set; }
+ public bool IsConnected => _adapter != null;
public async Task RunAsync(MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter)
{
@@ -45,7 +47,7 @@ namespace MQTTnet.Core.Server
try
{
- Adapter = adapter;
+ _adapter = adapter;
_cancellationTokenSource = new CancellationTokenSource();
_pendingMessagesQueue.Start(adapter, _cancellationTokenSource.Token);
@@ -75,7 +77,7 @@ namespace MQTTnet.Core.Server
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
- Adapter = null;
+ _adapter = null;
MqttTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId);
}
@@ -106,7 +108,7 @@ namespace MQTTnet.Core.Server
while (!cancellationToken.IsCancellationRequested)
{
var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
- await ProcessReceivedPacketAsync(packet).ConfigureAwait(false);
+ await ProcessReceivedPacketAsync(adapter, packet).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -124,28 +126,28 @@ namespace MQTTnet.Core.Server
}
}
- private async Task ProcessReceivedPacketAsync(MqttBasePacket packet)
+ private async Task ProcessReceivedPacketAsync(IMqttCommunicationAdapter adapter, MqttBasePacket packet)
{
if (packet is MqttSubscribePacket subscribePacket)
{
- await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Subscribe(subscribePacket));
+ await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Subscribe(subscribePacket));
EnqueueRetainedMessages(subscribePacket);
}
else if (packet is MqttUnsubscribePacket unsubscribePacket)
{
- await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Unsubscribe(unsubscribePacket));
+ await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Unsubscribe(unsubscribePacket));
}
else if (packet is MqttPublishPacket publishPacket)
{
- await HandleIncomingPublishPacketAsync(publishPacket);
+ await HandleIncomingPublishPacketAsync(adapter, publishPacket);
}
else if (packet is MqttPubRelPacket pubRelPacket)
{
- await HandleIncomingPubRelPacketAsync(pubRelPacket);
+ await HandleIncomingPubRelPacketAsync(adapter, pubRelPacket);
}
else if (packet is MqttPubRecPacket pubRecPacket)
{
- await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, pubRecPacket.CreateResponse());
+ await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, pubRecPacket.CreateResponse());
}
else if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
{
@@ -153,7 +155,7 @@ namespace MQTTnet.Core.Server
}
else if (packet is MqttPingReqPacket)
{
- await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPingRespPacket());
+ await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPingRespPacket());
}
else if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
{
@@ -175,7 +177,7 @@ namespace MQTTnet.Core.Server
}
}
- private async Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
+ private async Task HandleIncomingPublishPacketAsync(IMqttCommunicationAdapter adapter, MqttPublishPacket publishPacket)
{
if (publishPacket.Retain)
{
@@ -191,7 +193,7 @@ namespace MQTTnet.Core.Server
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
_mqttClientSessionsManager.DispatchPublishPacket(this, publishPacket);
- await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
+ await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
return;
}
@@ -205,21 +207,21 @@ namespace MQTTnet.Core.Server
_mqttClientSessionsManager.DispatchPublishPacket(this, publishPacket);
- await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
+ await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
return;
}
throw new MqttCommunicationException("Received a not supported QoS level.");
}
- private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket)
+ private Task HandleIncomingPubRelPacketAsync(IMqttCommunicationAdapter adapter, MqttPubRelPacket pubRelPacket)
{
lock (_unacknowledgedPublishPackets)
{
_unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
}
- return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier });
+ return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier });
}
}
}
diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs
index 981cdd3..df95c7e 100644
--- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs
+++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs
@@ -9,6 +9,7 @@ using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
+using MQTTnet.Core.Serializer;
namespace MQTTnet.Core.Server
{
@@ -24,9 +25,7 @@ namespace MQTTnet.Core.Server
}
public event EventHandler ApplicationMessageReceived;
-
public event EventHandler ClientConnected;
-
public event EventHandler ClientDisconnected;
public MqttClientRetainedMessagesManager RetainedMessagesManager { get; }
@@ -111,7 +110,7 @@ namespace MQTTnet.Core.Server
return _clientSessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient
{
ClientId = s.Value.ClientId,
- ProtocolVersion = s.Value.Adapter.PacketSerializer.ProtocolVersion
+ ProtocolVersion = s.Value.ProtocolVersion ?? MqttProtocolVersion.V311
}).ToList();
}
}
diff --git a/MQTTnet.sln b/MQTTnet.sln
index 419fa6a..8ba4c60 100644
--- a/MQTTnet.sln
+++ b/MQTTnet.sln
@@ -3,7 +3,7 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26730.16
MinimumVisualStudioVersion = 10.0.40219.1
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.NetFramework", "Frameworks\MQTTnet.NetFramework\MQTTnet.NetFramework.csproj", "{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}"
EndProject
diff --git a/README.md b/README.md
index 9a3f851..c3993a1 100644
--- a/README.md
+++ b/README.md
@@ -18,7 +18,7 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien
* Performance optimized (processing ~27.000 messages / second)*
* Interfaces included for mocking and testing
* Access to internal trace messages
-* Unit tested (57+ tests)
+* Unit tested (58+ tests)
\* Tested on local machine with MQTTnet client and server running in the same process using the TCP channel. The app for verification is part of this repository and stored in _/Tests/MQTTnet.TestApp.NetFramework_.
diff --git a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs
new file mode 100644
index 0000000..c83f83a
--- /dev/null
+++ b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs
@@ -0,0 +1,20 @@
+using MQTTnet.Core.Adapter;
+using MQTTnet.Core.Client;
+
+namespace MQTTnet.Core.Tests
+{
+ public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
+ {
+ private readonly IMqttCommunicationAdapter _adapter;
+
+ public MqttCommunicationAdapterFactory(IMqttCommunicationAdapter adapter)
+ {
+ _adapter = adapter;
+ }
+
+ public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
+ {
+ return _adapter;
+ }
+ }
+}
diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
index eb7cb83..de6265f 100644
--- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
@@ -6,7 +6,6 @@ using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Channel;
-using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Serializer;
@@ -411,7 +410,7 @@ namespace MQTTnet.Core.Tests
_stream.Position = 0;
}
- public Task ConnectAsync(MqttClientOptions options)
+ public Task ConnectAsync()
{
return Task.FromResult(0);
}
diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
index 19f96c6..f198576 100644
--- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
@@ -6,7 +6,6 @@ using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;
-using System;
namespace MQTTnet.Core.Tests
{
diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs
index 91d0719..9e71f8a 100644
--- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs
+++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs
@@ -4,7 +4,6 @@ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
-using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Serializer;
@@ -18,7 +17,7 @@ namespace MQTTnet.Core.Tests
public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer();
- public Task ConnectAsync(TimeSpan timeout, MqttClientOptions options)
+ public Task ConnectAsync(TimeSpan timeout)
{
return Task.FromResult(0);
}
diff --git a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs
index ad099d2..1906564 100644
--- a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs
+++ b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs
@@ -17,10 +17,11 @@ namespace MQTTnet.Core.Tests
adapterA.Partner = adapterB;
adapterB.Partner = adapterA;
- var client = new MqttClient(new MqttClientOptions() { ClientId = clientId }, adapterA);
+ var client = new MqttClient(new MqttCommunicationAdapterFactory(adapterA));
var connected = WaitForClientToConnect(server, clientId);
+
FireClientAcceptedEvent(adapterB);
- await client.ConnectAsync(willMessage);
+ await client.ConnectAsync(new MqttClientTcpOptions { ClientId = clientId, WillMessage = willMessage });
await connected;
return client;
@@ -30,17 +31,16 @@ namespace MQTTnet.Core.Tests
{
var tcs = new TaskCompletionSource