From 59b0b79a3235feb0d9a99a56ab1680d1bb9f481c Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Mon, 7 Aug 2017 20:28:49 +0200 Subject: [PATCH] Fix wrong QoS level handling for server. --- Build/MQTTnet.nuspec | 8 ++------ .../Implementations/MqttServerAdapter.cs | 4 ++-- Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs | 2 +- .../MQTTnet.NetFramework/Properties/AssemblyInfo.cs | 4 ++-- .../Implementations/MqttServerAdapter.cs | 4 ++-- .../MQTTnet.NetStandard/MQTTnet.Netstandard.csproj | 4 ++-- Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs | 2 +- .../Implementations/MqttServerAdapter.cs | 2 +- Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs | 2 +- .../MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs | 4 ++-- MQTTnet.Core/Client/MqttClient.cs | 2 +- MQTTnet.Core/MQTTnet.Core.csproj | 4 ++-- ...11PacketSerializer.cs => MqttV311PacketSerializer.cs} | 8 ++++---- MQTTnet.Core/Server/MqttClientSession.cs | 9 ++++++--- .../DefaultMqttV311PacketSerializerTests.cs | 4 ++-- 15 files changed, 31 insertions(+), 32 deletions(-) rename MQTTnet.Core/Serializer/{DefaultMqttV311PacketSerializer.cs => MqttV311PacketSerializer.cs} (99%) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 3687879..89dcc91 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -2,7 +2,7 @@ MQTTnet - 2.1.5 + 2.1.5.1 Christian Kratky Christian Kratky https://github.com/chkr1011/MQTTnet/blob/master/LICENSE @@ -10,11 +10,7 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * [MqttServer] Added support for publishing application messages -* [Core] Fixed QoS level 2 handling -* [Core] Performance optimizations -* [MqttClient/MqttServer] Errors while handline application messages are now catched and traced -* [MqttClient/MqttServer] Added interfaces + * [Server] Fixed wrong handling of QoS levels Copyright Christian Kratky 2016-2017 MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs index cbfcf66..694eec7 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs @@ -86,7 +86,7 @@ namespace MQTTnet.Implementations try { var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) when (!(exception is ObjectDisposedException)) @@ -107,7 +107,7 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs index ac12f97..b85edab 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs b/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs index 4d0a57f..486eca7 100644 --- a/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs +++ b/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs @@ -11,5 +11,5 @@ using System.Runtime.InteropServices; [assembly: AssemblyCulture("")] [assembly: ComVisible(false)] [assembly: Guid("a480ef90-0eaa-4d9a-b271-47a9c47f6f7d")] -[assembly: AssemblyVersion("2.1.4.0")] -[assembly: AssemblyFileVersion("2.1.4.0")] \ No newline at end of file +[assembly: AssemblyVersion("2.1.5.1")] +[assembly: AssemblyFileVersion("2.1.5.1")] \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index 863d3e7..7c2561d 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -84,7 +84,7 @@ namespace MQTTnet.Implementations try { var clientSocket = await _defaultEndpointSocket.AcceptAsync(); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) @@ -105,7 +105,7 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj index 2a60e55..1a1249e 100644 --- a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj +++ b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj @@ -4,8 +4,8 @@ netstandard1.3 MQTTnet MQTTnet - 2.1.4.0 - 2.1.4.0 + 2.1.5.1 + 2.1.5.1 0.0.0.0 diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 6073583..c5f409f 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); } } } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs index ea251ca..b0bfcbe 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs @@ -52,7 +52,7 @@ namespace MQTTnet.Implementations { try { - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index ac12f97..b85edab 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs b/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs index dee61f7..05204eb 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs @@ -10,5 +10,5 @@ using System.Runtime.InteropServices; [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] [assembly: ComVisible(false)] -[assembly: AssemblyVersion("2.1.4.0")] -[assembly: AssemblyFileVersion("2.1.4.0")] \ No newline at end of file +[assembly: AssemblyVersion("2.1.5.1")] +[assembly: AssemblyFileVersion("2.1.5.1")] \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 384de1f..f90adab 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -267,7 +267,7 @@ namespace MQTTnet.Core.Client return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } - throw new InvalidOperationException(); + throw new MqttCommunicationException("Received a not supported QoS level."); } private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index 4d12864..c938551 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -16,8 +16,8 @@ - 2.1.4.0 - 2.1.4.0 + 2.1.5.1 + 2.1.5.1 diff --git a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs b/MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs similarity index 99% rename from MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs rename to MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs index 4100b6c..523a2dc 100644 --- a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs @@ -9,8 +9,10 @@ using MQTTnet.Core.Protocol; namespace MQTTnet.Core.Serializer { - public sealed class DefaultMqttV311PacketSerializer : IMqttPacketSerializer + public sealed class MqttV311PacketSerializer : IMqttPacketSerializer { + private static readonly byte[] MqttV311Prefix = Encoding.UTF8.GetBytes("MQTT"); + public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination) { if (packet == null) throw new ArgumentNullException(nameof(packet)); @@ -351,8 +353,6 @@ namespace MQTTnet.Core.Serializer } } - private static readonly byte[] MqttPrefix = Encoding.UTF8.GetBytes("MQTT"); - private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) { ValidateConnectPacket(packet); @@ -361,7 +361,7 @@ namespace MQTTnet.Core.Serializer { // Write variable header output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name - output.Write(MqttPrefix); + output.Write(MqttV311Prefix); output.Write(0x04); // 3.1.2.2 Protocol Level var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index a3919f9..f84b847 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -153,13 +153,16 @@ namespace MQTTnet.Core.Server if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { _publishPacketReceivedCallback(this, publishPacket); + return Task.FromResult(0); } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { _publishPacketReceivedCallback(this, publishPacket); return _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] lock (_unacknowledgedPublishPackets) @@ -172,7 +175,7 @@ namespace MQTTnet.Core.Server return _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } - throw new MqttCommunicationException("Received not supported QoS level."); + throw new MqttCommunicationException("Received a not supported QoS level."); } private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket) diff --git a/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs index 62d0f30..4b458ed 100644 --- a/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs @@ -405,7 +405,7 @@ namespace MQTTnet.Core.Tests private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value) { - var serializer = new DefaultMqttV311PacketSerializer(); + var serializer = new MqttV311PacketSerializer(); var channel = new TestChannel(); serializer.SerializeAsync(packet, channel).Wait(); var buffer = channel.ToArray(); @@ -415,7 +415,7 @@ namespace MQTTnet.Core.Tests private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value) { - var serializer = new DefaultMqttV311PacketSerializer(); + var serializer = new MqttV311PacketSerializer(); var channel1 = new TestChannel(); serializer.SerializeAsync(packet, channel1).Wait();