From 59b0b79a3235feb0d9a99a56ab1680d1bb9f481c Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Mon, 7 Aug 2017 20:28:49 +0200 Subject: [PATCH 1/3] Fix wrong QoS level handling for server. --- Build/MQTTnet.nuspec | 8 ++------ .../Implementations/MqttServerAdapter.cs | 4 ++-- Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs | 2 +- .../MQTTnet.NetFramework/Properties/AssemblyInfo.cs | 4 ++-- .../Implementations/MqttServerAdapter.cs | 4 ++-- .../MQTTnet.NetStandard/MQTTnet.Netstandard.csproj | 4 ++-- Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs | 2 +- .../Implementations/MqttServerAdapter.cs | 2 +- Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs | 2 +- .../MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs | 4 ++-- MQTTnet.Core/Client/MqttClient.cs | 2 +- MQTTnet.Core/MQTTnet.Core.csproj | 4 ++-- ...11PacketSerializer.cs => MqttV311PacketSerializer.cs} | 8 ++++---- MQTTnet.Core/Server/MqttClientSession.cs | 9 ++++++--- .../DefaultMqttV311PacketSerializerTests.cs | 4 ++-- 15 files changed, 31 insertions(+), 32 deletions(-) rename MQTTnet.Core/Serializer/{DefaultMqttV311PacketSerializer.cs => MqttV311PacketSerializer.cs} (99%) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 3687879..89dcc91 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -2,7 +2,7 @@ MQTTnet - 2.1.5 + 2.1.5.1 Christian Kratky Christian Kratky https://github.com/chkr1011/MQTTnet/blob/master/LICENSE @@ -10,11 +10,7 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * [MqttServer] Added support for publishing application messages -* [Core] Fixed QoS level 2 handling -* [Core] Performance optimizations -* [MqttClient/MqttServer] Errors while handline application messages are now catched and traced -* [MqttClient/MqttServer] Added interfaces + * [Server] Fixed wrong handling of QoS levels Copyright Christian Kratky 2016-2017 MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs index cbfcf66..694eec7 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs @@ -86,7 +86,7 @@ namespace MQTTnet.Implementations try { var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) when (!(exception is ObjectDisposedException)) @@ -107,7 +107,7 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs index ac12f97..b85edab 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs b/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs index 4d0a57f..486eca7 100644 --- a/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs +++ b/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs @@ -11,5 +11,5 @@ using System.Runtime.InteropServices; [assembly: AssemblyCulture("")] [assembly: ComVisible(false)] [assembly: Guid("a480ef90-0eaa-4d9a-b271-47a9c47f6f7d")] -[assembly: AssemblyVersion("2.1.4.0")] -[assembly: AssemblyFileVersion("2.1.4.0")] \ No newline at end of file +[assembly: AssemblyVersion("2.1.5.1")] +[assembly: AssemblyFileVersion("2.1.5.1")] \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index 863d3e7..7c2561d 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -84,7 +84,7 @@ namespace MQTTnet.Implementations try { var clientSocket = await _defaultEndpointSocket.AcceptAsync(); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) @@ -105,7 +105,7 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj index 2a60e55..1a1249e 100644 --- a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj +++ b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj @@ -4,8 +4,8 @@ netstandard1.3 MQTTnet MQTTnet - 2.1.4.0 - 2.1.4.0 + 2.1.5.1 + 2.1.5.1 0.0.0.0 diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 6073583..c5f409f 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); } } } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs index ea251ca..b0bfcbe 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs @@ -52,7 +52,7 @@ namespace MQTTnet.Implementations { try { - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index ac12f97..b85edab 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs b/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs index dee61f7..05204eb 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs @@ -10,5 +10,5 @@ using System.Runtime.InteropServices; [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] [assembly: ComVisible(false)] -[assembly: AssemblyVersion("2.1.4.0")] -[assembly: AssemblyFileVersion("2.1.4.0")] \ No newline at end of file +[assembly: AssemblyVersion("2.1.5.1")] +[assembly: AssemblyFileVersion("2.1.5.1")] \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 384de1f..f90adab 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -267,7 +267,7 @@ namespace MQTTnet.Core.Client return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } - throw new InvalidOperationException(); + throw new MqttCommunicationException("Received a not supported QoS level."); } private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index 4d12864..c938551 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -16,8 +16,8 @@ - 2.1.4.0 - 2.1.4.0 + 2.1.5.1 + 2.1.5.1 diff --git a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs b/MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs similarity index 99% rename from MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs rename to MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs index 4100b6c..523a2dc 100644 --- a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs @@ -9,8 +9,10 @@ using MQTTnet.Core.Protocol; namespace MQTTnet.Core.Serializer { - public sealed class DefaultMqttV311PacketSerializer : IMqttPacketSerializer + public sealed class MqttV311PacketSerializer : IMqttPacketSerializer { + private static readonly byte[] MqttV311Prefix = Encoding.UTF8.GetBytes("MQTT"); + public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination) { if (packet == null) throw new ArgumentNullException(nameof(packet)); @@ -351,8 +353,6 @@ namespace MQTTnet.Core.Serializer } } - private static readonly byte[] MqttPrefix = Encoding.UTF8.GetBytes("MQTT"); - private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) { ValidateConnectPacket(packet); @@ -361,7 +361,7 @@ namespace MQTTnet.Core.Serializer { // Write variable header output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name - output.Write(MqttPrefix); + output.Write(MqttV311Prefix); output.Write(0x04); // 3.1.2.2 Protocol Level var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index a3919f9..f84b847 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -153,13 +153,16 @@ namespace MQTTnet.Core.Server if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { _publishPacketReceivedCallback(this, publishPacket); + return Task.FromResult(0); } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { _publishPacketReceivedCallback(this, publishPacket); return _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] lock (_unacknowledgedPublishPackets) @@ -172,7 +175,7 @@ namespace MQTTnet.Core.Server return _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } - throw new MqttCommunicationException("Received not supported QoS level."); + throw new MqttCommunicationException("Received a not supported QoS level."); } private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket) diff --git a/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs index 62d0f30..4b458ed 100644 --- a/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs @@ -405,7 +405,7 @@ namespace MQTTnet.Core.Tests private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value) { - var serializer = new DefaultMqttV311PacketSerializer(); + var serializer = new MqttV311PacketSerializer(); var channel = new TestChannel(); serializer.SerializeAsync(packet, channel).Wait(); var buffer = channel.ToArray(); @@ -415,7 +415,7 @@ namespace MQTTnet.Core.Tests private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value) { - var serializer = new DefaultMqttV311PacketSerializer(); + var serializer = new MqttV311PacketSerializer(); var channel1 = new TestChannel(); serializer.SerializeAsync(packet, channel1).Wait(); From 598ed66a3beae4e804687d9b701da55108ef1d83 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 13 Aug 2017 19:46:32 +0200 Subject: [PATCH 2/3] Add support for V3.1.0; Performance improvements --- Build/MQTTnet.nuspec | 9 ++- .../Implementations/MqttServerAdapter.cs | 4 +- .../MQTTnet.NetFramework/MqttClientFactory.cs | 2 +- .../Implementations/MqttServerAdapter.cs | 4 +- .../MQTTnet.NetStandard/MqttClientFactory.cs | 2 +- .../Implementations/MqttServerAdapter.cs | 2 +- .../MqttClientFactory.cs | 2 +- .../Adapter/IMqttCommunicationAdapter.cs | 3 + .../MqttChannelCommunicationAdapter.cs | 11 +-- MQTTnet.Core/Client/MqttClient.cs | 2 + MQTTnet.Core/Client/MqttClientOptions.cs | 7 +- MQTTnet.Core/Packets/MqttConnectPacket.cs | 6 +- .../Serializer/IMqttPacketSerializer.cs | 2 + MQTTnet.Core/Serializer/MqttPacketReader.cs | 2 +- ...tSerializer.cs => MqttPacketSerializer.cs} | 73 +++++++++++++------ MQTTnet.Core/Serializer/MqttPacketWriter.cs | 6 +- .../Serializer/MqttProtocolVersion.cs | 8 ++ MQTTnet.Core/Server/ConnectedMqttClient.cs | 11 +++ MQTTnet.Core/Server/IMqttServer.cs | 2 +- MQTTnet.Core/Server/MqttClientSession.cs | 23 +++--- .../Server/MqttClientSessionsManager.cs | 11 ++- MQTTnet.Core/Server/MqttServer.cs | 2 +- MQTTnet.sln | 7 +- README.md | 47 ++++++------ .../MQTTnet.Core.Tests.csproj | 2 +- ...rTests.cs => MqttPacketSerializerTests.cs} | 34 ++++++++- .../TestMqttCommunicationAdapter.cs | 3 + 27 files changed, 199 insertions(+), 88 deletions(-) rename MQTTnet.Core/Serializer/{MqttV311PacketSerializer.cs => MqttPacketSerializer.cs} (92%) create mode 100644 MQTTnet.Core/Serializer/MqttProtocolVersion.cs create mode 100644 MQTTnet.Core/Server/ConnectedMqttClient.cs rename Tests/MQTTnet.Core.Tests/{DefaultMqttV311PacketSerializerTests.cs => MqttPacketSerializerTests.cs} (92%) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 89dcc91..bb81a8d 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -2,7 +2,7 @@ MQTTnet - 2.1.5.1 + 2.2.0 Christian Kratky Christian Kratky https://github.com/chkr1011/MQTTnet/blob/master/LICENSE @@ -10,10 +10,13 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * [Server] Fixed wrong handling of QoS levels + * [Server] Added support for MQTT protocol version 3.1.0 +* [Server] Providing the used protocol version of connected clients +* [Client] Added support for protocol version 3.1.0 +* [Core] Several minor performance improvements Copyright Christian Kratky 2016-2017 - MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware + MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware Arduino diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs index 694eec7..63fd0d8 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs @@ -86,7 +86,7 @@ namespace MQTTnet.Implementations try { var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) when (!(exception is ObjectDisposedException)) @@ -107,7 +107,7 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs index b85edab..7dd7500 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index 7c2561d..a2e86b1 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -84,7 +84,7 @@ namespace MQTTnet.Implementations try { var clientSocket = await _defaultEndpointSocket.AcceptAsync(); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) @@ -105,7 +105,7 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index c5f409f..f47c494 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); } } } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs index b0bfcbe..eddd671 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs @@ -52,7 +52,7 @@ namespace MQTTnet.Implementations { try { - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index b85edab..7dd7500 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); } } } \ No newline at end of file diff --git a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs index 6651e05..77ab898 100644 --- a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using MQTTnet.Core.Client; using MQTTnet.Core.Packets; +using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Adapter { @@ -14,5 +15,7 @@ namespace MQTTnet.Core.Adapter Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout); Task ReceivePacketAsync(TimeSpan timeout); + + IMqttPacketSerializer PacketSerializer { get; } } } diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index ae90984..d504d89 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -11,15 +11,16 @@ namespace MQTTnet.Core.Adapter { public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter { - private readonly IMqttPacketSerializer _serializer; private readonly IMqttCommunicationChannel _channel; public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer) { _channel = channel ?? throw new ArgumentNullException(nameof(channel)); - _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); + PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); } + public IMqttPacketSerializer PacketSerializer { get; } + public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { var task = _channel.ConnectAsync(options); @@ -41,7 +42,7 @@ namespace MQTTnet.Core.Adapter bool hasTimeout; try { - var task = _serializer.SerializeAsync(packet, _channel); + var task = PacketSerializer.SerializeAsync(packet, _channel); hasTimeout = await Task.WhenAny(Task.Delay(timeout), task) != task; } catch (Exception exception) @@ -60,7 +61,7 @@ namespace MQTTnet.Core.Adapter MqttBasePacket packet; if (timeout > TimeSpan.Zero) { - var workerTask = _serializer.DeserializeAsync(_channel); + var workerTask = PacketSerializer.DeserializeAsync(_channel); var timeoutTask = Task.Delay(timeout); var hasTimeout = Task.WhenAny(timeoutTask, workerTask) == timeoutTask; @@ -73,7 +74,7 @@ namespace MQTTnet.Core.Adapter } else { - packet = await _serializer.DeserializeAsync(_channel); + packet = await PacketSerializer.DeserializeAsync(_channel); } if (packet == null) diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index f90adab..70e8c76 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -27,6 +27,8 @@ namespace MQTTnet.Core.Client { _options = options ?? throw new ArgumentNullException(nameof(options)); _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); + + _adapter.PacketSerializer.ProtocolVersion = options.ProtocolVersion; } public event EventHandler Connected; diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs index 0aaf43c..f9b75fa 100644 --- a/MQTTnet.Core/Client/MqttClientOptions.cs +++ b/MQTTnet.Core/Client/MqttClientOptions.cs @@ -1,4 +1,5 @@ using System; +using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Client { @@ -7,8 +8,8 @@ namespace MQTTnet.Core.Client public string Server { get; set; } public int? Port { get; set; } - - public MqttClientTlsOptions TlsOptions { get; } = new MqttClientTlsOptions(); + + public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); public string UserName { get; set; } @@ -21,5 +22,7 @@ namespace MQTTnet.Core.Client public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + + public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; } } diff --git a/MQTTnet.Core/Packets/MqttConnectPacket.cs b/MQTTnet.Core/Packets/MqttConnectPacket.cs index aadfe52..6db92a2 100644 --- a/MQTTnet.Core/Packets/MqttConnectPacket.cs +++ b/MQTTnet.Core/Packets/MqttConnectPacket.cs @@ -1,7 +1,11 @@ -namespace MQTTnet.Core.Packets +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Core.Packets { public sealed class MqttConnectPacket: MqttBasePacket { + public MqttProtocolVersion ProtocolVersion { get; set; } + public string ClientId { get; set; } public string Username { get; set; } diff --git a/MQTTnet.Core/Serializer/IMqttPacketSerializer.cs b/MQTTnet.Core/Serializer/IMqttPacketSerializer.cs index db53a13..801b7ea 100644 --- a/MQTTnet.Core/Serializer/IMqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/IMqttPacketSerializer.cs @@ -6,6 +6,8 @@ namespace MQTTnet.Core.Serializer { public interface IMqttPacketSerializer { + MqttProtocolVersion ProtocolVersion { get; set; } + Task SerializeAsync(MqttBasePacket mqttPacket, IMqttCommunicationChannel destination); Task DeserializeAsync(IMqttCommunicationChannel source); diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index f59d807..9abea6d 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -10,7 +10,7 @@ namespace MQTTnet.Core.Serializer { public sealed class MqttPacketReader : IDisposable { - private readonly MemoryStream _remainingData = new MemoryStream(); + private readonly MemoryStream _remainingData = new MemoryStream(1024); private readonly IMqttCommunicationChannel _source; private int _remainingLength; diff --git a/MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs similarity index 92% rename from MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs rename to MQTTnet.Core/Serializer/MqttPacketSerializer.cs index 523a2dc..f9ec70c 100644 --- a/MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs @@ -9,9 +9,12 @@ using MQTTnet.Core.Protocol; namespace MQTTnet.Core.Serializer { - public sealed class MqttV311PacketSerializer : IMqttPacketSerializer + public sealed class MqttPacketSerializer : IMqttPacketSerializer { - private static readonly byte[] MqttV311Prefix = Encoding.UTF8.GetBytes("MQTT"); + private static byte[] ProtocolVersionV311Name { get; } = Encoding.UTF8.GetBytes("MQTT"); + private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs"); + + public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination) { @@ -257,12 +260,21 @@ namespace MQTTnet.Core.Serializer private static async Task DeserializeConnectAsync(MqttPacketReader reader) { await reader.ReadRemainingDataAsync(2); // Skip 2 bytes - - var protocolName = await reader.ReadRemainingDataAsync(4); - if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT") + MqttProtocolVersion protocolVersion; + var protocolName = await reader.ReadRemainingDataAsync(4); + if (protocolName.SequenceEqual(ProtocolVersionV310Name)) + { + await reader.ReadRemainingDataAsync(2); + protocolVersion = MqttProtocolVersion.V310; + } + else if (protocolName.SequenceEqual(ProtocolVersionV311Name)) { - throw new MqttProtocolViolationException("Protocol name is not 'MQTT'."); + protocolVersion = MqttProtocolVersion.V311; + } + else + { + throw new MqttProtocolViolationException("Protocol name is not supported."); } var protocolLevel = await reader.ReadRemainingDataByteAsync(); @@ -273,6 +285,7 @@ namespace MQTTnet.Core.Serializer var packet = new MqttConnectPacket { + ProtocolVersion = protocolVersion, CleanSession = connectFlagsReader.Read() }; @@ -353,7 +366,7 @@ namespace MQTTnet.Core.Serializer } } - private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) { ValidateConnectPacket(packet); @@ -361,9 +374,19 @@ namespace MQTTnet.Core.Serializer { // Write variable header output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name - output.Write(MqttV311Prefix); - output.Write(0x04); // 3.1.2.2 Protocol Level - + if (ProtocolVersion == MqttProtocolVersion.V311) + { + output.Write(ProtocolVersionV311Name); + output.Write(0x04); // 3.1.2.2 Protocol Level (4) + } + else + { + output.Write(ProtocolVersionV310Name); + output.Write(0x64); + output.Write(0x70); + output.Write(0x03); // Protocol Level (3) + } + var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags connectFlags.Write(false); // Reserved connectFlags.Write(packet.CleanSession); @@ -408,13 +431,17 @@ namespace MQTTnet.Core.Serializer } } - private static Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { var connectAcknowledgeFlags = new ByteWriter(); - connectAcknowledgeFlags.Write(packet.IsSessionPresent); + if (ProtocolVersion == MqttProtocolVersion.V311) + { + connectAcknowledgeFlags.Write(packet.IsSessionPresent); + } + output.Write(connectAcknowledgeFlags); output.Write((byte)packet.ConnectReturnCode); @@ -423,6 +450,17 @@ namespace MQTTnet.Core.Serializer } } + private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination) + { + using (var output = new MqttPacketWriter()) + { + output.Write(packet.PacketIdentifier); + + output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02); + await output.WriteToAsync(destination); + } + } + private static Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination) { return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination); @@ -495,17 +533,6 @@ namespace MQTTnet.Core.Serializer } } - private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination) - { - using (var output = new MqttPacketWriter()) - { - output.Write(packet.PacketIdentifier); - - output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02); - await output.WriteToAsync(destination); - } - } - private static Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) diff --git a/MQTTnet.Core/Serializer/MqttPacketWriter.cs b/MQTTnet.Core/Serializer/MqttPacketWriter.cs index 4e2a3cb..778f52e 100644 --- a/MQTTnet.Core/Serializer/MqttPacketWriter.cs +++ b/MQTTnet.Core/Serializer/MqttPacketWriter.cs @@ -9,13 +9,13 @@ namespace MQTTnet.Core.Serializer { public sealed class MqttPacketWriter : IDisposable { - private readonly MemoryStream _buffer = new MemoryStream(512); + private readonly MemoryStream _buffer = new MemoryStream(1024); public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0) { - var fixedHeader = (byte)((byte)packetType << 4); + var fixedHeader = (int)packetType << 4; fixedHeader |= flags; - InjectFixedHeader(fixedHeader); + InjectFixedHeader((byte)fixedHeader); } public void Write(byte value) diff --git a/MQTTnet.Core/Serializer/MqttProtocolVersion.cs b/MQTTnet.Core/Serializer/MqttProtocolVersion.cs new file mode 100644 index 0000000..6b8814a --- /dev/null +++ b/MQTTnet.Core/Serializer/MqttProtocolVersion.cs @@ -0,0 +1,8 @@ +namespace MQTTnet.Core.Serializer +{ + public enum MqttProtocolVersion + { + V311, + V310 + } +} diff --git a/MQTTnet.Core/Server/ConnectedMqttClient.cs b/MQTTnet.Core/Server/ConnectedMqttClient.cs new file mode 100644 index 0000000..d363351 --- /dev/null +++ b/MQTTnet.Core/Server/ConnectedMqttClient.cs @@ -0,0 +1,11 @@ +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Core.Server +{ + public class ConnectedMqttClient + { + public string ClientId { get; set; } + + public MqttProtocolVersion ProtocolVersion { get; set; } + } +} diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs index 0b91c26..7a75624 100644 --- a/MQTTnet.Core/Server/IMqttServer.cs +++ b/MQTTnet.Core/Server/IMqttServer.cs @@ -9,7 +9,7 @@ namespace MQTTnet.Core.Server event EventHandler ApplicationMessageReceived; event EventHandler ClientConnected; - IList GetConnectedClients(); + IList GetConnectedClients(); void InjectClient(string identifier, IMqttCommunicationAdapter adapter); void Publish(MqttApplicationMessage applicationMessage); void Start(); diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index f84b847..bd99fe3 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -21,7 +21,6 @@ namespace MQTTnet.Core.Server private readonly MqttServerOptions _options; private CancellationTokenSource _cancellationTokenSource; - private IMqttCommunicationAdapter _adapter; private string _identifier; private MqttApplicationMessage _willApplicationMessage; @@ -36,7 +35,9 @@ namespace MQTTnet.Core.Server public string ClientId { get; } - public bool IsConnected => _adapter != null; + public bool IsConnected => Adapter != null; + + public IMqttCommunicationAdapter Adapter { get; private set; } public async Task RunAsync(string identifier, MqttApplicationMessage willApplicationMessage, IMqttCommunicationAdapter adapter) { @@ -47,7 +48,7 @@ namespace MQTTnet.Core.Server try { _identifier = identifier; - _adapter = adapter; + Adapter = adapter; _cancellationTokenSource = new CancellationTokenSource(); _messageQueue.Start(adapter); @@ -73,7 +74,7 @@ namespace MQTTnet.Core.Server _messageQueue.Stop(); _cancellationTokenSource.Cancel(); - _adapter = null; + Adapter = null; MqttTrace.Information(nameof(MqttClientSession), $"Client '{_identifier}': Disconnected."); } @@ -102,12 +103,12 @@ namespace MQTTnet.Core.Server { if (packet is MqttSubscribePacket subscribePacket) { - return _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout); } if (packet is MqttUnsubscribePacket unsubscribePacket) { - return _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout); } if (packet is MqttPublishPacket publishPacket) @@ -122,7 +123,7 @@ namespace MQTTnet.Core.Server if (packet is MqttPubRecPacket pubRecPacket) { - return _adapter.SendPacketAsync(pubRecPacket.CreateResponse(), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(pubRecPacket.CreateResponse(), _options.DefaultCommunicationTimeout); } if (packet is MqttPubAckPacket || packet is MqttPubCompPacket) @@ -133,7 +134,7 @@ namespace MQTTnet.Core.Server if (packet is MqttPingReqPacket) { - return _adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout); } if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) @@ -159,7 +160,7 @@ namespace MQTTnet.Core.Server if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { _publishPacketReceivedCallback(this, publishPacket); - return _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) @@ -172,7 +173,7 @@ namespace MQTTnet.Core.Server _publishPacketReceivedCallback(this, publishPacket); - return _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } throw new MqttCommunicationException("Received a not supported QoS level."); @@ -185,7 +186,7 @@ namespace MQTTnet.Core.Server _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - return _adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } } } diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index f78be0a..6165143 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -34,6 +34,9 @@ namespace MQTTnet.Core.Server throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); } + // Switch to the required protocol version before sending any response. + eventArgs.ClientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion; + var connectReturnCode = ValidateConnection(connectPacket); if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { @@ -73,11 +76,15 @@ namespace MQTTnet.Core.Server } } - public IList GetConnectedClients() + public IList GetConnectedClients() { lock (_syncRoot) { - return _clientSessions.Where(s => s.Value.IsConnected).Select(s => s.Key).ToList(); + return _clientSessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient + { + ClientId = s.Value.ClientId, + ProtocolVersion = s.Value.Adapter.PacketSerializer.ProtocolVersion + }).ToList(); } } diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index 98543b6..231cfcf 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -25,7 +25,7 @@ namespace MQTTnet.Core.Server _clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); } - public IList GetConnectedClients() + public IList GetConnectedClients() { return _clientSessionsManager.GetConnectedClients(); } diff --git a/MQTTnet.sln b/MQTTnet.sln index 697f504..c17884c 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26430.15 +VisualStudioVersion = 15.0.26430.16 MinimumVisualStudioVersion = 10.0.40219.1 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" EndProject @@ -27,6 +27,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1 Build\MQTTnet.nuspec = Build\MQTTnet.nuspec EndProjectSection EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{B3F60ECB-45BA-4C66-8903-8BB89CA67998}" + ProjectSection(SolutionItems) = preProject + README.md = README.md + EndProjectSection +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU diff --git a/README.md b/README.md index 5df9d0f..4668d6c 100644 --- a/README.md +++ b/README.md @@ -5,40 +5,47 @@ [![NuGet Badge](https://buildstats.info/nuget/MQTTnet)](https://www.nuget.org/packages/MQTTnet) # MQTTnet -MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server. The implementation is based on the documentation from http://mqtt.org/. +MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/. -## Features -* MQTT client included -* MQTT server (broker) included -* TLS 1.2 support for client and server (but not UWP servers) +# Features + +## General * Async support -* Rx support (via another project) -* List of connected clients available (server only) +* TLS 1.2 support for client and server (but not UWP servers) * Extensible communication channels (i.e. In-Memory, TCP, TCP+SSL, WebSockets (not included in this project)) -* Server is able to publish its own messages (no loopback client required) +* Interfaces included for mocking and testing +* Lightweight (only the low level implementation of MQTT, no overhead) * Access to internal trace messages -* Extensible client credential validation (server only) * Unit tested (50+ tests) -* Lightweight (only the low level implementation of MQTT, no overhead) -* Interfaces included for mocking and testing -## Supported frameworks +## Client +* Rx support (via another project) + +## Server (broker) +* List of connected clients available +* Supports connected clients with different protocol versions at the same time +* Able to publish its own messages (no loopback client required) +* Able to receive every messages (no loopback client required) +* Extensible client credential validation + +# Supported frameworks * .NET Standard 1.3+ * .NET Core 1.1+ * .NET Core App 1.1+ * .NET Framework 4.5.2+ (x86, x64, AnyCPU) * Universal Windows (UWP) 10.0.10240+ (x86, x64, ARM, AnyCPU) -## Supported MQTT versions +# Supported MQTT versions * 3.1.1 +* 3.1.0 -## Nuget +# Nuget This library is available as a nuget package: https://www.nuget.org/packages/MQTTnet/ -## Contributions +# Contributions If you want to contribute to this project just create a pull request. -## References +# References This library is used in the following projects: * MQTT Client Rx (Wrapper for Reactive Extensions, https://github.com/1iveowl/MQTTClient.rx) @@ -46,8 +53,8 @@ This library is used in the following projects: If you use this library and want to see your project here please let me know. -# MqttClient -## Example +# Examples +## MqttClient ```csharp var options = new MqttClientOptions @@ -119,9 +126,7 @@ while (true) } ``` -# MqttServer - -## Example +## MqttServer ```csharp var options = new MqttServerOptions diff --git a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj index 62ddb72..88bf8f2 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj +++ b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj @@ -86,7 +86,7 @@ - + diff --git a/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs similarity index 92% rename from Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs rename to Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index 4b458ed..c03fa7a 100644 --- a/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -12,8 +12,23 @@ using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Tests { [TestClass] - public class DefaultMqttV311PacketSerializerTests + public class MqttPacketSerializerTests { + [TestMethod] + public void SerializeV310_MqttConnectPacket() + { + var p = new MqttConnectPacket + { + ClientId = "XYZ", + Password = "PASS", + Username = "USER", + KeepAlivePeriod = 123, + CleanSession = true + }; + + SerializeAndCompare(p, "EB0ABE1RSXNkcAPCAHsAA1hZWgAEVVNFUgAEUEFTUw==", MqttProtocolVersion.V310); + } + [TestMethod] public void SerializeV311_MqttConnectPacket() { @@ -96,6 +111,17 @@ namespace MQTTnet.Core.Tests SerializeAndCompare(p, "IAIBBQ=="); } + [TestMethod] + public void SerializeV310_MqttConnAckPacket() + { + var p = new MqttConnAckPacket + { + ConnectReturnCode = MqttConnectReturnCode.ConnectionAccepted + }; + + SerializeAndCompare(p, "IAIAAA==", MqttProtocolVersion.V310); + } + [TestMethod] public void DeserializeV311_MqttConnAckPacket() { @@ -403,9 +429,9 @@ namespace MQTTnet.Core.Tests } } - private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value) + private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) { - var serializer = new MqttV311PacketSerializer(); + var serializer = new MqttPacketSerializer { ProtocolVersion = protocolVersion }; var channel = new TestChannel(); serializer.SerializeAsync(packet, channel).Wait(); var buffer = channel.ToArray(); @@ -415,7 +441,7 @@ namespace MQTTnet.Core.Tests private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value) { - var serializer = new MqttV311PacketSerializer(); + var serializer = new MqttPacketSerializer(); var channel1 = new TestChannel(); serializer.SerializeAsync(packet, channel1).Wait(); diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index 35bd92d..c8c6da6 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -4,6 +4,7 @@ using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; using MQTTnet.Core.Packets; +using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Tests { @@ -13,6 +14,8 @@ namespace MQTTnet.Core.Tests public TestMqttCommunicationAdapter Partner { get; set; } + public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer(); + public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { await Task.FromResult(0); From c90bdbfb9352ba79da03ce690ea922fdf2d5d5ae Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 17 Aug 2017 22:49:16 +0200 Subject: [PATCH 3/3] Fix issue with connection management. --- Build/MQTTnet.nuspec | 1 + .../Implementations/MqttTcpChannel.cs | 14 +- .../Implementations/MqttTcpChannel.cs | 14 +- .../Implementations/MqttTcpChannel.cs | 12 +- .../MqttChannelCommunicationAdapter.cs | 63 ++++---- MQTTnet.Core/Client/MqttClient.cs | 140 +++++++++++------- .../Exceptions/MqttCommunicationException.cs | 7 +- 7 files changed, 159 insertions(+), 92 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index bb81a8d..a905b00 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -14,6 +14,7 @@ * [Server] Providing the used protocol version of connected clients * [Client] Added support for protocol version 3.1.0 * [Core] Several minor performance improvements +* [Core] Fixed an issue with connection management (Thanks to wuzhenda; Zuendelmeister) Copyright Christian Kratky 2016-2017 MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware Arduino diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index 3a43a3e..c9c080d 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -12,12 +12,11 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { - private readonly Socket _socket; + private Socket _socket; private SslStream _sslStream; public MqttTcpChannel() { - _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); } public MqttTcpChannel(Socket socket, SslStream sslStream) @@ -31,6 +30,11 @@ namespace MQTTnet.Implementations if (options == null) throw new ArgumentNullException(nameof(options)); try { + if (_socket == null) + { + _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + } + await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null); if (options.TlsOptions.UseTls) @@ -49,8 +53,7 @@ namespace MQTTnet.Implementations { try { - _sslStream.Dispose(); - _socket.Dispose(); + Dispose(); return Task.FromResult(0); } catch (SocketException exception) @@ -108,6 +111,9 @@ namespace MQTTnet.Implementations { _socket?.Dispose(); _sslStream?.Dispose(); + + _socket = null; + _sslStream = null; } private static X509CertificateCollection LoadCertificates(MqttClientOptions options) diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index e78cd98..a4247b0 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -12,12 +12,11 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { - private readonly Socket _socket; + private Socket _socket; private SslStream _sslStream; public MqttTcpChannel() { - _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); } public MqttTcpChannel(Socket socket, SslStream sslStream) @@ -31,6 +30,11 @@ namespace MQTTnet.Implementations if (options == null) throw new ArgumentNullException(nameof(options)); try { + if (_socket == null) + { + _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + } + await _socket.ConnectAsync(options.Server, options.GetPort()); if (options.TlsOptions.UseTls) @@ -49,8 +53,7 @@ namespace MQTTnet.Implementations { try { - _sslStream.Dispose(); - _socket.Dispose(); + Dispose(); return Task.FromResult(0); } catch (SocketException exception) @@ -101,6 +104,9 @@ namespace MQTTnet.Implementations { _socket?.Dispose(); _sslStream?.Dispose(); + + _socket = null; + _sslStream = null; } private static X509CertificateCollection LoadCertificates(MqttClientOptions options) diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs index 99681b7..482ce32 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs @@ -15,11 +15,10 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { - private readonly StreamSocket _socket; + private StreamSocket _socket; public MqttTcpChannel() { - _socket = new StreamSocket(); } public MqttTcpChannel(StreamSocket socket) @@ -32,6 +31,11 @@ namespace MQTTnet.Implementations if (options == null) throw new ArgumentNullException(nameof(options)); try { + if (_socket == null) + { + _socket = new StreamSocket(); + } + if (!options.TlsOptions.UseTls) { await _socket.ConnectAsync(new HostName(options.Server), options.GetPort().ToString()); @@ -59,7 +63,7 @@ namespace MQTTnet.Implementations { try { - _socket.Dispose(); + Dispose(); return Task.FromResult(0); } catch (SocketException exception) @@ -100,6 +104,8 @@ namespace MQTTnet.Implementations public void Dispose() { _socket?.Dispose(); + + _socket = null; } private static Certificate LoadCertificate(MqttClientOptions options) diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index d504d89..04b5ca7 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -23,11 +23,7 @@ namespace MQTTnet.Core.Adapter public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { - var task = _channel.ConnectAsync(options); - if (await Task.WhenAny(Task.Delay(timeout), task) != task) - { - throw new MqttCommunicationTimedOutException(); - } + await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout); } public async Task DisconnectAsync() @@ -39,21 +35,7 @@ namespace MQTTnet.Core.Adapter { MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); - bool hasTimeout; - try - { - var task = PacketSerializer.SerializeAsync(packet, _channel); - hasTimeout = await Task.WhenAny(Task.Delay(timeout), task) != task; - } - catch (Exception exception) - { - throw new MqttCommunicationException(exception); - } - - if (hasTimeout) - { - throw new MqttCommunicationTimedOutException(); - } + await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout); } public async Task ReceivePacketAsync(TimeSpan timeout) @@ -61,16 +43,7 @@ namespace MQTTnet.Core.Adapter MqttBasePacket packet; if (timeout > TimeSpan.Zero) { - var workerTask = PacketSerializer.DeserializeAsync(_channel); - var timeoutTask = Task.Delay(timeout); - var hasTimeout = Task.WhenAny(timeoutTask, workerTask) == timeoutTask; - - if (hasTimeout) - { - throw new MqttCommunicationTimedOutException(); - } - - packet = workerTask.Result; + packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout); } else { @@ -85,5 +58,35 @@ namespace MQTTnet.Core.Adapter MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"RX <<< {packet}"); return packet; } + + private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) + { + var timeoutTask = Task.Delay(timeout); + if (await Task.WhenAny(timeoutTask, task) == timeoutTask) + { + throw new MqttCommunicationTimedOutException(); + } + + if (task.IsFaulted) + { + throw new MqttCommunicationException(task.Exception); + } + + return task.Result; + } + + private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) + { + var timeoutTask = Task.Delay(timeout); + if (await Task.WhenAny(timeoutTask, task) == timeoutTask) + { + throw new MqttCommunicationTimedOutException(); + } + + if (task.IsFaulted) + { + throw new MqttCommunicationException(task.Exception); + } + } } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 70e8c76..3481cbe 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -20,6 +20,7 @@ namespace MQTTnet.Core.Client private readonly MqttClientOptions _options; private readonly IMqttCommunicationAdapter _adapter; + private bool _disconnectedEventSuspended; private int _latestPacketIdentifier; private CancellationTokenSource _cancellationTokenSource; @@ -48,49 +49,64 @@ namespace MQTTnet.Core.Client throw new MqttProtocolViolationException("It is not allowed to connect with a server after the connection is established."); } - var connectPacket = new MqttConnectPacket + try { - ClientId = _options.ClientId, - Username = _options.UserName, - Password = _options.Password, - CleanSession = _options.CleanSession, - KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, - WillMessage = willApplicationMessage - }; + _disconnectedEventSuspended = false; - await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout); - MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); + await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout); - _cancellationTokenSource = new CancellationTokenSource(); - _latestPacketIdentifier = 0; - _packetDispatcher.Reset(); - IsConnected = true; + MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => ReceivePackets(_cancellationTokenSource.Token), _cancellationTokenSource.Token); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + var connectPacket = new MqttConnectPacket + { + ClientId = _options.ClientId, + Username = _options.UserName, + Password = _options.Password, + CleanSession = _options.CleanSession, + KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, + WillMessage = willApplicationMessage + }; + + _cancellationTokenSource = new CancellationTokenSource(); + _latestPacketIdentifier = 0; + _packetDispatcher.Reset(); + + StartReceivePackets(); + + var response = await SendAndReceiveAsync(connectPacket); + if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) + { + await DisconnectInternalAsync(); + throw new MqttConnectingFailedException(response.ConnectReturnCode); + } - var response = await SendAndReceiveAsync(connectPacket); - if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) - { - await DisconnectAsync(); - throw new MqttConnectingFailedException(response.ConnectReturnCode); - } + if (_options.KeepAlivePeriod != TimeSpan.Zero) + { + StartSendKeepAliveMessages(); + } + + MqttTrace.Verbose(nameof(MqttClient), "MQTT connection with server established."); - if (_options.KeepAlivePeriod != TimeSpan.Zero) + IsConnected = true; + Connected?.Invoke(this, EventArgs.Empty); + } + catch (Exception) { -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => SendKeepAliveMessagesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + await DisconnectInternalAsync(); + throw; } - - Connected?.Invoke(this, EventArgs.Empty); } public async Task DisconnectAsync() { - await SendAsync(new MqttDisconnectPacket()); - await DisconnectInternalAsync(); + try + { + await SendAsync(new MqttDisconnectPacket()); + } + finally + { + await DisconnectInternalAsync(); + } } public Task> SubscribeAsync(params TopicFilter[] topicFilters) @@ -181,8 +197,9 @@ namespace MQTTnet.Core.Client { await _adapter.DisconnectAsync(); } - catch + catch (Exception exception) { + MqttTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting."); } finally { @@ -191,7 +208,12 @@ namespace MQTTnet.Core.Client _cancellationTokenSource = null; IsConnected = false; - Disconnected?.Invoke(this, EventArgs.Empty); + + if (!_disconnectedEventSuspended) + { + _disconnectedEventSuspended = true; + Disconnected?.Invoke(this, EventArgs.Empty); + } } } @@ -239,7 +261,7 @@ namespace MQTTnet.Core.Client } catch (Exception exception) { - MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); + MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); } } @@ -278,7 +300,7 @@ namespace MQTTnet.Core.Client { _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - + await SendAsync(pubRelPacket.CreateResponse()); } @@ -300,15 +322,12 @@ namespace MQTTnet.Core.Client var pi1 = requestPacket as IMqttPacketWithIdentifier; var pi2 = p as IMqttPacketWithIdentifier; - if (pi1 != null && pi2 != null) + if (pi1 == null || pi2 == null) { - if (pi1.PacketIdentifier != pi2.PacketIdentifier) - { - return false; - } + return true; } - return true; + return pi1.PacketIdentifier == pi2.PacketIdentifier; } await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout); @@ -335,15 +354,16 @@ namespace MQTTnet.Core.Client catch (MqttCommunicationException exception) { MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); + await DisconnectInternalAsync(); } catch (Exception exception) { MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets."); + await DisconnectInternalAsync(); } finally { MqttTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); - await DisconnectInternalAsync(); } } @@ -354,27 +374,47 @@ namespace MQTTnet.Core.Client { while (!cancellationToken.IsCancellationRequested) { - var mqttPacket = await _adapter.ReceivePacketAsync(TimeSpan.Zero); - MqttTrace.Information(nameof(MqttClient), $"Received <<< {mqttPacket}"); + var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero); + MqttTrace.Information(nameof(MqttClient), $"Received <<< {packet}"); -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => ProcessReceivedPacketAsync(mqttPacket), cancellationToken); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + StartProcessReceivedPacket(packet, cancellationToken); } } catch (MqttCommunicationException exception) { - MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); + MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); + await DisconnectInternalAsync(); } catch (Exception exception) { - MqttTrace.Error(nameof(MqttClient), exception, "Error while receiving packets."); + MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); + await DisconnectInternalAsync(); } finally { MqttTrace.Information(nameof(MqttClient), "Stopped receiving packets."); - await DisconnectInternalAsync(); } } + + private void StartProcessReceivedPacket(MqttBasePacket packet, CancellationToken cancellationToken) + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(() => ProcessReceivedPacketAsync(packet), cancellationToken); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + + private void StartReceivePackets() + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(() => ReceivePackets(_cancellationTokenSource.Token), _cancellationTokenSource.Token); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + + private void StartSendKeepAliveMessages() + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(() => SendKeepAliveMessagesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } } } \ No newline at end of file diff --git a/MQTTnet.Core/Exceptions/MqttCommunicationException.cs b/MQTTnet.Core/Exceptions/MqttCommunicationException.cs index 8b471a0..2fc578e 100644 --- a/MQTTnet.Core/Exceptions/MqttCommunicationException.cs +++ b/MQTTnet.Core/Exceptions/MqttCommunicationException.cs @@ -4,7 +4,7 @@ namespace MQTTnet.Core.Exceptions { public class MqttCommunicationException : Exception { - public MqttCommunicationException() + protected MqttCommunicationException() { } @@ -17,5 +17,10 @@ namespace MQTTnet.Core.Exceptions : base(message) { } + + public MqttCommunicationException(string message, Exception innerException) + : base(message, innerException) + { + } } }