From 9b720d2325ddeda153fff14020e8dcdb980da360 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 19 Mar 2017 18:41:29 +0100 Subject: [PATCH] Initial commit. --- .gitattributes | 63 ++ Images/Logo_128x128.png | Bin 0 -> 2382 bytes MQTT.NET.Core.Tests/ByteReaderTests.cs | 30 + MQTT.NET.Core.Tests/ByteWriterTests.cs | 51 ++ .../DefaultMqttV311PacketSerializerTests.cs | 434 ++++++++++++ MQTT.NET.Core.Tests/MQTTnet.Core.Tests.csproj | 66 ++ .../Properties/AssemblyInfo.cs | 15 + MQTT.NET.Core/Adapter/IMqttAdapter.cs | 18 + MQTT.NET.Core/Adapter/MqttChannelAdapter.cs | 72 ++ .../Adapter/MqttConnectingFailedException.cs | 16 + .../Channel/IMqttTransportChannel.cs | 16 + MQTT.NET.Core/Client/MqttClient.cs | 350 ++++++++++ MQTT.NET.Core/Client/MqttClientOptions.cs | 21 + MQTT.NET.Core/Client/MqttClientStatistics.cs | 7 + MQTT.NET.Core/Client/MqttPacketAwaiter.cs | 32 + MQTT.NET.Core/Client/MqttPacketDispatcher.cs | 84 +++ MQTT.NET.Core/Client/MqttSubscribeResult.cs | 18 + MQTT.NET.Core/Diagnostics/MqttTrace.cs | 39 ++ MQTT.NET.Core/Diagnostics/MqttTraceLevel.cs | 10 + .../MqttTraceMessagePublishedEventArgs.cs | 26 + MQTT.NET.Core/DictionaryExtensions.cs | 21 + .../Exceptions/MqttCommunicationException.cs | 21 + .../MqttCommunicationTimedOutException.cs | 6 + .../MqttProtocolViolationException.cs | 12 + MQTT.NET.Core/MQTTnet.Core.csproj | 96 +++ MQTT.NET.Core/MqttApplicationMessage.cs | 27 + ...MqttApplicationMessageReceivedEventArgs.cs | 16 + MQTT.NET.Core/Packets/MqttBasePacket.cs | 6 + MQTT.NET.Core/Packets/MqttConnAckPacket.cs | 16 + MQTT.NET.Core/Packets/MqttConnectPacket.cs | 17 + MQTT.NET.Core/Packets/MqttDisconnectPacket.cs | 6 + MQTT.NET.Core/Packets/MqttPingReqPacket.cs | 12 + MQTT.NET.Core/Packets/MqttPingRespPacket.cs | 10 + MQTT.NET.Core/Packets/MqttPubAckPacket.cs | 7 + MQTT.NET.Core/Packets/MqttPubCompPacket.cs | 7 + MQTT.NET.Core/Packets/MqttPubRecPacket.cs | 7 + MQTT.NET.Core/Packets/MqttPubRelPacket.cs | 7 + MQTT.NET.Core/Packets/MqttPublishPacket.cs | 19 + MQTT.NET.Core/Packets/MqttSubAckPacket.cs | 12 + MQTT.NET.Core/Packets/MqttSubscribePacket.cs | 11 + MQTT.NET.Core/Packets/MqttUnsubAckPacket.cs | 7 + MQTT.NET.Core/Packets/MqttUnsubscribe.cs | 11 + MQTT.NET.Core/Packets/TopicFilter.cs | 17 + MQTT.NET.Core/Properties/AssemblyInfo.cs | 15 + .../Protocol/MqttConnectReturnCode.cs | 12 + .../Protocol/MqttControlPacketType.cs | 20 + .../Protocol/MqttQualityOfServiceLevel.cs | 9 + .../Protocol/MqttSubscribeReturnCode.cs | 10 + MQTT.NET.Core/Serializer/ByteReader.cs | 42 ++ MQTT.NET.Core/Serializer/ByteWriter.cs | 36 + .../DefaultMqttV311PacketSerializer.cs | 632 ++++++++++++++++++ .../Serializer/IMqttPacketSerializer.cs | 13 + MQTT.NET.Core/Serializer/MqttPacketReader.cs | 130 ++++ MQTT.NET.Core/Serializer/MqttPacketWriter.cs | 107 +++ MQTT.NET.Core/TaskExtensions.cs | 11 + MQTT.NET.TestConsole/App.config | 6 + .../MQTTnet.TestConsole.csproj | 71 ++ MQTT.NET.TestConsole/Program.cs | 109 +++ .../Properties/AssemblyInfo.cs | 15 + MQTTnet.NET/MQTTnet.NETFramework.csproj | 56 ++ MQTTnet.NET/MqttClientFactory.cs | 17 + MQTTnet.NET/MqttTcpChannel.cs | 46 ++ MQTTnet.NET/Properties/AssemblyInfo.cs | 15 + MQTTnet.Universal/MQTTnet.Universal.csproj | 132 ++++ MQTTnet.Universal/MqttClientFactory.cs | 17 + MQTTnet.Universal/MqttTcpChannel.cs | 52 ++ MQTTnet.Universal/Properties/AssemblyInfo.cs | 14 + .../Properties/MQTTnet.Universal.rd.xml | 33 + MQTTnet.Universal/project.json | 16 + MQTTnet.sln | 112 ++++ 70 files changed, 3487 insertions(+) create mode 100644 .gitattributes create mode 100644 Images/Logo_128x128.png create mode 100644 MQTT.NET.Core.Tests/ByteReaderTests.cs create mode 100644 MQTT.NET.Core.Tests/ByteWriterTests.cs create mode 100644 MQTT.NET.Core.Tests/DefaultMqttV311PacketSerializerTests.cs create mode 100644 MQTT.NET.Core.Tests/MQTTnet.Core.Tests.csproj create mode 100644 MQTT.NET.Core.Tests/Properties/AssemblyInfo.cs create mode 100644 MQTT.NET.Core/Adapter/IMqttAdapter.cs create mode 100644 MQTT.NET.Core/Adapter/MqttChannelAdapter.cs create mode 100644 MQTT.NET.Core/Adapter/MqttConnectingFailedException.cs create mode 100644 MQTT.NET.Core/Channel/IMqttTransportChannel.cs create mode 100644 MQTT.NET.Core/Client/MqttClient.cs create mode 100644 MQTT.NET.Core/Client/MqttClientOptions.cs create mode 100644 MQTT.NET.Core/Client/MqttClientStatistics.cs create mode 100644 MQTT.NET.Core/Client/MqttPacketAwaiter.cs create mode 100644 MQTT.NET.Core/Client/MqttPacketDispatcher.cs create mode 100644 MQTT.NET.Core/Client/MqttSubscribeResult.cs create mode 100644 MQTT.NET.Core/Diagnostics/MqttTrace.cs create mode 100644 MQTT.NET.Core/Diagnostics/MqttTraceLevel.cs create mode 100644 MQTT.NET.Core/Diagnostics/MqttTraceMessagePublishedEventArgs.cs create mode 100644 MQTT.NET.Core/DictionaryExtensions.cs create mode 100644 MQTT.NET.Core/Exceptions/MqttCommunicationException.cs create mode 100644 MQTT.NET.Core/Exceptions/MqttCommunicationTimedOutException.cs create mode 100644 MQTT.NET.Core/Exceptions/MqttProtocolViolationException.cs create mode 100644 MQTT.NET.Core/MQTTnet.Core.csproj create mode 100644 MQTT.NET.Core/MqttApplicationMessage.cs create mode 100644 MQTT.NET.Core/MqttApplicationMessageReceivedEventArgs.cs create mode 100644 MQTT.NET.Core/Packets/MqttBasePacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttConnAckPacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttConnectPacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttDisconnectPacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttPingReqPacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttPingRespPacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttPubAckPacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttPubCompPacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttPubRecPacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttPubRelPacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttPublishPacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttSubAckPacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttSubscribePacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttUnsubAckPacket.cs create mode 100644 MQTT.NET.Core/Packets/MqttUnsubscribe.cs create mode 100644 MQTT.NET.Core/Packets/TopicFilter.cs create mode 100644 MQTT.NET.Core/Properties/AssemblyInfo.cs create mode 100644 MQTT.NET.Core/Protocol/MqttConnectReturnCode.cs create mode 100644 MQTT.NET.Core/Protocol/MqttControlPacketType.cs create mode 100644 MQTT.NET.Core/Protocol/MqttQualityOfServiceLevel.cs create mode 100644 MQTT.NET.Core/Protocol/MqttSubscribeReturnCode.cs create mode 100644 MQTT.NET.Core/Serializer/ByteReader.cs create mode 100644 MQTT.NET.Core/Serializer/ByteWriter.cs create mode 100644 MQTT.NET.Core/Serializer/DefaultMqttV311PacketSerializer.cs create mode 100644 MQTT.NET.Core/Serializer/IMqttPacketSerializer.cs create mode 100644 MQTT.NET.Core/Serializer/MqttPacketReader.cs create mode 100644 MQTT.NET.Core/Serializer/MqttPacketWriter.cs create mode 100644 MQTT.NET.Core/TaskExtensions.cs create mode 100644 MQTT.NET.TestConsole/App.config create mode 100644 MQTT.NET.TestConsole/MQTTnet.TestConsole.csproj create mode 100644 MQTT.NET.TestConsole/Program.cs create mode 100644 MQTT.NET.TestConsole/Properties/AssemblyInfo.cs create mode 100644 MQTTnet.NET/MQTTnet.NETFramework.csproj create mode 100644 MQTTnet.NET/MqttClientFactory.cs create mode 100644 MQTTnet.NET/MqttTcpChannel.cs create mode 100644 MQTTnet.NET/Properties/AssemblyInfo.cs create mode 100644 MQTTnet.Universal/MQTTnet.Universal.csproj create mode 100644 MQTTnet.Universal/MqttClientFactory.cs create mode 100644 MQTTnet.Universal/MqttTcpChannel.cs create mode 100644 MQTTnet.Universal/Properties/AssemblyInfo.cs create mode 100644 MQTTnet.Universal/Properties/MQTTnet.Universal.rd.xml create mode 100644 MQTTnet.Universal/project.json create mode 100644 MQTTnet.sln diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..1ff0c42 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,63 @@ +############################################################################### +# Set default behavior to automatically normalize line endings. +############################################################################### +* text=auto + +############################################################################### +# Set default behavior for command prompt diff. +# +# This is need for earlier builds of msysgit that does not have it on by +# default for csharp files. +# Note: This is only used by command line +############################################################################### +#*.cs diff=csharp + +############################################################################### +# Set the merge driver for project and solution files +# +# Merging from the command prompt will add diff markers to the files if there +# are conflicts (Merging from VS is not affected by the settings below, in VS +# the diff markers are never inserted). Diff markers may cause the following +# file extensions to fail to load in VS. An alternative would be to treat +# these files as binary and thus will always conflict and require user +# intervention with every merge. To do so, just uncomment the entries below +############################################################################### +#*.sln merge=binary +#*.csproj merge=binary +#*.vbproj merge=binary +#*.vcxproj merge=binary +#*.vcproj merge=binary +#*.dbproj merge=binary +#*.fsproj merge=binary +#*.lsproj merge=binary +#*.wixproj merge=binary +#*.modelproj merge=binary +#*.sqlproj merge=binary +#*.wwaproj merge=binary + +############################################################################### +# behavior for image files +# +# image files are treated as binary by default. +############################################################################### +#*.jpg binary +#*.png binary +#*.gif binary + +############################################################################### +# diff behavior for common document formats +# +# Convert binary document formats to text before diffing them. This feature +# is only available from the command line. Turn it on by uncommenting the +# entries below. +############################################################################### +#*.doc diff=astextplain +#*.DOC diff=astextplain +#*.docx diff=astextplain +#*.DOCX diff=astextplain +#*.dot diff=astextplain +#*.DOT diff=astextplain +#*.pdf diff=astextplain +#*.PDF diff=astextplain +#*.rtf diff=astextplain +#*.RTF diff=astextplain diff --git a/Images/Logo_128x128.png b/Images/Logo_128x128.png new file mode 100644 index 0000000000000000000000000000000000000000..5b88e134b0364528eb767fcb359dcf219362734a GIT binary patch literal 2382 zcmd6o`#0467RM)4cK^rUz~9+8yXnUP2gkr6Y-C1RM|a|$)h0k@-oy901i%rU^j5%6>ZP8|oloB`SifaU^ty8x$M0Us*h;|BV=gLHS0 zaS~*BfPS8!|0&Sl3k;xvf!<)yY4D5>80-s%(803|Fw_qWWrAV;Aj=;N4*RLat^bML9@OE>fI}l;k1T z@{!U4TM)QOyHUZI!6*q3FpY(Ni%sqGz?D`p2UBI??kd=!>W5i)UyB4%uu)eogUpMxy2kY;} z-iztO2Hs(V{n+4pY-kW0`hX38#72j((P8Y<2*w-1$42q7PxxmZK0b!?$MA`9{0kqS zoWQ5P;M0@%^c4Pe3ja2ZfB%Yq|Aq^`<1^p!Sphyfga4Q#ev0`)%>N`7=81&`;@2Ya z`xo*1HzE8@073!;2oNM7h=3sife8d5L?Qx32}}$|;5b1LOG``QA^d+km~T(C_)Rh) z*3>W(>CfinSkWF_=_4*Guq<3zPQi2*n-=Oz3ZXI2u(Z$kvXu8}@731P+b8KBMIx=N zwJ|q6e$MkpG0LGhZ)tP6VSaIkQ^!?*P5;o{Y;NBA9J`;~mP>8QY*__8uKiy*Qw8qU z+T8mx8&sz8lBnzKmuR=4lA+1t3PCD>$IJl@eV;!@mA(Ysq93-dlt?(NyiQtbSr+1b zv-0!T)aj;Q;P%WGU)okvo8x}1P1S(V;?E^s|9YOf`$VWba+I39WB6XD3rtzCI85y2X9AB<*Uz&{*m9(AlHP?>jec@s_s?jI#YvsC8-B zXxQBRt4#OiUHb$@%DMw;@oT3Y7VC=?-#QntD_%K-tGmB4_*ZtMM0>~+8@Vr%8e>XT zfmo8s%%g;U;7=0?U`xpfqa7xvjhtNDtq02E9dD-$qMxWzcizEODHeO$B9K3wU@|@xyHgm)Husy_YLqP}f;WJf^1d>N>d1Td&-T zNz`2ASkLM-7-jpXfXixiluFZpaZ4mDkz8Ij}rPn?sXD|FRFIm=qnmfUw5(~R^Uton?uhqh-9X!gWx z%!oW@-n)q`yLQ4;|JIC5$cZt_8~Ql&!#2~AE>6S8OBYthp5}1=JyhfbNDVImW4`)Q z@xM~m6dYM&8JVTAR<6A%c$BXkT$KAy0*AtAk;PJmc5U}|KJ3w$9FthPxwO3K*SRcqbCohmth!5wea0BeuZU0S}5fRyHLKg(U0W7ds?!)jemjrC1e zn8o+yeOiA`e(|N+!k+3wjHpAxh=CY8fBWp%6}CRHwDAYmJe@T6S(8t)t@-r=GC%Cu z8`gGV*d1g5y^=&PQ3;+_Ke+PVwWb{>IrXg-Q)#;ll!%xC3xg5cmP~bH%RYMkhW+Pf zHzWnI1%X`$<>Fs4A(%HAn7VNi2+`=W1MpoMO*(q71NLSX_a;C1jO?x&PbiOM1(|ZD zn3b(1s-Ho!zfon zLbKzV2mC4Dm`oWUwZp}~b*iCkv<7m!6S-f~GLuMmxBqJzyvt@Kf396?&r` z4%hQ25#{x~RXX@hyL^von{=_Ozpmoequ#+A4SRupEuLANRVOzzJ6~gR<>duaRkBxN zb)QGXlc7hE$UA#Owte}aPeWP0pI*KU`F%dmgW#ec9|mYLav~PISN_;oP_6HoqV1v zXkL3@w2_@FtdchKZK$gZk}zi8ejmY#>o03a)!K16DznHrj~;LEzZJSH&n9_?bmM4s U-EO8s%YTuLg}r(85wFDm08p92HUIzs literal 0 HcmV?d00001 diff --git a/MQTT.NET.Core.Tests/ByteReaderTests.cs b/MQTT.NET.Core.Tests/ByteReaderTests.cs new file mode 100644 index 0000000..76ff90b --- /dev/null +++ b/MQTT.NET.Core.Tests/ByteReaderTests.cs @@ -0,0 +1,30 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Core.Tests +{ + [TestClass] + public class ByteReaderTests + { + [TestMethod] + public void ByteReader_ReadToEnd() + { + var reader = new ByteReader(85); + Assert.IsTrue(reader.Read()); + Assert.IsFalse(reader.Read()); + Assert.IsTrue(reader.Read()); + Assert.IsFalse(reader.Read()); + Assert.IsTrue(reader.Read()); + Assert.IsFalse(reader.Read()); + Assert.IsTrue(reader.Read()); + Assert.IsFalse(reader.Read()); + } + + [TestMethod] + public void ByteReader_ReadPartial() + { + var reader = new ByteReader(15); + Assert.AreEqual(3, reader.Read(2)); + } + } +} diff --git a/MQTT.NET.Core.Tests/ByteWriterTests.cs b/MQTT.NET.Core.Tests/ByteWriterTests.cs new file mode 100644 index 0000000..6599193 --- /dev/null +++ b/MQTT.NET.Core.Tests/ByteWriterTests.cs @@ -0,0 +1,51 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Core.Tests +{ + [TestClass] + public class ByteWriterTests + { + [TestMethod] + public void ByteWriter_WriteMultipleAll() + { + var b = new ByteWriter(); + Assert.AreEqual(0, b.Value); + b.Write(3, 2); + Assert.AreEqual(3, b.Value); + } + + [TestMethod] + public void ByteWriter_WriteMultiplePartial() + { + var b = new ByteWriter(); + Assert.AreEqual(0, b.Value); + b.Write(255, 2); + Assert.AreEqual(3, b.Value); + } + + [TestMethod] + public void ByteWriter_WriteTo0xFF() + { + var b = new ByteWriter(); + + Assert.AreEqual(0, b.Value); + b.Write(true); + Assert.AreEqual(1, b.Value); + b.Write(true); + Assert.AreEqual(3, b.Value); + b.Write(true); + Assert.AreEqual(7, b.Value); + b.Write(true); + Assert.AreEqual(15, b.Value); + b.Write(true); + Assert.AreEqual(31, b.Value); + b.Write(true); + Assert.AreEqual(63, b.Value); + b.Write(true); + Assert.AreEqual(127, b.Value); + b.Write(true); + Assert.AreEqual(255, b.Value); + } + } +} diff --git a/MQTT.NET.Core.Tests/DefaultMqttV311PacketSerializerTests.cs b/MQTT.NET.Core.Tests/DefaultMqttV311PacketSerializerTests.cs new file mode 100644 index 0000000..5354e94 --- /dev/null +++ b/MQTT.NET.Core.Tests/DefaultMqttV311PacketSerializerTests.cs @@ -0,0 +1,434 @@ +using System; +using System.IO; +using System.Text; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Core.Channel; +using MQTTnet.Core.Client; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Core.Tests +{ + [TestClass] + public class DefaultMqttV311PacketSerializerTests + { + [TestMethod] + public void SerializeV311_MqttConnectPacket() + { + var p = new MqttConnectPacket + { + ClientId = "XYZ", + Password = "PASS", + Username = "USER", + KeepAlivePeriod = 123, + CleanSession = true + }; + + SerializeAndCompare(p, "EBsABE1RVFQEwgB7AANYWVoABFVTRVIABFBBU1M="); + } + + [TestMethod] + public void SerializeV311_MqttConnectPacketWithWillMessage() + { + var p = new MqttConnectPacket + { + ClientId = "XYZ", + Password = "PASS", + Username = "USER", + KeepAlivePeriod = 123, + CleanSession = true, + WillMessage = new MqttApplicationMessage( + "My/last/will", + Encoding.UTF8.GetBytes("Good byte."), + MqttQualityOfServiceLevel.AtLeastOnce, + true) + }; + + SerializeAndCompare(p, "EDUABE1RVFQE7gB7AANYWVoADE15L2xhc3Qvd2lsbAAKR29vZCBieXRlLgAEVVNFUgAEUEFTUw=="); + } + + [TestMethod] + public void DeserializeV311_MqttConnectPacket() + { + var p = new MqttConnectPacket + { + ClientId = "XYZ", + Password = "PASS", + Username = "USER", + KeepAlivePeriod = 123, + CleanSession = true + }; + + DeserializeAndCompare(p, "EBsABE1RVFQEwgB7AANYWVoABFVTRVIABFBBU1M="); + } + + [TestMethod] + public void DeserializeV311_MqttConnectPacketWithWillMessage() + { + var p = new MqttConnectPacket + { + ClientId = "XYZ", + Password = "PASS", + Username = "USER", + KeepAlivePeriod = 123, + CleanSession = true, + WillMessage = new MqttApplicationMessage( + "My/last/will", + Encoding.UTF8.GetBytes("Good byte."), + MqttQualityOfServiceLevel.AtLeastOnce, + true) + }; + + DeserializeAndCompare(p, "EDUABE1RVFQE7gB7AANYWVoADE15L2xhc3Qvd2lsbAAKR29vZCBieXRlLgAEVVNFUgAEUEFTUw=="); + } + + [TestMethod] + public void SerializeV311_MqttConnAckPacket() + { + var p = new MqttConnAckPacket + { + IsSessionPresent = true, + ConnectReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized + }; + + SerializeAndCompare(p, "IAIBBQ=="); + } + + [TestMethod] + public void DeserializeV311_MqttConnAckPacket() + { + var p = new MqttConnAckPacket + { + IsSessionPresent = true, + ConnectReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized + }; + + DeserializeAndCompare(p, "IAIBBQ=="); + } + + [TestMethod] + public void SerializeV311_MqttDisconnectPacket() + { + SerializeAndCompare(new MqttDisconnectPacket(), "4AA="); + } + + [TestMethod] + public void SerializeV311_MqttPingReqPacket() + { + SerializeAndCompare(new MqttPingReqPacket(), "wAA="); + } + + [TestMethod] + public void SerializeV311_MqttPingRespPacket() + { + SerializeAndCompare(new MqttPingRespPacket(), "0AA="); + } + + [TestMethod] + public void SerializeV311_MqttPublishPacket() + { + var p = new MqttPublishPacket + { + PacketIdentifier = 123, + Dup = true, + Retain = true, + Payload = Encoding.ASCII.GetBytes("HELLO"), + QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, + Topic = "A/B/C" + }; + + SerializeAndCompare(p, "Ow4ABUEvQi9DAHtIRUxMTw=="); + } + + [TestMethod] + public void DeserializeV311_MqttPublishPacket() + { + var p = new MqttPublishPacket + { + PacketIdentifier = 123, + Dup = true, + Retain = true, + Payload = Encoding.ASCII.GetBytes("HELLO"), + QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, + Topic = "A/B/C" + }; + + DeserializeAndCompare(p, "Ow4ABUEvQi9DAHtIRUxMTw=="); + } + + [TestMethod] + public void SerializeV311_MqttPubAckPacket() + { + var p = new MqttPubAckPacket + { + PacketIdentifier = 123 + }; + + SerializeAndCompare(p, "QAIAew=="); + } + + [TestMethod] + public void DeserializeV311_MqttPubAckPacket() + { + var p = new MqttPubAckPacket + { + PacketIdentifier = 123 + }; + + DeserializeAndCompare(p, "QAIAew=="); + } + + [TestMethod] + public void SerializeV311_MqttPubRecPacket() + { + var p = new MqttPubRecPacket + { + PacketIdentifier = 123 + }; + + SerializeAndCompare(p, "UAIAew=="); + } + + [TestMethod] + public void DeserializeV311_MqttPubRecPacket() + { + var p = new MqttPubRecPacket + { + PacketIdentifier = 123 + }; + + DeserializeAndCompare(p, "UAIAew=="); + } + + [TestMethod] + public void SerializeV311_MqttPubRelPacket() + { + var p = new MqttPubRelPacket + { + PacketIdentifier = 123 + }; + + SerializeAndCompare(p, "YgIAew=="); + } + + [TestMethod] + public void DeserializeV311_MqttPubRelPacket() + { + var p = new MqttPubRelPacket + { + PacketIdentifier = 123 + }; + + DeserializeAndCompare(p, "YgIAew=="); + } + + [TestMethod] + public void SerializeV311_MqttPubCompPacket() + { + var p = new MqttPubCompPacket + { + PacketIdentifier = 123 + }; + + SerializeAndCompare(p, "cAIAew=="); + } + + [TestMethod] + public void DeserializeV311_MqttPubCompPacket() + { + var p = new MqttPubCompPacket + { + PacketIdentifier = 123 + }; + + DeserializeAndCompare(p, "cAIAew=="); + } + + [TestMethod] + public void SerializeV311_MqttSubscribePacket() + { + var p = new MqttSubscribePacket + { + PacketIdentifier = 123 + }; + + p.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.ExactlyOnce)); + p.TopicFilters.Add(new TopicFilter("1/2/3", MqttQualityOfServiceLevel.AtLeastOnce)); + p.TopicFilters.Add(new TopicFilter("x/y/z", MqttQualityOfServiceLevel.AtMostOnce)); + + SerializeAndCompare(p, "ghoAewAFQS9CL0MCAAUxLzIvMwEABXgveS96AA=="); + } + + [TestMethod] + public void DeserializeV311_MqttSubscribePacket() + { + var p = new MqttSubscribePacket + { + PacketIdentifier = 123 + }; + + p.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.ExactlyOnce)); + p.TopicFilters.Add(new TopicFilter("1/2/3", MqttQualityOfServiceLevel.AtLeastOnce)); + p.TopicFilters.Add(new TopicFilter("x/y/z", MqttQualityOfServiceLevel.AtMostOnce)); + + DeserializeAndCompare(p, "ghoAewAFQS9CL0MCAAUxLzIvMwEABXgveS96AA=="); + } + + [TestMethod] + public void SerializeV311_MqttSubAckPacket() + { + var p = new MqttSubAckPacket + { + PacketIdentifier = 123 + }; + + p.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS0); + p.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1); + p.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS2); + p.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.Failure); + + SerializeAndCompare(p, "kAYAewABAoA="); + } + + [TestMethod] + public void DeserializeV311_MqttSubAckPacket() + { + var p = new MqttSubAckPacket + { + PacketIdentifier = 123 + }; + + p.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS0); + p.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1); + p.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS2); + p.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.Failure); + + DeserializeAndCompare(p, "kAYAewABAoA="); + } + + [TestMethod] + public void SerializeV311_MqttUnsubscribePacket() + { + var p = new MqttUnsubscribePacket + { + PacketIdentifier = 123 + }; + + p.TopicFilters.Add("A/B/C"); + p.TopicFilters.Add("1/2/3"); + p.TopicFilters.Add("x/y/z"); + + SerializeAndCompare(p, "ohcAewAFQS9CL0MABTEvMi8zAAV4L3kveg=="); + } + + [TestMethod] + public void DeserializeV311_MqttUnsubscribePacket() + { + var p = new MqttUnsubscribePacket + { + PacketIdentifier = 123 + }; + + p.TopicFilters.Add("A/B/C"); + p.TopicFilters.Add("1/2/3"); + p.TopicFilters.Add("x/y/z"); + + DeserializeAndCompare(p, "ohcAewAFQS9CL0MABTEvMi8zAAV4L3kveg=="); + } + + [TestMethod] + public void SerializeV311_MqttUnsubAckPacket() + { + var p = new MqttUnsubAckPacket + { + PacketIdentifier = 123 + }; + + SerializeAndCompare(p, "sAIAew=="); + } + + [TestMethod] + public void DeserializeV311_MqttUnsubAckPacket() + { + var p = new MqttUnsubAckPacket + { + PacketIdentifier = 123 + }; + + DeserializeAndCompare(p, "sAIAew=="); + } + + + public class TestChannel : IMqttTransportChannel + { + private readonly MemoryStream _stream = new MemoryStream(); + + public bool IsConnected { get; } = true; + + public TestChannel() + { + } + + public TestChannel(byte[] initialData) + { + _stream.Write(initialData, 0, initialData.Length); + _stream.Position = 0; + } + + public async Task ConnectAsync(MqttClientOptions options) + { + await Task.FromResult(0); + } + + public async Task DisconnectAsync() + { + await Task.FromResult(0); + } + + public async Task WriteAsync(byte[] buffer) + { + await _stream.WriteAsync(buffer, 0, buffer.Length); + } + + public async Task ReadAsync(byte[] buffer) + { + await _stream.ReadAsync(buffer, 0, buffer.Length); + } + + public byte[] ToArray() + { + return _stream.ToArray(); + } + } + + private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value) + { + var serializer = new DefaultMqttV311PacketSerializer(); + var channel = new TestChannel(); + serializer.SerializeAsync(packet, channel).Wait(); + var buffer = channel.ToArray(); + + Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(buffer)); + } + + private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value) + { + var serializer = new DefaultMqttV311PacketSerializer(); + + var channel1 = new TestChannel(); + serializer.SerializeAsync(packet, channel1).Wait(); + var buffer1 = channel1.ToArray(); + + var channel2 = new TestChannel(buffer1); + var deserializedPacket = serializer.DeserializeAsync(channel2).Result; + var buffer2 = channel2.ToArray(); + + var channel3 = new TestChannel(buffer2); + serializer.SerializeAsync(deserializedPacket, channel3).Wait(); + + Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(channel3.ToArray())); + } + } +} diff --git a/MQTT.NET.Core.Tests/MQTTnet.Core.Tests.csproj b/MQTT.NET.Core.Tests/MQTTnet.Core.Tests.csproj new file mode 100644 index 0000000..4fde26b --- /dev/null +++ b/MQTT.NET.Core.Tests/MQTTnet.Core.Tests.csproj @@ -0,0 +1,66 @@ + + + + + Debug + AnyCPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC} + Library + Properties + MQTTnet.Core.Tests + MQTTnet.Core.Tests + v4.6.1 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE} + MQTTnet.Core + + + + + + + + \ No newline at end of file diff --git a/MQTT.NET.Core.Tests/Properties/AssemblyInfo.cs b/MQTT.NET.Core.Tests/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..23aa962 --- /dev/null +++ b/MQTT.NET.Core.Tests/Properties/AssemblyInfo.cs @@ -0,0 +1,15 @@ +using System.Reflection; +using System.Runtime.InteropServices; + +[assembly: AssemblyTitle("MQTTnet.Core.Tests")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Christian Kratky")] +[assembly: AssemblyProduct("MQTTnet")] +[assembly: AssemblyCopyright("Copyright © Christian Kratky 2015-2017")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] +[assembly: ComVisible(false)] +[assembly: Guid("a7ff0c91-25de-4ba6-b39e-f54e8dadf1cc")] +[assembly: AssemblyVersion("2.1.0.4")] +[assembly: AssemblyFileVersion("2.1.0.4")] diff --git a/MQTT.NET.Core/Adapter/IMqttAdapter.cs b/MQTT.NET.Core/Adapter/IMqttAdapter.cs new file mode 100644 index 0000000..d51e8af --- /dev/null +++ b/MQTT.NET.Core/Adapter/IMqttAdapter.cs @@ -0,0 +1,18 @@ +using System; +using System.Threading.Tasks; +using MQTTnet.Core.Client; +using MQTTnet.Core.Packets; + +namespace MQTTnet.Core.Adapter +{ + public interface IMqttAdapter + { + Task ConnectAsync(MqttClientOptions options, TimeSpan timeout); + + Task DisconnectAsync(); + + Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout); + + Task ReceivePacket(); + } +} diff --git a/MQTT.NET.Core/Adapter/MqttChannelAdapter.cs b/MQTT.NET.Core/Adapter/MqttChannelAdapter.cs new file mode 100644 index 0000000..ef6e59f --- /dev/null +++ b/MQTT.NET.Core/Adapter/MqttChannelAdapter.cs @@ -0,0 +1,72 @@ +using System; +using System.Threading.Tasks; +using MQTTnet.Core.Channel; +using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Core.Adapter +{ + public class MqttChannelAdapter : IMqttAdapter + { + private readonly IMqttPacketSerializer _serializer; + private readonly IMqttTransportChannel _channel; + + public MqttChannelAdapter(IMqttTransportChannel channel, IMqttPacketSerializer serializer) + { + if (channel == null) throw new ArgumentNullException(nameof(channel)); + if (serializer == null) throw new ArgumentNullException(nameof(serializer)); + + _channel = channel; + _serializer = serializer; + } + + public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) + { + var task = _channel.ConnectAsync(options); + if (await Task.WhenAny(Task.Delay(timeout), task) != task) + { + throw new MqttCommunicationTimedOutException(); + } + } + + public async Task DisconnectAsync() + { + await _channel.DisconnectAsync(); + } + + public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) + { + MqttTrace.Information(nameof(MqttChannelAdapter), $"Sending with timeout {timeout} >>> {packet}"); + + bool hasTimeout; + try + { + var task = _serializer.SerializeAsync(packet, _channel); + hasTimeout = await Task.WhenAny(Task.Delay(timeout), task) != task; + } + catch (Exception exception) + { + throw new MqttCommunicationException(exception); + } + + if (hasTimeout) + { + throw new MqttCommunicationTimedOutException(); + } + } + + public async Task ReceivePacket() + { + var mqttPacket = await _serializer.DeserializeAsync(_channel); + if (mqttPacket == null) + { + throw new MqttProtocolViolationException("Received malformed packet."); + } + + return mqttPacket; + } + } +} \ No newline at end of file diff --git a/MQTT.NET.Core/Adapter/MqttConnectingFailedException.cs b/MQTT.NET.Core/Adapter/MqttConnectingFailedException.cs new file mode 100644 index 0000000..aaf94f4 --- /dev/null +++ b/MQTT.NET.Core/Adapter/MqttConnectingFailedException.cs @@ -0,0 +1,16 @@ +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Adapter +{ + public class MqttConnectingFailedException : MqttCommunicationException + { + public MqttConnectingFailedException(MqttConnectReturnCode returnCode) + : base($"Connecting with MQTT server failed ({returnCode}).") + { + ReturnCode = returnCode; + } + + public MqttConnectReturnCode ReturnCode { get; } + } +} diff --git a/MQTT.NET.Core/Channel/IMqttTransportChannel.cs b/MQTT.NET.Core/Channel/IMqttTransportChannel.cs new file mode 100644 index 0000000..218c2e4 --- /dev/null +++ b/MQTT.NET.Core/Channel/IMqttTransportChannel.cs @@ -0,0 +1,16 @@ +using System.Threading.Tasks; +using MQTTnet.Core.Client; + +namespace MQTTnet.Core.Channel +{ + public interface IMqttTransportChannel + { + Task ConnectAsync(MqttClientOptions options); + + Task DisconnectAsync(); + + Task WriteAsync(byte[] buffer); + + Task ReadAsync(byte[] buffer); + } +} diff --git a/MQTT.NET.Core/Client/MqttClient.cs b/MQTT.NET.Core/Client/MqttClient.cs new file mode 100644 index 0000000..1b0ad50 --- /dev/null +++ b/MQTT.NET.Core/Client/MqttClient.cs @@ -0,0 +1,350 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Client +{ + public class MqttClient + { + private readonly Dictionary _pendingExactlyOncePublishPackets = new Dictionary(); + private readonly HashSet _processedPublishPackets = new HashSet(); + private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); + private readonly MqttClientOptions _options; + private readonly IMqttAdapter _adapter; + + private int _latestPacketIdentifier; + private CancellationTokenSource _cancellationTokenSource; + + public MqttClient(MqttClientOptions options, IMqttAdapter adapter) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + if (adapter == null) throw new ArgumentNullException(nameof(adapter)); + + _options = options; + _adapter = adapter; + } + + public event EventHandler Connected; + + public event EventHandler Disconnected; + + public event EventHandler ApplicationMessageReceived; + + public bool IsConnected { get; private set; } + + public async Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null) + { + MqttTrace.Verbose(nameof(MqttClient), "Trying to connect."); + + if (IsConnected) + { + throw new MqttProtocolViolationException("It is not allowed to connect with a server after the connection is established."); + } + + var connectPacket = new MqttConnectPacket + { + ClientId = _options.ClientId, + Username = _options.UserName, + Password = _options.Password, + KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, + WillMessage = willApplicationMessage + }; + + await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout); + MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); + + _cancellationTokenSource = new CancellationTokenSource(); + _latestPacketIdentifier = 0; + _processedPublishPackets.Clear(); + _packetDispatcher.Reset(); + IsConnected = true; + + Task.Factory.StartNew(async () => await ReceivePackets( + _cancellationTokenSource.Token), _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).Forget(); + + var response = await SendAndReceiveAsync(connectPacket, p => true); + if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) + { + throw new MqttConnectingFailedException(response.ConnectReturnCode); + } + + if (_options.KeepAlivePeriod != TimeSpan.Zero) + { + Task.Factory.StartNew(async () => await SendKeepAliveMessagesAsync( + _cancellationTokenSource.Token), _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).Forget(); + } + + Connected?.Invoke(this, EventArgs.Empty); + } + + public async Task DisconnectAsync() + { + await SendAsync(new MqttDisconnectPacket()); + await DisconnectInternalAsync(); + } + + private void ThrowIfNotConnected() + { + if (!IsConnected) + { + throw new MqttCommunicationException("The client is not connected."); + } + } + + public async Task> SubscribeAsync(IList topicFilters) + { + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + if (!topicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3]."); + ThrowIfNotConnected(); + + var subscribePacket = new MqttSubscribePacket + { + PacketIdentifier = GetNewPacketIdentifier(), + TopicFilters = topicFilters + }; + + Func packetSelector = p => p.PacketIdentifier == subscribePacket.PacketIdentifier; + var response = await SendAndReceiveAsync(subscribePacket, packetSelector); + + if (response.SubscribeReturnCodes.Count != topicFilters.Count) + { + throw new MqttProtocolViolationException("The return codes are not matching the topic filters [MQTT-3.9.3-1]."); + } + + var result = new List(); + for (var i = 0; i < topicFilters.Count; i++) + { + result.Add(new MqttSubscribeResult(topicFilters[i], response.SubscribeReturnCodes[i])); + } + + return result; + } + + public async Task Unsubscribe(IList topicFilters) + { + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + if (!topicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2]."); + ThrowIfNotConnected(); + + var unsubscribePacket = new MqttUnsubscribePacket + { + PacketIdentifier = GetNewPacketIdentifier(), + TopicFilters = topicFilters + }; + + Func packetSelector = p => p.PacketIdentifier == unsubscribePacket.PacketIdentifier; + await SendAndReceiveAsync(unsubscribePacket, packetSelector); + } + + public async Task PublishAsync(MqttApplicationMessage applicationMessage) + { + if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); + ThrowIfNotConnected(); + + var publishPacket = new MqttPublishPacket + { + Topic = applicationMessage.Topic, + Payload = applicationMessage.Payload, + QualityOfServiceLevel = applicationMessage.QualityOfServiceLevel, + Retain = applicationMessage.Retain, + Dup = false + }; + + if (publishPacket.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtMostOnce) + { + publishPacket.PacketIdentifier = GetNewPacketIdentifier(); + } + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + { + if (!publishPacket.PacketIdentifier.HasValue) throw new InvalidOperationException(); + + Func packageSelector = p => p.PacketIdentifier == publishPacket.PacketIdentifier.Value; + await SendAndReceiveAsync(publishPacket, packageSelector); + } + else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) + { + if (!publishPacket.PacketIdentifier.HasValue) throw new InvalidOperationException(); + + Func packageSelector = p => p.PacketIdentifier == publishPacket.PacketIdentifier.Value; + await SendAndReceiveAsync(publishPacket, packageSelector); + await SendAsync(new MqttPubCompPacket { PacketIdentifier = publishPacket.PacketIdentifier.Value }); + } + } + + private async Task DisconnectInternalAsync() + { + try + { + await _adapter.DisconnectAsync(); + } + catch + { + } + finally + { + _cancellationTokenSource?.Cancel(); + IsConnected = false; + Disconnected?.Invoke(this, EventArgs.Empty); + } + } + + private async void ProcessIncomingPacket(MqttBasePacket mqttPacket) + { + var publishPacket = mqttPacket as MqttPublishPacket; + if (publishPacket != null) + { + await ProcessReceivedPublishPacket(publishPacket); + return; + } + + var pingReqPacket = mqttPacket as MqttPingReqPacket; + if (pingReqPacket != null) + { + await SendAsync(new MqttPingRespPacket()); + return; + } + + var pubRelPacket = mqttPacket as MqttPubRelPacket; + if (pubRelPacket != null) + { + await ProcessReceivedPubRelPacket(pubRelPacket); + return; + } + + _packetDispatcher.Dispatch(mqttPacket); + } + + private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) + { + if (publishPacket.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtMostOnce) + { + if (publishPacket.PacketIdentifier == null) throw new InvalidOperationException(); + _processedPublishPackets.Add(publishPacket.PacketIdentifier.Value); + } + + var applicationMessage = new MqttApplicationMessage( + publishPacket.Topic, + publishPacket.Payload, + publishPacket.QualityOfServiceLevel, + publishPacket.Retain + ); + + ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage)); + } + + private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) + { + var originalPublishPacket = _pendingExactlyOncePublishPackets.Take(pubRelPacket.PacketIdentifier); + if (originalPublishPacket == null) throw new MqttCommunicationException(); + await SendAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }); + + FireApplicationMessageReceivedEvent(originalPublishPacket); + } + + private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket) + { + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) + { + FireApplicationMessageReceivedEvent(publishPacket); + } + else + { + if (!publishPacket.PacketIdentifier.HasValue) { throw new InvalidOperationException(); } + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + { + FireApplicationMessageReceivedEvent(publishPacket); + await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier.Value }); + } + else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) + { + _pendingExactlyOncePublishPackets.Add(publishPacket.PacketIdentifier.Value, publishPacket); + await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier.Value }); + } + } + } + + private async Task SendAsync(MqttBasePacket packet) + { + await _adapter.SendPacketAsync(packet, _options.DefaultCommunicationTimeout); + } + + private async Task SendAndReceiveAsync( + MqttBasePacket requestPacket, Func responsePacketSelector) where TResponsePacket : MqttBasePacket + { + Func selector = p => + { + var p1 = p as TResponsePacket; + return p1 != null && responsePacketSelector(p1); + }; + + return (TResponsePacket)await SendAndReceiveAsync(requestPacket, selector); + } + + private async Task SendAndReceiveAsync(MqttBasePacket requestPacket, Func responsePacketSelector) + { + var waitTask = _packetDispatcher.WaitForPacketAsync(responsePacketSelector, _options.DefaultCommunicationTimeout); + await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout); + return await waitTask; + } + + private ushort GetNewPacketIdentifier() + { + return (ushort)Interlocked.Increment(ref _latestPacketIdentifier); + } + + private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) + { + MqttTrace.Information(nameof(MqttClient), "Start sending keep alive packets."); + + try + { + while (!cancellationToken.IsCancellationRequested) + { + await Task.Delay(_options.KeepAlivePeriod, cancellationToken); + await SendAndReceiveAsync(new MqttPingReqPacket(), p => true); + } + } + catch (Exception exception) + { + MqttTrace.Error(nameof(MqttClient), exception, "Error while sending keep alive packets."); + } + finally + { + MqttTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); + } + } + + private async Task ReceivePackets(CancellationToken cancellationToken) + { + MqttTrace.Information(nameof(MqttClient), "Start receiving packets."); + try + { + while (!cancellationToken.IsCancellationRequested) + { + var mqttPacket = await _adapter.ReceivePacket(); + MqttTrace.Information(nameof(MqttChannelAdapter), $"Received <<< {mqttPacket}"); + + Task.Run(() => ProcessIncomingPacket(mqttPacket), cancellationToken).Forget(); + } + } + catch (Exception exception) + { + MqttTrace.Error(nameof(MqttClient), exception, "Error while receiving packets."); + await DisconnectInternalAsync(); + } + finally + { + MqttTrace.Information(nameof(MqttClient), "Stopped receiving packets."); + } + } + } +} \ No newline at end of file diff --git a/MQTT.NET.Core/Client/MqttClientOptions.cs b/MQTT.NET.Core/Client/MqttClientOptions.cs new file mode 100644 index 0000000..5410527 --- /dev/null +++ b/MQTT.NET.Core/Client/MqttClientOptions.cs @@ -0,0 +1,21 @@ +using System; + +namespace MQTTnet.Core.Client +{ + public class MqttClientOptions + { + public string Server { get; set; } + + public int Port { get; set; } = 1883; + + public string UserName { get; set; } + + public string Password { get; set; } + + public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty); + + public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); + + public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + } +} diff --git a/MQTT.NET.Core/Client/MqttClientStatistics.cs b/MQTT.NET.Core/Client/MqttClientStatistics.cs new file mode 100644 index 0000000..c284e84 --- /dev/null +++ b/MQTT.NET.Core/Client/MqttClientStatistics.cs @@ -0,0 +1,7 @@ +namespace MQTTnet.Core.Client +{ + public class MqttClientStatistics + { + public int SentPackets { get; set; } + } +} diff --git a/MQTT.NET.Core/Client/MqttPacketAwaiter.cs b/MQTT.NET.Core/Client/MqttPacketAwaiter.cs new file mode 100644 index 0000000..4ee415f --- /dev/null +++ b/MQTT.NET.Core/Client/MqttPacketAwaiter.cs @@ -0,0 +1,32 @@ +using System; +using System.Threading.Tasks; +using MQTTnet.Core.Packets; + +namespace MQTTnet.Core.Client +{ + public class MqttPacketAwaiter + { + private readonly TaskCompletionSource _taskCompletionSource = new TaskCompletionSource(); + private readonly Func _packetSelector; + + public MqttPacketAwaiter(Func packetSelector) + { + if (packetSelector == null) throw new ArgumentNullException(nameof(packetSelector)); + + _packetSelector = packetSelector; + } + + public Task Task => _taskCompletionSource.Task; + + public bool CheckPacket(MqttBasePacket packet) + { + if (!_packetSelector(packet)) + { + return false; + } + + _taskCompletionSource.SetResult(packet); + return true; + } + } +} \ No newline at end of file diff --git a/MQTT.NET.Core/Client/MqttPacketDispatcher.cs b/MQTT.NET.Core/Client/MqttPacketDispatcher.cs new file mode 100644 index 0000000..830aee4 --- /dev/null +++ b/MQTT.NET.Core/Client/MqttPacketDispatcher.cs @@ -0,0 +1,84 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Packets; + +namespace MQTTnet.Core.Client +{ + public class MqttPacketDispatcher + { + private readonly List _packetAwaiters = new List(); + + public async Task WaitForPacketAsync(Func selector, TimeSpan timeout) + { + if (selector == null) throw new ArgumentNullException(nameof(selector)); + + var waitHandle = new MqttPacketAwaiter(selector); + AddPacketAwaiter(waitHandle); + + var hasTimeout = await Task.WhenAny(Task.Delay(timeout), waitHandle.Task) != waitHandle.Task; + RemovePacketAwaiter(waitHandle); + + if (hasTimeout) + { + MqttTrace.Error(nameof(MqttPacketDispatcher), $"Timeout while waiting for packet."); + throw new MqttCommunicationTimedOutException(); + } + + return waitHandle.Task.Result; + } + + public void Dispatch(MqttBasePacket packet) + { + if (packet == null) throw new ArgumentNullException(nameof(packet)); + + var packetDispatched = false; + foreach (var packetAwaiter in GetPacketAwaiters()) + { + if (packetAwaiter.CheckPacket(packet)) + { + packetDispatched = true; + } + } + + if (!packetDispatched) + { + MqttTrace.Warning(nameof(MqttPacketDispatcher), $"Received packet '{packet}' not dispatched."); + } + } + + private List GetPacketAwaiters() + { + lock (_packetAwaiters) + { + return new List(_packetAwaiters); + } + } + + private void AddPacketAwaiter(MqttPacketAwaiter packetAwaiter) + { + lock (_packetAwaiters) + { + _packetAwaiters.Add(packetAwaiter); + } + } + + private void RemovePacketAwaiter(MqttPacketAwaiter packetAwaiter) + { + lock (_packetAwaiters) + { + _packetAwaiters.Remove(packetAwaiter); + } + } + + public void Reset() + { + lock (_packetAwaiters) + { + _packetAwaiters.Clear(); + } + } + } +} \ No newline at end of file diff --git a/MQTT.NET.Core/Client/MqttSubscribeResult.cs b/MQTT.NET.Core/Client/MqttSubscribeResult.cs new file mode 100644 index 0000000..9dd4944 --- /dev/null +++ b/MQTT.NET.Core/Client/MqttSubscribeResult.cs @@ -0,0 +1,18 @@ +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Client +{ + public class MqttSubscribeResult + { + public MqttSubscribeResult(TopicFilter topicFilter, MqttSubscribeReturnCode returnCode) + { + TopicFilter = topicFilter; + ReturnCode = returnCode; + } + + public TopicFilter TopicFilter { get; } + + public MqttSubscribeReturnCode ReturnCode { get; } + } +} diff --git a/MQTT.NET.Core/Diagnostics/MqttTrace.cs b/MQTT.NET.Core/Diagnostics/MqttTrace.cs new file mode 100644 index 0000000..ba72b36 --- /dev/null +++ b/MQTT.NET.Core/Diagnostics/MqttTrace.cs @@ -0,0 +1,39 @@ +using System; + +namespace MQTTnet.Core.Diagnostics +{ + public static class MqttTrace + { + public static event EventHandler TraceMessagePublished; + + public static void Verbose(string source, string message) + { + TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Verbose, message, null)); + } + + public static void Information(string source, string message) + { + TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Information, message, null)); + } + + public static void Warning(string source, string message) + { + TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Warning, message, null)); + } + + public static void Warning(string source, Exception exception, string message) + { + TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Warning, message, exception)); + } + + public static void Error(string source, string message) + { + TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Error, message, null)); + } + + public static void Error(string source, Exception exception, string message) + { + TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Error, message, exception)); + } + } +} diff --git a/MQTT.NET.Core/Diagnostics/MqttTraceLevel.cs b/MQTT.NET.Core/Diagnostics/MqttTraceLevel.cs new file mode 100644 index 0000000..0e7463f --- /dev/null +++ b/MQTT.NET.Core/Diagnostics/MqttTraceLevel.cs @@ -0,0 +1,10 @@ +namespace MQTTnet.Core.Diagnostics +{ + public enum MqttTraceLevel + { + Verbose, + Information, + Warning, + Error + } +} diff --git a/MQTT.NET.Core/Diagnostics/MqttTraceMessagePublishedEventArgs.cs b/MQTT.NET.Core/Diagnostics/MqttTraceMessagePublishedEventArgs.cs new file mode 100644 index 0000000..eb73c30 --- /dev/null +++ b/MQTT.NET.Core/Diagnostics/MqttTraceMessagePublishedEventArgs.cs @@ -0,0 +1,26 @@ +using System; + +namespace MQTTnet.Core.Diagnostics +{ + public class MqttTraceMessagePublishedEventArgs : EventArgs + { + public MqttTraceMessagePublishedEventArgs(int threadId, string source, MqttTraceLevel level, string message, Exception exception) + { + ThreadId = threadId; + Source = source; + Level = level; + Message = message; + Exception = exception; + } + + public int ThreadId { get; } + + public string Source { get; } + + public MqttTraceLevel Level { get; } + + public string Message { get; } + + public Exception Exception { get; } + } +} diff --git a/MQTT.NET.Core/DictionaryExtensions.cs b/MQTT.NET.Core/DictionaryExtensions.cs new file mode 100644 index 0000000..da405ae --- /dev/null +++ b/MQTT.NET.Core/DictionaryExtensions.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; + +namespace MQTTnet.Core +{ + public static class DictionaryExtensions + { + public static TValue Take(this IDictionary dictionary, TKey key) + { + if (dictionary == null) throw new ArgumentNullException(nameof(dictionary)); + + TValue value; + if (dictionary.TryGetValue(key, out value)) + { + dictionary.Remove(key); + } + + return value; + } + } +} diff --git a/MQTT.NET.Core/Exceptions/MqttCommunicationException.cs b/MQTT.NET.Core/Exceptions/MqttCommunicationException.cs new file mode 100644 index 0000000..8b471a0 --- /dev/null +++ b/MQTT.NET.Core/Exceptions/MqttCommunicationException.cs @@ -0,0 +1,21 @@ +using System; + +namespace MQTTnet.Core.Exceptions +{ + public class MqttCommunicationException : Exception + { + public MqttCommunicationException() + { + } + + public MqttCommunicationException(Exception innerException) + : base(innerException.Message, innerException) + { + } + + public MqttCommunicationException(string message) + : base(message) + { + } + } +} diff --git a/MQTT.NET.Core/Exceptions/MqttCommunicationTimedOutException.cs b/MQTT.NET.Core/Exceptions/MqttCommunicationTimedOutException.cs new file mode 100644 index 0000000..2e707d7 --- /dev/null +++ b/MQTT.NET.Core/Exceptions/MqttCommunicationTimedOutException.cs @@ -0,0 +1,6 @@ +namespace MQTTnet.Core.Exceptions +{ + public class MqttCommunicationTimedOutException : MqttCommunicationException + { + } +} diff --git a/MQTT.NET.Core/Exceptions/MqttProtocolViolationException.cs b/MQTT.NET.Core/Exceptions/MqttProtocolViolationException.cs new file mode 100644 index 0000000..153e867 --- /dev/null +++ b/MQTT.NET.Core/Exceptions/MqttProtocolViolationException.cs @@ -0,0 +1,12 @@ +using System; + +namespace MQTTnet.Core.Exceptions +{ + public class MqttProtocolViolationException : Exception + { + public MqttProtocolViolationException(string message) : base(message) + { + + } + } +} diff --git a/MQTT.NET.Core/MQTTnet.Core.csproj b/MQTT.NET.Core/MQTTnet.Core.csproj new file mode 100644 index 0000000..826d38b --- /dev/null +++ b/MQTT.NET.Core/MQTTnet.Core.csproj @@ -0,0 +1,96 @@ + + + + + 10.0 + Debug + AnyCPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE} + Library + Properties + MQTTnet.Core + MQTTnet.Core + en-US + 512 + {786C830F-07A1-408B-BD7F-6EE04809D6DB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC} + Profile111 + v4.5 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/MQTT.NET.Core/MqttApplicationMessage.cs b/MQTT.NET.Core/MqttApplicationMessage.cs new file mode 100644 index 0000000..1dbd056 --- /dev/null +++ b/MQTT.NET.Core/MqttApplicationMessage.cs @@ -0,0 +1,27 @@ +using System; +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core +{ + public class MqttApplicationMessage + { + public MqttApplicationMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, bool retain) + { + if (topic == null) throw new ArgumentNullException(nameof(topic)); + if (payload == null) throw new ArgumentNullException(nameof(payload)); + + Topic = topic; + Payload = payload; + QualityOfServiceLevel = qualityOfServiceLevel; + Retain = retain; + } + + public string Topic { get; } + + public byte[] Payload { get; } + + public MqttQualityOfServiceLevel QualityOfServiceLevel { get; } + + public bool Retain { get; } + } +} diff --git a/MQTT.NET.Core/MqttApplicationMessageReceivedEventArgs.cs b/MQTT.NET.Core/MqttApplicationMessageReceivedEventArgs.cs new file mode 100644 index 0000000..8167b8d --- /dev/null +++ b/MQTT.NET.Core/MqttApplicationMessageReceivedEventArgs.cs @@ -0,0 +1,16 @@ +using System; + +namespace MQTTnet.Core +{ + public class MqttApplicationMessageReceivedEventArgs : EventArgs + { + public MqttApplicationMessageReceivedEventArgs(MqttApplicationMessage applicationMessage) + { + if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); + + ApplicationMessage = applicationMessage; + } + + public MqttApplicationMessage ApplicationMessage { get; } + } +} diff --git a/MQTT.NET.Core/Packets/MqttBasePacket.cs b/MQTT.NET.Core/Packets/MqttBasePacket.cs new file mode 100644 index 0000000..41901e5 --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttBasePacket.cs @@ -0,0 +1,6 @@ +namespace MQTTnet.Core.Packets +{ + public abstract class MqttBasePacket + { + } +} diff --git a/MQTT.NET.Core/Packets/MqttConnAckPacket.cs b/MQTT.NET.Core/Packets/MqttConnAckPacket.cs new file mode 100644 index 0000000..3d262dc --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttConnAckPacket.cs @@ -0,0 +1,16 @@ +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Packets +{ + public class MqttConnAckPacket : MqttBasePacket + { + public bool IsSessionPresent { get; set; } + + public MqttConnectReturnCode ConnectReturnCode { get; set; } + + public override string ToString() + { + return $"{nameof(MqttConnAckPacket)}: [ConnectReturnCode={ConnectReturnCode}] [IsSessionPresent={IsSessionPresent}]"; + } + } +} diff --git a/MQTT.NET.Core/Packets/MqttConnectPacket.cs b/MQTT.NET.Core/Packets/MqttConnectPacket.cs new file mode 100644 index 0000000..e18a646 --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttConnectPacket.cs @@ -0,0 +1,17 @@ +namespace MQTTnet.Core.Packets +{ + public class MqttConnectPacket: MqttBasePacket + { + public string ClientId { get; set; } + + public string Username { get; set; } + + public string Password { get; set; } + + public ushort KeepAlivePeriod { get; set; } + + public bool CleanSession { get; set; } + + public MqttApplicationMessage WillMessage { get; set; } + } +} diff --git a/MQTT.NET.Core/Packets/MqttDisconnectPacket.cs b/MQTT.NET.Core/Packets/MqttDisconnectPacket.cs new file mode 100644 index 0000000..4e3659a --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttDisconnectPacket.cs @@ -0,0 +1,6 @@ +namespace MQTTnet.Core.Packets +{ + public class MqttDisconnectPacket : MqttBasePacket + { + } +} diff --git a/MQTT.NET.Core/Packets/MqttPingReqPacket.cs b/MQTT.NET.Core/Packets/MqttPingReqPacket.cs new file mode 100644 index 0000000..5c0fb57 --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttPingReqPacket.cs @@ -0,0 +1,12 @@ +using System.Xml; + +namespace MQTTnet.Core.Packets +{ + public class MqttPingReqPacket : MqttBasePacket + { + public override string ToString() + { + return nameof(MqttPingReqPacket); + } + } +} diff --git a/MQTT.NET.Core/Packets/MqttPingRespPacket.cs b/MQTT.NET.Core/Packets/MqttPingRespPacket.cs new file mode 100644 index 0000000..d7a837c --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttPingRespPacket.cs @@ -0,0 +1,10 @@ +namespace MQTTnet.Core.Packets +{ + public class MqttPingRespPacket : MqttBasePacket + { + public override string ToString() + { + return nameof(MqttPingRespPacket); + } + } +} diff --git a/MQTT.NET.Core/Packets/MqttPubAckPacket.cs b/MQTT.NET.Core/Packets/MqttPubAckPacket.cs new file mode 100644 index 0000000..da61768 --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttPubAckPacket.cs @@ -0,0 +1,7 @@ +namespace MQTTnet.Core.Packets +{ + public class MqttPubAckPacket : MqttBasePacket + { + public ushort PacketIdentifier { get; set; } + } +} diff --git a/MQTT.NET.Core/Packets/MqttPubCompPacket.cs b/MQTT.NET.Core/Packets/MqttPubCompPacket.cs new file mode 100644 index 0000000..9912ae8 --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttPubCompPacket.cs @@ -0,0 +1,7 @@ +namespace MQTTnet.Core.Packets +{ + public class MqttPubCompPacket : MqttBasePacket + { + public ushort PacketIdentifier { get; set; } + } +} diff --git a/MQTT.NET.Core/Packets/MqttPubRecPacket.cs b/MQTT.NET.Core/Packets/MqttPubRecPacket.cs new file mode 100644 index 0000000..8d81cfb --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttPubRecPacket.cs @@ -0,0 +1,7 @@ +namespace MQTTnet.Core.Packets +{ + public class MqttPubRecPacket : MqttBasePacket + { + public ushort PacketIdentifier { get; set; } + } +} diff --git a/MQTT.NET.Core/Packets/MqttPubRelPacket.cs b/MQTT.NET.Core/Packets/MqttPubRelPacket.cs new file mode 100644 index 0000000..787be0a --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttPubRelPacket.cs @@ -0,0 +1,7 @@ +namespace MQTTnet.Core.Packets +{ + public class MqttPubRelPacket : MqttBasePacket + { + public ushort PacketIdentifier { get; set; } + } +} diff --git a/MQTT.NET.Core/Packets/MqttPublishPacket.cs b/MQTT.NET.Core/Packets/MqttPublishPacket.cs new file mode 100644 index 0000000..912e1a8 --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttPublishPacket.cs @@ -0,0 +1,19 @@ +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Packets +{ + public class MqttPublishPacket : MqttBasePacket + { + public ushort? PacketIdentifier { get; set; } + + public bool Retain { get; set; } + + public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } + + public bool Dup { get; set; } + + public string Topic { get; set; } + + public byte[] Payload { get; set; } + } +} diff --git a/MQTT.NET.Core/Packets/MqttSubAckPacket.cs b/MQTT.NET.Core/Packets/MqttSubAckPacket.cs new file mode 100644 index 0000000..785af90 --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttSubAckPacket.cs @@ -0,0 +1,12 @@ +using System.Collections.Generic; +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Packets +{ + public class MqttSubAckPacket : MqttBasePacket + { + public ushort PacketIdentifier { get; set; } + + public List SubscribeReturnCodes { get; set; } = new List(); + } +} diff --git a/MQTT.NET.Core/Packets/MqttSubscribePacket.cs b/MQTT.NET.Core/Packets/MqttSubscribePacket.cs new file mode 100644 index 0000000..6157156 --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttSubscribePacket.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; + +namespace MQTTnet.Core.Packets +{ + public class MqttSubscribePacket : MqttBasePacket + { + public ushort PacketIdentifier { get; set; } + + public IList TopicFilters { get; set; } = new List(); + } +} diff --git a/MQTT.NET.Core/Packets/MqttUnsubAckPacket.cs b/MQTT.NET.Core/Packets/MqttUnsubAckPacket.cs new file mode 100644 index 0000000..59538e4 --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttUnsubAckPacket.cs @@ -0,0 +1,7 @@ +namespace MQTTnet.Core.Packets +{ + public class MqttUnsubAckPacket : MqttBasePacket + { + public ushort PacketIdentifier { get; set; } + } +} diff --git a/MQTT.NET.Core/Packets/MqttUnsubscribe.cs b/MQTT.NET.Core/Packets/MqttUnsubscribe.cs new file mode 100644 index 0000000..c874311 --- /dev/null +++ b/MQTT.NET.Core/Packets/MqttUnsubscribe.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; + +namespace MQTTnet.Core.Packets +{ + public class MqttUnsubscribePacket : MqttBasePacket + { + public ushort PacketIdentifier { get; set; } + + public IList TopicFilters { get; set; } = new List(); + } +} diff --git a/MQTT.NET.Core/Packets/TopicFilter.cs b/MQTT.NET.Core/Packets/TopicFilter.cs new file mode 100644 index 0000000..569f9e6 --- /dev/null +++ b/MQTT.NET.Core/Packets/TopicFilter.cs @@ -0,0 +1,17 @@ +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Packets +{ + public class TopicFilter + { + public TopicFilter(string topic, MqttQualityOfServiceLevel qualityOfServiceLevel) + { + Topic = topic; + QualityOfServiceLevel = qualityOfServiceLevel; + } + + public string Topic { get; } + + public MqttQualityOfServiceLevel QualityOfServiceLevel { get; } + } +} \ No newline at end of file diff --git a/MQTT.NET.Core/Properties/AssemblyInfo.cs b/MQTT.NET.Core/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..70c3047 --- /dev/null +++ b/MQTT.NET.Core/Properties/AssemblyInfo.cs @@ -0,0 +1,15 @@ +using System.Reflection; +using System.Runtime.InteropServices; + +[assembly: AssemblyTitle("MQTTnet.Core")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Christian Kratky")] +[assembly: AssemblyProduct("MQTTnet")] +[assembly: AssemblyCopyright("Copyright © Christian Kratky 2016-2017")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] +[assembly: ComVisible(false)] +[assembly: Guid("0e10d0ea-8e4b-4903-ab9e-93a187d07922")] +[assembly: AssemblyVersion("2.1.0.4")] +[assembly: AssemblyFileVersion("2.1.0.4")] diff --git a/MQTT.NET.Core/Protocol/MqttConnectReturnCode.cs b/MQTT.NET.Core/Protocol/MqttConnectReturnCode.cs new file mode 100644 index 0000000..604cd48 --- /dev/null +++ b/MQTT.NET.Core/Protocol/MqttConnectReturnCode.cs @@ -0,0 +1,12 @@ +namespace MQTTnet.Core.Protocol +{ + public enum MqttConnectReturnCode + { + ConnectionAccepted = 0x00, + ConnectionRefusedUnacceptableProtocolVersion = 0x01, + ConnectionRefusedIdentifierRejected = 0x02, + ConnectionRefusedServerUnavailable = 0x03, + ConnectionRefusedBadUsernameOrPassword = 0x04, + ConnectionRefusedNotAuthorized = 0x05 + } +} diff --git a/MQTT.NET.Core/Protocol/MqttControlPacketType.cs b/MQTT.NET.Core/Protocol/MqttControlPacketType.cs new file mode 100644 index 0000000..aa46b9d --- /dev/null +++ b/MQTT.NET.Core/Protocol/MqttControlPacketType.cs @@ -0,0 +1,20 @@ +namespace MQTTnet.Core.Protocol +{ + public enum MqttControlPacketType + { + Connect = 1, + ConnAck = 2, + Publish = 3, + PubAck = 4, + PubRec = 5, + PubRel = 6, + PubComp = 7, + Subscribe = 8, + SubAck = 9, + Unsubscibe = 10, + UnsubAck = 11, + PingReq = 12, + PingResp = 13, + Disconnect = 14 + } +} diff --git a/MQTT.NET.Core/Protocol/MqttQualityOfServiceLevel.cs b/MQTT.NET.Core/Protocol/MqttQualityOfServiceLevel.cs new file mode 100644 index 0000000..c19496a --- /dev/null +++ b/MQTT.NET.Core/Protocol/MqttQualityOfServiceLevel.cs @@ -0,0 +1,9 @@ +namespace MQTTnet.Core.Protocol +{ + public enum MqttQualityOfServiceLevel + { + AtMostOnce = 0x00, + AtLeastOnce = 0x01, + ExactlyOnce = 0x02 + } +} diff --git a/MQTT.NET.Core/Protocol/MqttSubscribeReturnCode.cs b/MQTT.NET.Core/Protocol/MqttSubscribeReturnCode.cs new file mode 100644 index 0000000..a561423 --- /dev/null +++ b/MQTT.NET.Core/Protocol/MqttSubscribeReturnCode.cs @@ -0,0 +1,10 @@ +namespace MQTTnet.Core.Protocol +{ + public enum MqttSubscribeReturnCode + { + SuccessMaximumQoS0 = 0x00, + SuccessMaximumQoS1 = 0x01, + SuccessMaximumQoS2 = 0x02, + Failure = 0x80 + } +} diff --git a/MQTT.NET.Core/Serializer/ByteReader.cs b/MQTT.NET.Core/Serializer/ByteReader.cs new file mode 100644 index 0000000..06dae2f --- /dev/null +++ b/MQTT.NET.Core/Serializer/ByteReader.cs @@ -0,0 +1,42 @@ +using System; + +namespace MQTTnet.Core.Serializer +{ + public class ByteReader + { + private int _index; + private readonly int _byte; + + public ByteReader(byte @byte) + { + _byte = @byte; + } + + public bool Read() + { + if (_index >= 8) + { + throw new InvalidOperationException("End of the byte reached."); + } + + var result = ((1 << _index) & _byte) > 0; + _index++; + + return result; + } + + public byte Read(int count) + { + var result = 0; + for (var i = 0; i < count; i++) + { + if (Read()) + { + result |= 1 << i; + } + } + + return (byte)result; + } + } +} diff --git a/MQTT.NET.Core/Serializer/ByteWriter.cs b/MQTT.NET.Core/Serializer/ByteWriter.cs new file mode 100644 index 0000000..cc19ce0 --- /dev/null +++ b/MQTT.NET.Core/Serializer/ByteWriter.cs @@ -0,0 +1,36 @@ +using System; + +namespace MQTTnet.Core.Serializer +{ + public class ByteWriter + { + private int _index; + private int _byte; + + public byte Value => (byte)_byte; + + public void Write(byte @byte, int count) + { + for (var i = 0; i < count; i++) + { + var value = ((1 << i) & @byte) > 0; + Write(value); + } + } + + public void Write(bool bit) + { + if (_index >= 8) + { + throw new InvalidOperationException("End of the byte reached."); + } + + if (bit) + { + _byte |= 1 << _index; + } + + _index++; + } + } +} diff --git a/MQTT.NET.Core/Serializer/DefaultMqttV311PacketSerializer.cs b/MQTT.NET.Core/Serializer/DefaultMqttV311PacketSerializer.cs new file mode 100644 index 0000000..20f3f36 --- /dev/null +++ b/MQTT.NET.Core/Serializer/DefaultMqttV311PacketSerializer.cs @@ -0,0 +1,632 @@ +using System; +using System.Linq; +using System.Net; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using MQTTnet.Core.Channel; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Serializer +{ + public class DefaultMqttV311PacketSerializer : IMqttPacketSerializer + { + public async Task SerializeAsync(MqttBasePacket packet, IMqttTransportChannel destination) + { + if (packet == null) throw new ArgumentNullException(nameof(packet)); + if (destination == null) throw new ArgumentNullException(nameof(destination)); + + var connectPacket = packet as MqttConnectPacket; + if (connectPacket != null) + { + await SerializeAsync(connectPacket, destination); + return; + } + + var connAckPacket = packet as MqttConnAckPacket; + if (connAckPacket != null) + { + await SerializeAsync(connAckPacket, destination); + return; + } + + var disconnectPacket = packet as MqttDisconnectPacket; + if (disconnectPacket != null) + { + await SerializeAsync(disconnectPacket, destination); + return; + } + + var pingReqPacket = packet as MqttPingReqPacket; + if (pingReqPacket != null) + { + await SerializeAsync(pingReqPacket, destination); + return; + } + + var pingRespPacket = packet as MqttPingRespPacket; + if (pingRespPacket != null) + { + await SerializeAsync(pingRespPacket, destination); + return; + } + + var publishPacket = packet as MqttPublishPacket; + if (publishPacket != null) + { + await SerializeAsync(publishPacket, destination); + return; + } + + var pubAckPacket = packet as MqttPubAckPacket; + if (pubAckPacket != null) + { + await SerializeAsync(pubAckPacket, destination); + return; + } + + var pubRecPacket = packet as MqttPubRecPacket; + if (pubRecPacket != null) + { + await SerializeAsync(pubRecPacket, destination); + return; + } + + var pubRelPacket = packet as MqttPubRelPacket; + if (pubRelPacket != null) + { + await SerializeAsync(pubRelPacket, destination); + return; + } + + var pubCompPacket = packet as MqttPubCompPacket; + if (pubCompPacket != null) + { + await SerializeAsync(pubCompPacket, destination); + return; + } + + var subscribePacket = packet as MqttSubscribePacket; + if (subscribePacket != null) + { + await SerializeAsync(subscribePacket, destination); + return; + } + + var subAckPacket = packet as MqttSubAckPacket; + if (subAckPacket != null) + { + await SerializeAsync(subAckPacket, destination); + return; + } + + var unsubscribePacket = packet as MqttUnsubscribePacket; + if (unsubscribePacket != null) + { + await SerializeAsync(unsubscribePacket, destination); + return; + } + + var unsubAckPacket = packet as MqttUnsubAckPacket; + if (unsubAckPacket != null) + { + await SerializeAsync(unsubAckPacket, destination); + return; + } + + throw new MqttProtocolViolationException("Packet type invalid."); + } + + public async Task DeserializeAsync(IMqttTransportChannel source) + { + using (var mqttPacketReader = new MqttPacketReader(source)) + { + await mqttPacketReader.ReadToEndAsync(); + + switch (mqttPacketReader.ControlPacketType) + { + case MqttControlPacketType.Connect: + { + return await DeserializeConnectAsync(mqttPacketReader); + } + + case MqttControlPacketType.ConnAck: + { + return await DeserializeConnAck(mqttPacketReader); + } + + case MqttControlPacketType.Disconnect: + { + return new MqttDisconnectPacket(); + } + + case MqttControlPacketType.Publish: + { + return await DeserializePublishAsync(mqttPacketReader); + } + + case MqttControlPacketType.PubAck: + { + return new MqttPubAckPacket + { + PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() + }; + } + + case MqttControlPacketType.PubRec: + { + return new MqttPubRecPacket + { + PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() + }; + } + + case MqttControlPacketType.PubRel: + { + return new MqttPubRelPacket + { + PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() + }; + } + + case MqttControlPacketType.PubComp: + { + return new MqttPubCompPacket + { + PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() + }; + } + + case MqttControlPacketType.PingReq: + { + return new MqttPingReqPacket(); + } + + case MqttControlPacketType.PingResp: + { + return new MqttPingRespPacket(); + } + + case MqttControlPacketType.Subscribe: + { + return await DeserializeSubscribeAsync(mqttPacketReader); + } + + case MqttControlPacketType.SubAck: + { + return await DeserializeSubAck(mqttPacketReader); + } + + case MqttControlPacketType.Unsubscibe: + { + return await DeserializeUnsubscribeAsync(mqttPacketReader); + } + + case MqttControlPacketType.UnsubAck: + { + return new MqttUnsubAckPacket + { + PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() + }; + } + } + } + + throw new ProtocolViolationException(); + } + + private async Task DeserializeUnsubscribeAsync(MqttPacketReader reader) + { + var packet = new MqttUnsubscribePacket + { + PacketIdentifier = await reader.ReadRemainingDataUShortAsync(), + }; + + while (!reader.EndOfRemainingData) + { + packet.TopicFilters.Add(await reader.ReadRemainingDataStringWithLengthPrefixAsync()); + } + + return packet; + } + + private async Task DeserializeSubscribeAsync(MqttPacketReader reader) + { + var packet = new MqttSubscribePacket + { + PacketIdentifier = await reader.ReadRemainingDataUShortAsync(), + }; + + while (!reader.EndOfRemainingData) + { + packet.TopicFilters.Add(new TopicFilter( + await reader.ReadRemainingDataStringWithLengthPrefixAsync(), + (MqttQualityOfServiceLevel)await reader.ReadRemainingDataByteAsync())); + } + + return packet; + } + + private async Task DeserializePublishAsync(MqttPacketReader reader) + { + var fixedHeader = new ByteReader(reader.FixedHeader); + var retain = fixedHeader.Read(); + var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2); + var dup = fixedHeader.Read(); + + var topic = await reader.ReadRemainingDataStringWithLengthPrefixAsync(); + + ushort? packetIdentifier = null; + if (qualityOfServiceLevel > 0) + { + packetIdentifier = await reader.ReadRemainingDataUShortAsync(); + } + + var packet = new MqttPublishPacket + { + Retain = retain, + QualityOfServiceLevel = qualityOfServiceLevel, + Dup = dup, + Topic = topic, + Payload = await reader.ReadRemainingDataAsync(), + PacketIdentifier = packetIdentifier + }; + + return packet; + } + + private async Task DeserializeConnectAsync(MqttPacketReader reader) + { + var packet = new MqttConnectPacket(); + + await reader.ReadRemainingDataByteAsync(); + await reader.ReadRemainingDataByteAsync(); + var protocolName = await reader.ReadRemainingDataAsync(4); + + if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT") + { + throw new ProtocolViolationException("Protocol name is not 'MQTT'."); + } + + var protocolLevel = await reader.ReadRemainingDataByteAsync(); + var connectFlags = await reader.ReadRemainingDataByteAsync(); + + var connectFlagsReader = new ByteReader(connectFlags); + connectFlagsReader.Read(); // Reserved. + packet.CleanSession = connectFlagsReader.Read(); + var willFlag = connectFlagsReader.Read(); + var willQoS = connectFlagsReader.Read(2); + var willRetain = connectFlagsReader.Read(); + var passwordFlag = connectFlagsReader.Read(); + var usernameFlag = connectFlagsReader.Read(); + + packet.KeepAlivePeriod = await reader.ReadRemainingDataUShortAsync(); + packet.ClientId = await reader.ReadRemainingDataStringWithLengthPrefixAsync(); + + if (willFlag) + { + packet.WillMessage = new MqttApplicationMessage( + await reader.ReadRemainingDataStringWithLengthPrefixAsync(), + await reader.ReadRemainingDataWithLengthPrefixAsync(), + (MqttQualityOfServiceLevel)willQoS, + willRetain); + } + + if (usernameFlag) + { + packet.Username = await reader.ReadRemainingDataStringWithLengthPrefixAsync(); + } + + if (passwordFlag) + { + packet.Password = await reader.ReadRemainingDataStringWithLengthPrefixAsync(); + } + + ValidateConnectPacket(packet); + return packet; + } + + private async Task DeserializeSubAck(MqttPacketReader reader) + { + var packet = new MqttSubAckPacket + { + PacketIdentifier = await reader.ReadRemainingDataUShortAsync() + }; + + while (!reader.EndOfRemainingData) + { + packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)await reader.ReadRemainingDataByteAsync()); + } + + return packet; + } + + private async Task DeserializeConnAck(MqttPacketReader reader) + { + var variableHeader1 = await reader.ReadRemainingDataByteAsync(); + var variableHeader2 = await reader.ReadRemainingDataByteAsync(); + + var packet = new MqttConnAckPacket + { + IsSessionPresent = new ByteReader(variableHeader1).Read(), + ConnectReturnCode = (MqttConnectReturnCode)variableHeader2 + }; + + return packet; + } + + private void ValidateConnectPacket(MqttConnectPacket packet) + { + if (string.IsNullOrEmpty(packet.ClientId) && !packet.CleanSession) + { + throw new ProtocolViolationException("CleanSession must be set if ClientId is empty [MQTT-3.1.3-7]."); + } + + if (!string.IsNullOrEmpty(packet.ClientId) && !Regex.IsMatch(packet.ClientId, "^[a-zA-Z0-9]*$")) + { + throw new ProtocolViolationException("ClientId contains invalid characters [MQTT-3.1.3-5]."); + } + } + + private void ValidatePublishPacket(MqttPublishPacket packet) + { + if (packet.QualityOfServiceLevel == 0 && packet.Dup) + { + throw new ProtocolViolationException("Dup flag must be false for QoS 0 packets [MQTT-3.3.1-2]."); + } + } + + private async Task SerializeAsync(MqttConnectPacket packet, IMqttTransportChannel destination) + { + ValidateConnectPacket(packet); + + using (var output = new MqttPacketWriter()) + { + // Write variable header + output.Write(0x00); // 3.1.2.1 Protocol Name + output.Write(0x04); // "" + output.Write('M'); + output.Write('Q'); + output.Write('T'); + output.Write('T'); + output.Write(0x04); // 3.1.2.2 Protocol Level + + var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags + connectFlags.Write(false); // Reserved + connectFlags.Write(packet.CleanSession); + connectFlags.Write(packet.WillMessage != null); + + if (packet.WillMessage != null) + { + connectFlags.Write((byte)packet.WillMessage.QualityOfServiceLevel, 2); + connectFlags.Write(packet.WillMessage.Retain); + } + else + { + connectFlags.Write(0, 2); + connectFlags.Write(false); + } + + connectFlags.Write(packet.Password != null); + connectFlags.Write(packet.Username != null); + + output.Write(connectFlags); + output.Write(packet.KeepAlivePeriod); + output.WriteWithLengthPrefix(packet.ClientId); + + if (packet.WillMessage != null) + { + output.WriteWithLengthPrefix(packet.WillMessage.Topic); + output.WriteWithLengthPrefix(packet.WillMessage.Payload); + } + + if (packet.Username != null) + { + output.WriteWithLengthPrefix(packet.Username); + } + + if (packet.Password != null) + { + output.WriteWithLengthPrefix(packet.Password); + } + + output.InjectFixedHeader(MqttControlPacketType.Connect); + await output.WriteToAsync(destination); + } + } + + private async Task SerializeAsync(MqttConnAckPacket packet, IMqttTransportChannel destination) + { + using (var output = new MqttPacketWriter()) + { + var connectAcknowledgeFlags = new ByteWriter(); + connectAcknowledgeFlags.Write(packet.IsSessionPresent); + + output.Write(connectAcknowledgeFlags); + output.Write((byte)packet.ConnectReturnCode); + + output.InjectFixedHeader(MqttControlPacketType.ConnAck); + await output.WriteToAsync(destination); + } + } + + private async Task SerializeAsync(MqttDisconnectPacket packet, IMqttTransportChannel destination) + { + await SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination); + } + + private async Task SerializeAsync(MqttPingReqPacket packet, IMqttTransportChannel destination) + { + await SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination); + } + + private async Task SerializeAsync(MqttPingRespPacket packet, IMqttTransportChannel destination) + { + await SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination); + } + + private async Task SerializeAsync(MqttPublishPacket packet, IMqttTransportChannel destination) + { + ValidatePublishPacket(packet); + + using (var output = new MqttPacketWriter()) + { + output.WriteWithLengthPrefix(packet.Topic); + + if (packet.QualityOfServiceLevel > 0) + { + if (!packet.PacketIdentifier.HasValue) + { + throw new MqttProtocolViolationException("Packet identifier must be set if QoS > 0 [MQTT-2.3.1-1]."); + } + + output.Write(packet.PacketIdentifier.Value); + } + else + { + if (packet.PacketIdentifier.HasValue) + { + throw new MqttProtocolViolationException("Packet identifier must be empty if QoS == 0 [MQTT-2.3.1-5]."); + } + } + + if (packet.Payload?.Length > 0) + { + output.Write(packet.Payload); + } + + var fixedHeader = new ByteWriter(); + fixedHeader.Write(packet.Retain); + fixedHeader.Write((byte)packet.QualityOfServiceLevel, 2); + fixedHeader.Write(packet.Dup); + + output.InjectFixedHeader(MqttControlPacketType.Publish, fixedHeader.Value); + await output.WriteToAsync(destination); + } + } + + private async Task SerializeAsync(MqttPubAckPacket packet, IMqttTransportChannel destination) + { + using (var output = new MqttPacketWriter()) + { + output.Write(packet.PacketIdentifier); + + output.InjectFixedHeader(MqttControlPacketType.PubAck); + await output.WriteToAsync(destination); + } + } + + private async Task SerializeAsync(MqttPubRecPacket packet, IMqttTransportChannel destination) + { + using (var output = new MqttPacketWriter()) + { + output.Write(packet.PacketIdentifier); + + output.InjectFixedHeader(MqttControlPacketType.PubRec); + await output.WriteToAsync(destination); + } + } + + private async Task SerializeAsync(MqttPubRelPacket packet, IMqttTransportChannel destination) + { + using (var output = new MqttPacketWriter()) + { + output.Write(packet.PacketIdentifier); + + output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02); + await output.WriteToAsync(destination); + } + } + + private async Task SerializeAsync(MqttPubCompPacket packet, IMqttTransportChannel destination) + { + using (var output = new MqttPacketWriter()) + { + output.Write(packet.PacketIdentifier); + + output.InjectFixedHeader(MqttControlPacketType.PubComp); + await output.WriteToAsync(destination); + } + } + + private async Task SerializeAsync(MqttSubscribePacket packet, IMqttTransportChannel destination) + { + using (var output = new MqttPacketWriter()) + { + output.Write(packet.PacketIdentifier); + + if (packet.TopicFilters?.Any() == true) + { + foreach (var topicFilter in packet.TopicFilters) + { + output.WriteWithLengthPrefix(topicFilter.Topic); + output.Write((byte)topicFilter.QualityOfServiceLevel); + } + } + + output.InjectFixedHeader(MqttControlPacketType.Subscribe, 0x02); + await output.WriteToAsync(destination); + } + } + + private async Task SerializeAsync(MqttSubAckPacket packet, IMqttTransportChannel destination) + { + using (var output = new MqttPacketWriter()) + { + output.Write(packet.PacketIdentifier); + + if (packet.SubscribeReturnCodes?.Any() == true) + { + foreach (var packetSubscribeReturnCode in packet.SubscribeReturnCodes) + { + output.Write((byte)packetSubscribeReturnCode); + } + } + + output.InjectFixedHeader(MqttControlPacketType.SubAck); + await output.WriteToAsync(destination); + } + } + + private async Task SerializeAsync(MqttUnsubscribePacket packet, IMqttTransportChannel destination) + { + using (var output = new MqttPacketWriter()) + { + output.Write(packet.PacketIdentifier); + + if (packet.TopicFilters?.Any() == true) + { + foreach (var topicFilter in packet.TopicFilters) + { + output.WriteWithLengthPrefix(topicFilter); + } + } + + output.InjectFixedHeader(MqttControlPacketType.Unsubscibe, 0x02); + await output.WriteToAsync(destination); + } + } + + private async Task SerializeAsync(MqttUnsubAckPacket packet, IMqttTransportChannel destination) + { + using (var output = new MqttPacketWriter()) + { + output.Write(packet.PacketIdentifier); + + output.InjectFixedHeader(MqttControlPacketType.UnsubAck); + await output.WriteToAsync(destination); + } + } + + private async Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttTransportChannel destination) + { + using (var output = new MqttPacketWriter()) + { + output.InjectFixedHeader(type); + await output.WriteToAsync(destination); + } + } + } +} diff --git a/MQTT.NET.Core/Serializer/IMqttPacketSerializer.cs b/MQTT.NET.Core/Serializer/IMqttPacketSerializer.cs new file mode 100644 index 0000000..0be0bd7 --- /dev/null +++ b/MQTT.NET.Core/Serializer/IMqttPacketSerializer.cs @@ -0,0 +1,13 @@ +using System.Threading.Tasks; +using MQTTnet.Core.Channel; +using MQTTnet.Core.Packets; + +namespace MQTTnet.Core.Serializer +{ + public interface IMqttPacketSerializer + { + Task SerializeAsync(MqttBasePacket mqttPacket, IMqttTransportChannel destination); + + Task DeserializeAsync(IMqttTransportChannel source); + } +} \ No newline at end of file diff --git a/MQTT.NET.Core/Serializer/MqttPacketReader.cs b/MQTT.NET.Core/Serializer/MqttPacketReader.cs new file mode 100644 index 0000000..c504449 --- /dev/null +++ b/MQTT.NET.Core/Serializer/MqttPacketReader.cs @@ -0,0 +1,130 @@ +using System; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Threading.Tasks; +using MQTTnet.Core.Channel; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Serializer +{ + public sealed class MqttPacketReader : IDisposable + { + private readonly MemoryStream _remainingData = new MemoryStream(); + private readonly IMqttTransportChannel _source; + + public MqttPacketReader(IMqttTransportChannel source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + + _source = source; + } + + public MqttControlPacketType ControlPacketType { get; private set; } + + public byte FixedHeader { get; private set; } + + public int RemainingLength { get; private set; } + + public bool EndOfRemainingData => _remainingData.Position == _remainingData.Length; + + public async Task ReadToEndAsync() + { + await ReadFixedHeaderAsync(); + await ReadRemainingLengthAsync(); + + if (RemainingLength == 0) + { + return; + } + + var buffer = new byte[RemainingLength]; + await _source.ReadAsync(buffer); + + _remainingData.Write(buffer, 0, buffer.Length); + _remainingData.Position = 0; + } + + private async Task ReadFixedHeaderAsync() + { + FixedHeader = await ReadStreamByteAsync(); + + var byteReader = new ByteReader(FixedHeader); + byteReader.Read(4); + ControlPacketType = (MqttControlPacketType)byteReader.Read(4); + } + + private async Task ReadStreamByteAsync() + { + var buffer = new byte[1]; + await _source.ReadAsync(buffer); + return buffer[0]; + } + + private async Task ReadRemainingLengthAsync() + { + // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. + var multiplier = 1; + var value = 0; + byte encodedByte; + do + { + encodedByte = await ReadStreamByteAsync(); + value += (encodedByte & 127) * multiplier; + multiplier *= 128; + if (multiplier > 128 * 128 * 128) + { + throw new MqttProtocolViolationException("Remaining length is ivalid."); + } + } while ((encodedByte & 128) != 0); + + RemainingLength = value; + } + + public async Task ReadRemainingDataByteAsync() + { + return (await ReadRemainingDataAsync(1))[0]; + } + + public async Task ReadRemainingDataUShortAsync() + { + var buffer = await ReadRemainingDataAsync(2); + + var temp = buffer[0]; + buffer[0] = buffer[1]; + buffer[1] = temp; + + return BitConverter.ToUInt16(buffer, 0); + } + + public async Task ReadRemainingDataStringWithLengthPrefixAsync() + { + var buffer = await ReadRemainingDataWithLengthPrefixAsync(); + return Encoding.UTF8.GetString(buffer, 0, buffer.Length); + } + + public async Task ReadRemainingDataWithLengthPrefixAsync() + { + var length = await ReadRemainingDataUShortAsync(); + return await ReadRemainingDataAsync(length); + } + + public async Task ReadRemainingDataAsync() + { + return await ReadRemainingDataAsync(RemainingLength - (int)_remainingData.Position); + } + + public async Task ReadRemainingDataAsync(int length) + { + var buffer = new byte[length]; + await _remainingData.ReadAsync(buffer, 0, buffer.Length); + return buffer; + } + + public void Dispose() + { + _remainingData?.Dispose(); + } + } +} diff --git a/MQTT.NET.Core/Serializer/MqttPacketWriter.cs b/MQTT.NET.Core/Serializer/MqttPacketWriter.cs new file mode 100644 index 0000000..f382dab --- /dev/null +++ b/MQTT.NET.Core/Serializer/MqttPacketWriter.cs @@ -0,0 +1,107 @@ +using System; +using System.IO; +using System.Text; +using System.Threading.Tasks; +using MQTTnet.Core.Channel; +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Serializer +{ + public sealed class MqttPacketWriter : IDisposable + { + private readonly MemoryStream _buffer = new MemoryStream(); + + public void InjectFixedHeader(byte fixedHeader) + { + if (_buffer.Length == 0) + { + Write(fixedHeader); + Write(0); + return; + } + + var remainingLength = (int)_buffer.Length; + using (var buffer = new MemoryStream()) + { + _buffer.WriteTo(buffer); + _buffer.SetLength(0); + + _buffer.WriteByte(fixedHeader); + + // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. + var x = remainingLength; + do + { + var encodedByte = (byte)(x % 128); + x = x / 128; + if (x > 0) + { + encodedByte = (byte)(encodedByte | 128); + } + + _buffer.WriteByte(encodedByte); + } while (x > 0); + + buffer.Position = 0; + buffer.WriteTo(_buffer); + } + } + + public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0) + { + var fixedHeader = (byte)((byte)packetType << 4); + fixedHeader |= flags; + InjectFixedHeader(fixedHeader); + } + + public void Write(byte value) + { + _buffer.WriteByte(value); + } + + public void Write(char value) + { + _buffer.WriteByte((byte)value); + } + + public void Write(ushort value) + { + var buffer = BitConverter.GetBytes(value); + _buffer.WriteByte(buffer[1]); + _buffer.WriteByte(buffer[0]); + } + + public void Write(ByteWriter value) + { + _buffer.WriteByte(value.Value); + } + + public void Write(byte[] value) + { + _buffer.Write(value, 0, value.Length); + } + + public void WriteWithLengthPrefix(string value) + { + WriteWithLengthPrefix(Encoding.UTF8.GetBytes(value ?? string.Empty)); + } + + public void WriteWithLengthPrefix(byte[] value) + { + var length = (ushort)value.Length; + + Write(length); + Write(value); + } + + public void Dispose() + { + _buffer?.Dispose(); + } + + public async Task WriteToAsync(IMqttTransportChannel destination) + { + await destination.WriteAsync(_buffer.ToArray()); + } + } +} diff --git a/MQTT.NET.Core/TaskExtensions.cs b/MQTT.NET.Core/TaskExtensions.cs new file mode 100644 index 0000000..d04213a --- /dev/null +++ b/MQTT.NET.Core/TaskExtensions.cs @@ -0,0 +1,11 @@ +using System.Threading.Tasks; + +namespace MQTTnet.Core +{ + public static class TaskExtensions + { + public static void Forget(this Task task) + { + } + } +} diff --git a/MQTT.NET.TestConsole/App.config b/MQTT.NET.TestConsole/App.config new file mode 100644 index 0000000..731f6de --- /dev/null +++ b/MQTT.NET.TestConsole/App.config @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/MQTT.NET.TestConsole/MQTTnet.TestConsole.csproj b/MQTT.NET.TestConsole/MQTTnet.TestConsole.csproj new file mode 100644 index 0000000..57c6ecb --- /dev/null +++ b/MQTT.NET.TestConsole/MQTTnet.TestConsole.csproj @@ -0,0 +1,71 @@ + + + + + Debug + AnyCPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A} + Exe + Properties + MQTTnet.TestConsole + MQTTnet.TestConsole + v4.6.1 + 512 + true + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + + + {99c884f3-b4b9-417d-aa92-dc7dd1c4cfee} + MQTTnet.Core + + + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D} + MQTTnet.NETFramework + + + + + \ No newline at end of file diff --git a/MQTT.NET.TestConsole/Program.cs b/MQTT.NET.TestConsole/Program.cs new file mode 100644 index 0000000..fa9c99a --- /dev/null +++ b/MQTT.NET.TestConsole/Program.cs @@ -0,0 +1,109 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using MQTTnet.Core; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; +using MQTTnet.NETFramework; + +namespace MQTTnet.TestConsole +{ + public static class Program + { + public static void Main(string[] arguments) + { + Task.Run(() => Run(arguments)).Wait(); + } + + private static async Task Run(string[] arguments) + { + MqttTrace.TraceMessagePublished += (s, e) => + { + Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); + if (e.Exception != null) + { + // Console.WriteLine(e.Exception); + } + }; + + try + { + var options = new MqttClientOptions + { + Server = "localhost" + }; + + var client = new MqttClientFactory().CreateMqttClient(options); + client.ApplicationMessageReceived += (s, e) => + { + Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); + Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); + Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); + Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); + Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); + Console.WriteLine(); + }; + + client.Connected += async (s, e) => + { + Console.WriteLine("### CONNECTED WITH SERVER ###"); + + await client.SubscribeAsync(new List + { + new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce) + }); + + Console.WriteLine("### SUBSCRIBED ###"); + }; + + client.Disconnected += async (s, e) => + { + Console.WriteLine("### DISCONNECTED FROM SERVER ###"); + await Task.Delay(TimeSpan.FromSeconds(5)); + + try + { + await client.ConnectAsync(); + } + catch + { + Console.WriteLine("### RECONNECTING FAILED ###"); + } + }; + + try + { + await client.ConnectAsync(); + } + catch + { + Console.WriteLine("### CONNECTING FAILED ###"); + } + + Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###"); + + while (true) + { + Console.ReadLine(); + + var applicationMessage = new MqttApplicationMessage( + "A/B/C", + Encoding.UTF8.GetBytes("Hello World"), + MqttQualityOfServiceLevel.AtLeastOnce, + false + ); + + await client.PublishAsync(applicationMessage); + } + } + catch (Exception exception) + { + Console.WriteLine(exception); + } + } + } +} diff --git a/MQTT.NET.TestConsole/Properties/AssemblyInfo.cs b/MQTT.NET.TestConsole/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..1de2952 --- /dev/null +++ b/MQTT.NET.TestConsole/Properties/AssemblyInfo.cs @@ -0,0 +1,15 @@ +using System.Reflection; +using System.Runtime.InteropServices; + +[assembly: AssemblyTitle("MQTTnet.TestConsole")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Christian Kratky")] +[assembly: AssemblyProduct("MQTTnet")] +[assembly: AssemblyCopyright("Copyright © Christian Kratky 2016-2017")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] +[assembly: ComVisible(false)] +[assembly: Guid("7b19b139-2e9d-4f1d-88b4-6180b4cf872a")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] \ No newline at end of file diff --git a/MQTTnet.NET/MQTTnet.NETFramework.csproj b/MQTTnet.NET/MQTTnet.NETFramework.csproj new file mode 100644 index 0000000..54d0160 --- /dev/null +++ b/MQTTnet.NET/MQTTnet.NETFramework.csproj @@ -0,0 +1,56 @@ + + + + + Debug + AnyCPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D} + Library + Properties + MQTTnet.NETFramework + MQTTnet.NETFramework + v4.6.1 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE} + MQTTnet.Core + + + + + \ No newline at end of file diff --git a/MQTTnet.NET/MqttClientFactory.cs b/MQTTnet.NET/MqttClientFactory.cs new file mode 100644 index 0000000..2a1600b --- /dev/null +++ b/MQTTnet.NET/MqttClientFactory.cs @@ -0,0 +1,17 @@ +using System; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Client; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.NETFramework +{ + public class MqttClientFactory + { + public MqttClient CreateMqttClient(MqttClientOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + return new MqttClient(options, new MqttChannelAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + } + } +} diff --git a/MQTTnet.NET/MqttTcpChannel.cs b/MQTTnet.NET/MqttTcpChannel.cs new file mode 100644 index 0000000..e00aed0 --- /dev/null +++ b/MQTTnet.NET/MqttTcpChannel.cs @@ -0,0 +1,46 @@ +using System; +using System.Net.Sockets; +using System.Threading.Tasks; +using MQTTnet.Core.Channel; +using MQTTnet.Core.Client; + +namespace MQTTnet.NETFramework +{ + public class MqttTcpChannel : IMqttTransportChannel, IDisposable + { + private readonly Socket _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + + public async Task ConnectAsync(MqttClientOptions options) + { + await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.Port, null); + } + + public async Task DisconnectAsync() + { + await Task.Factory.FromAsync(_socket.BeginDisconnect, _socket.EndDisconnect, true, null); + } + + public async Task WriteAsync(byte[] buffer) + { + if (buffer == null) throw new ArgumentNullException(nameof(buffer)); + + await Task.Factory.FromAsync( + // ReSharper disable once AssignNullToNotNullAttribute + _socket.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, null, null), + _socket.EndSend); + } + + public async Task ReadAsync(byte[] buffer) + { + await Task.Factory.FromAsync( + // ReSharper disable once AssignNullToNotNullAttribute + _socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, null, null), + _socket.EndReceive); + } + + public void Dispose() + { + _socket?.Dispose(); + } + } +} \ No newline at end of file diff --git a/MQTTnet.NET/Properties/AssemblyInfo.cs b/MQTTnet.NET/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..a61a3e6 --- /dev/null +++ b/MQTTnet.NET/Properties/AssemblyInfo.cs @@ -0,0 +1,15 @@ +using System.Reflection; +using System.Runtime.InteropServices; + +[assembly: AssemblyTitle("MQTTnet.NETFramework")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Christian Kratky")] +[assembly: AssemblyProduct("MQTTnet")] +[assembly: AssemblyCopyright("Copyright © Christian Kratky 2016-2017")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] +[assembly: ComVisible(false)] +[assembly: Guid("a480ef90-0eaa-4d9a-b271-47a9c47f6f7d")] +[assembly: AssemblyVersion("2.1.0.4")] +[assembly: AssemblyFileVersion("2.1.0.4")] \ No newline at end of file diff --git a/MQTTnet.Universal/MQTTnet.Universal.csproj b/MQTTnet.Universal/MQTTnet.Universal.csproj new file mode 100644 index 0000000..b661288 --- /dev/null +++ b/MQTTnet.Universal/MQTTnet.Universal.csproj @@ -0,0 +1,132 @@ + + + + + Debug + AnyCPU + {BD60C727-D8E8-40C3-B8E3-C95A864AE611} + Library + Properties + MQTTnet.Universal + MQTTnet.Universal + en-US + UAP + 10.0.14393.0 + 10.0.10586.0 + 14 + 512 + {A5A43C5B-DE2A-4C0C-9213-0A381AF9435A};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC} + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE;NETFX_CORE;WINDOWS_UWP + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE;NETFX_CORE;WINDOWS_UWP + prompt + 4 + + + x86 + true + bin\x86\Debug\ + DEBUG;TRACE;NETFX_CORE;WINDOWS_UWP + ;2008 + full + x86 + false + prompt + + + x86 + bin\x86\Release\ + TRACE;NETFX_CORE;WINDOWS_UWP + true + ;2008 + pdbonly + x86 + false + prompt + + + ARM + true + bin\ARM\Debug\ + DEBUG;TRACE;NETFX_CORE;WINDOWS_UWP + ;2008 + full + ARM + false + prompt + + + ARM + bin\ARM\Release\ + TRACE;NETFX_CORE;WINDOWS_UWP + true + ;2008 + pdbonly + ARM + false + prompt + + + x64 + true + bin\x64\Debug\ + DEBUG;TRACE;NETFX_CORE;WINDOWS_UWP + ;2008 + full + x64 + false + prompt + + + x64 + bin\x64\Release\ + TRACE;NETFX_CORE;WINDOWS_UWP + true + ;2008 + pdbonly + x64 + false + prompt + + + + + + + + + + + + + + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE} + MQTTnet.Core + + + + 14.0 + + + + \ No newline at end of file diff --git a/MQTTnet.Universal/MqttClientFactory.cs b/MQTTnet.Universal/MqttClientFactory.cs new file mode 100644 index 0000000..36b8643 --- /dev/null +++ b/MQTTnet.Universal/MqttClientFactory.cs @@ -0,0 +1,17 @@ +using System; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Client; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Universal +{ + public class MqttClientFactory + { + public MqttClient CreateMqttClient(MqttClientOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + return new MqttClient(options, new MqttChannelAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + } + } +} diff --git a/MQTTnet.Universal/MqttTcpChannel.cs b/MQTTnet.Universal/MqttTcpChannel.cs new file mode 100644 index 0000000..350c4b2 --- /dev/null +++ b/MQTTnet.Universal/MqttTcpChannel.cs @@ -0,0 +1,52 @@ +using System; +using System.Runtime.InteropServices.WindowsRuntime; +using System.Threading.Tasks; +using Windows.Networking; +using Windows.Networking.Sockets; +using Windows.Storage.Streams; +using MQTTnet.Core.Channel; +using MQTTnet.Core.Client; +using Buffer = Windows.Storage.Streams.Buffer; + +namespace MQTTnet.Universal +{ + public sealed class MqttTcpChannel : IMqttTransportChannel, IDisposable + { + private readonly StreamSocket _socket = new StreamSocket(); + + public async Task ConnectAsync(MqttClientOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + await _socket.ConnectAsync(new HostName(options.Server), options.Port.ToString()); + } + + public async Task DisconnectAsync() + { + await _socket.CancelIOAsync(); + _socket.Dispose(); + } + + public async Task WriteAsync(byte[] buffer) + { + if (buffer == null) throw new ArgumentNullException(nameof(buffer)); + + await _socket.OutputStream.WriteAsync(buffer.AsBuffer()); + await _socket.OutputStream.FlushAsync(); + } + + public async Task ReadAsync(byte[] buffer) + { + var buffer2 = new Buffer((uint)buffer.Length); + await _socket.InputStream.ReadAsync(buffer2, (uint)buffer.Length, InputStreamOptions.None); + + var array2 = buffer2.ToArray(); + Array.Copy(array2, buffer, array2.Length); + } + + public void Dispose() + { + _socket?.Dispose(); + } + } +} \ No newline at end of file diff --git a/MQTTnet.Universal/Properties/AssemblyInfo.cs b/MQTTnet.Universal/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..3b16f2b --- /dev/null +++ b/MQTTnet.Universal/Properties/AssemblyInfo.cs @@ -0,0 +1,14 @@ +using System.Reflection; +using System.Runtime.InteropServices; + +[assembly: AssemblyTitle("MQTTnet.Universal")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Christian Kratky")] +[assembly: AssemblyProduct("MQTTnet")] +[assembly: AssemblyCopyright("Copyright © Christian Kratky 2016-2017")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] +[assembly: ComVisible(false)] +[assembly: AssemblyVersion("2.1.0.4")] +[assembly: AssemblyFileVersion("2.1.0.4")] \ No newline at end of file diff --git a/MQTTnet.Universal/Properties/MQTTnet.Universal.rd.xml b/MQTTnet.Universal/Properties/MQTTnet.Universal.rd.xml new file mode 100644 index 0000000..0b2946c --- /dev/null +++ b/MQTTnet.Universal/Properties/MQTTnet.Universal.rd.xml @@ -0,0 +1,33 @@ + + + + + + + + + diff --git a/MQTTnet.Universal/project.json b/MQTTnet.Universal/project.json new file mode 100644 index 0000000..92d1456 --- /dev/null +++ b/MQTTnet.Universal/project.json @@ -0,0 +1,16 @@ +{ + "dependencies": { + "Microsoft.NETCore.UniversalWindowsPlatform": "5.1.0" + }, + "frameworks": { + "uap10.0": {} + }, + "runtimes": { + "win10-arm": {}, + "win10-arm-aot": {}, + "win10-x86": {}, + "win10-x86-aot": {}, + "win10-x64": {}, + "win10-x64-aot": {} + } +} \ No newline at end of file diff --git a/MQTTnet.sln b/MQTTnet.sln new file mode 100644 index 0000000..3cd97f5 --- /dev/null +++ b/MQTTnet.sln @@ -0,0 +1,112 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 14 +VisualStudioVersion = 14.0.25420.1 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core", "MQTT.NET.Core\MQTTnet.Core.csproj", "{99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestConsole", "MQTT.NET.TestConsole\MQTTnet.TestConsole.csproj", "{7B19B139-2E9D-4F1D-88B4-6180B4CF872A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "MQTT.NET.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.NETFramework", "MQTTnet.NET\MQTTnet.NETFramework.csproj", "{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Universal", "MQTTnet.Universal\MQTTnet.Universal.csproj", "{BD60C727-D8E8-40C3-B8E3-C95A864AE611}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Debug|ARM = Debug|ARM + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 + Release|Any CPU = Release|Any CPU + Release|ARM = Release|ARM + Release|x64 = Release|x64 + Release|x86 = Release|x86 + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Debug|ARM.ActiveCfg = Debug|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Debug|ARM.Build.0 = Debug|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Debug|x64.ActiveCfg = Debug|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Debug|x64.Build.0 = Debug|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Debug|x86.ActiveCfg = Debug|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Debug|x86.Build.0 = Debug|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Release|Any CPU.Build.0 = Release|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Release|ARM.ActiveCfg = Release|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Release|ARM.Build.0 = Release|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Release|x64.ActiveCfg = Release|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Release|x64.Build.0 = Release|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Release|x86.ActiveCfg = Release|Any CPU + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}.Release|x86.Build.0 = Release|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Debug|ARM.ActiveCfg = Debug|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Debug|ARM.Build.0 = Debug|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Debug|x64.ActiveCfg = Debug|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Debug|x64.Build.0 = Debug|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Debug|x86.ActiveCfg = Debug|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Debug|x86.Build.0 = Debug|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Release|Any CPU.Build.0 = Release|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Release|ARM.ActiveCfg = Release|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Release|ARM.Build.0 = Release|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Release|x64.ActiveCfg = Release|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Release|x64.Build.0 = Release|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Release|x86.ActiveCfg = Release|Any CPU + {7B19B139-2E9D-4F1D-88B4-6180B4CF872A}.Release|x86.Build.0 = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.ActiveCfg = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.Build.0 = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.ActiveCfg = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.Build.0 = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.ActiveCfg = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.Build.0 = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.Build.0 = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.ActiveCfg = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.Build.0 = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.ActiveCfg = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.Build.0 = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.ActiveCfg = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|ARM.ActiveCfg = Debug|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|ARM.Build.0 = Debug|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|x64.ActiveCfg = Debug|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|x64.Build.0 = Debug|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|x86.ActiveCfg = Debug|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|x86.Build.0 = Debug|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Release|Any CPU.Build.0 = Release|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Release|ARM.ActiveCfg = Release|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Release|ARM.Build.0 = Release|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Release|x64.ActiveCfg = Release|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Release|x64.Build.0 = Release|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Release|x86.ActiveCfg = Release|Any CPU + {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Release|x86.Build.0 = Release|Any CPU + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Debug|ARM.ActiveCfg = Debug|ARM + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Debug|ARM.Build.0 = Debug|ARM + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Debug|x64.ActiveCfg = Debug|x64 + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Debug|x64.Build.0 = Debug|x64 + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Debug|x86.ActiveCfg = Debug|x86 + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Debug|x86.Build.0 = Debug|x86 + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Release|Any CPU.Build.0 = Release|Any CPU + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Release|ARM.ActiveCfg = Release|ARM + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Release|ARM.Build.0 = Release|ARM + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Release|x64.ActiveCfg = Release|x64 + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Release|x64.Build.0 = Release|x64 + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Release|x86.ActiveCfg = Release|x86 + {BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Release|x86.Build.0 = Release|x86 + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal