You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

617 lines
22 KiB

  1. using MQTTnet.Exceptions;
  2. using MQTTnet.Packets;
  3. using MQTTnet.Protocol;
  4. using System;
  5. using System.IO;
  6. using System.Linq;
  7. using MQTTnet.Adapter;
  8. namespace MQTTnet.Serializer
  9. {
  10. public class MqttPacketSerializer : IMqttPacketSerializer
  11. {
  12. public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
  13. public ArraySegment<byte> Serialize(MqttBasePacket packet)
  14. {
  15. if (packet == null) throw new ArgumentNullException(nameof(packet));
  16. using (var stream = new MemoryStream(128))
  17. {
  18. // Leave enough head space for max header size (fixed + 4 variable remaining length)
  19. stream.Position = 5;
  20. var fixedHeader = SerializePacket(packet, stream);
  21. stream.Position = 1;
  22. var remainingLength = MqttPacketWriter.EncodeRemainingLength((int)stream.Length - 5, stream);
  23. var headerSize = remainingLength + 1;
  24. var headerOffset = 5 - headerSize;
  25. // Position cursor on correct offset on beginining of array (has leading 0x0)
  26. stream.Position = headerOffset;
  27. stream.WriteByte(fixedHeader);
  28. #if NET461 || NET452 || NETSTANDARD2_0
  29. var buffer = stream.GetBuffer();
  30. #else
  31. var buffer = stream.ToArray();
  32. #endif
  33. return new ArraySegment<byte>(buffer, headerOffset, (int)stream.Length - headerOffset);
  34. }
  35. }
  36. private byte SerializePacket(MqttBasePacket packet, Stream stream)
  37. {
  38. switch (packet)
  39. {
  40. case MqttConnectPacket connectPacket: return Serialize(connectPacket, stream);
  41. case MqttConnAckPacket connAckPacket: return Serialize(connAckPacket, stream);
  42. case MqttDisconnectPacket _: return SerializeEmptyPacket(MqttControlPacketType.Disconnect);
  43. case MqttPingReqPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingReq);
  44. case MqttPingRespPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingResp);
  45. case MqttPublishPacket publishPacket: return Serialize(publishPacket, stream);
  46. case MqttPubAckPacket pubAckPacket: return Serialize(pubAckPacket, stream);
  47. case MqttPubRecPacket pubRecPacket: return Serialize(pubRecPacket, stream);
  48. case MqttPubRelPacket pubRelPacket: return Serialize(pubRelPacket, stream);
  49. case MqttPubCompPacket pubCompPacket: return Serialize(pubCompPacket, stream);
  50. case MqttSubscribePacket subscribePacket: return Serialize(subscribePacket, stream);
  51. case MqttSubAckPacket subAckPacket: return Serialize(subAckPacket, stream);
  52. case MqttUnsubscribePacket unsubscribePacket: return Serialize(unsubscribePacket, stream);
  53. case MqttUnsubAckPacket unsubAckPacket: return Serialize(unsubAckPacket, stream);
  54. default: throw new MqttProtocolViolationException("Packet type invalid.");
  55. }
  56. }
  57. public MqttBasePacket Deserialize(ReceivedMqttPacket receivedMqttPacket)
  58. {
  59. if (receivedMqttPacket == null) throw new ArgumentNullException(nameof(receivedMqttPacket));
  60. var controlPacketType = receivedMqttPacket.FixedHeader >> 4;
  61. if (controlPacketType < 1 || controlPacketType > 14)
  62. {
  63. throw new MqttProtocolViolationException($"The packet type is invalid ({controlPacketType}).");
  64. }
  65. switch ((MqttControlPacketType)controlPacketType)
  66. {
  67. case MqttControlPacketType.Connect: return DeserializeConnect(receivedMqttPacket.Body);
  68. case MqttControlPacketType.ConnAck: return DeserializeConnAck(receivedMqttPacket.Body);
  69. case MqttControlPacketType.Disconnect: return new MqttDisconnectPacket();
  70. case MqttControlPacketType.Publish: return DeserializePublish(receivedMqttPacket);
  71. case MqttControlPacketType.PubAck: return DeserializePubAck(receivedMqttPacket.Body);
  72. case MqttControlPacketType.PubRec: return DeserializePubRec(receivedMqttPacket.Body);
  73. case MqttControlPacketType.PubRel: return DeserializePubRel(receivedMqttPacket.Body);
  74. case MqttControlPacketType.PubComp: return DeserializePubComp(receivedMqttPacket.Body);
  75. case MqttControlPacketType.PingReq: return new MqttPingReqPacket();
  76. case MqttControlPacketType.PingResp: return new MqttPingRespPacket();
  77. case MqttControlPacketType.Subscribe: return DeserializeSubscribe(receivedMqttPacket.Body);
  78. case MqttControlPacketType.SubAck: return DeserializeSubAck(receivedMqttPacket.Body);
  79. case MqttControlPacketType.Unsubscibe: return DeserializeUnsubscribe(receivedMqttPacket.Body);
  80. case MqttControlPacketType.UnsubAck: return DeserializeUnsubAck(receivedMqttPacket.Body);
  81. default: throw new MqttProtocolViolationException($"Packet type ({controlPacketType}) not supported.");
  82. }
  83. }
  84. private static MqttBasePacket DeserializeUnsubAck(Stream body)
  85. {
  86. ThrowIfBodyIsEmpty(body);
  87. return new MqttUnsubAckPacket
  88. {
  89. PacketIdentifier = body.ReadUInt16()
  90. };
  91. }
  92. private static MqttBasePacket DeserializePubComp(Stream body)
  93. {
  94. ThrowIfBodyIsEmpty(body);
  95. return new MqttPubCompPacket
  96. {
  97. PacketIdentifier = body.ReadUInt16()
  98. };
  99. }
  100. private static MqttBasePacket DeserializePubRel(Stream body)
  101. {
  102. ThrowIfBodyIsEmpty(body);
  103. return new MqttPubRelPacket
  104. {
  105. PacketIdentifier = body.ReadUInt16()
  106. };
  107. }
  108. private static MqttBasePacket DeserializePubRec(Stream body)
  109. {
  110. ThrowIfBodyIsEmpty(body);
  111. return new MqttPubRecPacket
  112. {
  113. PacketIdentifier = body.ReadUInt16()
  114. };
  115. }
  116. private static MqttBasePacket DeserializePubAck(Stream body)
  117. {
  118. ThrowIfBodyIsEmpty(body);
  119. return new MqttPubAckPacket
  120. {
  121. PacketIdentifier = body.ReadUInt16()
  122. };
  123. }
  124. private static MqttBasePacket DeserializeUnsubscribe(Stream body)
  125. {
  126. ThrowIfBodyIsEmpty(body);
  127. var packet = new MqttUnsubscribePacket
  128. {
  129. PacketIdentifier = body.ReadUInt16(),
  130. };
  131. while (body.Position != body.Length)
  132. {
  133. packet.TopicFilters.Add(body.ReadStringWithLengthPrefix());
  134. }
  135. return packet;
  136. }
  137. private static MqttBasePacket DeserializeSubscribe(Stream body)
  138. {
  139. ThrowIfBodyIsEmpty(body);
  140. var packet = new MqttSubscribePacket
  141. {
  142. PacketIdentifier = body.ReadUInt16()
  143. };
  144. while (body.Position != body.Length)
  145. {
  146. packet.TopicFilters.Add(new TopicFilter(
  147. body.ReadStringWithLengthPrefix(),
  148. (MqttQualityOfServiceLevel)body.ReadByte()));
  149. }
  150. return packet;
  151. }
  152. private static MqttBasePacket DeserializePublish(ReceivedMqttPacket receivedMqttPacket)
  153. {
  154. var body = receivedMqttPacket.Body;
  155. ThrowIfBodyIsEmpty(body);
  156. var fixedHeader = new ByteReader(receivedMqttPacket.FixedHeader);
  157. var retain = fixedHeader.Read();
  158. var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2);
  159. var dup = fixedHeader.Read();
  160. var topic = body.ReadStringWithLengthPrefix();
  161. ushort? packetIdentifier = null;
  162. if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  163. {
  164. packetIdentifier = body.ReadUInt16();
  165. }
  166. var packet = new MqttPublishPacket
  167. {
  168. PacketIdentifier = packetIdentifier,
  169. Retain = retain,
  170. Topic = topic,
  171. Payload = body.ReadRemainingData(),
  172. QualityOfServiceLevel = qualityOfServiceLevel,
  173. Dup = dup
  174. };
  175. return packet;
  176. }
  177. private static MqttBasePacket DeserializeConnect(Stream body)
  178. {
  179. ThrowIfBodyIsEmpty(body);
  180. var protocolName = body.ReadStringWithLengthPrefix();
  181. MqttProtocolVersion protocolVersion;
  182. if (protocolName == "MQTT")
  183. {
  184. var protocolLevel = body.ReadByte();
  185. if (protocolLevel != 4)
  186. {
  187. throw new MqttProtocolViolationException($"Protocol level ({protocolLevel}) not supported for MQTT 3.1.1.");
  188. }
  189. protocolVersion = MqttProtocolVersion.V311;
  190. }
  191. else if (protocolName == "MQIsdp")
  192. {
  193. var protocolLevel = body.ReadByte();
  194. if (protocolLevel != 3)
  195. {
  196. throw new MqttProtocolViolationException($"Protocol level ({protocolLevel}) not supported for MQTT 3.1.");
  197. }
  198. protocolVersion = MqttProtocolVersion.V310;
  199. }
  200. else
  201. {
  202. throw new MqttProtocolViolationException($"Protocol name ({protocolName}) is not supported.");
  203. }
  204. var connectFlags = new ByteReader(body.ReadByte());
  205. if (connectFlags.Read())
  206. {
  207. throw new MqttProtocolViolationException("The first bit of the Connect Flags must be set to 0.");
  208. }
  209. var packet = new MqttConnectPacket
  210. {
  211. ProtocolVersion = protocolVersion,
  212. CleanSession = connectFlags.Read()
  213. };
  214. var willFlag = connectFlags.Read();
  215. var willQoS = connectFlags.Read(2);
  216. var willRetain = connectFlags.Read();
  217. var passwordFlag = connectFlags.Read();
  218. var usernameFlag = connectFlags.Read();
  219. packet.KeepAlivePeriod = body.ReadUInt16();
  220. packet.ClientId = body.ReadStringWithLengthPrefix();
  221. if (willFlag)
  222. {
  223. packet.WillMessage = new MqttApplicationMessage
  224. {
  225. Topic = body.ReadStringWithLengthPrefix(),
  226. Payload = body.ReadWithLengthPrefix(),
  227. QualityOfServiceLevel = (MqttQualityOfServiceLevel)willQoS,
  228. Retain = willRetain
  229. };
  230. }
  231. if (usernameFlag)
  232. {
  233. packet.Username = body.ReadStringWithLengthPrefix();
  234. }
  235. if (passwordFlag)
  236. {
  237. packet.Password = body.ReadStringWithLengthPrefix();
  238. }
  239. ValidateConnectPacket(packet);
  240. return packet;
  241. }
  242. private static MqttBasePacket DeserializeSubAck(Stream body)
  243. {
  244. ThrowIfBodyIsEmpty(body);
  245. var packet = new MqttSubAckPacket
  246. {
  247. PacketIdentifier = body.ReadUInt16()
  248. };
  249. while (body.Position != body.Length)
  250. {
  251. packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)body.ReadByte());
  252. }
  253. return packet;
  254. }
  255. private MqttBasePacket DeserializeConnAck(Stream body)
  256. {
  257. ThrowIfBodyIsEmpty(body);
  258. var packet = new MqttConnAckPacket();
  259. var firstByteReader = new ByteReader(body.ReadByte());
  260. if (ProtocolVersion == MqttProtocolVersion.V311)
  261. {
  262. packet.IsSessionPresent = firstByteReader.Read();
  263. }
  264. packet.ConnectReturnCode = (MqttConnectReturnCode)body.ReadByte();
  265. return packet;
  266. }
  267. private static void ValidateConnectPacket(MqttConnectPacket packet)
  268. {
  269. if (packet == null) throw new ArgumentNullException(nameof(packet));
  270. if (string.IsNullOrEmpty(packet.ClientId) && !packet.CleanSession)
  271. {
  272. throw new MqttProtocolViolationException("CleanSession must be set if ClientId is empty [MQTT-3.1.3-7].");
  273. }
  274. }
  275. private static void ValidatePublishPacket(MqttPublishPacket packet)
  276. {
  277. if (packet == null) throw new ArgumentNullException(nameof(packet));
  278. if (packet.QualityOfServiceLevel == 0 && packet.Dup)
  279. {
  280. throw new MqttProtocolViolationException("Dup flag must be false for QoS 0 packets [MQTT-3.3.1-2].");
  281. }
  282. }
  283. private byte Serialize(MqttConnectPacket packet, Stream stream)
  284. {
  285. ValidateConnectPacket(packet);
  286. // Write variable header
  287. if (ProtocolVersion == MqttProtocolVersion.V311)
  288. {
  289. stream.WriteWithLengthPrefix("MQTT");
  290. stream.WriteByte(4); // 3.1.2.2 Protocol Level 4
  291. }
  292. else
  293. {
  294. stream.WriteWithLengthPrefix("MQIsdp");
  295. stream.WriteByte(3); // Protocol Level 3
  296. }
  297. var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
  298. connectFlags.Write(false); // Reserved
  299. connectFlags.Write(packet.CleanSession);
  300. connectFlags.Write(packet.WillMessage != null);
  301. if (packet.WillMessage != null)
  302. {
  303. connectFlags.Write((int)packet.WillMessage.QualityOfServiceLevel, 2);
  304. connectFlags.Write(packet.WillMessage.Retain);
  305. }
  306. else
  307. {
  308. connectFlags.Write(0, 2);
  309. connectFlags.Write(false);
  310. }
  311. if (packet.Password != null && packet.Username == null)
  312. {
  313. throw new MqttProtocolViolationException("If the User Name Flag is set to 0, the Password Flag MUST be set to 0 [MQTT-3.1.2-22].");
  314. }
  315. connectFlags.Write(packet.Password != null);
  316. connectFlags.Write(packet.Username != null);
  317. stream.Write(connectFlags);
  318. stream.Write(packet.KeepAlivePeriod);
  319. stream.WriteWithLengthPrefix(packet.ClientId);
  320. if (packet.WillMessage != null)
  321. {
  322. stream.WriteWithLengthPrefix(packet.WillMessage.Topic);
  323. stream.WriteWithLengthPrefix(packet.WillMessage.Payload);
  324. }
  325. if (packet.Username != null)
  326. {
  327. stream.WriteWithLengthPrefix(packet.Username);
  328. }
  329. if (packet.Password != null)
  330. {
  331. stream.WriteWithLengthPrefix(packet.Password);
  332. }
  333. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Connect);
  334. }
  335. private byte Serialize(MqttConnAckPacket packet, Stream stream)
  336. {
  337. if (ProtocolVersion == MqttProtocolVersion.V310)
  338. {
  339. stream.WriteByte(0);
  340. }
  341. else if (ProtocolVersion == MqttProtocolVersion.V311)
  342. {
  343. var connectAcknowledgeFlags = new ByteWriter();
  344. connectAcknowledgeFlags.Write(packet.IsSessionPresent);
  345. stream.Write(connectAcknowledgeFlags);
  346. }
  347. else
  348. {
  349. throw new MqttProtocolViolationException("Protocol version not supported.");
  350. }
  351. stream.WriteByte((byte)packet.ConnectReturnCode);
  352. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck);
  353. }
  354. private static byte Serialize(MqttPubRelPacket packet, Stream stream)
  355. {
  356. if (!packet.PacketIdentifier.HasValue)
  357. {
  358. throw new MqttProtocolViolationException("PubRel packet has no packet identifier.");
  359. }
  360. stream.Write(packet.PacketIdentifier.Value);
  361. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02);
  362. }
  363. private static byte Serialize(MqttPublishPacket packet, Stream stream)
  364. {
  365. ValidatePublishPacket(packet);
  366. stream.WriteWithLengthPrefix(packet.Topic);
  367. if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  368. {
  369. if (!packet.PacketIdentifier.HasValue)
  370. {
  371. throw new MqttProtocolViolationException("Publish packet has no packet identifier.");
  372. }
  373. stream.Write(packet.PacketIdentifier.Value);
  374. }
  375. else
  376. {
  377. if (packet.PacketIdentifier > 0)
  378. {
  379. throw new MqttProtocolViolationException("Packet identifier must be empty if QoS == 0 [MQTT-2.3.1-5].");
  380. }
  381. }
  382. if (packet.Payload?.Length > 0)
  383. {
  384. stream.Write(packet.Payload, 0, packet.Payload.Length);
  385. }
  386. byte fixedHeader = 0;
  387. if (packet.Retain)
  388. {
  389. fixedHeader |= 0x01;
  390. }
  391. fixedHeader |= (byte)((byte)packet.QualityOfServiceLevel << 1);
  392. if (packet.Dup)
  393. {
  394. fixedHeader |= 0x08;
  395. }
  396. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader);
  397. }
  398. private static byte Serialize(MqttPubAckPacket packet, Stream stream)
  399. {
  400. if (!packet.PacketIdentifier.HasValue)
  401. {
  402. throw new MqttProtocolViolationException("PubAck packet has no packet identifier.");
  403. }
  404. stream.Write(packet.PacketIdentifier.Value);
  405. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck);
  406. }
  407. private static byte Serialize(MqttPubRecPacket packet, Stream stream)
  408. {
  409. if (!packet.PacketIdentifier.HasValue)
  410. {
  411. throw new MqttProtocolViolationException("PubRec packet has no packet identifier.");
  412. }
  413. stream.Write(packet.PacketIdentifier.Value);
  414. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec);
  415. }
  416. private static byte Serialize(MqttPubCompPacket packet, Stream stream)
  417. {
  418. if (!packet.PacketIdentifier.HasValue)
  419. {
  420. throw new MqttProtocolViolationException("PubComp packet has no packet identifier.");
  421. }
  422. stream.Write(packet.PacketIdentifier.Value);
  423. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp);
  424. }
  425. private static byte Serialize(MqttSubscribePacket packet, Stream stream)
  426. {
  427. if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");
  428. if (!packet.PacketIdentifier.HasValue)
  429. {
  430. throw new MqttProtocolViolationException("Subscribe packet has no packet identifier.");
  431. }
  432. stream.Write(packet.PacketIdentifier.Value);
  433. if (packet.TopicFilters?.Count > 0)
  434. {
  435. foreach (var topicFilter in packet.TopicFilters)
  436. {
  437. stream.WriteWithLengthPrefix(topicFilter.Topic);
  438. stream.WriteByte((byte)topicFilter.QualityOfServiceLevel);
  439. }
  440. }
  441. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Subscribe, 0x02);
  442. }
  443. private static byte Serialize(MqttSubAckPacket packet, Stream stream)
  444. {
  445. if (!packet.PacketIdentifier.HasValue)
  446. {
  447. throw new MqttProtocolViolationException("SubAck packet has no packet identifier.");
  448. }
  449. stream.Write(packet.PacketIdentifier.Value);
  450. if (packet.SubscribeReturnCodes?.Any() == true)
  451. {
  452. foreach (var packetSubscribeReturnCode in packet.SubscribeReturnCodes)
  453. {
  454. stream.WriteByte((byte)packetSubscribeReturnCode);
  455. }
  456. }
  457. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.SubAck);
  458. }
  459. private static byte Serialize(MqttUnsubscribePacket packet, Stream stream)
  460. {
  461. if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");
  462. if (!packet.PacketIdentifier.HasValue)
  463. {
  464. throw new MqttProtocolViolationException("Unsubscribe packet has no packet identifier.");
  465. }
  466. stream.Write(packet.PacketIdentifier.Value);
  467. if (packet.TopicFilters?.Any() == true)
  468. {
  469. foreach (var topicFilter in packet.TopicFilters)
  470. {
  471. stream.WriteWithLengthPrefix(topicFilter);
  472. }
  473. }
  474. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
  475. }
  476. private static byte Serialize(MqttUnsubAckPacket packet, Stream stream)
  477. {
  478. if (!packet.PacketIdentifier.HasValue)
  479. {
  480. throw new MqttProtocolViolationException("UnsubAck packet has no packet identifier.");
  481. }
  482. stream.Write(packet.PacketIdentifier.Value);
  483. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck);
  484. }
  485. private static byte SerializeEmptyPacket(MqttControlPacketType type)
  486. {
  487. return MqttPacketWriter.BuildFixedHeader(type);
  488. }
  489. private static void ThrowIfBodyIsEmpty(Stream body)
  490. {
  491. if (body == null || body.Length == 0)
  492. {
  493. throw new MqttProtocolViolationException("Data from the body is required but not present.");
  494. }
  495. }
  496. }
  497. }