From e021d5a56b479d516469673a2fa334a98fe3098c Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 28 Jul 2017 22:18:39 +0200 Subject: [PATCH 1/8] Handle exceptions whicha re thrown while processing application messages. --- MQTTnet.Core/Client/MqttClient.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index df3c61f..3cec8d8 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -238,7 +238,15 @@ namespace MQTTnet.Core.Client } var applicationMessage = publishPacket.ToApplicationMessage(); - ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage)); + + try + { + ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage)); + } + catch (Exception exception) + { + MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); + } } private Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket) From 0b0239bcf150254032392b958ef0b0087a280a0b Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 28 Jul 2017 22:21:48 +0200 Subject: [PATCH 2/8] Performance refactoring. --- MQTTnet.Core/Serializer/ByteReader.cs | 4 +- .../DefaultMqttV311PacketSerializer.cs | 102 ++++++++---------- MQTTnet.Core/Serializer/MqttPacketWriter.cs | 66 ++++++------ 3 files changed, 80 insertions(+), 92 deletions(-) diff --git a/MQTTnet.Core/Serializer/ByteReader.cs b/MQTTnet.Core/Serializer/ByteReader.cs index 935c777..ce7f709 100644 --- a/MQTTnet.Core/Serializer/ByteReader.cs +++ b/MQTTnet.Core/Serializer/ByteReader.cs @@ -24,7 +24,7 @@ namespace MQTTnet.Core.Serializer return result; } - public byte Read(int count) + public int Read(int count) { if (_index + count > 8) { @@ -42,7 +42,7 @@ namespace MQTTnet.Core.Serializer _index++; } - return (byte)result; + return result; } } } diff --git a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs b/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs index ffac41b..4100b6c 100644 --- a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs +++ b/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs @@ -16,86 +16,72 @@ namespace MQTTnet.Core.Serializer if (packet == null) throw new ArgumentNullException(nameof(packet)); if (destination == null) throw new ArgumentNullException(nameof(destination)); - var connectPacket = packet as MqttConnectPacket; - if (connectPacket != null) + if (packet is MqttConnectPacket connectPacket) { return SerializeAsync(connectPacket, destination); } - var connAckPacket = packet as MqttConnAckPacket; - if (connAckPacket != null) + if (packet is MqttConnAckPacket connAckPacket) { return SerializeAsync(connAckPacket, destination); } - var disconnectPacket = packet as MqttDisconnectPacket; - if (disconnectPacket != null) + if (packet is MqttDisconnectPacket disconnectPacket) { return SerializeAsync(disconnectPacket, destination); } - var pingReqPacket = packet as MqttPingReqPacket; - if (pingReqPacket != null) + if (packet is MqttPingReqPacket pingReqPacket) { return SerializeAsync(pingReqPacket, destination); } - var pingRespPacket = packet as MqttPingRespPacket; - if (pingRespPacket != null) + if (packet is MqttPingRespPacket pingRespPacket) { return SerializeAsync(pingRespPacket, destination); } - var publishPacket = packet as MqttPublishPacket; - if (publishPacket != null) + if (packet is MqttPublishPacket publishPacket) { return SerializeAsync(publishPacket, destination); } - var pubAckPacket = packet as MqttPubAckPacket; - if (pubAckPacket != null) + if (packet is MqttPubAckPacket pubAckPacket) { return SerializeAsync(pubAckPacket, destination); } - var pubRecPacket = packet as MqttPubRecPacket; - if (pubRecPacket != null) + if (packet is MqttPubRecPacket pubRecPacket) { return SerializeAsync(pubRecPacket, destination); } - var pubRelPacket = packet as MqttPubRelPacket; - if (pubRelPacket != null) + if (packet is MqttPubRelPacket pubRelPacket) { return SerializeAsync(pubRelPacket, destination); } - var pubCompPacket = packet as MqttPubCompPacket; - if (pubCompPacket != null) + if (packet is MqttPubCompPacket pubCompPacket) { return SerializeAsync(pubCompPacket, destination); } - var subscribePacket = packet as MqttSubscribePacket; - if (subscribePacket != null) + if (packet is MqttSubscribePacket subscribePacket) { return SerializeAsync(subscribePacket, destination); } - var subAckPacket = packet as MqttSubAckPacket; - if (subAckPacket != null) + if (packet is MqttSubAckPacket subAckPacket) { return SerializeAsync(subAckPacket, destination); } - var unsubscribePacket = packet as MqttUnsubscribePacket; - if (unsubscribePacket != null) + if (packet is MqttUnsubscribePacket unsubscribePacket) { return SerializeAsync(unsubscribePacket, destination); } - var unsubAckPacket = packet as MqttUnsubAckPacket; - if (unsubAckPacket != null) + if (packet is MqttUnsubAckPacket unsubAckPacket) { return SerializeAsync(unsubAckPacket, destination); } @@ -206,7 +192,7 @@ namespace MQTTnet.Core.Serializer } } - private async Task DeserializeUnsubscribeAsync(MqttPacketReader reader) + private static async Task DeserializeUnsubscribeAsync(MqttPacketReader reader) { var packet = new MqttUnsubscribePacket { @@ -221,7 +207,7 @@ namespace MQTTnet.Core.Serializer return packet; } - private async Task DeserializeSubscribeAsync(MqttPacketReader reader) + private static async Task DeserializeSubscribeAsync(MqttPacketReader reader) { var packet = new MqttSubscribePacket { @@ -238,7 +224,7 @@ namespace MQTTnet.Core.Serializer return packet; } - private async Task DeserializePublishAsync(MqttPacketReader reader) + private static async Task DeserializePublishAsync(MqttPacketReader reader) { var fixedHeader = new ByteReader(reader.FixedHeader); var retain = fixedHeader.Read(); @@ -266,13 +252,10 @@ namespace MQTTnet.Core.Serializer return packet; } - private async Task DeserializeConnectAsync(MqttPacketReader reader) + private static async Task DeserializeConnectAsync(MqttPacketReader reader) { - var packet = new MqttConnectPacket(); - - await reader.ReadRemainingDataByteAsync(); - await reader.ReadRemainingDataByteAsync(); - + await reader.ReadRemainingDataAsync(2); // Skip 2 bytes + var protocolName = await reader.ReadRemainingDataAsync(4); if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT") @@ -285,7 +268,12 @@ namespace MQTTnet.Core.Serializer var connectFlagsReader = new ByteReader(connectFlags); connectFlagsReader.Read(); // Reserved. - packet.CleanSession = connectFlagsReader.Read(); + + var packet = new MqttConnectPacket + { + CleanSession = connectFlagsReader.Read() + }; + var willFlag = connectFlagsReader.Read(); var willQoS = connectFlagsReader.Read(2); var willRetain = connectFlagsReader.Read(); @@ -318,7 +306,7 @@ namespace MQTTnet.Core.Serializer return packet; } - private async Task DeserializeSubAck(MqttPacketReader reader) + private static async Task DeserializeSubAck(MqttPacketReader reader) { var packet = new MqttSubAckPacket { @@ -333,7 +321,7 @@ namespace MQTTnet.Core.Serializer return packet; } - private async Task DeserializeConnAck(MqttPacketReader reader) + private static async Task DeserializeConnAck(MqttPacketReader reader) { var variableHeader1 = await reader.ReadRemainingDataByteAsync(); var variableHeader2 = await reader.ReadRemainingDataByteAsync(); @@ -347,7 +335,7 @@ namespace MQTTnet.Core.Serializer return packet; } - private void ValidateConnectPacket(MqttConnectPacket packet) + private static void ValidateConnectPacket(MqttConnectPacket packet) { if (string.IsNullOrEmpty(packet.ClientId) && !packet.CleanSession) { @@ -355,7 +343,7 @@ namespace MQTTnet.Core.Serializer } } - private void ValidatePublishPacket(MqttPublishPacket packet) + private static void ValidatePublishPacket(MqttPublishPacket packet) { if (packet.QualityOfServiceLevel == 0 && packet.Dup) { @@ -365,7 +353,7 @@ namespace MQTTnet.Core.Serializer private static readonly byte[] MqttPrefix = Encoding.UTF8.GetBytes("MQTT"); - private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) { ValidateConnectPacket(packet); @@ -420,7 +408,7 @@ namespace MQTTnet.Core.Serializer } } - private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -435,22 +423,22 @@ namespace MQTTnet.Core.Serializer } } - private Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination) { return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination); } - private Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination) { return SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination); } - private Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination) { return SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination); } - private Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination) { ValidatePublishPacket(packet); @@ -485,7 +473,7 @@ namespace MQTTnet.Core.Serializer } } - private Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -496,7 +484,7 @@ namespace MQTTnet.Core.Serializer } } - private Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -507,7 +495,7 @@ namespace MQTTnet.Core.Serializer } } - private async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination) + private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -518,7 +506,7 @@ namespace MQTTnet.Core.Serializer } } - private Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -529,7 +517,7 @@ namespace MQTTnet.Core.Serializer } } - private Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -549,7 +537,7 @@ namespace MQTTnet.Core.Serializer } } - private Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -568,7 +556,7 @@ namespace MQTTnet.Core.Serializer } } - private Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -587,7 +575,7 @@ namespace MQTTnet.Core.Serializer } } - private Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -598,7 +586,7 @@ namespace MQTTnet.Core.Serializer } } - private Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination) + private static Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { diff --git a/MQTTnet.Core/Serializer/MqttPacketWriter.cs b/MQTTnet.Core/Serializer/MqttPacketWriter.cs index eb9bfcf..4e2a3cb 100644 --- a/MQTTnet.Core/Serializer/MqttPacketWriter.cs +++ b/MQTTnet.Core/Serializer/MqttPacketWriter.cs @@ -11,39 +11,6 @@ namespace MQTTnet.Core.Serializer { private readonly MemoryStream _buffer = new MemoryStream(512); - public void InjectFixedHeader(byte fixedHeader) - { - if (_buffer.Length == 0) - { - Write(fixedHeader); - Write(0); - return; - } - - var backupBuffer = _buffer.ToArray(); - var remainingLength = (int)_buffer.Length; - - _buffer.SetLength(0); - - _buffer.WriteByte(fixedHeader); - - // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. - var x = remainingLength; - do - { - var encodedByte = x % 128; - x = x / 128; - if (x > 0) - { - encodedByte = encodedByte | 128; - } - - _buffer.WriteByte((byte)encodedByte); - } while (x > 0); - - _buffer.Write(backupBuffer, 0, backupBuffer.Length); - } - public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0) { var fixedHeader = (byte)((byte)packetType << 4); @@ -101,5 +68,38 @@ namespace MQTTnet.Core.Serializer { _buffer?.Dispose(); } + + private void InjectFixedHeader(byte fixedHeader) + { + if (_buffer.Length == 0) + { + Write(fixedHeader); + Write(0); + return; + } + + var backupBuffer = _buffer.ToArray(); + var remainingLength = (int)_buffer.Length; + + _buffer.SetLength(0); + + _buffer.WriteByte(fixedHeader); + + // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. + var x = remainingLength; + do + { + var encodedByte = x % 128; + x = x / 128; + if (x > 0) + { + encodedByte = encodedByte | 128; + } + + _buffer.WriteByte((byte)encodedByte); + } while (x > 0); + + _buffer.Write(backupBuffer, 0, backupBuffer.Length); + } } } From 447ca6cabbc9f5512d147ef38873638ff9db053d Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 5 Aug 2017 11:37:52 +0200 Subject: [PATCH 3/8] Add interfaces, optimize performance, fix QoS level 2 --- .../MQTTnet.NetFramework/MqttClientFactory.cs | 2 +- .../MQTTnet.NetFramework/MqttServerFactory.cs | 2 +- .../MQTTnet.NetStandard/MqttClientFactory.cs | 2 +- .../MQTTnet.NetStandard/MqttServerFactory.cs | 2 +- .../MqttClientFactory.cs | 2 +- .../MqttServerFactory.cs | 2 +- MQTTnet.Core/Client/IMqttClient.cs | 24 +++++++ MQTTnet.Core/Client/MqttClient.cs | 47 ++++++------- ...tifier.cs => IMqttPacketWithIdentifier.cs} | 2 +- MQTTnet.Core/Packets/MqttBasePacket.cs | 21 +----- MQTTnet.Core/Packets/MqttBasePublishPacket.cs | 2 +- MQTTnet.Core/Packets/MqttPacketExtensions.cs | 27 ++++++++ MQTTnet.Core/Packets/MqttSubAckPacket.cs | 2 +- MQTTnet.Core/Packets/MqttSubscribePacket.cs | 2 +- MQTTnet.Core/Packets/MqttUnsubAckPacket.cs | 2 +- MQTTnet.Core/Packets/MqttUnsubscribe.cs | 2 +- MQTTnet.Core/Server/IMqttServer.cs | 18 +++++ MQTTnet.Core/Server/MqttClientMessageQueue.cs | 7 +- .../Server/MqttClientPublishPacketContext.cs | 5 +- MQTTnet.Core/Server/MqttClientSession.cs | 68 ++++++++++--------- .../Server/MqttClientSessionsManager.cs | 22 ++++-- .../Server/MqttClientSubscriptionsManager.cs | 45 +++++++----- MQTTnet.Core/Server/MqttServer.cs | 10 ++- ...cs => MqttServerDefaultEndpointOptions.cs} | 2 +- MQTTnet.Core/Server/MqttServerOptions.cs | 4 +- .../MqttServerTlsEndpointOptionsExtensions.cs | 19 ------ .../MainPage.xaml.cs | 2 +- 27 files changed, 198 insertions(+), 147 deletions(-) create mode 100644 MQTTnet.Core/Client/IMqttClient.cs rename MQTTnet.Core/Packets/{IPacketWithPacketIdentifier.cs => IMqttPacketWithIdentifier.cs} (67%) create mode 100644 MQTTnet.Core/Packets/MqttPacketExtensions.cs create mode 100644 MQTTnet.Core/Server/IMqttServer.cs rename MQTTnet.Core/Server/{DefaultEndpointOptions.cs => MqttServerDefaultEndpointOptions.cs} (71%) delete mode 100644 MQTTnet.Core/Server/MqttServerTlsEndpointOptionsExtensions.cs diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs index 7b84239..ac12f97 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet { public class MqttClientFactory { - public MqttClient CreateMqttClient(MqttClientOptions options) + public IMqttClient CreateMqttClient(MqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs index eb7441c..00da03a 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet { public class MqttServerFactory { - public MqttServer CreateMqttServer(MqttServerOptions options) + public IMqttServer CreateMqttServer(MqttServerOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index ac6f611..6073583 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet { public class MqttClientFactory { - public MqttClient CreateMqttClient(MqttClientOptions options) + public IMqttClient CreateMqttClient(MqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs index eb7441c..00da03a 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet { public class MqttServerFactory { - public MqttServer CreateMqttServer(MqttServerOptions options) + public IMqttServer CreateMqttServer(MqttServerOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index 7b84239..ac12f97 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet { public class MqttClientFactory { - public MqttClient CreateMqttClient(MqttClientOptions options) + public IMqttClient CreateMqttClient(MqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs index eb7441c..00da03a 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet { public class MqttServerFactory { - public MqttServer CreateMqttServer(MqttServerOptions options) + public IMqttServer CreateMqttServer(MqttServerOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/MQTTnet.Core/Client/IMqttClient.cs b/MQTTnet.Core/Client/IMqttClient.cs new file mode 100644 index 0000000..0170adf --- /dev/null +++ b/MQTTnet.Core/Client/IMqttClient.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using MQTTnet.Core.Packets; + +namespace MQTTnet.Core.Client +{ + public interface IMqttClient + { + bool IsConnected { get; } + + event EventHandler ApplicationMessageReceived; + event EventHandler Connected; + event EventHandler Disconnected; + + Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null); + Task DisconnectAsync(); + Task PublishAsync(MqttApplicationMessage applicationMessage); + Task> SubscribeAsync(IList topicFilters); + Task> SubscribeAsync(params TopicFilter[] topicFilters); + Task Unsubscribe(IList topicFilters); + Task Unsubscribe(params string[] topicFilters); + } +} \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 3cec8d8..384de1f 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -13,10 +12,9 @@ using MQTTnet.Core.Protocol; namespace MQTTnet.Core.Client { - public class MqttClient + public class MqttClient : IMqttClient { - private readonly ConcurrentDictionary _pendingExactlyOncePublishPackets = new ConcurrentDictionary(); - private readonly HashSet _processedPublishPackets = new HashSet(); + private readonly HashSet _unacknowledgedPublishPackets = new HashSet(); private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); private readonly MqttClientOptions _options; @@ -63,7 +61,6 @@ namespace MQTTnet.Core.Client _cancellationTokenSource = new CancellationTokenSource(); _latestPacketIdentifier = 0; - _processedPublishPackets.Clear(); _packetDispatcher.Reset(); IsConnected = true; @@ -105,6 +102,7 @@ namespace MQTTnet.Core.Client { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); if (!topicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3]."); + ThrowIfNotConnected(); var subscribePacket = new MqttSubscribePacket @@ -154,6 +152,7 @@ namespace MQTTnet.Core.Client if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { + // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] await SendAsync(publishPacket); } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) @@ -164,8 +163,8 @@ namespace MQTTnet.Core.Client else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - await SendAndReceiveAsync(publishPacket); - await SendAsync(publishPacket.CreateResponse()); + var pubRecPacket = await SendAndReceiveAsync(publishPacket); + await SendAndReceiveAsync(pubRecPacket.CreateResponse()); } } @@ -208,14 +207,12 @@ namespace MQTTnet.Core.Client return DisconnectAsync(); } - var publishPacket = mqttPacket as MqttPublishPacket; - if (publishPacket != null) + if (mqttPacket is MqttPublishPacket publishPacket) { return ProcessReceivedPublishPacket(publishPacket); } - var pubRelPacket = mqttPacket as MqttPubRelPacket; - if (pubRelPacket != null) + if (mqttPacket is MqttPubRelPacket pubRelPacket) { return ProcessReceivedPubRelPacket(pubRelPacket); } @@ -232,11 +229,6 @@ namespace MQTTnet.Core.Client private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) { - if (publishPacket.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtMostOnce) - { - _processedPublishPackets.Add(publishPacket.PacketIdentifier); - } - var applicationMessage = publishPacket.ToApplicationMessage(); try @@ -265,7 +257,13 @@ namespace MQTTnet.Core.Client if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { - _pendingExactlyOncePublishPackets[publishPacket.PacketIdentifier] = publishPacket; + // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] + lock (_unacknowledgedPublishPackets) + { + _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier); + } + + FireApplicationMessageReceivedEvent(publishPacket); return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } @@ -274,15 +272,12 @@ namespace MQTTnet.Core.Client private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) { - MqttPublishPacket originalPublishPacket; - if (!_pendingExactlyOncePublishPackets.TryRemove(pubRelPacket.PacketIdentifier, out originalPublishPacket)) + lock (_unacknowledgedPublishPackets) { - throw new MqttCommunicationException(); + _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - - await SendAsync(originalPublishPacket.CreateResponse()); - - FireApplicationMessageReceivedEvent(originalPublishPacket); + + await SendAsync(pubRelPacket.CreateResponse()); } private Task SendAsync(MqttBasePacket packet) @@ -300,8 +295,8 @@ namespace MQTTnet.Core.Client return false; } - var pi1 = requestPacket as IPacketWithIdentifier; - var pi2 = p as IPacketWithIdentifier; + var pi1 = requestPacket as IMqttPacketWithIdentifier; + var pi2 = p as IMqttPacketWithIdentifier; if (pi1 != null && pi2 != null) { diff --git a/MQTTnet.Core/Packets/IPacketWithPacketIdentifier.cs b/MQTTnet.Core/Packets/IMqttPacketWithIdentifier.cs similarity index 67% rename from MQTTnet.Core/Packets/IPacketWithPacketIdentifier.cs rename to MQTTnet.Core/Packets/IMqttPacketWithIdentifier.cs index 128f3f5..420955c 100644 --- a/MQTTnet.Core/Packets/IPacketWithPacketIdentifier.cs +++ b/MQTTnet.Core/Packets/IMqttPacketWithIdentifier.cs @@ -1,6 +1,6 @@ namespace MQTTnet.Core.Packets { - public interface IPacketWithIdentifier + public interface IMqttPacketWithIdentifier { ushort PacketIdentifier { get; set; } } diff --git a/MQTTnet.Core/Packets/MqttBasePacket.cs b/MQTTnet.Core/Packets/MqttBasePacket.cs index 2a167cb..41901e5 100644 --- a/MQTTnet.Core/Packets/MqttBasePacket.cs +++ b/MQTTnet.Core/Packets/MqttBasePacket.cs @@ -1,25 +1,6 @@ -using System; - -namespace MQTTnet.Core.Packets +namespace MQTTnet.Core.Packets { public abstract class MqttBasePacket { - public TResponsePacket CreateResponse() - { - var responsePacket = Activator.CreateInstance(); - var responsePacketWithIdentifier = responsePacket as IPacketWithIdentifier; - if (responsePacketWithIdentifier != null) - { - var requestPacketWithIdentifier = this as IPacketWithIdentifier; - if (requestPacketWithIdentifier == null) - { - throw new InvalidOperationException("Response packet has PacketIdentifier but request packet does not."); - } - - responsePacketWithIdentifier.PacketIdentifier = requestPacketWithIdentifier.PacketIdentifier; - } - - return responsePacket; - } } } diff --git a/MQTTnet.Core/Packets/MqttBasePublishPacket.cs b/MQTTnet.Core/Packets/MqttBasePublishPacket.cs index ff57003..67d55b9 100644 --- a/MQTTnet.Core/Packets/MqttBasePublishPacket.cs +++ b/MQTTnet.Core/Packets/MqttBasePublishPacket.cs @@ -1,6 +1,6 @@ namespace MQTTnet.Core.Packets { - public class MqttBasePublishPacket : MqttBasePacket, IPacketWithIdentifier + public class MqttBasePublishPacket : MqttBasePacket, IMqttPacketWithIdentifier { public ushort PacketIdentifier { get; set; } } diff --git a/MQTTnet.Core/Packets/MqttPacketExtensions.cs b/MQTTnet.Core/Packets/MqttPacketExtensions.cs new file mode 100644 index 0000000..fef1054 --- /dev/null +++ b/MQTTnet.Core/Packets/MqttPacketExtensions.cs @@ -0,0 +1,27 @@ +using System; + +namespace MQTTnet.Core.Packets +{ + public static class MqttPacketExtensions + { + public static TResponsePacket CreateResponse(this MqttBasePacket packet) + { + if (packet == null) throw new ArgumentNullException(nameof(packet)); + + var responsePacket = Activator.CreateInstance(); + + if (responsePacket is IMqttPacketWithIdentifier responsePacketWithIdentifier) + { + var requestPacketWithIdentifier = packet as IMqttPacketWithIdentifier; + if (requestPacketWithIdentifier == null) + { + throw new InvalidOperationException("Response packet has PacketIdentifier but request packet does not."); + } + + responsePacketWithIdentifier.PacketIdentifier = requestPacketWithIdentifier.PacketIdentifier; + } + + return responsePacket; + } + } +} diff --git a/MQTTnet.Core/Packets/MqttSubAckPacket.cs b/MQTTnet.Core/Packets/MqttSubAckPacket.cs index 3a7265f..f63d0ca 100644 --- a/MQTTnet.Core/Packets/MqttSubAckPacket.cs +++ b/MQTTnet.Core/Packets/MqttSubAckPacket.cs @@ -4,7 +4,7 @@ using MQTTnet.Core.Protocol; namespace MQTTnet.Core.Packets { - public sealed class MqttSubAckPacket : MqttBasePacket, IPacketWithIdentifier + public sealed class MqttSubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier { public ushort PacketIdentifier { get; set; } diff --git a/MQTTnet.Core/Packets/MqttSubscribePacket.cs b/MQTTnet.Core/Packets/MqttSubscribePacket.cs index a0949e8..007bde7 100644 --- a/MQTTnet.Core/Packets/MqttSubscribePacket.cs +++ b/MQTTnet.Core/Packets/MqttSubscribePacket.cs @@ -3,7 +3,7 @@ using System.Linq; namespace MQTTnet.Core.Packets { - public sealed class MqttSubscribePacket : MqttBasePacket, IPacketWithIdentifier + public sealed class MqttSubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier { public ushort PacketIdentifier { get; set; } diff --git a/MQTTnet.Core/Packets/MqttUnsubAckPacket.cs b/MQTTnet.Core/Packets/MqttUnsubAckPacket.cs index 5f33095..57a5a7d 100644 --- a/MQTTnet.Core/Packets/MqttUnsubAckPacket.cs +++ b/MQTTnet.Core/Packets/MqttUnsubAckPacket.cs @@ -1,6 +1,6 @@ namespace MQTTnet.Core.Packets { - public sealed class MqttUnsubAckPacket : MqttBasePacket, IPacketWithIdentifier + public sealed class MqttUnsubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier { public ushort PacketIdentifier { get; set; } } diff --git a/MQTTnet.Core/Packets/MqttUnsubscribe.cs b/MQTTnet.Core/Packets/MqttUnsubscribe.cs index 3e3f36f..b6cfab6 100644 --- a/MQTTnet.Core/Packets/MqttUnsubscribe.cs +++ b/MQTTnet.Core/Packets/MqttUnsubscribe.cs @@ -2,7 +2,7 @@ namespace MQTTnet.Core.Packets { - public sealed class MqttUnsubscribePacket : MqttBasePacket, IPacketWithIdentifier + public sealed class MqttUnsubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier { public ushort PacketIdentifier { get; set; } diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs new file mode 100644 index 0000000..0b91c26 --- /dev/null +++ b/MQTTnet.Core/Server/IMqttServer.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using MQTTnet.Core.Adapter; + +namespace MQTTnet.Core.Server +{ + public interface IMqttServer + { + event EventHandler ApplicationMessageReceived; + event EventHandler ClientConnected; + + IList GetConnectedClients(); + void InjectClient(string identifier, IMqttCommunicationAdapter adapter); + void Publish(MqttApplicationMessage applicationMessage); + void Start(); + void Stop(); + } +} \ No newline at end of file diff --git a/MQTTnet.Core/Server/MqttClientMessageQueue.cs b/MQTTnet.Core/Server/MqttClientMessageQueue.cs index 926e581..07c6ea5 100644 --- a/MQTTnet.Core/Server/MqttClientMessageQueue.cs +++ b/MQTTnet.Core/Server/MqttClientMessageQueue.cs @@ -35,7 +35,7 @@ namespace MQTTnet.Core.Server _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); _cancellationTokenSource = new CancellationTokenSource(); - Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token)); + Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); } public void Stop() @@ -45,14 +45,13 @@ namespace MQTTnet.Core.Server _cancellationTokenSource = null; } - public void Enqueue(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) + public void Enqueue(MqttPublishPacket publishPacket) { - if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession)); if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); lock (_pendingPublishPackets) { - _pendingPublishPackets.Add(new MqttClientPublishPacketContext(senderClientSession, publishPacket)); + _pendingPublishPackets.Add(new MqttClientPublishPacketContext(publishPacket)); _gate.Set(); } } diff --git a/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs b/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs index 8847c70..9855390 100644 --- a/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs +++ b/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs @@ -5,14 +5,11 @@ namespace MQTTnet.Core.Server { public sealed class MqttClientPublishPacketContext { - public MqttClientPublishPacketContext(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) + public MqttClientPublishPacketContext(MqttPublishPacket publishPacket) { - SenderClientSession = senderClientSession ?? throw new ArgumentNullException(nameof(senderClientSession)); PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket)); } - public MqttClientSession SenderClientSession { get; } - public MqttPublishPacket PublishPacket { get; } public int SendTries { get; set; } diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index b89e73e..a3919f9 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -1,5 +1,5 @@ using System; -using System.Collections.Concurrent; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; @@ -13,13 +13,13 @@ namespace MQTTnet.Core.Server { public sealed class MqttClientSession : IDisposable { - private readonly ConcurrentDictionary _pendingIncomingPublications = new ConcurrentDictionary(); + private readonly HashSet _unacknowledgedPublishPackets = new HashSet(); private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager(); private readonly MqttClientMessageQueue _messageQueue; private readonly Action _publishPacketReceivedCallback; private readonly MqttServerOptions _options; - + private CancellationTokenSource _cancellationTokenSource; private IMqttCommunicationAdapter _adapter; private string _identifier; @@ -79,17 +79,16 @@ namespace MQTTnet.Core.Server } } - public void EnqueuePublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) + public void EnqueuePublishPacket(MqttPublishPacket publishPacket) { - if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession)); if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); - if (!_subscriptionsManager.IsTopicSubscribed(publishPacket)) + if (!_subscriptionsManager.IsSubscribed(publishPacket)) { return; } - _messageQueue.Enqueue(senderClientSession, publishPacket); + _messageQueue.Enqueue(publishPacket); MqttTrace.Verbose(nameof(MqttClientSession), $"Client '{_identifier}: Enqueued pending publish packet."); } @@ -101,34 +100,35 @@ namespace MQTTnet.Core.Server private Task HandleIncomingPacketAsync(MqttBasePacket packet) { - var subscribePacket = packet as MqttSubscribePacket; - if (subscribePacket != null) + if (packet is MqttSubscribePacket subscribePacket) { return _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout); } - var unsubscribePacket = packet as MqttUnsubscribePacket; - if (unsubscribePacket != null) + if (packet is MqttUnsubscribePacket unsubscribePacket) { return _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout); } - var publishPacket = packet as MqttPublishPacket; - if (publishPacket != null) + if (packet is MqttPublishPacket publishPacket) { return HandleIncomingPublishPacketAsync(publishPacket); } - var pubRelPacket = packet as MqttPubRelPacket; - if (pubRelPacket != null) + if (packet is MqttPubRelPacket pubRelPacket) { return HandleIncomingPubRelPacketAsync(pubRelPacket); } - var pubAckPacket = packet as MqttPubAckPacket; - if (pubAckPacket != null) + if (packet is MqttPubRecPacket pubRecPacket) + { + return _adapter.SendPacketAsync(pubRecPacket.CreateResponse(), _options.DefaultCommunicationTimeout); + } + + if (packet is MqttPubAckPacket || packet is MqttPubCompPacket) { - return HandleIncomingPubAckPacketAsync(pubAckPacket); + // Discard message. + return Task.FromResult((object)null); } if (packet is MqttPingReqPacket) @@ -148,12 +148,7 @@ namespace MQTTnet.Core.Server return Task.FromResult((object)null); } - private async Task HandleIncomingPubAckPacketAsync(MqttPubAckPacket pubAckPacket) - { - await Task.FromResult((object)null); - } - - private async Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket) + private Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket) { if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { @@ -161,26 +156,33 @@ namespace MQTTnet.Core.Server } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { - await _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); _publishPacketReceivedCallback(this, publishPacket); + return _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { - _pendingIncomingPublications[publishPacket.PacketIdentifier] = publishPacket; - await _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] + lock (_unacknowledgedPublishPackets) + { + _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier); + } + + _publishPacketReceivedCallback(this, publishPacket); + + return _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } + + throw new MqttCommunicationException("Received not supported QoS level."); } - private async Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket) + private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket) { - MqttPublishPacket publishPacket; - if (!_pendingIncomingPublications.TryRemove(pubRelPacket.PacketIdentifier, out publishPacket)) + lock (_unacknowledgedPublishPackets) { - return; + _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - await _adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); - _publishPacketReceivedCallback(this, publishPacket); + return _adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } } } diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index ae4d865..f78be0a 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -22,7 +22,7 @@ namespace MQTTnet.Core.Server _options = options ?? throw new ArgumentNullException(nameof(options)); } - public event EventHandler ApplicationMessageReceived; + public event EventHandler ApplicationMessageReceived; public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs) { @@ -127,14 +127,24 @@ namespace MQTTnet.Core.Server } } - private void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) + public void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) { - var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession.ClientId, publishPacket.ToApplicationMessage()); - ApplicationMessageReceived?.Invoke(this, eventArgs); + try + { + var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession?.ClientId, publishPacket.ToApplicationMessage()); + ApplicationMessageReceived?.Invoke(this, eventArgs); + } + catch (Exception exception) + { + MqttTrace.Error(nameof(MqttClientSessionsManager), exception, "Error while processing application message"); + } - foreach (var clientSession in _clientSessions.Values.ToList()) + lock (_syncRoot) { - clientSession.EnqueuePublishPacket(senderClientSession, publishPacket); + foreach (var clientSession in _clientSessions.Values.ToList()) + { + clientSession.EnqueuePublishPacket(publishPacket); + } } } } diff --git a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs index 8cb9563..390f8f3 100644 --- a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs @@ -1,5 +1,5 @@ using System; -using System.Collections.Concurrent; +using System.Collections.Generic; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; @@ -7,17 +7,21 @@ namespace MQTTnet.Core.Server { public sealed class MqttClientSubscriptionsManager { - private readonly ConcurrentDictionary _subscribedTopics = new ConcurrentDictionary(); + private readonly Dictionary _subscribedTopics = new Dictionary(); public MqttSubAckPacket Subscribe(MqttSubscribePacket subscribePacket) { if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket)); var responsePacket = subscribePacket.CreateResponse(); - foreach (var topicFilter in subscribePacket.TopicFilters) + + lock (_subscribedTopics) { - _subscribedTopics[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; - responsePacket.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1); // TODO: Add support for QoS 2. + foreach (var topicFilter in subscribePacket.TopicFilters) + { + _subscribedTopics[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; + responsePacket.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1); // TODO: Add support for QoS 2. + } } return responsePacket; @@ -27,32 +31,37 @@ namespace MQTTnet.Core.Server { if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket)); - foreach (var topicFilter in unsubscribePacket.TopicFilters) + lock (_subscribedTopics) { - MqttQualityOfServiceLevel _; - _subscribedTopics.TryRemove(topicFilter, out _); + foreach (var topicFilter in unsubscribePacket.TopicFilters) + { + _subscribedTopics.Remove(topicFilter); + } } return unsubscribePacket.CreateResponse(); } - public bool IsTopicSubscribed(MqttPublishPacket publishPacket) + public bool IsSubscribed(MqttPublishPacket publishPacket) { if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); - foreach (var subscribedTopic in _subscribedTopics) + lock (_subscribedTopics) { - if (!MqttTopicFilterComparer.IsMatch(publishPacket.Topic, subscribedTopic.Key)) + foreach (var subscribedTopic in _subscribedTopics) { - continue; - } + if (publishPacket.QualityOfServiceLevel > subscribedTopic.Value) + { + continue; + } - if (subscribedTopic.Value < publishPacket.QualityOfServiceLevel) - { - continue; - } + if (!MqttTopicFilterComparer.IsMatch(publishPacket.Topic, subscribedTopic.Key)) + { + continue; + } - return true; + return true; + } } return false; diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index ca55d7a..98543b6 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -4,10 +4,11 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Internal; namespace MQTTnet.Core.Server { - public sealed class MqttServer + public sealed class MqttServer : IMqttServer { private readonly MqttClientSessionsManager _clientSessionsManager; private readonly ICollection _adapters; @@ -33,6 +34,13 @@ namespace MQTTnet.Core.Server public event EventHandler ApplicationMessageReceived; + public void Publish(MqttApplicationMessage applicationMessage) + { + if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); + + _clientSessionsManager.DispatchPublishPacket(null, applicationMessage.ToPublishPacket()); + } + public void InjectClient(string identifier, IMqttCommunicationAdapter adapter) { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); diff --git a/MQTTnet.Core/Server/DefaultEndpointOptions.cs b/MQTTnet.Core/Server/MqttServerDefaultEndpointOptions.cs similarity index 71% rename from MQTTnet.Core/Server/DefaultEndpointOptions.cs rename to MQTTnet.Core/Server/MqttServerDefaultEndpointOptions.cs index 203ed28..04c4d73 100644 --- a/MQTTnet.Core/Server/DefaultEndpointOptions.cs +++ b/MQTTnet.Core/Server/MqttServerDefaultEndpointOptions.cs @@ -1,6 +1,6 @@ namespace MQTTnet.Core.Server { - public sealed class DefaultEndpointOptions + public sealed class MqttServerDefaultEndpointOptions { public bool IsEnabled { get; set; } = true; diff --git a/MQTTnet.Core/Server/MqttServerOptions.cs b/MQTTnet.Core/Server/MqttServerOptions.cs index 18e284d..5edf500 100644 --- a/MQTTnet.Core/Server/MqttServerOptions.cs +++ b/MQTTnet.Core/Server/MqttServerOptions.cs @@ -6,13 +6,13 @@ namespace MQTTnet.Core.Server { public sealed class MqttServerOptions { - public DefaultEndpointOptions DefaultEndpointOptions { get; } = new DefaultEndpointOptions(); + public MqttServerDefaultEndpointOptions DefaultEndpointOptions { get; } = new MqttServerDefaultEndpointOptions(); public MqttServerTlsEndpointOptions TlsEndpointOptions { get; } = new MqttServerTlsEndpointOptions(); public int ConnectionBacklog { get; set; } = 10; - public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(15); public Func ConnectionValidator { get; set; } } diff --git a/MQTTnet.Core/Server/MqttServerTlsEndpointOptionsExtensions.cs b/MQTTnet.Core/Server/MqttServerTlsEndpointOptionsExtensions.cs deleted file mode 100644 index d790526..0000000 --- a/MQTTnet.Core/Server/MqttServerTlsEndpointOptionsExtensions.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; - -namespace MQTTnet.Core.Server -{ - public static class MqttServerTlsEndpointOptionsExtensions - { - public static int GetPort(this DefaultEndpointOptions options) - { - if (options == null) throw new ArgumentNullException(nameof(options)); - - if (!options.Port.HasValue) - { - return 1883; - } - - return options.Port.Value; - } - } -} diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 1104a33..aa40c8b 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -8,7 +8,7 @@ namespace MQTTnet.TestApp.UniversalWindows { public sealed partial class MainPage { - private MqttClient _mqttClient; + private IMqttClient _mqttClient; public MainPage() { From 68926cef2082784bfe1f32ead1b3ad303d45f6f6 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 5 Aug 2017 11:38:03 +0200 Subject: [PATCH 4/8] Add unit tests --- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 59 ++++++++++++++++++- .../MqttSubscriptionsManagerTests.cs | 8 +-- 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 1ca8f82..8454c76 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -68,7 +68,64 @@ namespace MQTTnet.Core.Tests Assert.AreEqual(1, receivedMessagesCount); } - private MqttClient ConnectTestClient(string clientId, MqttApplicationMessage willMessage, MqttServer server) + [TestMethod] + public async Task MqttServer_Unsubscribe() + { + var s = new MqttServer(new MqttServerOptions(), new List { new TestMqttServerAdapter() }); + s.Start(); + + var c1 = ConnectTestClient("c1", null, s); + var c2 = ConnectTestClient("c2", null, s); + + var receivedMessagesCount = 0; + c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; + + var message = new MqttApplicationMessage("a", new byte[0], MqttQualityOfServiceLevel.AtLeastOnce, false); + + await c2.PublishAsync(message); + Assert.AreEqual(0, receivedMessagesCount); + + await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); + await c2.PublishAsync(message); + + await Task.Delay(500); + Assert.AreEqual(1, receivedMessagesCount); + + await c1.Unsubscribe("a"); + await c2.PublishAsync(message); + + await Task.Delay(500); + Assert.AreEqual(1, receivedMessagesCount); + + s.Stop(); + await Task.Delay(500); + + Assert.AreEqual(1, receivedMessagesCount); + } + + [TestMethod] + public async Task MqttServer_Publish() + { + var s = new MqttServer(new MqttServerOptions(), new List { new TestMqttServerAdapter() }); + s.Start(); + + var c1 = ConnectTestClient("c1", null, s); + + var receivedMessagesCount = 0; + c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; + + var message = new MqttApplicationMessage("a", new byte[0], MqttQualityOfServiceLevel.AtLeastOnce, false); + await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); + + s.Publish(message); + await Task.Delay(500); + + s.Stop(); + + Assert.AreEqual(1, receivedMessagesCount); + } + + private static MqttClient ConnectTestClient(string clientId, MqttApplicationMessage willMessage, MqttServer server) { var adapterA = new TestMqttCommunicationAdapter(); var adapterB = new TestMqttCommunicationAdapter(); diff --git a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs index 9452b10..e477731 100644 --- a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs @@ -24,7 +24,7 @@ namespace MQTTnet.Core.Tests QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }; - Assert.IsTrue(sm.IsTopicSubscribed(pp)); + Assert.IsTrue(sm.IsSubscribed(pp)); } [TestMethod] @@ -43,7 +43,7 @@ namespace MQTTnet.Core.Tests QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }; - Assert.IsFalse(sm.IsTopicSubscribed(pp)); + Assert.IsFalse(sm.IsSubscribed(pp)); } [TestMethod] @@ -62,13 +62,13 @@ namespace MQTTnet.Core.Tests QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }; - Assert.IsTrue(sm.IsTopicSubscribed(pp)); + Assert.IsTrue(sm.IsSubscribed(pp)); var up = new MqttUnsubscribePacket(); up.TopicFilters.Add("A/B/C"); sm.Unsubscribe(up); - Assert.IsFalse(sm.IsTopicSubscribed(pp)); + Assert.IsFalse(sm.IsSubscribed(pp)); } } } From 65c2a0fd7c97569f7b180b03c06be22f944fc570 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 5 Aug 2017 11:38:24 +0200 Subject: [PATCH 5/8] Update documentation --- Build/MQTTnet.nuspec | 6 +++++- README.md | 4 +++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 7825ae6..3687879 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -10,7 +10,11 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * + * [MqttServer] Added support for publishing application messages +* [Core] Fixed QoS level 2 handling +* [Core] Performance optimizations +* [MqttClient/MqttServer] Errors while handline application messages are now catched and traced +* [MqttClient/MqttServer] Added interfaces Copyright Christian Kratky 2016-2017 MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware diff --git a/README.md b/README.md index 379df3b..5df9d0f 100644 --- a/README.md +++ b/README.md @@ -15,10 +15,12 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien * Rx support (via another project) * List of connected clients available (server only) * Extensible communication channels (i.e. In-Memory, TCP, TCP+SSL, WebSockets (not included in this project)) +* Server is able to publish its own messages (no loopback client required) * Access to internal trace messages * Extensible client credential validation (server only) -* Unit tested (48+ tests) +* Unit tested (50+ tests) * Lightweight (only the low level implementation of MQTT, no overhead) +* Interfaces included for mocking and testing ## Supported frameworks * .NET Standard 1.3+ From 59b0b79a3235feb0d9a99a56ab1680d1bb9f481c Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Mon, 7 Aug 2017 20:28:49 +0200 Subject: [PATCH 6/8] Fix wrong QoS level handling for server. --- Build/MQTTnet.nuspec | 8 ++------ .../Implementations/MqttServerAdapter.cs | 4 ++-- Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs | 2 +- .../MQTTnet.NetFramework/Properties/AssemblyInfo.cs | 4 ++-- .../Implementations/MqttServerAdapter.cs | 4 ++-- .../MQTTnet.NetStandard/MQTTnet.Netstandard.csproj | 4 ++-- Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs | 2 +- .../Implementations/MqttServerAdapter.cs | 2 +- Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs | 2 +- .../MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs | 4 ++-- MQTTnet.Core/Client/MqttClient.cs | 2 +- MQTTnet.Core/MQTTnet.Core.csproj | 4 ++-- ...11PacketSerializer.cs => MqttV311PacketSerializer.cs} | 8 ++++---- MQTTnet.Core/Server/MqttClientSession.cs | 9 ++++++--- .../DefaultMqttV311PacketSerializerTests.cs | 4 ++-- 15 files changed, 31 insertions(+), 32 deletions(-) rename MQTTnet.Core/Serializer/{DefaultMqttV311PacketSerializer.cs => MqttV311PacketSerializer.cs} (99%) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 3687879..89dcc91 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -2,7 +2,7 @@ MQTTnet - 2.1.5 + 2.1.5.1 Christian Kratky Christian Kratky https://github.com/chkr1011/MQTTnet/blob/master/LICENSE @@ -10,11 +10,7 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * [MqttServer] Added support for publishing application messages -* [Core] Fixed QoS level 2 handling -* [Core] Performance optimizations -* [MqttClient/MqttServer] Errors while handline application messages are now catched and traced -* [MqttClient/MqttServer] Added interfaces + * [Server] Fixed wrong handling of QoS levels Copyright Christian Kratky 2016-2017 MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs index cbfcf66..694eec7 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs @@ -86,7 +86,7 @@ namespace MQTTnet.Implementations try { var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) when (!(exception is ObjectDisposedException)) @@ -107,7 +107,7 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs index ac12f97..b85edab 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs b/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs index 4d0a57f..486eca7 100644 --- a/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs +++ b/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs @@ -11,5 +11,5 @@ using System.Runtime.InteropServices; [assembly: AssemblyCulture("")] [assembly: ComVisible(false)] [assembly: Guid("a480ef90-0eaa-4d9a-b271-47a9c47f6f7d")] -[assembly: AssemblyVersion("2.1.4.0")] -[assembly: AssemblyFileVersion("2.1.4.0")] \ No newline at end of file +[assembly: AssemblyVersion("2.1.5.1")] +[assembly: AssemblyFileVersion("2.1.5.1")] \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index 863d3e7..7c2561d 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -84,7 +84,7 @@ namespace MQTTnet.Implementations try { var clientSocket = await _defaultEndpointSocket.AcceptAsync(); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) @@ -105,7 +105,7 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj index 2a60e55..1a1249e 100644 --- a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj +++ b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj @@ -4,8 +4,8 @@ netstandard1.3 MQTTnet MQTTnet - 2.1.4.0 - 2.1.4.0 + 2.1.5.1 + 2.1.5.1 0.0.0.0 diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 6073583..c5f409f 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); } } } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs index ea251ca..b0bfcbe 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs @@ -52,7 +52,7 @@ namespace MQTTnet.Implementations { try { - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new DefaultMqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttV311PacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index ac12f97..b85edab 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs b/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs index dee61f7..05204eb 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs @@ -10,5 +10,5 @@ using System.Runtime.InteropServices; [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] [assembly: ComVisible(false)] -[assembly: AssemblyVersion("2.1.4.0")] -[assembly: AssemblyFileVersion("2.1.4.0")] \ No newline at end of file +[assembly: AssemblyVersion("2.1.5.1")] +[assembly: AssemblyFileVersion("2.1.5.1")] \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 384de1f..f90adab 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -267,7 +267,7 @@ namespace MQTTnet.Core.Client return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } - throw new InvalidOperationException(); + throw new MqttCommunicationException("Received a not supported QoS level."); } private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index 4d12864..c938551 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -16,8 +16,8 @@ - 2.1.4.0 - 2.1.4.0 + 2.1.5.1 + 2.1.5.1 diff --git a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs b/MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs similarity index 99% rename from MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs rename to MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs index 4100b6c..523a2dc 100644 --- a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs @@ -9,8 +9,10 @@ using MQTTnet.Core.Protocol; namespace MQTTnet.Core.Serializer { - public sealed class DefaultMqttV311PacketSerializer : IMqttPacketSerializer + public sealed class MqttV311PacketSerializer : IMqttPacketSerializer { + private static readonly byte[] MqttV311Prefix = Encoding.UTF8.GetBytes("MQTT"); + public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination) { if (packet == null) throw new ArgumentNullException(nameof(packet)); @@ -351,8 +353,6 @@ namespace MQTTnet.Core.Serializer } } - private static readonly byte[] MqttPrefix = Encoding.UTF8.GetBytes("MQTT"); - private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) { ValidateConnectPacket(packet); @@ -361,7 +361,7 @@ namespace MQTTnet.Core.Serializer { // Write variable header output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name - output.Write(MqttPrefix); + output.Write(MqttV311Prefix); output.Write(0x04); // 3.1.2.2 Protocol Level var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index a3919f9..f84b847 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -153,13 +153,16 @@ namespace MQTTnet.Core.Server if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { _publishPacketReceivedCallback(this, publishPacket); + return Task.FromResult(0); } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { _publishPacketReceivedCallback(this, publishPacket); return _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] lock (_unacknowledgedPublishPackets) @@ -172,7 +175,7 @@ namespace MQTTnet.Core.Server return _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } - throw new MqttCommunicationException("Received not supported QoS level."); + throw new MqttCommunicationException("Received a not supported QoS level."); } private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket) diff --git a/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs index 62d0f30..4b458ed 100644 --- a/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs @@ -405,7 +405,7 @@ namespace MQTTnet.Core.Tests private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value) { - var serializer = new DefaultMqttV311PacketSerializer(); + var serializer = new MqttV311PacketSerializer(); var channel = new TestChannel(); serializer.SerializeAsync(packet, channel).Wait(); var buffer = channel.ToArray(); @@ -415,7 +415,7 @@ namespace MQTTnet.Core.Tests private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value) { - var serializer = new DefaultMqttV311PacketSerializer(); + var serializer = new MqttV311PacketSerializer(); var channel1 = new TestChannel(); serializer.SerializeAsync(packet, channel1).Wait(); From 598ed66a3beae4e804687d9b701da55108ef1d83 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 13 Aug 2017 19:46:32 +0200 Subject: [PATCH 7/8] Add support for V3.1.0; Performance improvements --- Build/MQTTnet.nuspec | 9 ++- .../Implementations/MqttServerAdapter.cs | 4 +- .../MQTTnet.NetFramework/MqttClientFactory.cs | 2 +- .../Implementations/MqttServerAdapter.cs | 4 +- .../MQTTnet.NetStandard/MqttClientFactory.cs | 2 +- .../Implementations/MqttServerAdapter.cs | 2 +- .../MqttClientFactory.cs | 2 +- .../Adapter/IMqttCommunicationAdapter.cs | 3 + .../MqttChannelCommunicationAdapter.cs | 11 +-- MQTTnet.Core/Client/MqttClient.cs | 2 + MQTTnet.Core/Client/MqttClientOptions.cs | 7 +- MQTTnet.Core/Packets/MqttConnectPacket.cs | 6 +- .../Serializer/IMqttPacketSerializer.cs | 2 + MQTTnet.Core/Serializer/MqttPacketReader.cs | 2 +- ...tSerializer.cs => MqttPacketSerializer.cs} | 73 +++++++++++++------ MQTTnet.Core/Serializer/MqttPacketWriter.cs | 6 +- .../Serializer/MqttProtocolVersion.cs | 8 ++ MQTTnet.Core/Server/ConnectedMqttClient.cs | 11 +++ MQTTnet.Core/Server/IMqttServer.cs | 2 +- MQTTnet.Core/Server/MqttClientSession.cs | 23 +++--- .../Server/MqttClientSessionsManager.cs | 11 ++- MQTTnet.Core/Server/MqttServer.cs | 2 +- MQTTnet.sln | 7 +- README.md | 47 ++++++------ .../MQTTnet.Core.Tests.csproj | 2 +- ...rTests.cs => MqttPacketSerializerTests.cs} | 34 ++++++++- .../TestMqttCommunicationAdapter.cs | 3 + 27 files changed, 199 insertions(+), 88 deletions(-) rename MQTTnet.Core/Serializer/{MqttV311PacketSerializer.cs => MqttPacketSerializer.cs} (92%) create mode 100644 MQTTnet.Core/Serializer/MqttProtocolVersion.cs create mode 100644 MQTTnet.Core/Server/ConnectedMqttClient.cs rename Tests/MQTTnet.Core.Tests/{DefaultMqttV311PacketSerializerTests.cs => MqttPacketSerializerTests.cs} (92%) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 89dcc91..bb81a8d 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -2,7 +2,7 @@ MQTTnet - 2.1.5.1 + 2.2.0 Christian Kratky Christian Kratky https://github.com/chkr1011/MQTTnet/blob/master/LICENSE @@ -10,10 +10,13 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * [Server] Fixed wrong handling of QoS levels + * [Server] Added support for MQTT protocol version 3.1.0 +* [Server] Providing the used protocol version of connected clients +* [Client] Added support for protocol version 3.1.0 +* [Core] Several minor performance improvements Copyright Christian Kratky 2016-2017 - MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware + MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware Arduino diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs index 694eec7..63fd0d8 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs @@ -86,7 +86,7 @@ namespace MQTTnet.Implementations try { var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) when (!(exception is ObjectDisposedException)) @@ -107,7 +107,7 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs index b85edab..7dd7500 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index 7c2561d..a2e86b1 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -84,7 +84,7 @@ namespace MQTTnet.Implementations try { var clientSocket = await _defaultEndpointSocket.AcceptAsync(); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) @@ -105,7 +105,7 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index c5f409f..f47c494 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); } } } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs index b0bfcbe..eddd671 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs @@ -52,7 +52,7 @@ namespace MQTTnet.Implementations { try { - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttV311PacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index b85edab..7dd7500 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); } } } \ No newline at end of file diff --git a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs index 6651e05..77ab898 100644 --- a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using MQTTnet.Core.Client; using MQTTnet.Core.Packets; +using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Adapter { @@ -14,5 +15,7 @@ namespace MQTTnet.Core.Adapter Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout); Task ReceivePacketAsync(TimeSpan timeout); + + IMqttPacketSerializer PacketSerializer { get; } } } diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index ae90984..d504d89 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -11,15 +11,16 @@ namespace MQTTnet.Core.Adapter { public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter { - private readonly IMqttPacketSerializer _serializer; private readonly IMqttCommunicationChannel _channel; public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer) { _channel = channel ?? throw new ArgumentNullException(nameof(channel)); - _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); + PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); } + public IMqttPacketSerializer PacketSerializer { get; } + public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { var task = _channel.ConnectAsync(options); @@ -41,7 +42,7 @@ namespace MQTTnet.Core.Adapter bool hasTimeout; try { - var task = _serializer.SerializeAsync(packet, _channel); + var task = PacketSerializer.SerializeAsync(packet, _channel); hasTimeout = await Task.WhenAny(Task.Delay(timeout), task) != task; } catch (Exception exception) @@ -60,7 +61,7 @@ namespace MQTTnet.Core.Adapter MqttBasePacket packet; if (timeout > TimeSpan.Zero) { - var workerTask = _serializer.DeserializeAsync(_channel); + var workerTask = PacketSerializer.DeserializeAsync(_channel); var timeoutTask = Task.Delay(timeout); var hasTimeout = Task.WhenAny(timeoutTask, workerTask) == timeoutTask; @@ -73,7 +74,7 @@ namespace MQTTnet.Core.Adapter } else { - packet = await _serializer.DeserializeAsync(_channel); + packet = await PacketSerializer.DeserializeAsync(_channel); } if (packet == null) diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index f90adab..70e8c76 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -27,6 +27,8 @@ namespace MQTTnet.Core.Client { _options = options ?? throw new ArgumentNullException(nameof(options)); _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); + + _adapter.PacketSerializer.ProtocolVersion = options.ProtocolVersion; } public event EventHandler Connected; diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs index 0aaf43c..f9b75fa 100644 --- a/MQTTnet.Core/Client/MqttClientOptions.cs +++ b/MQTTnet.Core/Client/MqttClientOptions.cs @@ -1,4 +1,5 @@ using System; +using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Client { @@ -7,8 +8,8 @@ namespace MQTTnet.Core.Client public string Server { get; set; } public int? Port { get; set; } - - public MqttClientTlsOptions TlsOptions { get; } = new MqttClientTlsOptions(); + + public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); public string UserName { get; set; } @@ -21,5 +22,7 @@ namespace MQTTnet.Core.Client public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + + public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; } } diff --git a/MQTTnet.Core/Packets/MqttConnectPacket.cs b/MQTTnet.Core/Packets/MqttConnectPacket.cs index aadfe52..6db92a2 100644 --- a/MQTTnet.Core/Packets/MqttConnectPacket.cs +++ b/MQTTnet.Core/Packets/MqttConnectPacket.cs @@ -1,7 +1,11 @@ -namespace MQTTnet.Core.Packets +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Core.Packets { public sealed class MqttConnectPacket: MqttBasePacket { + public MqttProtocolVersion ProtocolVersion { get; set; } + public string ClientId { get; set; } public string Username { get; set; } diff --git a/MQTTnet.Core/Serializer/IMqttPacketSerializer.cs b/MQTTnet.Core/Serializer/IMqttPacketSerializer.cs index db53a13..801b7ea 100644 --- a/MQTTnet.Core/Serializer/IMqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/IMqttPacketSerializer.cs @@ -6,6 +6,8 @@ namespace MQTTnet.Core.Serializer { public interface IMqttPacketSerializer { + MqttProtocolVersion ProtocolVersion { get; set; } + Task SerializeAsync(MqttBasePacket mqttPacket, IMqttCommunicationChannel destination); Task DeserializeAsync(IMqttCommunicationChannel source); diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index f59d807..9abea6d 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -10,7 +10,7 @@ namespace MQTTnet.Core.Serializer { public sealed class MqttPacketReader : IDisposable { - private readonly MemoryStream _remainingData = new MemoryStream(); + private readonly MemoryStream _remainingData = new MemoryStream(1024); private readonly IMqttCommunicationChannel _source; private int _remainingLength; diff --git a/MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs similarity index 92% rename from MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs rename to MQTTnet.Core/Serializer/MqttPacketSerializer.cs index 523a2dc..f9ec70c 100644 --- a/MQTTnet.Core/Serializer/MqttV311PacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs @@ -9,9 +9,12 @@ using MQTTnet.Core.Protocol; namespace MQTTnet.Core.Serializer { - public sealed class MqttV311PacketSerializer : IMqttPacketSerializer + public sealed class MqttPacketSerializer : IMqttPacketSerializer { - private static readonly byte[] MqttV311Prefix = Encoding.UTF8.GetBytes("MQTT"); + private static byte[] ProtocolVersionV311Name { get; } = Encoding.UTF8.GetBytes("MQTT"); + private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs"); + + public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination) { @@ -257,12 +260,21 @@ namespace MQTTnet.Core.Serializer private static async Task DeserializeConnectAsync(MqttPacketReader reader) { await reader.ReadRemainingDataAsync(2); // Skip 2 bytes - - var protocolName = await reader.ReadRemainingDataAsync(4); - if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT") + MqttProtocolVersion protocolVersion; + var protocolName = await reader.ReadRemainingDataAsync(4); + if (protocolName.SequenceEqual(ProtocolVersionV310Name)) + { + await reader.ReadRemainingDataAsync(2); + protocolVersion = MqttProtocolVersion.V310; + } + else if (protocolName.SequenceEqual(ProtocolVersionV311Name)) { - throw new MqttProtocolViolationException("Protocol name is not 'MQTT'."); + protocolVersion = MqttProtocolVersion.V311; + } + else + { + throw new MqttProtocolViolationException("Protocol name is not supported."); } var protocolLevel = await reader.ReadRemainingDataByteAsync(); @@ -273,6 +285,7 @@ namespace MQTTnet.Core.Serializer var packet = new MqttConnectPacket { + ProtocolVersion = protocolVersion, CleanSession = connectFlagsReader.Read() }; @@ -353,7 +366,7 @@ namespace MQTTnet.Core.Serializer } } - private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) { ValidateConnectPacket(packet); @@ -361,9 +374,19 @@ namespace MQTTnet.Core.Serializer { // Write variable header output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name - output.Write(MqttV311Prefix); - output.Write(0x04); // 3.1.2.2 Protocol Level - + if (ProtocolVersion == MqttProtocolVersion.V311) + { + output.Write(ProtocolVersionV311Name); + output.Write(0x04); // 3.1.2.2 Protocol Level (4) + } + else + { + output.Write(ProtocolVersionV310Name); + output.Write(0x64); + output.Write(0x70); + output.Write(0x03); // Protocol Level (3) + } + var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags connectFlags.Write(false); // Reserved connectFlags.Write(packet.CleanSession); @@ -408,13 +431,17 @@ namespace MQTTnet.Core.Serializer } } - private static Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { var connectAcknowledgeFlags = new ByteWriter(); - connectAcknowledgeFlags.Write(packet.IsSessionPresent); + if (ProtocolVersion == MqttProtocolVersion.V311) + { + connectAcknowledgeFlags.Write(packet.IsSessionPresent); + } + output.Write(connectAcknowledgeFlags); output.Write((byte)packet.ConnectReturnCode); @@ -423,6 +450,17 @@ namespace MQTTnet.Core.Serializer } } + private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination) + { + using (var output = new MqttPacketWriter()) + { + output.Write(packet.PacketIdentifier); + + output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02); + await output.WriteToAsync(destination); + } + } + private static Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination) { return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination); @@ -495,17 +533,6 @@ namespace MQTTnet.Core.Serializer } } - private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination) - { - using (var output = new MqttPacketWriter()) - { - output.Write(packet.PacketIdentifier); - - output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02); - await output.WriteToAsync(destination); - } - } - private static Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) diff --git a/MQTTnet.Core/Serializer/MqttPacketWriter.cs b/MQTTnet.Core/Serializer/MqttPacketWriter.cs index 4e2a3cb..778f52e 100644 --- a/MQTTnet.Core/Serializer/MqttPacketWriter.cs +++ b/MQTTnet.Core/Serializer/MqttPacketWriter.cs @@ -9,13 +9,13 @@ namespace MQTTnet.Core.Serializer { public sealed class MqttPacketWriter : IDisposable { - private readonly MemoryStream _buffer = new MemoryStream(512); + private readonly MemoryStream _buffer = new MemoryStream(1024); public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0) { - var fixedHeader = (byte)((byte)packetType << 4); + var fixedHeader = (int)packetType << 4; fixedHeader |= flags; - InjectFixedHeader(fixedHeader); + InjectFixedHeader((byte)fixedHeader); } public void Write(byte value) diff --git a/MQTTnet.Core/Serializer/MqttProtocolVersion.cs b/MQTTnet.Core/Serializer/MqttProtocolVersion.cs new file mode 100644 index 0000000..6b8814a --- /dev/null +++ b/MQTTnet.Core/Serializer/MqttProtocolVersion.cs @@ -0,0 +1,8 @@ +namespace MQTTnet.Core.Serializer +{ + public enum MqttProtocolVersion + { + V311, + V310 + } +} diff --git a/MQTTnet.Core/Server/ConnectedMqttClient.cs b/MQTTnet.Core/Server/ConnectedMqttClient.cs new file mode 100644 index 0000000..d363351 --- /dev/null +++ b/MQTTnet.Core/Server/ConnectedMqttClient.cs @@ -0,0 +1,11 @@ +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Core.Server +{ + public class ConnectedMqttClient + { + public string ClientId { get; set; } + + public MqttProtocolVersion ProtocolVersion { get; set; } + } +} diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs index 0b91c26..7a75624 100644 --- a/MQTTnet.Core/Server/IMqttServer.cs +++ b/MQTTnet.Core/Server/IMqttServer.cs @@ -9,7 +9,7 @@ namespace MQTTnet.Core.Server event EventHandler ApplicationMessageReceived; event EventHandler ClientConnected; - IList GetConnectedClients(); + IList GetConnectedClients(); void InjectClient(string identifier, IMqttCommunicationAdapter adapter); void Publish(MqttApplicationMessage applicationMessage); void Start(); diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index f84b847..bd99fe3 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -21,7 +21,6 @@ namespace MQTTnet.Core.Server private readonly MqttServerOptions _options; private CancellationTokenSource _cancellationTokenSource; - private IMqttCommunicationAdapter _adapter; private string _identifier; private MqttApplicationMessage _willApplicationMessage; @@ -36,7 +35,9 @@ namespace MQTTnet.Core.Server public string ClientId { get; } - public bool IsConnected => _adapter != null; + public bool IsConnected => Adapter != null; + + public IMqttCommunicationAdapter Adapter { get; private set; } public async Task RunAsync(string identifier, MqttApplicationMessage willApplicationMessage, IMqttCommunicationAdapter adapter) { @@ -47,7 +48,7 @@ namespace MQTTnet.Core.Server try { _identifier = identifier; - _adapter = adapter; + Adapter = adapter; _cancellationTokenSource = new CancellationTokenSource(); _messageQueue.Start(adapter); @@ -73,7 +74,7 @@ namespace MQTTnet.Core.Server _messageQueue.Stop(); _cancellationTokenSource.Cancel(); - _adapter = null; + Adapter = null; MqttTrace.Information(nameof(MqttClientSession), $"Client '{_identifier}': Disconnected."); } @@ -102,12 +103,12 @@ namespace MQTTnet.Core.Server { if (packet is MqttSubscribePacket subscribePacket) { - return _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout); } if (packet is MqttUnsubscribePacket unsubscribePacket) { - return _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout); } if (packet is MqttPublishPacket publishPacket) @@ -122,7 +123,7 @@ namespace MQTTnet.Core.Server if (packet is MqttPubRecPacket pubRecPacket) { - return _adapter.SendPacketAsync(pubRecPacket.CreateResponse(), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(pubRecPacket.CreateResponse(), _options.DefaultCommunicationTimeout); } if (packet is MqttPubAckPacket || packet is MqttPubCompPacket) @@ -133,7 +134,7 @@ namespace MQTTnet.Core.Server if (packet is MqttPingReqPacket) { - return _adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout); } if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) @@ -159,7 +160,7 @@ namespace MQTTnet.Core.Server if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { _publishPacketReceivedCallback(this, publishPacket); - return _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) @@ -172,7 +173,7 @@ namespace MQTTnet.Core.Server _publishPacketReceivedCallback(this, publishPacket); - return _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } throw new MqttCommunicationException("Received a not supported QoS level."); @@ -185,7 +186,7 @@ namespace MQTTnet.Core.Server _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - return _adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + return Adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } } } diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index f78be0a..6165143 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -34,6 +34,9 @@ namespace MQTTnet.Core.Server throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); } + // Switch to the required protocol version before sending any response. + eventArgs.ClientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion; + var connectReturnCode = ValidateConnection(connectPacket); if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { @@ -73,11 +76,15 @@ namespace MQTTnet.Core.Server } } - public IList GetConnectedClients() + public IList GetConnectedClients() { lock (_syncRoot) { - return _clientSessions.Where(s => s.Value.IsConnected).Select(s => s.Key).ToList(); + return _clientSessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient + { + ClientId = s.Value.ClientId, + ProtocolVersion = s.Value.Adapter.PacketSerializer.ProtocolVersion + }).ToList(); } } diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index 98543b6..231cfcf 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -25,7 +25,7 @@ namespace MQTTnet.Core.Server _clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); } - public IList GetConnectedClients() + public IList GetConnectedClients() { return _clientSessionsManager.GetConnectedClients(); } diff --git a/MQTTnet.sln b/MQTTnet.sln index 697f504..c17884c 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26430.15 +VisualStudioVersion = 15.0.26430.16 MinimumVisualStudioVersion = 10.0.40219.1 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" EndProject @@ -27,6 +27,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1 Build\MQTTnet.nuspec = Build\MQTTnet.nuspec EndProjectSection EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{B3F60ECB-45BA-4C66-8903-8BB89CA67998}" + ProjectSection(SolutionItems) = preProject + README.md = README.md + EndProjectSection +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU diff --git a/README.md b/README.md index 5df9d0f..4668d6c 100644 --- a/README.md +++ b/README.md @@ -5,40 +5,47 @@ [![NuGet Badge](https://buildstats.info/nuget/MQTTnet)](https://www.nuget.org/packages/MQTTnet) # MQTTnet -MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server. The implementation is based on the documentation from http://mqtt.org/. +MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/. -## Features -* MQTT client included -* MQTT server (broker) included -* TLS 1.2 support for client and server (but not UWP servers) +# Features + +## General * Async support -* Rx support (via another project) -* List of connected clients available (server only) +* TLS 1.2 support for client and server (but not UWP servers) * Extensible communication channels (i.e. In-Memory, TCP, TCP+SSL, WebSockets (not included in this project)) -* Server is able to publish its own messages (no loopback client required) +* Interfaces included for mocking and testing +* Lightweight (only the low level implementation of MQTT, no overhead) * Access to internal trace messages -* Extensible client credential validation (server only) * Unit tested (50+ tests) -* Lightweight (only the low level implementation of MQTT, no overhead) -* Interfaces included for mocking and testing -## Supported frameworks +## Client +* Rx support (via another project) + +## Server (broker) +* List of connected clients available +* Supports connected clients with different protocol versions at the same time +* Able to publish its own messages (no loopback client required) +* Able to receive every messages (no loopback client required) +* Extensible client credential validation + +# Supported frameworks * .NET Standard 1.3+ * .NET Core 1.1+ * .NET Core App 1.1+ * .NET Framework 4.5.2+ (x86, x64, AnyCPU) * Universal Windows (UWP) 10.0.10240+ (x86, x64, ARM, AnyCPU) -## Supported MQTT versions +# Supported MQTT versions * 3.1.1 +* 3.1.0 -## Nuget +# Nuget This library is available as a nuget package: https://www.nuget.org/packages/MQTTnet/ -## Contributions +# Contributions If you want to contribute to this project just create a pull request. -## References +# References This library is used in the following projects: * MQTT Client Rx (Wrapper for Reactive Extensions, https://github.com/1iveowl/MQTTClient.rx) @@ -46,8 +53,8 @@ This library is used in the following projects: If you use this library and want to see your project here please let me know. -# MqttClient -## Example +# Examples +## MqttClient ```csharp var options = new MqttClientOptions @@ -119,9 +126,7 @@ while (true) } ``` -# MqttServer - -## Example +## MqttServer ```csharp var options = new MqttServerOptions diff --git a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj index 62ddb72..88bf8f2 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj +++ b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj @@ -86,7 +86,7 @@ - + diff --git a/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs similarity index 92% rename from Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs rename to Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index 4b458ed..c03fa7a 100644 --- a/Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -12,8 +12,23 @@ using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Tests { [TestClass] - public class DefaultMqttV311PacketSerializerTests + public class MqttPacketSerializerTests { + [TestMethod] + public void SerializeV310_MqttConnectPacket() + { + var p = new MqttConnectPacket + { + ClientId = "XYZ", + Password = "PASS", + Username = "USER", + KeepAlivePeriod = 123, + CleanSession = true + }; + + SerializeAndCompare(p, "EB0ABE1RSXNkcAPCAHsAA1hZWgAEVVNFUgAEUEFTUw==", MqttProtocolVersion.V310); + } + [TestMethod] public void SerializeV311_MqttConnectPacket() { @@ -96,6 +111,17 @@ namespace MQTTnet.Core.Tests SerializeAndCompare(p, "IAIBBQ=="); } + [TestMethod] + public void SerializeV310_MqttConnAckPacket() + { + var p = new MqttConnAckPacket + { + ConnectReturnCode = MqttConnectReturnCode.ConnectionAccepted + }; + + SerializeAndCompare(p, "IAIAAA==", MqttProtocolVersion.V310); + } + [TestMethod] public void DeserializeV311_MqttConnAckPacket() { @@ -403,9 +429,9 @@ namespace MQTTnet.Core.Tests } } - private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value) + private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) { - var serializer = new MqttV311PacketSerializer(); + var serializer = new MqttPacketSerializer { ProtocolVersion = protocolVersion }; var channel = new TestChannel(); serializer.SerializeAsync(packet, channel).Wait(); var buffer = channel.ToArray(); @@ -415,7 +441,7 @@ namespace MQTTnet.Core.Tests private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value) { - var serializer = new MqttV311PacketSerializer(); + var serializer = new MqttPacketSerializer(); var channel1 = new TestChannel(); serializer.SerializeAsync(packet, channel1).Wait(); diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index 35bd92d..c8c6da6 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -4,6 +4,7 @@ using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; using MQTTnet.Core.Packets; +using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Tests { @@ -13,6 +14,8 @@ namespace MQTTnet.Core.Tests public TestMqttCommunicationAdapter Partner { get; set; } + public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer(); + public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { await Task.FromResult(0); From c90bdbfb9352ba79da03ce690ea922fdf2d5d5ae Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 17 Aug 2017 22:49:16 +0200 Subject: [PATCH 8/8] Fix issue with connection management. --- Build/MQTTnet.nuspec | 1 + .../Implementations/MqttTcpChannel.cs | 14 +- .../Implementations/MqttTcpChannel.cs | 14 +- .../Implementations/MqttTcpChannel.cs | 12 +- .../MqttChannelCommunicationAdapter.cs | 63 ++++---- MQTTnet.Core/Client/MqttClient.cs | 140 +++++++++++------- .../Exceptions/MqttCommunicationException.cs | 7 +- 7 files changed, 159 insertions(+), 92 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index bb81a8d..a905b00 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -14,6 +14,7 @@ * [Server] Providing the used protocol version of connected clients * [Client] Added support for protocol version 3.1.0 * [Core] Several minor performance improvements +* [Core] Fixed an issue with connection management (Thanks to wuzhenda; Zuendelmeister) Copyright Christian Kratky 2016-2017 MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware Arduino diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index 3a43a3e..c9c080d 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -12,12 +12,11 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { - private readonly Socket _socket; + private Socket _socket; private SslStream _sslStream; public MqttTcpChannel() { - _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); } public MqttTcpChannel(Socket socket, SslStream sslStream) @@ -31,6 +30,11 @@ namespace MQTTnet.Implementations if (options == null) throw new ArgumentNullException(nameof(options)); try { + if (_socket == null) + { + _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + } + await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null); if (options.TlsOptions.UseTls) @@ -49,8 +53,7 @@ namespace MQTTnet.Implementations { try { - _sslStream.Dispose(); - _socket.Dispose(); + Dispose(); return Task.FromResult(0); } catch (SocketException exception) @@ -108,6 +111,9 @@ namespace MQTTnet.Implementations { _socket?.Dispose(); _sslStream?.Dispose(); + + _socket = null; + _sslStream = null; } private static X509CertificateCollection LoadCertificates(MqttClientOptions options) diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index e78cd98..a4247b0 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -12,12 +12,11 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { - private readonly Socket _socket; + private Socket _socket; private SslStream _sslStream; public MqttTcpChannel() { - _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); } public MqttTcpChannel(Socket socket, SslStream sslStream) @@ -31,6 +30,11 @@ namespace MQTTnet.Implementations if (options == null) throw new ArgumentNullException(nameof(options)); try { + if (_socket == null) + { + _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + } + await _socket.ConnectAsync(options.Server, options.GetPort()); if (options.TlsOptions.UseTls) @@ -49,8 +53,7 @@ namespace MQTTnet.Implementations { try { - _sslStream.Dispose(); - _socket.Dispose(); + Dispose(); return Task.FromResult(0); } catch (SocketException exception) @@ -101,6 +104,9 @@ namespace MQTTnet.Implementations { _socket?.Dispose(); _sslStream?.Dispose(); + + _socket = null; + _sslStream = null; } private static X509CertificateCollection LoadCertificates(MqttClientOptions options) diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs index 99681b7..482ce32 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs @@ -15,11 +15,10 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { - private readonly StreamSocket _socket; + private StreamSocket _socket; public MqttTcpChannel() { - _socket = new StreamSocket(); } public MqttTcpChannel(StreamSocket socket) @@ -32,6 +31,11 @@ namespace MQTTnet.Implementations if (options == null) throw new ArgumentNullException(nameof(options)); try { + if (_socket == null) + { + _socket = new StreamSocket(); + } + if (!options.TlsOptions.UseTls) { await _socket.ConnectAsync(new HostName(options.Server), options.GetPort().ToString()); @@ -59,7 +63,7 @@ namespace MQTTnet.Implementations { try { - _socket.Dispose(); + Dispose(); return Task.FromResult(0); } catch (SocketException exception) @@ -100,6 +104,8 @@ namespace MQTTnet.Implementations public void Dispose() { _socket?.Dispose(); + + _socket = null; } private static Certificate LoadCertificate(MqttClientOptions options) diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index d504d89..04b5ca7 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -23,11 +23,7 @@ namespace MQTTnet.Core.Adapter public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { - var task = _channel.ConnectAsync(options); - if (await Task.WhenAny(Task.Delay(timeout), task) != task) - { - throw new MqttCommunicationTimedOutException(); - } + await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout); } public async Task DisconnectAsync() @@ -39,21 +35,7 @@ namespace MQTTnet.Core.Adapter { MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); - bool hasTimeout; - try - { - var task = PacketSerializer.SerializeAsync(packet, _channel); - hasTimeout = await Task.WhenAny(Task.Delay(timeout), task) != task; - } - catch (Exception exception) - { - throw new MqttCommunicationException(exception); - } - - if (hasTimeout) - { - throw new MqttCommunicationTimedOutException(); - } + await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout); } public async Task ReceivePacketAsync(TimeSpan timeout) @@ -61,16 +43,7 @@ namespace MQTTnet.Core.Adapter MqttBasePacket packet; if (timeout > TimeSpan.Zero) { - var workerTask = PacketSerializer.DeserializeAsync(_channel); - var timeoutTask = Task.Delay(timeout); - var hasTimeout = Task.WhenAny(timeoutTask, workerTask) == timeoutTask; - - if (hasTimeout) - { - throw new MqttCommunicationTimedOutException(); - } - - packet = workerTask.Result; + packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout); } else { @@ -85,5 +58,35 @@ namespace MQTTnet.Core.Adapter MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"RX <<< {packet}"); return packet; } + + private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) + { + var timeoutTask = Task.Delay(timeout); + if (await Task.WhenAny(timeoutTask, task) == timeoutTask) + { + throw new MqttCommunicationTimedOutException(); + } + + if (task.IsFaulted) + { + throw new MqttCommunicationException(task.Exception); + } + + return task.Result; + } + + private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) + { + var timeoutTask = Task.Delay(timeout); + if (await Task.WhenAny(timeoutTask, task) == timeoutTask) + { + throw new MqttCommunicationTimedOutException(); + } + + if (task.IsFaulted) + { + throw new MqttCommunicationException(task.Exception); + } + } } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 70e8c76..3481cbe 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -20,6 +20,7 @@ namespace MQTTnet.Core.Client private readonly MqttClientOptions _options; private readonly IMqttCommunicationAdapter _adapter; + private bool _disconnectedEventSuspended; private int _latestPacketIdentifier; private CancellationTokenSource _cancellationTokenSource; @@ -48,49 +49,64 @@ namespace MQTTnet.Core.Client throw new MqttProtocolViolationException("It is not allowed to connect with a server after the connection is established."); } - var connectPacket = new MqttConnectPacket + try { - ClientId = _options.ClientId, - Username = _options.UserName, - Password = _options.Password, - CleanSession = _options.CleanSession, - KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, - WillMessage = willApplicationMessage - }; + _disconnectedEventSuspended = false; - await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout); - MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); + await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout); - _cancellationTokenSource = new CancellationTokenSource(); - _latestPacketIdentifier = 0; - _packetDispatcher.Reset(); - IsConnected = true; + MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => ReceivePackets(_cancellationTokenSource.Token), _cancellationTokenSource.Token); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + var connectPacket = new MqttConnectPacket + { + ClientId = _options.ClientId, + Username = _options.UserName, + Password = _options.Password, + CleanSession = _options.CleanSession, + KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, + WillMessage = willApplicationMessage + }; + + _cancellationTokenSource = new CancellationTokenSource(); + _latestPacketIdentifier = 0; + _packetDispatcher.Reset(); + + StartReceivePackets(); + + var response = await SendAndReceiveAsync(connectPacket); + if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) + { + await DisconnectInternalAsync(); + throw new MqttConnectingFailedException(response.ConnectReturnCode); + } - var response = await SendAndReceiveAsync(connectPacket); - if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) - { - await DisconnectAsync(); - throw new MqttConnectingFailedException(response.ConnectReturnCode); - } + if (_options.KeepAlivePeriod != TimeSpan.Zero) + { + StartSendKeepAliveMessages(); + } + + MqttTrace.Verbose(nameof(MqttClient), "MQTT connection with server established."); - if (_options.KeepAlivePeriod != TimeSpan.Zero) + IsConnected = true; + Connected?.Invoke(this, EventArgs.Empty); + } + catch (Exception) { -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => SendKeepAliveMessagesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + await DisconnectInternalAsync(); + throw; } - - Connected?.Invoke(this, EventArgs.Empty); } public async Task DisconnectAsync() { - await SendAsync(new MqttDisconnectPacket()); - await DisconnectInternalAsync(); + try + { + await SendAsync(new MqttDisconnectPacket()); + } + finally + { + await DisconnectInternalAsync(); + } } public Task> SubscribeAsync(params TopicFilter[] topicFilters) @@ -181,8 +197,9 @@ namespace MQTTnet.Core.Client { await _adapter.DisconnectAsync(); } - catch + catch (Exception exception) { + MqttTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting."); } finally { @@ -191,7 +208,12 @@ namespace MQTTnet.Core.Client _cancellationTokenSource = null; IsConnected = false; - Disconnected?.Invoke(this, EventArgs.Empty); + + if (!_disconnectedEventSuspended) + { + _disconnectedEventSuspended = true; + Disconnected?.Invoke(this, EventArgs.Empty); + } } } @@ -239,7 +261,7 @@ namespace MQTTnet.Core.Client } catch (Exception exception) { - MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); + MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); } } @@ -278,7 +300,7 @@ namespace MQTTnet.Core.Client { _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - + await SendAsync(pubRelPacket.CreateResponse()); } @@ -300,15 +322,12 @@ namespace MQTTnet.Core.Client var pi1 = requestPacket as IMqttPacketWithIdentifier; var pi2 = p as IMqttPacketWithIdentifier; - if (pi1 != null && pi2 != null) + if (pi1 == null || pi2 == null) { - if (pi1.PacketIdentifier != pi2.PacketIdentifier) - { - return false; - } + return true; } - return true; + return pi1.PacketIdentifier == pi2.PacketIdentifier; } await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout); @@ -335,15 +354,16 @@ namespace MQTTnet.Core.Client catch (MqttCommunicationException exception) { MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); + await DisconnectInternalAsync(); } catch (Exception exception) { MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets."); + await DisconnectInternalAsync(); } finally { MqttTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); - await DisconnectInternalAsync(); } } @@ -354,27 +374,47 @@ namespace MQTTnet.Core.Client { while (!cancellationToken.IsCancellationRequested) { - var mqttPacket = await _adapter.ReceivePacketAsync(TimeSpan.Zero); - MqttTrace.Information(nameof(MqttClient), $"Received <<< {mqttPacket}"); + var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero); + MqttTrace.Information(nameof(MqttClient), $"Received <<< {packet}"); -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => ProcessReceivedPacketAsync(mqttPacket), cancellationToken); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + StartProcessReceivedPacket(packet, cancellationToken); } } catch (MqttCommunicationException exception) { - MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); + MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); + await DisconnectInternalAsync(); } catch (Exception exception) { - MqttTrace.Error(nameof(MqttClient), exception, "Error while receiving packets."); + MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); + await DisconnectInternalAsync(); } finally { MqttTrace.Information(nameof(MqttClient), "Stopped receiving packets."); - await DisconnectInternalAsync(); } } + + private void StartProcessReceivedPacket(MqttBasePacket packet, CancellationToken cancellationToken) + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(() => ProcessReceivedPacketAsync(packet), cancellationToken); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + + private void StartReceivePackets() + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(() => ReceivePackets(_cancellationTokenSource.Token), _cancellationTokenSource.Token); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + + private void StartSendKeepAliveMessages() + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(() => SendKeepAliveMessagesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } } } \ No newline at end of file diff --git a/MQTTnet.Core/Exceptions/MqttCommunicationException.cs b/MQTTnet.Core/Exceptions/MqttCommunicationException.cs index 8b471a0..2fc578e 100644 --- a/MQTTnet.Core/Exceptions/MqttCommunicationException.cs +++ b/MQTTnet.Core/Exceptions/MqttCommunicationException.cs @@ -4,7 +4,7 @@ namespace MQTTnet.Core.Exceptions { public class MqttCommunicationException : Exception { - public MqttCommunicationException() + protected MqttCommunicationException() { } @@ -17,5 +17,10 @@ namespace MQTTnet.Core.Exceptions : base(message) { } + + public MqttCommunicationException(string message, Exception innerException) + : base(message, innerException) + { + } } }