You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

573 lines
21 KiB

  1. using MQTTnet.Exceptions;
  2. using MQTTnet.Packets;
  3. using MQTTnet.Protocol;
  4. using System;
  5. using System.Collections.Generic;
  6. using System.IO;
  7. using System.Linq;
  8. using System.Text;
  9. namespace MQTTnet.Serializer
  10. {
  11. public sealed class MqttPacketSerializer : IMqttPacketSerializer
  12. {
  13. private static byte[] ProtocolVersionV311Name { get; } = Encoding.UTF8.GetBytes("MQTT");
  14. private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs");
  15. public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
  16. public ICollection<ArraySegment<byte>> Serialize(MqttBasePacket packet)
  17. {
  18. if (packet == null) throw new ArgumentNullException(nameof(packet));
  19. using (var stream = new MemoryStream(128))
  20. using (var writer = new MqttPacketWriter(stream))
  21. {
  22. var fixedHeader = SerializePacket(packet, writer);
  23. var remainingLength = (int)stream.Length;
  24. writer.Write(fixedHeader);
  25. MqttPacketWriter.WriteRemainingLength(remainingLength, writer);
  26. var headerLength = (int)stream.Length - remainingLength;
  27. #if NET461 || NET452 || NETSTANDARD2_0
  28. var buffer = stream.GetBuffer();
  29. #else
  30. var buffer = stream.ToArray();
  31. #endif
  32. return new List<ArraySegment<byte>>
  33. {
  34. new ArraySegment<byte>(buffer, remainingLength, headerLength),
  35. new ArraySegment<byte>(buffer, 0, remainingLength)
  36. };
  37. }
  38. }
  39. public MqttBasePacket Deserialize(MqttPacketHeader header, MemoryStream body)
  40. {
  41. if (header == null) throw new ArgumentNullException(nameof(header));
  42. if (body == null) throw new ArgumentNullException(nameof(body));
  43. using (var reader = new MqttPacketReader(header, body))
  44. {
  45. return Deserialize(header, reader);
  46. }
  47. }
  48. private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter writer)
  49. {
  50. switch (packet)
  51. {
  52. case MqttConnectPacket connectPacket: return Serialize(connectPacket, writer);
  53. case MqttConnAckPacket connAckPacket: return Serialize(connAckPacket, writer);
  54. case MqttDisconnectPacket _: return SerializeEmptyPacket(MqttControlPacketType.Disconnect);
  55. case MqttPingReqPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingReq);
  56. case MqttPingRespPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingResp);
  57. case MqttPublishPacket publishPacket: return Serialize(publishPacket, writer);
  58. case MqttPubAckPacket pubAckPacket: return Serialize(pubAckPacket, writer);
  59. case MqttPubRecPacket pubRecPacket: return Serialize(pubRecPacket, writer);
  60. case MqttPubRelPacket pubRelPacket: return Serialize(pubRelPacket, writer);
  61. case MqttPubCompPacket pubCompPacket: return Serialize(pubCompPacket, writer);
  62. case MqttSubscribePacket subscribePacket: return Serialize(subscribePacket, writer);
  63. case MqttSubAckPacket subAckPacket: return Serialize(subAckPacket, writer);
  64. case MqttUnsubscribePacket unsubscribePacket: return Serialize(unsubscribePacket, writer);
  65. case MqttUnsubAckPacket unsubAckPacket: return Serialize(unsubAckPacket, writer);
  66. default: throw new MqttProtocolViolationException("Packet type invalid.");
  67. }
  68. }
  69. private MqttBasePacket Deserialize(MqttPacketHeader header, MqttPacketReader reader)
  70. {
  71. switch (header.ControlPacketType)
  72. {
  73. case MqttControlPacketType.Connect: return DeserializeConnect(reader);
  74. case MqttControlPacketType.ConnAck: return DeserializeConnAck(reader);
  75. case MqttControlPacketType.Disconnect: return new MqttDisconnectPacket();
  76. case MqttControlPacketType.Publish: return DeserializePublish(reader, header);
  77. case MqttControlPacketType.PubAck: return DeserializePubAck(reader);
  78. case MqttControlPacketType.PubRec: return DeserializePubRec(reader);
  79. case MqttControlPacketType.PubRel: return DeserializePubRel(reader);
  80. case MqttControlPacketType.PubComp: return DeserializePubComp(reader);
  81. case MqttControlPacketType.PingReq: return new MqttPingReqPacket();
  82. case MqttControlPacketType.PingResp: return new MqttPingRespPacket();
  83. case MqttControlPacketType.Subscribe: return DeserializeSubscribe(reader);
  84. case MqttControlPacketType.SubAck: return DeserializeSubAck(reader);
  85. case MqttControlPacketType.Unsubscibe: return DeserializeUnsubscribe(reader);
  86. case MqttControlPacketType.UnsubAck: return DeserializeUnsubAck(reader);
  87. default: throw new MqttProtocolViolationException($"Packet type ({(int)header.ControlPacketType}) not supported.");
  88. }
  89. }
  90. private static MqttBasePacket DeserializeUnsubAck(MqttPacketReader reader)
  91. {
  92. return new MqttUnsubAckPacket
  93. {
  94. PacketIdentifier = reader.ReadUInt16()
  95. };
  96. }
  97. private static MqttBasePacket DeserializePubComp(MqttPacketReader reader)
  98. {
  99. return new MqttPubCompPacket
  100. {
  101. PacketIdentifier = reader.ReadUInt16()
  102. };
  103. }
  104. private static MqttBasePacket DeserializePubRel(MqttPacketReader reader)
  105. {
  106. return new MqttPubRelPacket
  107. {
  108. PacketIdentifier = reader.ReadUInt16()
  109. };
  110. }
  111. private static MqttBasePacket DeserializePubRec(MqttPacketReader reader)
  112. {
  113. return new MqttPubRecPacket
  114. {
  115. PacketIdentifier = reader.ReadUInt16()
  116. };
  117. }
  118. private static MqttBasePacket DeserializePubAck(MqttPacketReader reader)
  119. {
  120. return new MqttPubAckPacket
  121. {
  122. PacketIdentifier = reader.ReadUInt16()
  123. };
  124. }
  125. private static MqttBasePacket DeserializeUnsubscribe(MqttPacketReader reader)
  126. {
  127. var packet = new MqttUnsubscribePacket
  128. {
  129. PacketIdentifier = reader.ReadUInt16(),
  130. };
  131. while (!reader.EndOfRemainingData)
  132. {
  133. packet.TopicFilters.Add(reader.ReadStringWithLengthPrefix());
  134. }
  135. return packet;
  136. }
  137. private static MqttBasePacket DeserializeSubscribe(MqttPacketReader reader)
  138. {
  139. var packet = new MqttSubscribePacket
  140. {
  141. PacketIdentifier = reader.ReadUInt16()
  142. };
  143. while (!reader.EndOfRemainingData)
  144. {
  145. packet.TopicFilters.Add(new TopicFilter(
  146. reader.ReadStringWithLengthPrefix(),
  147. (MqttQualityOfServiceLevel)reader.ReadByte()));
  148. }
  149. return packet;
  150. }
  151. private static MqttBasePacket DeserializePublish(MqttPacketReader reader, MqttPacketHeader mqttPacketHeader)
  152. {
  153. var fixedHeader = new ByteReader(mqttPacketHeader.FixedHeader);
  154. var retain = fixedHeader.Read();
  155. var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2);
  156. var dup = fixedHeader.Read();
  157. var topic = reader.ReadStringWithLengthPrefix();
  158. ushort packetIdentifier = 0;
  159. if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  160. {
  161. packetIdentifier = reader.ReadUInt16();
  162. }
  163. var packet = new MqttPublishPacket
  164. {
  165. Retain = retain,
  166. QualityOfServiceLevel = qualityOfServiceLevel,
  167. Dup = dup,
  168. Topic = topic,
  169. Payload = reader.ReadRemainingData(),
  170. PacketIdentifier = packetIdentifier
  171. };
  172. return packet;
  173. }
  174. private static MqttBasePacket DeserializeConnect(MqttPacketReader reader)
  175. {
  176. reader.ReadBytes(2); // Skip 2 bytes
  177. MqttProtocolVersion protocolVersion;
  178. var protocolName = reader.ReadBytes(4);
  179. if (protocolName.SequenceEqual(ProtocolVersionV310Name))
  180. {
  181. reader.ReadBytes(2);
  182. protocolVersion = MqttProtocolVersion.V310;
  183. }
  184. else if (protocolName.SequenceEqual(ProtocolVersionV311Name))
  185. {
  186. protocolVersion = MqttProtocolVersion.V311;
  187. }
  188. else
  189. {
  190. throw new MqttProtocolViolationException("Protocol name is not supported.");
  191. }
  192. reader.ReadByte(); // Skip protocol level
  193. var connectFlags = reader.ReadByte();
  194. var connectFlagsReader = new ByteReader(connectFlags);
  195. connectFlagsReader.Read(); // Reserved.
  196. var packet = new MqttConnectPacket
  197. {
  198. ProtocolVersion = protocolVersion,
  199. CleanSession = connectFlagsReader.Read()
  200. };
  201. var willFlag = connectFlagsReader.Read();
  202. var willQoS = connectFlagsReader.Read(2);
  203. var willRetain = connectFlagsReader.Read();
  204. var passwordFlag = connectFlagsReader.Read();
  205. var usernameFlag = connectFlagsReader.Read();
  206. packet.KeepAlivePeriod = reader.ReadUInt16();
  207. packet.ClientId = reader.ReadStringWithLengthPrefix();
  208. if (willFlag)
  209. {
  210. packet.WillMessage = new MqttApplicationMessage
  211. {
  212. Topic = reader.ReadStringWithLengthPrefix(),
  213. Payload = reader.ReadWithLengthPrefix(),
  214. QualityOfServiceLevel = (MqttQualityOfServiceLevel)willQoS,
  215. Retain = willRetain
  216. };
  217. }
  218. if (usernameFlag)
  219. {
  220. packet.Username = reader.ReadStringWithLengthPrefix();
  221. }
  222. if (passwordFlag)
  223. {
  224. packet.Password = reader.ReadStringWithLengthPrefix();
  225. }
  226. ValidateConnectPacket(packet);
  227. return packet;
  228. }
  229. private static MqttBasePacket DeserializeSubAck(MqttPacketReader reader)
  230. {
  231. var packet = new MqttSubAckPacket
  232. {
  233. PacketIdentifier = reader.ReadUInt16()
  234. };
  235. while (!reader.EndOfRemainingData)
  236. {
  237. packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)reader.ReadByte());
  238. }
  239. return packet;
  240. }
  241. private MqttBasePacket DeserializeConnAck(MqttPacketReader reader)
  242. {
  243. var packet = new MqttConnAckPacket();
  244. var firstByteReader = new ByteReader(reader.ReadByte());
  245. if (ProtocolVersion == MqttProtocolVersion.V311)
  246. {
  247. packet.IsSessionPresent = firstByteReader.Read();
  248. }
  249. packet.ConnectReturnCode = (MqttConnectReturnCode)reader.ReadByte();
  250. return packet;
  251. }
  252. private static void ValidateConnectPacket(MqttConnectPacket packet)
  253. {
  254. if (packet == null) throw new ArgumentNullException(nameof(packet));
  255. if (string.IsNullOrEmpty(packet.ClientId) && !packet.CleanSession)
  256. {
  257. throw new MqttProtocolViolationException("CleanSession must be set if ClientId is empty [MQTT-3.1.3-7].");
  258. }
  259. }
  260. private static void ValidatePublishPacket(MqttPublishPacket packet)
  261. {
  262. if (packet == null) throw new ArgumentNullException(nameof(packet));
  263. if (packet.QualityOfServiceLevel == 0 && packet.Dup)
  264. {
  265. throw new MqttProtocolViolationException("Dup flag must be false for QoS 0 packets [MQTT-3.3.1-2].");
  266. }
  267. }
  268. private byte Serialize(MqttConnectPacket packet, MqttPacketWriter writer)
  269. {
  270. ValidateConnectPacket(packet);
  271. // Write variable header
  272. writer.Write(0x00, 0x04); // 3.1.2.1 Protocol Name
  273. if (ProtocolVersion == MqttProtocolVersion.V311)
  274. {
  275. writer.Write(ProtocolVersionV311Name);
  276. writer.Write(0x04); // 3.1.2.2 Protocol Level (4)
  277. }
  278. else
  279. {
  280. writer.Write(ProtocolVersionV310Name);
  281. writer.Write(0x64, 0x70, 0x03); // Protocol Level (0x03)
  282. }
  283. var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
  284. connectFlags.Write(false); // Reserved
  285. connectFlags.Write(packet.CleanSession);
  286. connectFlags.Write(packet.WillMessage != null);
  287. if (packet.WillMessage != null)
  288. {
  289. connectFlags.Write((int)packet.WillMessage.QualityOfServiceLevel, 2);
  290. connectFlags.Write(packet.WillMessage.Retain);
  291. }
  292. else
  293. {
  294. connectFlags.Write(0, 2);
  295. connectFlags.Write(false);
  296. }
  297. connectFlags.Write(packet.Password != null);
  298. connectFlags.Write(packet.Username != null);
  299. writer.Write(connectFlags);
  300. writer.Write(packet.KeepAlivePeriod);
  301. writer.WriteWithLengthPrefix(packet.ClientId);
  302. if (packet.WillMessage != null)
  303. {
  304. writer.WriteWithLengthPrefix(packet.WillMessage.Topic);
  305. writer.WriteWithLengthPrefix(packet.WillMessage.Payload);
  306. }
  307. if (packet.Username != null)
  308. {
  309. writer.WriteWithLengthPrefix(packet.Username);
  310. }
  311. if (packet.Password != null)
  312. {
  313. writer.WriteWithLengthPrefix(packet.Password);
  314. }
  315. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Connect);
  316. }
  317. private byte Serialize(MqttConnAckPacket packet, MqttPacketWriter writer)
  318. {
  319. if (ProtocolVersion == MqttProtocolVersion.V310)
  320. {
  321. writer.Write(0);
  322. }
  323. else if (ProtocolVersion == MqttProtocolVersion.V311)
  324. {
  325. var connectAcknowledgeFlags = new ByteWriter();
  326. connectAcknowledgeFlags.Write(packet.IsSessionPresent);
  327. writer.Write(connectAcknowledgeFlags);
  328. }
  329. else
  330. {
  331. throw new MqttProtocolViolationException("Protocol version not supported.");
  332. }
  333. writer.Write((byte)packet.ConnectReturnCode);
  334. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck);
  335. }
  336. private static byte Serialize(MqttPubRelPacket packet, MqttPacketWriter writer)
  337. {
  338. if (!packet.PacketIdentifier.HasValue)
  339. {
  340. throw new MqttProtocolViolationException("PubRel packet has no packet identifier.");
  341. }
  342. writer.Write(packet.PacketIdentifier.Value);
  343. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02);
  344. }
  345. private static byte Serialize(MqttPublishPacket packet, MqttPacketWriter writer)
  346. {
  347. ValidatePublishPacket(packet);
  348. writer.WriteWithLengthPrefix(packet.Topic);
  349. if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  350. {
  351. if (!packet.PacketIdentifier.HasValue)
  352. {
  353. throw new MqttProtocolViolationException("Publish packet has no packet identifier.");
  354. }
  355. writer.Write(packet.PacketIdentifier.Value);
  356. }
  357. else
  358. {
  359. if (packet.PacketIdentifier > 0)
  360. {
  361. throw new MqttProtocolViolationException("Packet identifier must be empty if QoS == 0 [MQTT-2.3.1-5].");
  362. }
  363. }
  364. if (packet.Payload?.Length > 0)
  365. {
  366. writer.Write(packet.Payload);
  367. }
  368. byte fixedHeader = 0;
  369. if (packet.Retain)
  370. {
  371. fixedHeader |= 0x01;
  372. }
  373. fixedHeader |= (byte)((byte)packet.QualityOfServiceLevel << 1);
  374. if (packet.Dup)
  375. {
  376. fixedHeader |= 0x08;
  377. }
  378. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader);
  379. }
  380. private static byte Serialize(MqttPubAckPacket packet, MqttPacketWriter writer)
  381. {
  382. if (!packet.PacketIdentifier.HasValue)
  383. {
  384. throw new MqttProtocolViolationException("PubAck packet has no packet identifier.");
  385. }
  386. writer.Write(packet.PacketIdentifier.Value);
  387. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck);
  388. }
  389. private static byte Serialize(MqttPubRecPacket packet, MqttPacketWriter writer)
  390. {
  391. if (!packet.PacketIdentifier.HasValue)
  392. {
  393. throw new MqttProtocolViolationException("PubRec packet has no packet identifier.");
  394. }
  395. writer.Write(packet.PacketIdentifier.Value);
  396. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec);
  397. }
  398. private static byte Serialize(MqttPubCompPacket packet, MqttPacketWriter writer)
  399. {
  400. if (!packet.PacketIdentifier.HasValue)
  401. {
  402. throw new MqttProtocolViolationException("PubComp packet has no packet identifier.");
  403. }
  404. writer.Write(packet.PacketIdentifier.Value);
  405. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp);
  406. }
  407. private static byte Serialize(MqttSubscribePacket packet, MqttPacketWriter writer)
  408. {
  409. if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");
  410. if (!packet.PacketIdentifier.HasValue)
  411. {
  412. throw new MqttProtocolViolationException("Subscribe packet has no packet identifier.");
  413. }
  414. writer.Write(packet.PacketIdentifier.Value);
  415. if (packet.TopicFilters?.Count > 0)
  416. {
  417. foreach (var topicFilter in packet.TopicFilters)
  418. {
  419. writer.WriteWithLengthPrefix(topicFilter.Topic);
  420. writer.Write((byte)topicFilter.QualityOfServiceLevel);
  421. }
  422. }
  423. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Subscribe, 0x02);
  424. }
  425. private static byte Serialize(MqttSubAckPacket packet, MqttPacketWriter writer)
  426. {
  427. if (!packet.PacketIdentifier.HasValue)
  428. {
  429. throw new MqttProtocolViolationException("SubAck packet has no packet identifier.");
  430. }
  431. writer.Write(packet.PacketIdentifier.Value);
  432. if (packet.SubscribeReturnCodes?.Any() == true)
  433. {
  434. foreach (var packetSubscribeReturnCode in packet.SubscribeReturnCodes)
  435. {
  436. writer.Write((byte)packetSubscribeReturnCode);
  437. }
  438. }
  439. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.SubAck);
  440. }
  441. private static byte Serialize(MqttUnsubscribePacket packet, MqttPacketWriter writer)
  442. {
  443. if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");
  444. if (!packet.PacketIdentifier.HasValue)
  445. {
  446. throw new MqttProtocolViolationException("Unsubscribe packet has no packet identifier.");
  447. }
  448. writer.Write(packet.PacketIdentifier.Value);
  449. if (packet.TopicFilters?.Any() == true)
  450. {
  451. foreach (var topicFilter in packet.TopicFilters)
  452. {
  453. writer.WriteWithLengthPrefix(topicFilter);
  454. }
  455. }
  456. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
  457. }
  458. private static byte Serialize(MqttUnsubAckPacket packet, BinaryWriter writer)
  459. {
  460. if (!packet.PacketIdentifier.HasValue)
  461. {
  462. throw new MqttProtocolViolationException("UnsubAck packet has no packet identifier.");
  463. }
  464. writer.Write(packet.PacketIdentifier.Value);
  465. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck);
  466. }
  467. private static byte SerializeEmptyPacket(MqttControlPacketType type)
  468. {
  469. return MqttPacketWriter.BuildFixedHeader(type);
  470. }
  471. }
  472. }