You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

629 lines
23 KiB

  1. using MQTTnet.Exceptions;
  2. using MQTTnet.Packets;
  3. using MQTTnet.Protocol;
  4. using System;
  5. using System.Linq;
  6. using MQTTnet.Adapter;
  7. namespace MQTTnet.Serializer
  8. {
  9. public class MqttPacketSerializer : IMqttPacketSerializer
  10. {
  11. private const int FixedHeaderSize = 1;
  12. private readonly MqttPacketWriter _packetWriter = new MqttPacketWriter();
  13. public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
  14. public ArraySegment<byte> Serialize(MqttBasePacket packet)
  15. {
  16. if (packet == null) throw new ArgumentNullException(nameof(packet));
  17. // Leave enough head space for max header size (fixed + 4 variable remaining length = 5 bytes)
  18. _packetWriter.Reset();
  19. _packetWriter.Seek(5);
  20. var fixedHeader = SerializePacket(packet, _packetWriter);
  21. var remainingLength = _packetWriter.Length - 5;
  22. var remainingLengthBuffer = MqttPacketWriter.EncodeRemainingLength(remainingLength);
  23. var headerSize = FixedHeaderSize + remainingLengthBuffer.Count;
  24. var headerOffset = 5 - headerSize;
  25. // Position cursor on correct offset on beginining of array (has leading 0x0)
  26. _packetWriter.Seek(headerOffset);
  27. _packetWriter.Write(fixedHeader);
  28. _packetWriter.Write(remainingLengthBuffer.Array, remainingLengthBuffer.Offset, remainingLengthBuffer.Count);
  29. var buffer = _packetWriter.GetBuffer();
  30. return new ArraySegment<byte>(buffer, headerOffset, _packetWriter.Length - headerOffset);
  31. }
  32. public MqttBasePacket Deserialize(ReceivedMqttPacket receivedMqttPacket)
  33. {
  34. if (receivedMqttPacket == null) throw new ArgumentNullException(nameof(receivedMqttPacket));
  35. var controlPacketType = receivedMqttPacket.FixedHeader >> 4;
  36. if (controlPacketType < 1 || controlPacketType > 14)
  37. {
  38. throw new MqttProtocolViolationException($"The packet type is invalid ({controlPacketType}).");
  39. }
  40. switch ((MqttControlPacketType)controlPacketType)
  41. {
  42. case MqttControlPacketType.Connect: return DeserializeConnect(receivedMqttPacket.Body);
  43. case MqttControlPacketType.ConnAck: return DeserializeConnAck(receivedMqttPacket.Body);
  44. case MqttControlPacketType.Disconnect: return new MqttDisconnectPacket();
  45. case MqttControlPacketType.Publish: return DeserializePublish(receivedMqttPacket);
  46. case MqttControlPacketType.PubAck: return DeserializePubAck(receivedMqttPacket.Body);
  47. case MqttControlPacketType.PubRec: return DeserializePubRec(receivedMqttPacket.Body);
  48. case MqttControlPacketType.PubRel: return DeserializePubRel(receivedMqttPacket.Body);
  49. case MqttControlPacketType.PubComp: return DeserializePubComp(receivedMqttPacket.Body);
  50. case MqttControlPacketType.PingReq: return new MqttPingReqPacket();
  51. case MqttControlPacketType.PingResp: return new MqttPingRespPacket();
  52. case MqttControlPacketType.Subscribe: return DeserializeSubscribe(receivedMqttPacket.Body);
  53. case MqttControlPacketType.SubAck: return DeserializeSubAck(receivedMqttPacket.Body);
  54. case MqttControlPacketType.Unsubscibe: return DeserializeUnsubscribe(receivedMqttPacket.Body);
  55. case MqttControlPacketType.UnsubAck: return DeserializeUnsubAck(receivedMqttPacket.Body);
  56. default: throw new MqttProtocolViolationException($"Packet type ({controlPacketType}) not supported.");
  57. }
  58. }
  59. public void FreeBuffer()
  60. {
  61. _packetWriter.FreeBuffer();
  62. }
  63. private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter packetWriter)
  64. {
  65. switch (packet)
  66. {
  67. case MqttConnectPacket connectPacket: return Serialize(connectPacket, packetWriter);
  68. case MqttConnAckPacket connAckPacket: return Serialize(connAckPacket, packetWriter);
  69. case MqttDisconnectPacket _: return SerializeEmptyPacket(MqttControlPacketType.Disconnect);
  70. case MqttPingReqPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingReq);
  71. case MqttPingRespPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingResp);
  72. case MqttPublishPacket publishPacket: return Serialize(publishPacket, packetWriter);
  73. case MqttPubAckPacket pubAckPacket: return Serialize(pubAckPacket, packetWriter);
  74. case MqttPubRecPacket pubRecPacket: return Serialize(pubRecPacket, packetWriter);
  75. case MqttPubRelPacket pubRelPacket: return Serialize(pubRelPacket, packetWriter);
  76. case MqttPubCompPacket pubCompPacket: return Serialize(pubCompPacket, packetWriter);
  77. case MqttSubscribePacket subscribePacket: return Serialize(subscribePacket, packetWriter);
  78. case MqttSubAckPacket subAckPacket: return Serialize(subAckPacket, packetWriter);
  79. case MqttUnsubscribePacket unsubscribePacket: return Serialize(unsubscribePacket, packetWriter);
  80. case MqttUnsubAckPacket unsubAckPacket: return Serialize(unsubAckPacket, packetWriter);
  81. default: throw new MqttProtocolViolationException("Packet type invalid.");
  82. }
  83. }
  84. private static MqttBasePacket DeserializeUnsubAck(MqttPacketBodyReader body)
  85. {
  86. ThrowIfBodyIsEmpty(body);
  87. return new MqttUnsubAckPacket
  88. {
  89. PacketIdentifier = body.ReadUInt16()
  90. };
  91. }
  92. private static MqttBasePacket DeserializePubComp(MqttPacketBodyReader body)
  93. {
  94. ThrowIfBodyIsEmpty(body);
  95. return new MqttPubCompPacket
  96. {
  97. PacketIdentifier = body.ReadUInt16()
  98. };
  99. }
  100. private static MqttBasePacket DeserializePubRel(MqttPacketBodyReader body)
  101. {
  102. ThrowIfBodyIsEmpty(body);
  103. return new MqttPubRelPacket
  104. {
  105. PacketIdentifier = body.ReadUInt16()
  106. };
  107. }
  108. private static MqttBasePacket DeserializePubRec(MqttPacketBodyReader body)
  109. {
  110. ThrowIfBodyIsEmpty(body);
  111. return new MqttPubRecPacket
  112. {
  113. PacketIdentifier = body.ReadUInt16()
  114. };
  115. }
  116. private static MqttBasePacket DeserializePubAck(MqttPacketBodyReader body)
  117. {
  118. ThrowIfBodyIsEmpty(body);
  119. return new MqttPubAckPacket
  120. {
  121. PacketIdentifier = body.ReadUInt16()
  122. };
  123. }
  124. private static MqttBasePacket DeserializeUnsubscribe(MqttPacketBodyReader body)
  125. {
  126. ThrowIfBodyIsEmpty(body);
  127. var packet = new MqttUnsubscribePacket
  128. {
  129. PacketIdentifier = body.ReadUInt16(),
  130. };
  131. while (!body.EndOfStream)
  132. {
  133. packet.TopicFilters.Add(body.ReadStringWithLengthPrefix());
  134. }
  135. return packet;
  136. }
  137. private static MqttBasePacket DeserializeSubscribe(MqttPacketBodyReader body)
  138. {
  139. ThrowIfBodyIsEmpty(body);
  140. var packet = new MqttSubscribePacket
  141. {
  142. PacketIdentifier = body.ReadUInt16()
  143. };
  144. while (!body.EndOfStream)
  145. {
  146. packet.TopicFilters.Add(new TopicFilter(
  147. body.ReadStringWithLengthPrefix(),
  148. (MqttQualityOfServiceLevel)body.ReadByte()));
  149. }
  150. return packet;
  151. }
  152. private static MqttBasePacket DeserializePublish(ReceivedMqttPacket receivedMqttPacket)
  153. {
  154. ThrowIfBodyIsEmpty(receivedMqttPacket.Body);
  155. var retain = (receivedMqttPacket.FixedHeader & 0x1) > 0;
  156. var qualityOfServiceLevel = (MqttQualityOfServiceLevel)(receivedMqttPacket.FixedHeader >> 1 & 0x3);
  157. var dup = (receivedMqttPacket.FixedHeader & 0x8) > 0;
  158. var topic = receivedMqttPacket.Body.ReadStringWithLengthPrefix();
  159. ushort? packetIdentifier = null;
  160. if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  161. {
  162. packetIdentifier = receivedMqttPacket.Body.ReadUInt16();
  163. }
  164. var packet = new MqttPublishPacket
  165. {
  166. PacketIdentifier = packetIdentifier,
  167. Retain = retain,
  168. Topic = topic,
  169. Payload = receivedMqttPacket.Body.ReadRemainingData().ToArray(),
  170. QualityOfServiceLevel = qualityOfServiceLevel,
  171. Dup = dup
  172. };
  173. return packet;
  174. }
  175. private static MqttBasePacket DeserializeConnect(MqttPacketBodyReader body)
  176. {
  177. ThrowIfBodyIsEmpty(body);
  178. var protocolName = body.ReadStringWithLengthPrefix();
  179. MqttProtocolVersion protocolVersion;
  180. if (protocolName == "MQTT")
  181. {
  182. var protocolLevel = body.ReadByte();
  183. if (protocolLevel != 4)
  184. {
  185. throw new MqttProtocolViolationException($"Protocol level ({protocolLevel}) not supported for MQTT 3.1.1.");
  186. }
  187. protocolVersion = MqttProtocolVersion.V311;
  188. }
  189. else if (protocolName == "MQIsdp")
  190. {
  191. var protocolLevel = body.ReadByte();
  192. if (protocolLevel != 3)
  193. {
  194. throw new MqttProtocolViolationException($"Protocol level ({protocolLevel}) not supported for MQTT 3.1.");
  195. }
  196. protocolVersion = MqttProtocolVersion.V310;
  197. }
  198. else
  199. {
  200. throw new MqttProtocolViolationException($"Protocol name ({protocolName}) is not supported.");
  201. }
  202. var connectFlags = body.ReadByte();
  203. if ((connectFlags & 0x1) > 0)
  204. {
  205. throw new MqttProtocolViolationException("The first bit of the Connect Flags must be set to 0.");
  206. }
  207. var packet = new MqttConnectPacket
  208. {
  209. ProtocolVersion = protocolVersion,
  210. CleanSession = (connectFlags & 0x2) > 0
  211. };
  212. var willFlag = (connectFlags & 0x4) > 0;
  213. var willQoS = (connectFlags & 0x18) >> 3;
  214. var willRetain = (connectFlags & 0x20) > 0;
  215. var passwordFlag = (connectFlags & 0x40) > 0;
  216. var usernameFlag = (connectFlags & 0x80) > 0;
  217. packet.KeepAlivePeriod = body.ReadUInt16();
  218. packet.ClientId = body.ReadStringWithLengthPrefix();
  219. if (willFlag)
  220. {
  221. packet.WillMessage = new MqttApplicationMessage
  222. {
  223. Topic = body.ReadStringWithLengthPrefix(),
  224. Payload = body.ReadWithLengthPrefix().ToArray(),
  225. QualityOfServiceLevel = (MqttQualityOfServiceLevel)willQoS,
  226. Retain = willRetain
  227. };
  228. }
  229. if (usernameFlag)
  230. {
  231. packet.Username = body.ReadStringWithLengthPrefix();
  232. }
  233. if (passwordFlag)
  234. {
  235. packet.Password = body.ReadStringWithLengthPrefix();
  236. }
  237. ValidateConnectPacket(packet);
  238. return packet;
  239. }
  240. private static MqttBasePacket DeserializeSubAck(MqttPacketBodyReader body)
  241. {
  242. ThrowIfBodyIsEmpty(body);
  243. var packet = new MqttSubAckPacket
  244. {
  245. PacketIdentifier = body.ReadUInt16()
  246. };
  247. while (!body.EndOfStream)
  248. {
  249. packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)body.ReadByte());
  250. }
  251. return packet;
  252. }
  253. private MqttBasePacket DeserializeConnAck(MqttPacketBodyReader body)
  254. {
  255. ThrowIfBodyIsEmpty(body);
  256. var packet = new MqttConnAckPacket();
  257. var acknowledgeFlags = body.ReadByte();
  258. if (ProtocolVersion == MqttProtocolVersion.V311)
  259. {
  260. packet.IsSessionPresent = (acknowledgeFlags & 0x1) > 0;
  261. }
  262. packet.ConnectReturnCode = (MqttConnectReturnCode)body.ReadByte();
  263. return packet;
  264. }
  265. private static void ValidateConnectPacket(MqttConnectPacket packet)
  266. {
  267. if (packet == null) throw new ArgumentNullException(nameof(packet));
  268. if (string.IsNullOrEmpty(packet.ClientId) && !packet.CleanSession)
  269. {
  270. throw new MqttProtocolViolationException("CleanSession must be set if ClientId is empty [MQTT-3.1.3-7].");
  271. }
  272. }
  273. // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local
  274. private static void ValidatePublishPacket(MqttPublishPacket packet)
  275. {
  276. if (packet.QualityOfServiceLevel == 0 && packet.Dup)
  277. {
  278. throw new MqttProtocolViolationException("Dup flag must be false for QoS 0 packets [MQTT-3.3.1-2].");
  279. }
  280. }
  281. private byte Serialize(MqttConnectPacket packet, MqttPacketWriter packetWriter)
  282. {
  283. ValidateConnectPacket(packet);
  284. // Write variable header
  285. if (ProtocolVersion == MqttProtocolVersion.V311)
  286. {
  287. packetWriter.WriteWithLengthPrefix("MQTT");
  288. packetWriter.Write(4); // 3.1.2.2 Protocol Level 4
  289. }
  290. else
  291. {
  292. packetWriter.WriteWithLengthPrefix("MQIsdp");
  293. packetWriter.Write(3); // Protocol Level 3
  294. }
  295. byte connectFlags = 0x0;
  296. if (packet.CleanSession)
  297. {
  298. connectFlags |= 0x2;
  299. }
  300. if (packet.WillMessage != null)
  301. {
  302. connectFlags |= 0x4;
  303. connectFlags |= (byte)((byte)packet.WillMessage.QualityOfServiceLevel << 3);
  304. if (packet.WillMessage.Retain)
  305. {
  306. connectFlags |= 0x20;
  307. }
  308. }
  309. if (packet.Password != null && packet.Username == null)
  310. {
  311. throw new MqttProtocolViolationException("If the User Name Flag is set to 0, the Password Flag MUST be set to 0 [MQTT-3.1.2-22].");
  312. }
  313. if (packet.Password != null)
  314. {
  315. connectFlags |= 0x40;
  316. }
  317. if (packet.Username != null)
  318. {
  319. connectFlags |= 0x80;
  320. }
  321. packetWriter.Write(connectFlags);
  322. packetWriter.Write(packet.KeepAlivePeriod);
  323. packetWriter.WriteWithLengthPrefix(packet.ClientId);
  324. if (packet.WillMessage != null)
  325. {
  326. packetWriter.WriteWithLengthPrefix(packet.WillMessage.Topic);
  327. packetWriter.WriteWithLengthPrefix(packet.WillMessage.Payload);
  328. }
  329. if (packet.Username != null)
  330. {
  331. packetWriter.WriteWithLengthPrefix(packet.Username);
  332. }
  333. if (packet.Password != null)
  334. {
  335. packetWriter.WriteWithLengthPrefix(packet.Password);
  336. }
  337. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Connect);
  338. }
  339. private byte Serialize(MqttConnAckPacket packet, MqttPacketWriter packetWriter)
  340. {
  341. if (ProtocolVersion == MqttProtocolVersion.V310)
  342. {
  343. packetWriter.Write(0);
  344. }
  345. else if (ProtocolVersion == MqttProtocolVersion.V311)
  346. {
  347. byte connectAcknowledgeFlags = 0x0;
  348. if (packet.IsSessionPresent)
  349. {
  350. connectAcknowledgeFlags |= 0x1;
  351. }
  352. packetWriter.Write(connectAcknowledgeFlags);
  353. }
  354. else
  355. {
  356. throw new MqttProtocolViolationException("Protocol version not supported.");
  357. }
  358. packetWriter.Write((byte)packet.ConnectReturnCode);
  359. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck);
  360. }
  361. private static byte Serialize(MqttPubRelPacket packet, MqttPacketWriter packetWriter)
  362. {
  363. if (!packet.PacketIdentifier.HasValue)
  364. {
  365. throw new MqttProtocolViolationException("PubRel packet has no packet identifier.");
  366. }
  367. packetWriter.Write(packet.PacketIdentifier.Value);
  368. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02);
  369. }
  370. private static byte Serialize(MqttPublishPacket packet, MqttPacketWriter packetWriter)
  371. {
  372. ValidatePublishPacket(packet);
  373. packetWriter.WriteWithLengthPrefix(packet.Topic);
  374. if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  375. {
  376. if (!packet.PacketIdentifier.HasValue)
  377. {
  378. throw new MqttProtocolViolationException("Publish packet has no packet identifier.");
  379. }
  380. packetWriter.Write(packet.PacketIdentifier.Value);
  381. }
  382. else
  383. {
  384. if (packet.PacketIdentifier > 0)
  385. {
  386. throw new MqttProtocolViolationException("Packet identifier must be empty if QoS == 0 [MQTT-2.3.1-5].");
  387. }
  388. }
  389. if (packet.Payload?.Length > 0)
  390. {
  391. packetWriter.Write(packet.Payload, 0, packet.Payload.Length);
  392. }
  393. byte fixedHeader = 0;
  394. if (packet.Retain)
  395. {
  396. fixedHeader |= 0x01;
  397. }
  398. fixedHeader |= (byte)((byte)packet.QualityOfServiceLevel << 1);
  399. if (packet.Dup)
  400. {
  401. fixedHeader |= 0x08;
  402. }
  403. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader);
  404. }
  405. private static byte Serialize(MqttPubAckPacket packet, MqttPacketWriter packetWriter)
  406. {
  407. if (!packet.PacketIdentifier.HasValue)
  408. {
  409. throw new MqttProtocolViolationException("PubAck packet has no packet identifier.");
  410. }
  411. packetWriter.Write(packet.PacketIdentifier.Value);
  412. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck);
  413. }
  414. private static byte Serialize(MqttPubRecPacket packet, MqttPacketWriter packetWriter)
  415. {
  416. if (!packet.PacketIdentifier.HasValue)
  417. {
  418. throw new MqttProtocolViolationException("PubRec packet has no packet identifier.");
  419. }
  420. packetWriter.Write(packet.PacketIdentifier.Value);
  421. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec);
  422. }
  423. private static byte Serialize(MqttPubCompPacket packet, MqttPacketWriter packetWriter)
  424. {
  425. if (!packet.PacketIdentifier.HasValue)
  426. {
  427. throw new MqttProtocolViolationException("PubComp packet has no packet identifier.");
  428. }
  429. packetWriter.Write(packet.PacketIdentifier.Value);
  430. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp);
  431. }
  432. private static byte Serialize(MqttSubscribePacket packet, MqttPacketWriter packetWriter)
  433. {
  434. if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");
  435. if (!packet.PacketIdentifier.HasValue)
  436. {
  437. throw new MqttProtocolViolationException("Subscribe packet has no packet identifier.");
  438. }
  439. packetWriter.Write(packet.PacketIdentifier.Value);
  440. if (packet.TopicFilters?.Count > 0)
  441. {
  442. foreach (var topicFilter in packet.TopicFilters)
  443. {
  444. packetWriter.WriteWithLengthPrefix(topicFilter.Topic);
  445. packetWriter.Write((byte)topicFilter.QualityOfServiceLevel);
  446. }
  447. }
  448. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Subscribe, 0x02);
  449. }
  450. private static byte Serialize(MqttSubAckPacket packet, MqttPacketWriter packetWriter)
  451. {
  452. if (!packet.PacketIdentifier.HasValue)
  453. {
  454. throw new MqttProtocolViolationException("SubAck packet has no packet identifier.");
  455. }
  456. packetWriter.Write(packet.PacketIdentifier.Value);
  457. if (packet.SubscribeReturnCodes?.Any() == true)
  458. {
  459. foreach (var packetSubscribeReturnCode in packet.SubscribeReturnCodes)
  460. {
  461. packetWriter.Write((byte)packetSubscribeReturnCode);
  462. }
  463. }
  464. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.SubAck);
  465. }
  466. private static byte Serialize(MqttUnsubscribePacket packet, MqttPacketWriter packetWriter)
  467. {
  468. if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");
  469. if (!packet.PacketIdentifier.HasValue)
  470. {
  471. throw new MqttProtocolViolationException("Unsubscribe packet has no packet identifier.");
  472. }
  473. packetWriter.Write(packet.PacketIdentifier.Value);
  474. if (packet.TopicFilters?.Any() == true)
  475. {
  476. foreach (var topicFilter in packet.TopicFilters)
  477. {
  478. packetWriter.WriteWithLengthPrefix(topicFilter);
  479. }
  480. }
  481. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
  482. }
  483. private static byte Serialize(MqttUnsubAckPacket packet, MqttPacketWriter packetWriter)
  484. {
  485. if (!packet.PacketIdentifier.HasValue)
  486. {
  487. throw new MqttProtocolViolationException("UnsubAck packet has no packet identifier.");
  488. }
  489. packetWriter.Write(packet.PacketIdentifier.Value);
  490. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck);
  491. }
  492. private static byte SerializeEmptyPacket(MqttControlPacketType type)
  493. {
  494. return MqttPacketWriter.BuildFixedHeader(type);
  495. }
  496. // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local
  497. private static void ThrowIfBodyIsEmpty(MqttPacketBodyReader body)
  498. {
  499. if (body == null || body.Length == 0)
  500. {
  501. throw new MqttProtocolViolationException("Data from the body is required but not present.");
  502. }
  503. }
  504. }
  505. }