Browse Source

Merge branch 'master' into develop

release/3.x.x
Christian Kratky 7 years ago
parent
commit
487a97c31e
35 changed files with 372 additions and 195 deletions
  1. +7
    -7
      Build/MQTTnet.nuspec
  2. +2
    -2
      Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs
  3. +10
    -4
      Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
  4. +1
    -1
      Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
  5. +2
    -2
      Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs
  6. +2
    -2
      Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs
  7. +10
    -4
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  8. +2
    -2
      Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj
  9. +1
    -1
      Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
  10. +1
    -1
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs
  11. +9
    -3
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs
  12. +1
    -1
      Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
  13. +2
    -2
      Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs
  14. +3
    -0
      MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs
  15. +37
    -33
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  16. +93
    -51
      MQTTnet.Core/Client/MqttClient.cs
  17. +5
    -2
      MQTTnet.Core/Client/MqttClientOptions.cs
  18. +6
    -1
      MQTTnet.Core/Exceptions/MqttCommunicationException.cs
  19. +2
    -2
      MQTTnet.Core/MQTTnet.Core.csproj
  20. +5
    -1
      MQTTnet.Core/Packets/MqttConnectPacket.cs
  21. +2
    -0
      MQTTnet.Core/Serializer/IMqttPacketSerializer.cs
  22. +1
    -1
      MQTTnet.Core/Serializer/MqttPacketReader.cs
  23. +51
    -24
      MQTTnet.Core/Serializer/MqttPacketSerializer.cs
  24. +3
    -3
      MQTTnet.Core/Serializer/MqttPacketWriter.cs
  25. +8
    -0
      MQTTnet.Core/Serializer/MqttProtocolVersion.cs
  26. +11
    -0
      MQTTnet.Core/Server/ConnectedMqttClient.cs
  27. +1
    -1
      MQTTnet.Core/Server/IMqttServer.cs
  28. +18
    -14
      MQTTnet.Core/Server/MqttClientSession.cs
  29. +9
    -2
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  30. +1
    -1
      MQTTnet.Core/Server/MqttServer.cs
  31. +6
    -1
      MQTTnet.sln
  32. +26
    -21
      README.md
  33. +1
    -1
      Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj
  34. +30
    -4
      Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
  35. +3
    -0
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs

+ 7
- 7
Build/MQTTnet.nuspec View File

@@ -2,7 +2,7 @@
<package > <package >
<metadata> <metadata>
<id>MQTTnet</id> <id>MQTTnet</id>
<version>2.1.5</version>
<version>2.2.0</version>
<authors>Christian Kratky</authors> <authors>Christian Kratky</authors>
<owners>Christian Kratky</owners> <owners>Christian Kratky</owners>
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl> <licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl>
@@ -10,14 +10,14 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl> <iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance> <requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> <description>MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [MqttServer] Added support for publishing application messages
* [Core] Fixed QoS level 2 handling
* [Core] Performance optimizations
* [MqttClient/MqttServer] Errors while handline application messages are now catched and traced
* [MqttClient/MqttServer] Added interfaces
<releaseNotes>* [Server] Added support for MQTT protocol version 3.1.0
* [Server] Providing the used protocol version of connected clients
* [Client] Added support for protocol version 3.1.0
* [Core] Several minor performance improvements
* [Core] Fixed an issue with connection management (Thanks to wuzhenda; Zuendelmeister)
</releaseNotes> </releaseNotes>
<copyright>Copyright Christian Kratky 2016-2017</copyright> <copyright>Copyright Christian Kratky 2016-2017</copyright>
<tags>MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware</tags>
<tags>MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware Arduino</tags>
<dependencies> <dependencies>


<group targetFramework="netstandard1.3"> <group targetFramework="netstandard1.3">


+ 2
- 2
Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs View File

@@ -86,7 +86,7 @@ namespace MQTTnet.Implementations
try try
{ {
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null); var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new DefaultMqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
} }
catch (Exception exception) when (!(exception is ObjectDisposedException)) catch (Exception exception) when (!(exception is ObjectDisposedException))
@@ -107,7 +107,7 @@ namespace MQTTnet.Implementations
var sslStream = new SslStream(new NetworkStream(clientSocket)); var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new DefaultMqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
} }
catch (Exception exception) catch (Exception exception)


+ 10
- 4
Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs View File

@@ -12,12 +12,11 @@ namespace MQTTnet.Implementations
{ {
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{ {
private readonly Socket _socket;
private Socket _socket;
private SslStream _sslStream; private SslStream _sslStream;


public MqttTcpChannel() public MqttTcpChannel()
{ {
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
} }


public MqttTcpChannel(Socket socket, SslStream sslStream) public MqttTcpChannel(Socket socket, SslStream sslStream)
@@ -31,6 +30,11 @@ namespace MQTTnet.Implementations
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
try try
{ {
if (_socket == null)
{
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}

await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null); await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null);


if (options.TlsOptions.UseTls) if (options.TlsOptions.UseTls)
@@ -49,8 +53,7 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
_sslStream.Dispose();
_socket.Dispose();
Dispose();
return Task.FromResult(0); return Task.FromResult(0);
} }
catch (SocketException exception) catch (SocketException exception)
@@ -108,6 +111,9 @@ namespace MQTTnet.Implementations
{ {
_socket?.Dispose(); _socket?.Dispose();
_sslStream?.Dispose(); _sslStream?.Dispose();

_socket = null;
_sslStream = null;
} }


private static X509CertificateCollection LoadCertificates(MqttClientOptions options) private static X509CertificateCollection LoadCertificates(MqttClientOptions options)


+ 1
- 1
Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs View File

@@ -12,7 +12,7 @@ namespace MQTTnet
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));


return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer()));
} }
} }
} }

+ 2
- 2
Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs View File

@@ -11,5 +11,5 @@ using System.Runtime.InteropServices;
[assembly: AssemblyCulture("")] [assembly: AssemblyCulture("")]
[assembly: ComVisible(false)] [assembly: ComVisible(false)]
[assembly: Guid("a480ef90-0eaa-4d9a-b271-47a9c47f6f7d")] [assembly: Guid("a480ef90-0eaa-4d9a-b271-47a9c47f6f7d")]
[assembly: AssemblyVersion("2.1.4.0")]
[assembly: AssemblyFileVersion("2.1.4.0")]
[assembly: AssemblyVersion("2.1.5.1")]
[assembly: AssemblyFileVersion("2.1.5.1")]

+ 2
- 2
Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs View File

@@ -84,7 +84,7 @@ namespace MQTTnet.Implementations
try try
{ {
var clientSocket = await _defaultEndpointSocket.AcceptAsync(); var clientSocket = await _defaultEndpointSocket.AcceptAsync();
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new DefaultMqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
} }
catch (Exception exception) catch (Exception exception)
@@ -105,7 +105,7 @@ namespace MQTTnet.Implementations
var sslStream = new SslStream(new NetworkStream(clientSocket)); var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new DefaultMqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
} }
catch (Exception exception) catch (Exception exception)


+ 10
- 4
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs View File

@@ -12,12 +12,11 @@ namespace MQTTnet.Implementations
{ {
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{ {
private readonly Socket _socket;
private Socket _socket;
private SslStream _sslStream; private SslStream _sslStream;


public MqttTcpChannel() public MqttTcpChannel()
{ {
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
} }


public MqttTcpChannel(Socket socket, SslStream sslStream) public MqttTcpChannel(Socket socket, SslStream sslStream)
@@ -31,6 +30,11 @@ namespace MQTTnet.Implementations
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
try try
{ {
if (_socket == null)
{
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}

await _socket.ConnectAsync(options.Server, options.GetPort()); await _socket.ConnectAsync(options.Server, options.GetPort());
if (options.TlsOptions.UseTls) if (options.TlsOptions.UseTls)
@@ -49,8 +53,7 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
_sslStream.Dispose();
_socket.Dispose();
Dispose();
return Task.FromResult(0); return Task.FromResult(0);
} }
catch (SocketException exception) catch (SocketException exception)
@@ -101,6 +104,9 @@ namespace MQTTnet.Implementations
{ {
_socket?.Dispose(); _socket?.Dispose();
_sslStream?.Dispose(); _sslStream?.Dispose();

_socket = null;
_sslStream = null;
} }


private static X509CertificateCollection LoadCertificates(MqttClientOptions options) private static X509CertificateCollection LoadCertificates(MqttClientOptions options)


+ 2
- 2
Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj View File

@@ -4,8 +4,8 @@
<TargetFramework>netstandard1.3</TargetFramework> <TargetFramework>netstandard1.3</TargetFramework>
<AssemblyName>MQTTnet</AssemblyName> <AssemblyName>MQTTnet</AssemblyName>
<RootNamespace>MQTTnet</RootNamespace> <RootNamespace>MQTTnet</RootNamespace>
<AssemblyVersion>2.1.4.0</AssemblyVersion>
<FileVersion>2.1.4.0</FileVersion>
<AssemblyVersion>2.1.5.1</AssemblyVersion>
<FileVersion>2.1.5.1</FileVersion>
<Version>0.0.0.0</Version> <Version>0.0.0.0</Version>
<Company /> <Company />
<Product /> <Product />


+ 1
- 1
Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs View File

@@ -12,7 +12,7 @@ namespace MQTTnet
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));


return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer()));
} }
} }
} }

+ 1
- 1
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs View File

@@ -52,7 +52,7 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new DefaultMqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter));
} }
catch (Exception exception) catch (Exception exception)


+ 9
- 3
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs View File

@@ -15,11 +15,10 @@ namespace MQTTnet.Implementations
{ {
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{ {
private readonly StreamSocket _socket;
private StreamSocket _socket;


public MqttTcpChannel() public MqttTcpChannel()
{ {
_socket = new StreamSocket();
} }


public MqttTcpChannel(StreamSocket socket) public MqttTcpChannel(StreamSocket socket)
@@ -32,6 +31,11 @@ namespace MQTTnet.Implementations
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
try try
{ {
if (_socket == null)
{
_socket = new StreamSocket();
}

if (!options.TlsOptions.UseTls) if (!options.TlsOptions.UseTls)
{ {
await _socket.ConnectAsync(new HostName(options.Server), options.GetPort().ToString()); await _socket.ConnectAsync(new HostName(options.Server), options.GetPort().ToString());
@@ -59,7 +63,7 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
_socket.Dispose();
Dispose();
return Task.FromResult(0); return Task.FromResult(0);
} }
catch (SocketException exception) catch (SocketException exception)
@@ -100,6 +104,8 @@ namespace MQTTnet.Implementations
public void Dispose() public void Dispose()
{ {
_socket?.Dispose(); _socket?.Dispose();

_socket = null;
} }


private static Certificate LoadCertificate(MqttClientOptions options) private static Certificate LoadCertificate(MqttClientOptions options)


+ 1
- 1
Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs View File

@@ -12,7 +12,7 @@ namespace MQTTnet
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));


return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer()));
} }
} }
} }

+ 2
- 2
Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs View File

@@ -10,5 +10,5 @@ using System.Runtime.InteropServices;
[assembly: AssemblyTrademark("")] [assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")] [assembly: AssemblyCulture("")]
[assembly: ComVisible(false)] [assembly: ComVisible(false)]
[assembly: AssemblyVersion("2.1.4.0")]
[assembly: AssemblyFileVersion("2.1.4.0")]
[assembly: AssemblyVersion("2.1.5.1")]
[assembly: AssemblyFileVersion("2.1.5.1")]

+ 3
- 0
MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs View File

@@ -2,6 +2,7 @@
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Serializer;


namespace MQTTnet.Core.Adapter namespace MQTTnet.Core.Adapter
{ {
@@ -14,5 +15,7 @@ namespace MQTTnet.Core.Adapter
Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout); Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout);


Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout); Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout);

IMqttPacketSerializer PacketSerializer { get; }
} }
} }

+ 37
- 33
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs View File

@@ -11,22 +11,19 @@ namespace MQTTnet.Core.Adapter
{ {
public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter
{ {
private readonly IMqttPacketSerializer _serializer;
private readonly IMqttCommunicationChannel _channel; private readonly IMqttCommunicationChannel _channel;


public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer) public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer)
{ {
_channel = channel ?? throw new ArgumentNullException(nameof(channel)); _channel = channel ?? throw new ArgumentNullException(nameof(channel));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
} }


public IMqttPacketSerializer PacketSerializer { get; }

public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
{ {
var task = _channel.ConnectAsync(options);
if (await Task.WhenAny(Task.Delay(timeout), task) != task)
{
throw new MqttCommunicationTimedOutException();
}
await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout);
} }


public async Task DisconnectAsync() public async Task DisconnectAsync()
@@ -38,21 +35,7 @@ namespace MQTTnet.Core.Adapter
{ {
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]");


bool hasTimeout;
try
{
var task = _serializer.SerializeAsync(packet, _channel);
hasTimeout = await Task.WhenAny(Task.Delay(timeout), task) != task;
}
catch (Exception exception)
{
throw new MqttCommunicationException(exception);
}

if (hasTimeout)
{
throw new MqttCommunicationTimedOutException();
}
await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout);
} }


public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout) public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
@@ -60,20 +43,11 @@ namespace MQTTnet.Core.Adapter
MqttBasePacket packet; MqttBasePacket packet;
if (timeout > TimeSpan.Zero) if (timeout > TimeSpan.Zero)
{ {
var workerTask = _serializer.DeserializeAsync(_channel);
var timeoutTask = Task.Delay(timeout);
var hasTimeout = Task.WhenAny(timeoutTask, workerTask) == timeoutTask;

if (hasTimeout)
{
throw new MqttCommunicationTimedOutException();
}

packet = workerTask.Result;
packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout);
} }
else else
{ {
packet = await _serializer.DeserializeAsync(_channel);
packet = await PacketSerializer.DeserializeAsync(_channel);
} }


if (packet == null) if (packet == null)
@@ -84,5 +58,35 @@ namespace MQTTnet.Core.Adapter
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"RX <<< {packet}"); MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"RX <<< {packet}");
return packet; return packet;
} }

private static async Task<TResult> ExecuteWithTimeoutAsync<TResult>(Task<TResult> task, TimeSpan timeout)
{
var timeoutTask = Task.Delay(timeout);
if (await Task.WhenAny(timeoutTask, task) == timeoutTask)
{
throw new MqttCommunicationTimedOutException();
}

if (task.IsFaulted)
{
throw new MqttCommunicationException(task.Exception);
}

return task.Result;
}

private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout)
{
var timeoutTask = Task.Delay(timeout);
if (await Task.WhenAny(timeoutTask, task) == timeoutTask)
{
throw new MqttCommunicationTimedOutException();
}

if (task.IsFaulted)
{
throw new MqttCommunicationException(task.Exception);
}
}
} }
} }

+ 93
- 51
MQTTnet.Core/Client/MqttClient.cs View File

@@ -20,6 +20,7 @@ namespace MQTTnet.Core.Client
private readonly MqttClientOptions _options; private readonly MqttClientOptions _options;
private readonly IMqttCommunicationAdapter _adapter; private readonly IMqttCommunicationAdapter _adapter;


private bool _disconnectedEventSuspended;
private int _latestPacketIdentifier; private int _latestPacketIdentifier;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;


@@ -27,6 +28,8 @@ namespace MQTTnet.Core.Client
{ {
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));

_adapter.PacketSerializer.ProtocolVersion = options.ProtocolVersion;
} }


public event EventHandler Connected; public event EventHandler Connected;
@@ -46,49 +49,64 @@ namespace MQTTnet.Core.Client
throw new MqttProtocolViolationException("It is not allowed to connect with a server after the connection is established."); throw new MqttProtocolViolationException("It is not allowed to connect with a server after the connection is established.");
} }


var connectPacket = new MqttConnectPacket
try
{ {
ClientId = _options.ClientId,
Username = _options.UserName,
Password = _options.Password,
CleanSession = _options.CleanSession,
KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
WillMessage = willApplicationMessage
};
_disconnectedEventSuspended = false;


await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout);
MqttTrace.Verbose(nameof(MqttClient), "Connection with server established.");
await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout);


_cancellationTokenSource = new CancellationTokenSource();
_latestPacketIdentifier = 0;
_packetDispatcher.Reset();
IsConnected = true;
MqttTrace.Verbose(nameof(MqttClient), "Connection with server established.");


#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ReceivePackets(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
var connectPacket = new MqttConnectPacket
{
ClientId = _options.ClientId,
Username = _options.UserName,
Password = _options.Password,
CleanSession = _options.CleanSession,
KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
WillMessage = willApplicationMessage
};

_cancellationTokenSource = new CancellationTokenSource();
_latestPacketIdentifier = 0;
_packetDispatcher.Reset();

StartReceivePackets();

var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket);
if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
await DisconnectInternalAsync();
throw new MqttConnectingFailedException(response.ConnectReturnCode);
}


var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket);
if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
await DisconnectAsync();
throw new MqttConnectingFailedException(response.ConnectReturnCode);
}
if (_options.KeepAlivePeriod != TimeSpan.Zero)
{
StartSendKeepAliveMessages();
}
MqttTrace.Verbose(nameof(MqttClient), "MQTT connection with server established.");


if (_options.KeepAlivePeriod != TimeSpan.Zero)
IsConnected = true;
Connected?.Invoke(this, EventArgs.Empty);
}
catch (Exception)
{ {
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => SendKeepAliveMessagesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
await DisconnectInternalAsync();
throw;
} }

Connected?.Invoke(this, EventArgs.Empty);
} }


public async Task DisconnectAsync() public async Task DisconnectAsync()
{ {
await SendAsync(new MqttDisconnectPacket());
await DisconnectInternalAsync();
try
{
await SendAsync(new MqttDisconnectPacket());
}
finally
{
await DisconnectInternalAsync();
}
} }


public Task<IList<MqttSubscribeResult>> SubscribeAsync(params TopicFilter[] topicFilters) public Task<IList<MqttSubscribeResult>> SubscribeAsync(params TopicFilter[] topicFilters)
@@ -179,8 +197,9 @@ namespace MQTTnet.Core.Client
{ {
await _adapter.DisconnectAsync(); await _adapter.DisconnectAsync();
} }
catch
catch (Exception exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting.");
} }
finally finally
{ {
@@ -189,7 +208,12 @@ namespace MQTTnet.Core.Client
_cancellationTokenSource = null; _cancellationTokenSource = null;


IsConnected = false; IsConnected = false;
Disconnected?.Invoke(this, EventArgs.Empty);

if (!_disconnectedEventSuspended)
{
_disconnectedEventSuspended = true;
Disconnected?.Invoke(this, EventArgs.Empty);
}
} }
} }


@@ -237,7 +261,7 @@ namespace MQTTnet.Core.Client
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message.");
MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message.");
} }
} }


@@ -267,7 +291,7 @@ namespace MQTTnet.Core.Client
return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
} }


throw new InvalidOperationException();
throw new MqttCommunicationException("Received a not supported QoS level.");
} }


private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
@@ -276,7 +300,7 @@ namespace MQTTnet.Core.Client
{ {
_unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
} }
await SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>()); await SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>());
} }


@@ -298,15 +322,12 @@ namespace MQTTnet.Core.Client
var pi1 = requestPacket as IMqttPacketWithIdentifier; var pi1 = requestPacket as IMqttPacketWithIdentifier;
var pi2 = p as IMqttPacketWithIdentifier; var pi2 = p as IMqttPacketWithIdentifier;


if (pi1 != null && pi2 != null)
if (pi1 == null || pi2 == null)
{ {
if (pi1.PacketIdentifier != pi2.PacketIdentifier)
{
return false;
}
return true;
} }


return true;
return pi1.PacketIdentifier == pi2.PacketIdentifier;
} }


await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout); await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout);
@@ -333,15 +354,16 @@ namespace MQTTnet.Core.Client
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets.");
await DisconnectInternalAsync();
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets."); MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets.");
await DisconnectInternalAsync();
} }
finally finally
{ {
MqttTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); MqttTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets.");
await DisconnectInternalAsync();
} }
} }


@@ -352,27 +374,47 @@ namespace MQTTnet.Core.Client
{ {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
var mqttPacket = await _adapter.ReceivePacketAsync(TimeSpan.Zero);
MqttTrace.Information(nameof(MqttClient), $"Received <<< {mqttPacket}");
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero);
MqttTrace.Information(nameof(MqttClient), $"Received <<< {packet}");


#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ProcessReceivedPacketAsync(mqttPacket), cancellationToken);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
StartProcessReceivedPacket(packet, cancellationToken);
} }
} }
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets.");
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets.");
await DisconnectInternalAsync();
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Error(nameof(MqttClient), exception, "Error while receiving packets.");
MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets.");
await DisconnectInternalAsync();
} }
finally finally
{ {
MqttTrace.Information(nameof(MqttClient), "Stopped receiving packets."); MqttTrace.Information(nameof(MqttClient), "Stopped receiving packets.");
await DisconnectInternalAsync();
} }
} }

private void StartProcessReceivedPacket(MqttBasePacket packet, CancellationToken cancellationToken)
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ProcessReceivedPacketAsync(packet), cancellationToken);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}

private void StartReceivePackets()
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ReceivePackets(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}

private void StartSendKeepAliveMessages()
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => SendKeepAliveMessagesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
} }
} }

+ 5
- 2
MQTTnet.Core/Client/MqttClientOptions.cs View File

@@ -1,4 +1,5 @@
using System; using System;
using MQTTnet.Core.Serializer;


namespace MQTTnet.Core.Client namespace MQTTnet.Core.Client
{ {
@@ -7,8 +8,8 @@ namespace MQTTnet.Core.Client
public string Server { get; set; } public string Server { get; set; }


public int? Port { get; set; } public int? Port { get; set; }
public MqttClientTlsOptions TlsOptions { get; } = new MqttClientTlsOptions();
public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions();


public string UserName { get; set; } public string UserName { get; set; }


@@ -21,5 +22,7 @@ namespace MQTTnet.Core.Client
public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5);


public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
} }
} }

+ 6
- 1
MQTTnet.Core/Exceptions/MqttCommunicationException.cs View File

@@ -4,7 +4,7 @@ namespace MQTTnet.Core.Exceptions
{ {
public class MqttCommunicationException : Exception public class MqttCommunicationException : Exception
{ {
public MqttCommunicationException()
protected MqttCommunicationException()
{ {
} }


@@ -17,5 +17,10 @@ namespace MQTTnet.Core.Exceptions
: base(message) : base(message)
{ {
} }

public MqttCommunicationException(string message, Exception innerException)
: base(message, innerException)
{
}
} }
} }

+ 2
- 2
MQTTnet.Core/MQTTnet.Core.csproj View File

@@ -16,8 +16,8 @@
<PackageIconUrl></PackageIconUrl> <PackageIconUrl></PackageIconUrl>
<RepositoryUrl></RepositoryUrl> <RepositoryUrl></RepositoryUrl>
<PackageTags></PackageTags> <PackageTags></PackageTags>
<FileVersion>2.1.4.0</FileVersion>
<AssemblyVersion>2.1.4.0</AssemblyVersion>
<FileVersion>2.1.5.1</FileVersion>
<AssemblyVersion>2.1.5.1</AssemblyVersion>
<PackageLicenseUrl></PackageLicenseUrl> <PackageLicenseUrl></PackageLicenseUrl>
</PropertyGroup> </PropertyGroup>




+ 5
- 1
MQTTnet.Core/Packets/MqttConnectPacket.cs View File

@@ -1,7 +1,11 @@
namespace MQTTnet.Core.Packets
using MQTTnet.Core.Serializer;

namespace MQTTnet.Core.Packets
{ {
public sealed class MqttConnectPacket: MqttBasePacket public sealed class MqttConnectPacket: MqttBasePacket
{ {
public MqttProtocolVersion ProtocolVersion { get; set; }

public string ClientId { get; set; } public string ClientId { get; set; }


public string Username { get; set; } public string Username { get; set; }


+ 2
- 0
MQTTnet.Core/Serializer/IMqttPacketSerializer.cs View File

@@ -6,6 +6,8 @@ namespace MQTTnet.Core.Serializer
{ {
public interface IMqttPacketSerializer public interface IMqttPacketSerializer
{ {
MqttProtocolVersion ProtocolVersion { get; set; }

Task SerializeAsync(MqttBasePacket mqttPacket, IMqttCommunicationChannel destination); Task SerializeAsync(MqttBasePacket mqttPacket, IMqttCommunicationChannel destination);


Task<MqttBasePacket> DeserializeAsync(IMqttCommunicationChannel source); Task<MqttBasePacket> DeserializeAsync(IMqttCommunicationChannel source);


+ 1
- 1
MQTTnet.Core/Serializer/MqttPacketReader.cs View File

@@ -10,7 +10,7 @@ namespace MQTTnet.Core.Serializer
{ {
public sealed class MqttPacketReader : IDisposable public sealed class MqttPacketReader : IDisposable
{ {
private readonly MemoryStream _remainingData = new MemoryStream();
private readonly MemoryStream _remainingData = new MemoryStream(1024);
private readonly IMqttCommunicationChannel _source; private readonly IMqttCommunicationChannel _source;


private int _remainingLength; private int _remainingLength;


MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs → MQTTnet.Core/Serializer/MqttPacketSerializer.cs View File

@@ -9,8 +9,13 @@ using MQTTnet.Core.Protocol;


namespace MQTTnet.Core.Serializer namespace MQTTnet.Core.Serializer
{ {
public sealed class DefaultMqttV311PacketSerializer : IMqttPacketSerializer
public sealed class MqttPacketSerializer : IMqttPacketSerializer
{ {
private static byte[] ProtocolVersionV311Name { get; } = Encoding.UTF8.GetBytes("MQTT");
private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs");

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;

public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination) public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination)
{ {
if (packet == null) throw new ArgumentNullException(nameof(packet)); if (packet == null) throw new ArgumentNullException(nameof(packet));
@@ -255,12 +260,21 @@ namespace MQTTnet.Core.Serializer
private static async Task<MqttBasePacket> DeserializeConnectAsync(MqttPacketReader reader) private static async Task<MqttBasePacket> DeserializeConnectAsync(MqttPacketReader reader)
{ {
await reader.ReadRemainingDataAsync(2); // Skip 2 bytes await reader.ReadRemainingDataAsync(2); // Skip 2 bytes
var protocolName = await reader.ReadRemainingDataAsync(4);


if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT")
MqttProtocolVersion protocolVersion;
var protocolName = await reader.ReadRemainingDataAsync(4);
if (protocolName.SequenceEqual(ProtocolVersionV310Name))
{
await reader.ReadRemainingDataAsync(2);
protocolVersion = MqttProtocolVersion.V310;
}
else if (protocolName.SequenceEqual(ProtocolVersionV311Name))
{ {
throw new MqttProtocolViolationException("Protocol name is not 'MQTT'.");
protocolVersion = MqttProtocolVersion.V311;
}
else
{
throw new MqttProtocolViolationException("Protocol name is not supported.");
} }


var protocolLevel = await reader.ReadRemainingDataByteAsync(); var protocolLevel = await reader.ReadRemainingDataByteAsync();
@@ -271,6 +285,7 @@ namespace MQTTnet.Core.Serializer


var packet = new MqttConnectPacket var packet = new MqttConnectPacket
{ {
ProtocolVersion = protocolVersion,
CleanSession = connectFlagsReader.Read() CleanSession = connectFlagsReader.Read()
}; };


@@ -351,9 +366,7 @@ namespace MQTTnet.Core.Serializer
} }
} }


private static readonly byte[] MqttPrefix = Encoding.UTF8.GetBytes("MQTT");

private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
{ {
ValidateConnectPacket(packet); ValidateConnectPacket(packet);


@@ -361,9 +374,19 @@ namespace MQTTnet.Core.Serializer
{ {
// Write variable header // Write variable header
output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name
output.Write(MqttPrefix);
output.Write(0x04); // 3.1.2.2 Protocol Level

if (ProtocolVersion == MqttProtocolVersion.V311)
{
output.Write(ProtocolVersionV311Name);
output.Write(0x04); // 3.1.2.2 Protocol Level (4)
}
else
{
output.Write(ProtocolVersionV310Name);
output.Write(0x64);
output.Write(0x70);
output.Write(0x03); // Protocol Level (3)
}
var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
connectFlags.Write(false); // Reserved connectFlags.Write(false); // Reserved
connectFlags.Write(packet.CleanSession); connectFlags.Write(packet.CleanSession);
@@ -408,13 +431,17 @@ namespace MQTTnet.Core.Serializer
} }
} }


private static Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())
{ {
var connectAcknowledgeFlags = new ByteWriter(); var connectAcknowledgeFlags = new ByteWriter();
connectAcknowledgeFlags.Write(packet.IsSessionPresent);


if (ProtocolVersion == MqttProtocolVersion.V311)
{
connectAcknowledgeFlags.Write(packet.IsSessionPresent);
}
output.Write(connectAcknowledgeFlags); output.Write(connectAcknowledgeFlags);
output.Write((byte)packet.ConnectReturnCode); output.Write((byte)packet.ConnectReturnCode);


@@ -423,6 +450,17 @@ namespace MQTTnet.Core.Serializer
} }
} }


private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);

output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02);
await output.WriteToAsync(destination);
}
}

private static Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination) private static Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination)
{ {
return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination); return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination);
@@ -495,17 +533,6 @@ namespace MQTTnet.Core.Serializer
} }
} }


private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);

output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02);
await output.WriteToAsync(destination);
}
}

private static Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination) private static Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())

+ 3
- 3
MQTTnet.Core/Serializer/MqttPacketWriter.cs View File

@@ -9,13 +9,13 @@ namespace MQTTnet.Core.Serializer
{ {
public sealed class MqttPacketWriter : IDisposable public sealed class MqttPacketWriter : IDisposable
{ {
private readonly MemoryStream _buffer = new MemoryStream(512);
private readonly MemoryStream _buffer = new MemoryStream(1024);


public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0) public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0)
{ {
var fixedHeader = (byte)((byte)packetType << 4);
var fixedHeader = (int)packetType << 4;
fixedHeader |= flags; fixedHeader |= flags;
InjectFixedHeader(fixedHeader);
InjectFixedHeader((byte)fixedHeader);
} }


public void Write(byte value) public void Write(byte value)


+ 8
- 0
MQTTnet.Core/Serializer/MqttProtocolVersion.cs View File

@@ -0,0 +1,8 @@
namespace MQTTnet.Core.Serializer
{
public enum MqttProtocolVersion
{
V311,
V310
}
}

+ 11
- 0
MQTTnet.Core/Server/ConnectedMqttClient.cs View File

@@ -0,0 +1,11 @@
using MQTTnet.Core.Serializer;

namespace MQTTnet.Core.Server
{
public class ConnectedMqttClient
{
public string ClientId { get; set; }

public MqttProtocolVersion ProtocolVersion { get; set; }
}
}

+ 1
- 1
MQTTnet.Core/Server/IMqttServer.cs View File

@@ -9,7 +9,7 @@ namespace MQTTnet.Core.Server
event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
event EventHandler<MqttClientConnectedEventArgs> ClientConnected; event EventHandler<MqttClientConnectedEventArgs> ClientConnected;


IList<string> GetConnectedClients();
IList<ConnectedMqttClient> GetConnectedClients();
void InjectClient(string identifier, IMqttCommunicationAdapter adapter); void InjectClient(string identifier, IMqttCommunicationAdapter adapter);
void Publish(MqttApplicationMessage applicationMessage); void Publish(MqttApplicationMessage applicationMessage);
void Start(); void Start();


+ 18
- 14
MQTTnet.Core/Server/MqttClientSession.cs View File

@@ -21,7 +21,6 @@ namespace MQTTnet.Core.Server
private readonly MqttServerOptions _options; private readonly MqttServerOptions _options;


private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
private IMqttCommunicationAdapter _adapter;
private string _identifier; private string _identifier;
private MqttApplicationMessage _willApplicationMessage; private MqttApplicationMessage _willApplicationMessage;


@@ -36,7 +35,9 @@ namespace MQTTnet.Core.Server


public string ClientId { get; } public string ClientId { get; }


public bool IsConnected => _adapter != null;
public bool IsConnected => Adapter != null;

public IMqttCommunicationAdapter Adapter { get; private set; }


public async Task RunAsync(string identifier, MqttApplicationMessage willApplicationMessage, IMqttCommunicationAdapter adapter) public async Task RunAsync(string identifier, MqttApplicationMessage willApplicationMessage, IMqttCommunicationAdapter adapter)
{ {
@@ -47,7 +48,7 @@ namespace MQTTnet.Core.Server
try try
{ {
_identifier = identifier; _identifier = identifier;
_adapter = adapter;
Adapter = adapter;
_cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource = new CancellationTokenSource();


_messageQueue.Start(adapter); _messageQueue.Start(adapter);
@@ -73,7 +74,7 @@ namespace MQTTnet.Core.Server


_messageQueue.Stop(); _messageQueue.Stop();
_cancellationTokenSource.Cancel(); _cancellationTokenSource.Cancel();
_adapter = null;
Adapter = null;


MqttTrace.Information(nameof(MqttClientSession), $"Client '{_identifier}': Disconnected."); MqttTrace.Information(nameof(MqttClientSession), $"Client '{_identifier}': Disconnected.");
} }
@@ -102,12 +103,12 @@ namespace MQTTnet.Core.Server
{ {
if (packet is MqttSubscribePacket subscribePacket) if (packet is MqttSubscribePacket subscribePacket)
{ {
return _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout);
} }


if (packet is MqttUnsubscribePacket unsubscribePacket) if (packet is MqttUnsubscribePacket unsubscribePacket)
{ {
return _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout);
} }


if (packet is MqttPublishPacket publishPacket) if (packet is MqttPublishPacket publishPacket)
@@ -122,7 +123,7 @@ namespace MQTTnet.Core.Server


if (packet is MqttPubRecPacket pubRecPacket) if (packet is MqttPubRecPacket pubRecPacket)
{ {
return _adapter.SendPacketAsync(pubRecPacket.CreateResponse<MqttPubRelPacket>(), _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(pubRecPacket.CreateResponse<MqttPubRelPacket>(), _options.DefaultCommunicationTimeout);
} }


if (packet is MqttPubAckPacket || packet is MqttPubCompPacket) if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
@@ -133,7 +134,7 @@ namespace MQTTnet.Core.Server


if (packet is MqttPingReqPacket) if (packet is MqttPingReqPacket)
{ {
return _adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout);
} }


if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
@@ -153,13 +154,16 @@ namespace MQTTnet.Core.Server
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{ {
_publishPacketReceivedCallback(this, publishPacket); _publishPacketReceivedCallback(this, publishPacket);
return Task.FromResult(0);
} }
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{ {
_publishPacketReceivedCallback(this, publishPacket); _publishPacketReceivedCallback(this, publishPacket);
return _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
} }
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{ {
// QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
lock (_unacknowledgedPublishPackets) lock (_unacknowledgedPublishPackets)
@@ -169,10 +173,10 @@ namespace MQTTnet.Core.Server


_publishPacketReceivedCallback(this, publishPacket); _publishPacketReceivedCallback(this, publishPacket);


return _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
} }


throw new MqttCommunicationException("Received not supported QoS level.");
throw new MqttCommunicationException("Received a not supported QoS level.");
} }


private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket) private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket)
@@ -182,7 +186,7 @@ namespace MQTTnet.Core.Server
_unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
} }


return _adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
} }
} }
} }

+ 9
- 2
MQTTnet.Core/Server/MqttClientSessionsManager.cs View File

@@ -34,6 +34,9 @@ namespace MQTTnet.Core.Server
throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
} }


// Switch to the required protocol version before sending any response.
eventArgs.ClientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion;

var connectReturnCode = ValidateConnection(connectPacket); var connectReturnCode = ValidateConnection(connectPacket);
if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{ {
@@ -73,11 +76,15 @@ namespace MQTTnet.Core.Server
} }
} }


public IList<string> GetConnectedClients()
public IList<ConnectedMqttClient> GetConnectedClients()
{ {
lock (_syncRoot) lock (_syncRoot)
{ {
return _clientSessions.Where(s => s.Value.IsConnected).Select(s => s.Key).ToList();
return _clientSessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient
{
ClientId = s.Value.ClientId,
ProtocolVersion = s.Value.Adapter.PacketSerializer.ProtocolVersion
}).ToList();
} }
} }




+ 1
- 1
MQTTnet.Core/Server/MqttServer.cs View File

@@ -25,7 +25,7 @@ namespace MQTTnet.Core.Server
_clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); _clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e);
} }


public IList<string> GetConnectedClients()
public IList<ConnectedMqttClient> GetConnectedClients()
{ {
return _clientSessionsManager.GetConnectedClients(); return _clientSessionsManager.GetConnectedClients();
} }


+ 6
- 1
MQTTnet.sln View File

@@ -1,7 +1,7 @@
 
Microsoft Visual Studio Solution File, Format Version 12.00 Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15 # Visual Studio 15
VisualStudioVersion = 15.0.26430.15
VisualStudioVersion = 15.0.26430.16
MinimumVisualStudioVersion = 10.0.40219.1 MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject EndProject
@@ -27,6 +27,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1
Build\MQTTnet.nuspec = Build\MQTTnet.nuspec Build\MQTTnet.nuspec = Build\MQTTnet.nuspec
EndProjectSection EndProjectSection
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{B3F60ECB-45BA-4C66-8903-8BB89CA67998}"
ProjectSection(SolutionItems) = preProject
README.md = README.md
EndProjectSection
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU


+ 26
- 21
README.md View File

@@ -5,40 +5,47 @@
[![NuGet Badge](https://buildstats.info/nuget/MQTTnet)](https://www.nuget.org/packages/MQTTnet) [![NuGet Badge](https://buildstats.info/nuget/MQTTnet)](https://www.nuget.org/packages/MQTTnet)


# MQTTnet # MQTTnet
MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server. The implementation is based on the documentation from http://mqtt.org/.
MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.


## Features
* MQTT client included
* MQTT server (broker) included
* TLS 1.2 support for client and server (but not UWP servers)
# Features

## General
* Async support * Async support
* Rx support (via another project)
* List of connected clients available (server only)
* TLS 1.2 support for client and server (but not UWP servers)
* Extensible communication channels (i.e. In-Memory, TCP, TCP+SSL, WebSockets (not included in this project)) * Extensible communication channels (i.e. In-Memory, TCP, TCP+SSL, WebSockets (not included in this project))
* Server is able to publish its own messages (no loopback client required)
* Interfaces included for mocking and testing
* Lightweight (only the low level implementation of MQTT, no overhead)
* Access to internal trace messages * Access to internal trace messages
* Extensible client credential validation (server only)
* Unit tested (50+ tests) * Unit tested (50+ tests)
* Lightweight (only the low level implementation of MQTT, no overhead)
* Interfaces included for mocking and testing


## Supported frameworks
## Client
* Rx support (via another project)

## Server (broker)
* List of connected clients available
* Supports connected clients with different protocol versions at the same time
* Able to publish its own messages (no loopback client required)
* Able to receive every messages (no loopback client required)
* Extensible client credential validation

# Supported frameworks
* .NET Standard 1.3+ * .NET Standard 1.3+
* .NET Core 1.1+ * .NET Core 1.1+
* .NET Core App 1.1+ * .NET Core App 1.1+
* .NET Framework 4.5.2+ (x86, x64, AnyCPU) * .NET Framework 4.5.2+ (x86, x64, AnyCPU)
* Universal Windows (UWP) 10.0.10240+ (x86, x64, ARM, AnyCPU) * Universal Windows (UWP) 10.0.10240+ (x86, x64, ARM, AnyCPU)


## Supported MQTT versions
# Supported MQTT versions
* 3.1.1 * 3.1.1
* 3.1.0


## Nuget
# Nuget
This library is available as a nuget package: https://www.nuget.org/packages/MQTTnet/ This library is available as a nuget package: https://www.nuget.org/packages/MQTTnet/


## Contributions
# Contributions
If you want to contribute to this project just create a pull request. If you want to contribute to this project just create a pull request.


## References
# References
This library is used in the following projects: This library is used in the following projects:


* MQTT Client Rx (Wrapper for Reactive Extensions, https://github.com/1iveowl/MQTTClient.rx) * MQTT Client Rx (Wrapper for Reactive Extensions, https://github.com/1iveowl/MQTTClient.rx)
@@ -46,8 +53,8 @@ This library is used in the following projects:


If you use this library and want to see your project here please let me know. If you use this library and want to see your project here please let me know.


# MqttClient
## Example
# Examples
## MqttClient


```csharp ```csharp
var options = new MqttClientOptions var options = new MqttClientOptions
@@ -119,9 +126,7 @@ while (true)
} }
``` ```


# MqttServer

## Example
## MqttServer


```csharp ```csharp
var options = new MqttServerOptions var options = new MqttServerOptions


+ 1
- 1
Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj View File

@@ -86,7 +86,7 @@
<ItemGroup> <ItemGroup>
<Compile Include="ByteReaderTests.cs" /> <Compile Include="ByteReaderTests.cs" />
<Compile Include="ByteWriterTests.cs" /> <Compile Include="ByteWriterTests.cs" />
<Compile Include="DefaultMqttV311PacketSerializerTests.cs" />
<Compile Include="MqttPacketSerializerTests.cs" />
<Compile Include="MqttServerTests.cs" /> <Compile Include="MqttServerTests.cs" />
<Compile Include="MqttSubscriptionsManagerTests.cs" /> <Compile Include="MqttSubscriptionsManagerTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />


Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs → Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs View File

@@ -12,8 +12,23 @@ using MQTTnet.Core.Serializer;
namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
{ {
[TestClass] [TestClass]
public class DefaultMqttV311PacketSerializerTests
public class MqttPacketSerializerTests
{ {
[TestMethod]
public void SerializeV310_MqttConnectPacket()
{
var p = new MqttConnectPacket
{
ClientId = "XYZ",
Password = "PASS",
Username = "USER",
KeepAlivePeriod = 123,
CleanSession = true
};

SerializeAndCompare(p, "EB0ABE1RSXNkcAPCAHsAA1hZWgAEVVNFUgAEUEFTUw==", MqttProtocolVersion.V310);
}

[TestMethod] [TestMethod]
public void SerializeV311_MqttConnectPacket() public void SerializeV311_MqttConnectPacket()
{ {
@@ -96,6 +111,17 @@ namespace MQTTnet.Core.Tests
SerializeAndCompare(p, "IAIBBQ=="); SerializeAndCompare(p, "IAIBBQ==");
} }


[TestMethod]
public void SerializeV310_MqttConnAckPacket()
{
var p = new MqttConnAckPacket
{
ConnectReturnCode = MqttConnectReturnCode.ConnectionAccepted
};

SerializeAndCompare(p, "IAIAAA==", MqttProtocolVersion.V310);
}

[TestMethod] [TestMethod]
public void DeserializeV311_MqttConnAckPacket() public void DeserializeV311_MqttConnAckPacket()
{ {
@@ -403,9 +429,9 @@ namespace MQTTnet.Core.Tests
} }
} }


private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value)
private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
{ {
var serializer = new DefaultMqttV311PacketSerializer();
var serializer = new MqttPacketSerializer { ProtocolVersion = protocolVersion };
var channel = new TestChannel(); var channel = new TestChannel();
serializer.SerializeAsync(packet, channel).Wait(); serializer.SerializeAsync(packet, channel).Wait();
var buffer = channel.ToArray(); var buffer = channel.ToArray();
@@ -415,7 +441,7 @@ namespace MQTTnet.Core.Tests


private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value) private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value)
{ {
var serializer = new DefaultMqttV311PacketSerializer();
var serializer = new MqttPacketSerializer();


var channel1 = new TestChannel(); var channel1 = new TestChannel();
serializer.SerializeAsync(packet, channel1).Wait(); serializer.SerializeAsync(packet, channel1).Wait();

+ 3
- 0
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs View File

@@ -4,6 +4,7 @@ using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Serializer;


namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
{ {
@@ -13,6 +14,8 @@ namespace MQTTnet.Core.Tests


public TestMqttCommunicationAdapter Partner { get; set; } public TestMqttCommunicationAdapter Partner { get; set; }


public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer();

public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
{ {
await Task.FromResult(0); await Task.FromResult(0);


Loading…
Cancel
Save