Browse Source

Add support for V3.1.0; Performance improvements

release/3.x.x
Christian Kratky 7 years ago
parent
commit
598ed66a3b
27 changed files with 199 additions and 88 deletions
  1. +6
    -3
      Build/MQTTnet.nuspec
  2. +2
    -2
      Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs
  3. +1
    -1
      Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
  4. +2
    -2
      Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs
  5. +1
    -1
      Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
  6. +1
    -1
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs
  7. +1
    -1
      Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
  8. +3
    -0
      MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs
  9. +6
    -5
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  10. +2
    -0
      MQTTnet.Core/Client/MqttClient.cs
  11. +5
    -2
      MQTTnet.Core/Client/MqttClientOptions.cs
  12. +5
    -1
      MQTTnet.Core/Packets/MqttConnectPacket.cs
  13. +2
    -0
      MQTTnet.Core/Serializer/IMqttPacketSerializer.cs
  14. +1
    -1
      MQTTnet.Core/Serializer/MqttPacketReader.cs
  15. +50
    -23
      MQTTnet.Core/Serializer/MqttPacketSerializer.cs
  16. +3
    -3
      MQTTnet.Core/Serializer/MqttPacketWriter.cs
  17. +8
    -0
      MQTTnet.Core/Serializer/MqttProtocolVersion.cs
  18. +11
    -0
      MQTTnet.Core/Server/ConnectedMqttClient.cs
  19. +1
    -1
      MQTTnet.Core/Server/IMqttServer.cs
  20. +12
    -11
      MQTTnet.Core/Server/MqttClientSession.cs
  21. +9
    -2
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  22. +1
    -1
      MQTTnet.Core/Server/MqttServer.cs
  23. +6
    -1
      MQTTnet.sln
  24. +26
    -21
      README.md
  25. +1
    -1
      Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj
  26. +30
    -4
      Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
  27. +3
    -0
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs

+ 6
- 3
Build/MQTTnet.nuspec View File

@@ -2,7 +2,7 @@
<package > <package >
<metadata> <metadata>
<id>MQTTnet</id> <id>MQTTnet</id>
<version>2.1.5.1</version>
<version>2.2.0</version>
<authors>Christian Kratky</authors> <authors>Christian Kratky</authors>
<owners>Christian Kratky</owners> <owners>Christian Kratky</owners>
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl> <licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl>
@@ -10,10 +10,13 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl> <iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance> <requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> <description>MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [Server] Fixed wrong handling of QoS levels
<releaseNotes>* [Server] Added support for MQTT protocol version 3.1.0
* [Server] Providing the used protocol version of connected clients
* [Client] Added support for protocol version 3.1.0
* [Core] Several minor performance improvements
</releaseNotes> </releaseNotes>
<copyright>Copyright Christian Kratky 2016-2017</copyright> <copyright>Copyright Christian Kratky 2016-2017</copyright>
<tags>MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware</tags>
<tags>MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware Arduino</tags>
<dependencies> <dependencies>


<group targetFramework="netstandard1.3"> <group targetFramework="netstandard1.3">


+ 2
- 2
Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs View File

@@ -86,7 +86,7 @@ namespace MQTTnet.Implementations
try try
{ {
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null); var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
} }
catch (Exception exception) when (!(exception is ObjectDisposedException)) catch (Exception exception) when (!(exception is ObjectDisposedException))
@@ -107,7 +107,7 @@ namespace MQTTnet.Implementations
var sslStream = new SslStream(new NetworkStream(clientSocket)); var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
} }
catch (Exception exception) catch (Exception exception)


+ 1
- 1
Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs View File

@@ -12,7 +12,7 @@ namespace MQTTnet
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));


return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer()));
} }
} }
} }

+ 2
- 2
Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs View File

@@ -84,7 +84,7 @@ namespace MQTTnet.Implementations
try try
{ {
var clientSocket = await _defaultEndpointSocket.AcceptAsync(); var clientSocket = await _defaultEndpointSocket.AcceptAsync();
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
} }
catch (Exception exception) catch (Exception exception)
@@ -105,7 +105,7 @@ namespace MQTTnet.Implementations
var sslStream = new SslStream(new NetworkStream(clientSocket)); var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
} }
catch (Exception exception) catch (Exception exception)


+ 1
- 1
Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs View File

@@ -12,7 +12,7 @@ namespace MQTTnet
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));


return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer()));
} }
} }
} }

+ 1
- 1
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs View File

@@ -52,7 +52,7 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter));
} }
catch (Exception exception) catch (Exception exception)


+ 1
- 1
Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs View File

@@ -12,7 +12,7 @@ namespace MQTTnet
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));


return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer()));
} }
} }
} }

+ 3
- 0
MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs View File

@@ -2,6 +2,7 @@
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Serializer;


namespace MQTTnet.Core.Adapter namespace MQTTnet.Core.Adapter
{ {
@@ -14,5 +15,7 @@ namespace MQTTnet.Core.Adapter
Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout); Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout);


Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout); Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout);

IMqttPacketSerializer PacketSerializer { get; }
} }
} }

+ 6
- 5
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs View File

@@ -11,15 +11,16 @@ namespace MQTTnet.Core.Adapter
{ {
public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter
{ {
private readonly IMqttPacketSerializer _serializer;
private readonly IMqttCommunicationChannel _channel; private readonly IMqttCommunicationChannel _channel;


public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer) public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer)
{ {
_channel = channel ?? throw new ArgumentNullException(nameof(channel)); _channel = channel ?? throw new ArgumentNullException(nameof(channel));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
} }


public IMqttPacketSerializer PacketSerializer { get; }

public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
{ {
var task = _channel.ConnectAsync(options); var task = _channel.ConnectAsync(options);
@@ -41,7 +42,7 @@ namespace MQTTnet.Core.Adapter
bool hasTimeout; bool hasTimeout;
try try
{ {
var task = _serializer.SerializeAsync(packet, _channel);
var task = PacketSerializer.SerializeAsync(packet, _channel);
hasTimeout = await Task.WhenAny(Task.Delay(timeout), task) != task; hasTimeout = await Task.WhenAny(Task.Delay(timeout), task) != task;
} }
catch (Exception exception) catch (Exception exception)
@@ -60,7 +61,7 @@ namespace MQTTnet.Core.Adapter
MqttBasePacket packet; MqttBasePacket packet;
if (timeout > TimeSpan.Zero) if (timeout > TimeSpan.Zero)
{ {
var workerTask = _serializer.DeserializeAsync(_channel);
var workerTask = PacketSerializer.DeserializeAsync(_channel);
var timeoutTask = Task.Delay(timeout); var timeoutTask = Task.Delay(timeout);
var hasTimeout = Task.WhenAny(timeoutTask, workerTask) == timeoutTask; var hasTimeout = Task.WhenAny(timeoutTask, workerTask) == timeoutTask;


@@ -73,7 +74,7 @@ namespace MQTTnet.Core.Adapter
} }
else else
{ {
packet = await _serializer.DeserializeAsync(_channel);
packet = await PacketSerializer.DeserializeAsync(_channel);
} }


if (packet == null) if (packet == null)


+ 2
- 0
MQTTnet.Core/Client/MqttClient.cs View File

@@ -27,6 +27,8 @@ namespace MQTTnet.Core.Client
{ {
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));

_adapter.PacketSerializer.ProtocolVersion = options.ProtocolVersion;
} }


public event EventHandler Connected; public event EventHandler Connected;


+ 5
- 2
MQTTnet.Core/Client/MqttClientOptions.cs View File

@@ -1,4 +1,5 @@
using System; using System;
using MQTTnet.Core.Serializer;


namespace MQTTnet.Core.Client namespace MQTTnet.Core.Client
{ {
@@ -7,8 +8,8 @@ namespace MQTTnet.Core.Client
public string Server { get; set; } public string Server { get; set; }


public int? Port { get; set; } public int? Port { get; set; }
public MqttClientTlsOptions TlsOptions { get; } = new MqttClientTlsOptions();
public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions();


public string UserName { get; set; } public string UserName { get; set; }


@@ -21,5 +22,7 @@ namespace MQTTnet.Core.Client
public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5);


public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
} }
} }

+ 5
- 1
MQTTnet.Core/Packets/MqttConnectPacket.cs View File

@@ -1,7 +1,11 @@
namespace MQTTnet.Core.Packets
using MQTTnet.Core.Serializer;

namespace MQTTnet.Core.Packets
{ {
public sealed class MqttConnectPacket: MqttBasePacket public sealed class MqttConnectPacket: MqttBasePacket
{ {
public MqttProtocolVersion ProtocolVersion { get; set; }

public string ClientId { get; set; } public string ClientId { get; set; }


public string Username { get; set; } public string Username { get; set; }


+ 2
- 0
MQTTnet.Core/Serializer/IMqttPacketSerializer.cs View File

@@ -6,6 +6,8 @@ namespace MQTTnet.Core.Serializer
{ {
public interface IMqttPacketSerializer public interface IMqttPacketSerializer
{ {
MqttProtocolVersion ProtocolVersion { get; set; }

Task SerializeAsync(MqttBasePacket mqttPacket, IMqttCommunicationChannel destination); Task SerializeAsync(MqttBasePacket mqttPacket, IMqttCommunicationChannel destination);


Task<MqttBasePacket> DeserializeAsync(IMqttCommunicationChannel source); Task<MqttBasePacket> DeserializeAsync(IMqttCommunicationChannel source);


+ 1
- 1
MQTTnet.Core/Serializer/MqttPacketReader.cs View File

@@ -10,7 +10,7 @@ namespace MQTTnet.Core.Serializer
{ {
public sealed class MqttPacketReader : IDisposable public sealed class MqttPacketReader : IDisposable
{ {
private readonly MemoryStream _remainingData = new MemoryStream();
private readonly MemoryStream _remainingData = new MemoryStream(1024);
private readonly IMqttCommunicationChannel _source; private readonly IMqttCommunicationChannel _source;


private int _remainingLength; private int _remainingLength;


MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs → MQTTnet.Core/Serializer/MqttPacketSerializer.cs View File

@@ -9,9 +9,12 @@ using MQTTnet.Core.Protocol;


namespace MQTTnet.Core.Serializer namespace MQTTnet.Core.Serializer
{ {
public sealed class MqttV311PacketSerializer : IMqttPacketSerializer
public sealed class MqttPacketSerializer : IMqttPacketSerializer
{ {
private static readonly byte[] MqttV311Prefix = Encoding.UTF8.GetBytes("MQTT");
private static byte[] ProtocolVersionV311Name { get; } = Encoding.UTF8.GetBytes("MQTT");
private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs");

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;


public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination) public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination)
{ {
@@ -257,12 +260,21 @@ namespace MQTTnet.Core.Serializer
private static async Task<MqttBasePacket> DeserializeConnectAsync(MqttPacketReader reader) private static async Task<MqttBasePacket> DeserializeConnectAsync(MqttPacketReader reader)
{ {
await reader.ReadRemainingDataAsync(2); // Skip 2 bytes await reader.ReadRemainingDataAsync(2); // Skip 2 bytes
var protocolName = await reader.ReadRemainingDataAsync(4);


if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT")
MqttProtocolVersion protocolVersion;
var protocolName = await reader.ReadRemainingDataAsync(4);
if (protocolName.SequenceEqual(ProtocolVersionV310Name))
{
await reader.ReadRemainingDataAsync(2);
protocolVersion = MqttProtocolVersion.V310;
}
else if (protocolName.SequenceEqual(ProtocolVersionV311Name))
{ {
throw new MqttProtocolViolationException("Protocol name is not 'MQTT'.");
protocolVersion = MqttProtocolVersion.V311;
}
else
{
throw new MqttProtocolViolationException("Protocol name is not supported.");
} }


var protocolLevel = await reader.ReadRemainingDataByteAsync(); var protocolLevel = await reader.ReadRemainingDataByteAsync();
@@ -273,6 +285,7 @@ namespace MQTTnet.Core.Serializer


var packet = new MqttConnectPacket var packet = new MqttConnectPacket
{ {
ProtocolVersion = protocolVersion,
CleanSession = connectFlagsReader.Read() CleanSession = connectFlagsReader.Read()
}; };


@@ -353,7 +366,7 @@ namespace MQTTnet.Core.Serializer
} }
} }


private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
{ {
ValidateConnectPacket(packet); ValidateConnectPacket(packet);


@@ -361,9 +374,19 @@ namespace MQTTnet.Core.Serializer
{ {
// Write variable header // Write variable header
output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name
output.Write(MqttV311Prefix);
output.Write(0x04); // 3.1.2.2 Protocol Level

if (ProtocolVersion == MqttProtocolVersion.V311)
{
output.Write(ProtocolVersionV311Name);
output.Write(0x04); // 3.1.2.2 Protocol Level (4)
}
else
{
output.Write(ProtocolVersionV310Name);
output.Write(0x64);
output.Write(0x70);
output.Write(0x03); // Protocol Level (3)
}
var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
connectFlags.Write(false); // Reserved connectFlags.Write(false); // Reserved
connectFlags.Write(packet.CleanSession); connectFlags.Write(packet.CleanSession);
@@ -408,13 +431,17 @@ namespace MQTTnet.Core.Serializer
} }
} }


private static Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())
{ {
var connectAcknowledgeFlags = new ByteWriter(); var connectAcknowledgeFlags = new ByteWriter();
connectAcknowledgeFlags.Write(packet.IsSessionPresent);


if (ProtocolVersion == MqttProtocolVersion.V311)
{
connectAcknowledgeFlags.Write(packet.IsSessionPresent);
}
output.Write(connectAcknowledgeFlags); output.Write(connectAcknowledgeFlags);
output.Write((byte)packet.ConnectReturnCode); output.Write((byte)packet.ConnectReturnCode);


@@ -423,6 +450,17 @@ namespace MQTTnet.Core.Serializer
} }
} }


private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);

output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02);
await output.WriteToAsync(destination);
}
}

private static Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination) private static Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination)
{ {
return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination); return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination);
@@ -495,17 +533,6 @@ namespace MQTTnet.Core.Serializer
} }
} }


private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);

output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02);
await output.WriteToAsync(destination);
}
}

private static Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination) private static Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())

+ 3
- 3
MQTTnet.Core/Serializer/MqttPacketWriter.cs View File

@@ -9,13 +9,13 @@ namespace MQTTnet.Core.Serializer
{ {
public sealed class MqttPacketWriter : IDisposable public sealed class MqttPacketWriter : IDisposable
{ {
private readonly MemoryStream _buffer = new MemoryStream(512);
private readonly MemoryStream _buffer = new MemoryStream(1024);


public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0) public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0)
{ {
var fixedHeader = (byte)((byte)packetType << 4);
var fixedHeader = (int)packetType << 4;
fixedHeader |= flags; fixedHeader |= flags;
InjectFixedHeader(fixedHeader);
InjectFixedHeader((byte)fixedHeader);
} }


public void Write(byte value) public void Write(byte value)


+ 8
- 0
MQTTnet.Core/Serializer/MqttProtocolVersion.cs View File

@@ -0,0 +1,8 @@
namespace MQTTnet.Core.Serializer
{
public enum MqttProtocolVersion
{
V311,
V310
}
}

+ 11
- 0
MQTTnet.Core/Server/ConnectedMqttClient.cs View File

@@ -0,0 +1,11 @@
using MQTTnet.Core.Serializer;

namespace MQTTnet.Core.Server
{
public class ConnectedMqttClient
{
public string ClientId { get; set; }

public MqttProtocolVersion ProtocolVersion { get; set; }
}
}

+ 1
- 1
MQTTnet.Core/Server/IMqttServer.cs View File

@@ -9,7 +9,7 @@ namespace MQTTnet.Core.Server
event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
event EventHandler<MqttClientConnectedEventArgs> ClientConnected; event EventHandler<MqttClientConnectedEventArgs> ClientConnected;


IList<string> GetConnectedClients();
IList<ConnectedMqttClient> GetConnectedClients();
void InjectClient(string identifier, IMqttCommunicationAdapter adapter); void InjectClient(string identifier, IMqttCommunicationAdapter adapter);
void Publish(MqttApplicationMessage applicationMessage); void Publish(MqttApplicationMessage applicationMessage);
void Start(); void Start();


+ 12
- 11
MQTTnet.Core/Server/MqttClientSession.cs View File

@@ -21,7 +21,6 @@ namespace MQTTnet.Core.Server
private readonly MqttServerOptions _options; private readonly MqttServerOptions _options;


private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
private IMqttCommunicationAdapter _adapter;
private string _identifier; private string _identifier;
private MqttApplicationMessage _willApplicationMessage; private MqttApplicationMessage _willApplicationMessage;


@@ -36,7 +35,9 @@ namespace MQTTnet.Core.Server


public string ClientId { get; } public string ClientId { get; }


public bool IsConnected => _adapter != null;
public bool IsConnected => Adapter != null;

public IMqttCommunicationAdapter Adapter { get; private set; }


public async Task RunAsync(string identifier, MqttApplicationMessage willApplicationMessage, IMqttCommunicationAdapter adapter) public async Task RunAsync(string identifier, MqttApplicationMessage willApplicationMessage, IMqttCommunicationAdapter adapter)
{ {
@@ -47,7 +48,7 @@ namespace MQTTnet.Core.Server
try try
{ {
_identifier = identifier; _identifier = identifier;
_adapter = adapter;
Adapter = adapter;
_cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource = new CancellationTokenSource();


_messageQueue.Start(adapter); _messageQueue.Start(adapter);
@@ -73,7 +74,7 @@ namespace MQTTnet.Core.Server


_messageQueue.Stop(); _messageQueue.Stop();
_cancellationTokenSource.Cancel(); _cancellationTokenSource.Cancel();
_adapter = null;
Adapter = null;


MqttTrace.Information(nameof(MqttClientSession), $"Client '{_identifier}': Disconnected."); MqttTrace.Information(nameof(MqttClientSession), $"Client '{_identifier}': Disconnected.");
} }
@@ -102,12 +103,12 @@ namespace MQTTnet.Core.Server
{ {
if (packet is MqttSubscribePacket subscribePacket) if (packet is MqttSubscribePacket subscribePacket)
{ {
return _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout);
} }


if (packet is MqttUnsubscribePacket unsubscribePacket) if (packet is MqttUnsubscribePacket unsubscribePacket)
{ {
return _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout);
} }


if (packet is MqttPublishPacket publishPacket) if (packet is MqttPublishPacket publishPacket)
@@ -122,7 +123,7 @@ namespace MQTTnet.Core.Server


if (packet is MqttPubRecPacket pubRecPacket) if (packet is MqttPubRecPacket pubRecPacket)
{ {
return _adapter.SendPacketAsync(pubRecPacket.CreateResponse<MqttPubRelPacket>(), _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(pubRecPacket.CreateResponse<MqttPubRelPacket>(), _options.DefaultCommunicationTimeout);
} }


if (packet is MqttPubAckPacket || packet is MqttPubCompPacket) if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
@@ -133,7 +134,7 @@ namespace MQTTnet.Core.Server


if (packet is MqttPingReqPacket) if (packet is MqttPingReqPacket)
{ {
return _adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout);
} }


if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
@@ -159,7 +160,7 @@ namespace MQTTnet.Core.Server
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{ {
_publishPacketReceivedCallback(this, publishPacket); _publishPacketReceivedCallback(this, publishPacket);
return _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
} }


if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
@@ -172,7 +173,7 @@ namespace MQTTnet.Core.Server


_publishPacketReceivedCallback(this, publishPacket); _publishPacketReceivedCallback(this, publishPacket);


return _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
} }


throw new MqttCommunicationException("Received a not supported QoS level."); throw new MqttCommunicationException("Received a not supported QoS level.");
@@ -185,7 +186,7 @@ namespace MQTTnet.Core.Server
_unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
} }


return _adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
return Adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
} }
} }
} }

+ 9
- 2
MQTTnet.Core/Server/MqttClientSessionsManager.cs View File

@@ -34,6 +34,9 @@ namespace MQTTnet.Core.Server
throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
} }


// Switch to the required protocol version before sending any response.
eventArgs.ClientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion;

var connectReturnCode = ValidateConnection(connectPacket); var connectReturnCode = ValidateConnection(connectPacket);
if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{ {
@@ -73,11 +76,15 @@ namespace MQTTnet.Core.Server
} }
} }


public IList<string> GetConnectedClients()
public IList<ConnectedMqttClient> GetConnectedClients()
{ {
lock (_syncRoot) lock (_syncRoot)
{ {
return _clientSessions.Where(s => s.Value.IsConnected).Select(s => s.Key).ToList();
return _clientSessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient
{
ClientId = s.Value.ClientId,
ProtocolVersion = s.Value.Adapter.PacketSerializer.ProtocolVersion
}).ToList();
} }
} }




+ 1
- 1
MQTTnet.Core/Server/MqttServer.cs View File

@@ -25,7 +25,7 @@ namespace MQTTnet.Core.Server
_clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); _clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e);
} }


public IList<string> GetConnectedClients()
public IList<ConnectedMqttClient> GetConnectedClients()
{ {
return _clientSessionsManager.GetConnectedClients(); return _clientSessionsManager.GetConnectedClients();
} }


+ 6
- 1
MQTTnet.sln View File

@@ -1,7 +1,7 @@
 
Microsoft Visual Studio Solution File, Format Version 12.00 Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15 # Visual Studio 15
VisualStudioVersion = 15.0.26430.15
VisualStudioVersion = 15.0.26430.16
MinimumVisualStudioVersion = 10.0.40219.1 MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject EndProject
@@ -27,6 +27,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1
Build\MQTTnet.nuspec = Build\MQTTnet.nuspec Build\MQTTnet.nuspec = Build\MQTTnet.nuspec
EndProjectSection EndProjectSection
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{B3F60ECB-45BA-4C66-8903-8BB89CA67998}"
ProjectSection(SolutionItems) = preProject
README.md = README.md
EndProjectSection
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU


+ 26
- 21
README.md View File

@@ -5,40 +5,47 @@
[![NuGet Badge](https://buildstats.info/nuget/MQTTnet)](https://www.nuget.org/packages/MQTTnet) [![NuGet Badge](https://buildstats.info/nuget/MQTTnet)](https://www.nuget.org/packages/MQTTnet)


# MQTTnet # MQTTnet
MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server. The implementation is based on the documentation from http://mqtt.org/.
MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.


## Features
* MQTT client included
* MQTT server (broker) included
* TLS 1.2 support for client and server (but not UWP servers)
# Features

## General
* Async support * Async support
* Rx support (via another project)
* List of connected clients available (server only)
* TLS 1.2 support for client and server (but not UWP servers)
* Extensible communication channels (i.e. In-Memory, TCP, TCP+SSL, WebSockets (not included in this project)) * Extensible communication channels (i.e. In-Memory, TCP, TCP+SSL, WebSockets (not included in this project))
* Server is able to publish its own messages (no loopback client required)
* Interfaces included for mocking and testing
* Lightweight (only the low level implementation of MQTT, no overhead)
* Access to internal trace messages * Access to internal trace messages
* Extensible client credential validation (server only)
* Unit tested (50+ tests) * Unit tested (50+ tests)
* Lightweight (only the low level implementation of MQTT, no overhead)
* Interfaces included for mocking and testing


## Supported frameworks
## Client
* Rx support (via another project)

## Server (broker)
* List of connected clients available
* Supports connected clients with different protocol versions at the same time
* Able to publish its own messages (no loopback client required)
* Able to receive every messages (no loopback client required)
* Extensible client credential validation

# Supported frameworks
* .NET Standard 1.3+ * .NET Standard 1.3+
* .NET Core 1.1+ * .NET Core 1.1+
* .NET Core App 1.1+ * .NET Core App 1.1+
* .NET Framework 4.5.2+ (x86, x64, AnyCPU) * .NET Framework 4.5.2+ (x86, x64, AnyCPU)
* Universal Windows (UWP) 10.0.10240+ (x86, x64, ARM, AnyCPU) * Universal Windows (UWP) 10.0.10240+ (x86, x64, ARM, AnyCPU)


## Supported MQTT versions
# Supported MQTT versions
* 3.1.1 * 3.1.1
* 3.1.0


## Nuget
# Nuget
This library is available as a nuget package: https://www.nuget.org/packages/MQTTnet/ This library is available as a nuget package: https://www.nuget.org/packages/MQTTnet/


## Contributions
# Contributions
If you want to contribute to this project just create a pull request. If you want to contribute to this project just create a pull request.


## References
# References
This library is used in the following projects: This library is used in the following projects:


* MQTT Client Rx (Wrapper for Reactive Extensions, https://github.com/1iveowl/MQTTClient.rx) * MQTT Client Rx (Wrapper for Reactive Extensions, https://github.com/1iveowl/MQTTClient.rx)
@@ -46,8 +53,8 @@ This library is used in the following projects:


If you use this library and want to see your project here please let me know. If you use this library and want to see your project here please let me know.


# MqttClient
## Example
# Examples
## MqttClient


```csharp ```csharp
var options = new MqttClientOptions var options = new MqttClientOptions
@@ -119,9 +126,7 @@ while (true)
} }
``` ```


# MqttServer

## Example
## MqttServer


```csharp ```csharp
var options = new MqttServerOptions var options = new MqttServerOptions


+ 1
- 1
Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj View File

@@ -86,7 +86,7 @@
<ItemGroup> <ItemGroup>
<Compile Include="ByteReaderTests.cs" /> <Compile Include="ByteReaderTests.cs" />
<Compile Include="ByteWriterTests.cs" /> <Compile Include="ByteWriterTests.cs" />
<Compile Include="DefaultMqttV311PacketSerializerTests.cs" />
<Compile Include="MqttPacketSerializerTests.cs" />
<Compile Include="MqttServerTests.cs" /> <Compile Include="MqttServerTests.cs" />
<Compile Include="MqttSubscriptionsManagerTests.cs" /> <Compile Include="MqttSubscriptionsManagerTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />


Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs → Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs View File

@@ -12,8 +12,23 @@ using MQTTnet.Core.Serializer;
namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
{ {
[TestClass] [TestClass]
public class DefaultMqttV311PacketSerializerTests
public class MqttPacketSerializerTests
{ {
[TestMethod]
public void SerializeV310_MqttConnectPacket()
{
var p = new MqttConnectPacket
{
ClientId = "XYZ",
Password = "PASS",
Username = "USER",
KeepAlivePeriod = 123,
CleanSession = true
};

SerializeAndCompare(p, "EB0ABE1RSXNkcAPCAHsAA1hZWgAEVVNFUgAEUEFTUw==", MqttProtocolVersion.V310);
}

[TestMethod] [TestMethod]
public void SerializeV311_MqttConnectPacket() public void SerializeV311_MqttConnectPacket()
{ {
@@ -96,6 +111,17 @@ namespace MQTTnet.Core.Tests
SerializeAndCompare(p, "IAIBBQ=="); SerializeAndCompare(p, "IAIBBQ==");
} }


[TestMethod]
public void SerializeV310_MqttConnAckPacket()
{
var p = new MqttConnAckPacket
{
ConnectReturnCode = MqttConnectReturnCode.ConnectionAccepted
};

SerializeAndCompare(p, "IAIAAA==", MqttProtocolVersion.V310);
}

[TestMethod] [TestMethod]
public void DeserializeV311_MqttConnAckPacket() public void DeserializeV311_MqttConnAckPacket()
{ {
@@ -403,9 +429,9 @@ namespace MQTTnet.Core.Tests
} }
} }


private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value)
private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
{ {
var serializer = new MqttV311PacketSerializer();
var serializer = new MqttPacketSerializer { ProtocolVersion = protocolVersion };
var channel = new TestChannel(); var channel = new TestChannel();
serializer.SerializeAsync(packet, channel).Wait(); serializer.SerializeAsync(packet, channel).Wait();
var buffer = channel.ToArray(); var buffer = channel.ToArray();
@@ -415,7 +441,7 @@ namespace MQTTnet.Core.Tests


private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value) private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value)
{ {
var serializer = new MqttV311PacketSerializer();
var serializer = new MqttPacketSerializer();


var channel1 = new TestChannel(); var channel1 = new TestChannel();
serializer.SerializeAsync(packet, channel1).Wait(); serializer.SerializeAsync(packet, channel1).Wait();

+ 3
- 0
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs View File

@@ -4,6 +4,7 @@ using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Serializer;


namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
{ {
@@ -13,6 +14,8 @@ namespace MQTTnet.Core.Tests


public TestMqttCommunicationAdapter Partner { get; set; } public TestMqttCommunicationAdapter Partner { get; set; }


public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer();

public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
{ {
await Task.FromResult(0); await Task.FromResult(0);


Loading…
Cancel
Save