25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

580 lines
21 KiB

  1. using MQTTnet.Exceptions;
  2. using MQTTnet.Packets;
  3. using MQTTnet.Protocol;
  4. using System;
  5. using System.IO;
  6. using System.Linq;
  7. using System.Text;
  8. namespace MQTTnet.Serializer
  9. {
  10. public sealed class MqttPacketSerializer : IMqttPacketSerializer
  11. {
  12. private static byte[] ProtocolVersionV311Name { get; } = Encoding.UTF8.GetBytes("MQTT");
  13. private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIsdp");
  14. public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
  15. public ArraySegment<byte> Serialize(MqttBasePacket packet)
  16. {
  17. if (packet == null) throw new ArgumentNullException(nameof(packet));
  18. using (var stream = new MemoryStream(128))
  19. {
  20. // Leave enough head space for max header size (fixed + 4 variable remaining length)
  21. stream.Position = 5;
  22. var fixedHeader = SerializePacket(packet, stream);
  23. stream.Position = 1;
  24. var remainingLength = MqttPacketWriter.EncodeRemainingLength((int)stream.Length - 5, stream);
  25. var headerSize = remainingLength + 1;
  26. var headerOffset = 5 - headerSize;
  27. // Position cursor on correct offset on beginining of array (has leading 0x0)
  28. stream.Position = headerOffset;
  29. stream.WriteByte(fixedHeader);
  30. #if NET461 || NET452 || NETSTANDARD2_0
  31. var buffer = stream.GetBuffer();
  32. #else
  33. var buffer = stream.ToArray();
  34. #endif
  35. return new ArraySegment<byte>(buffer, headerOffset, (int)stream.Length - headerOffset);
  36. }
  37. }
  38. private byte SerializePacket(MqttBasePacket packet, Stream stream)
  39. {
  40. switch (packet)
  41. {
  42. case MqttConnectPacket connectPacket: return Serialize(connectPacket, stream);
  43. case MqttConnAckPacket connAckPacket: return Serialize(connAckPacket, stream);
  44. case MqttDisconnectPacket _: return SerializeEmptyPacket(MqttControlPacketType.Disconnect);
  45. case MqttPingReqPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingReq);
  46. case MqttPingRespPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingResp);
  47. case MqttPublishPacket publishPacket: return Serialize(publishPacket, stream);
  48. case MqttPubAckPacket pubAckPacket: return Serialize(pubAckPacket, stream);
  49. case MqttPubRecPacket pubRecPacket: return Serialize(pubRecPacket, stream);
  50. case MqttPubRelPacket pubRelPacket: return Serialize(pubRelPacket, stream);
  51. case MqttPubCompPacket pubCompPacket: return Serialize(pubCompPacket, stream);
  52. case MqttSubscribePacket subscribePacket: return Serialize(subscribePacket, stream);
  53. case MqttSubAckPacket subAckPacket: return Serialize(subAckPacket, stream);
  54. case MqttUnsubscribePacket unsubscribePacket: return Serialize(unsubscribePacket, stream);
  55. case MqttUnsubAckPacket unsubAckPacket: return Serialize(unsubAckPacket, stream);
  56. default: throw new MqttProtocolViolationException("Packet type invalid.");
  57. }
  58. }
  59. public MqttBasePacket Deserialize(MqttPacketHeader header, Stream stream)
  60. {
  61. if (header == null) throw new ArgumentNullException(nameof(header));
  62. if (stream == null) throw new ArgumentNullException(nameof(stream));
  63. switch (header.ControlPacketType)
  64. {
  65. case MqttControlPacketType.Connect: return DeserializeConnect(stream);
  66. case MqttControlPacketType.ConnAck: return DeserializeConnAck(stream);
  67. case MqttControlPacketType.Disconnect: return new MqttDisconnectPacket();
  68. case MqttControlPacketType.Publish: return DeserializePublish(stream, header);
  69. case MqttControlPacketType.PubAck: return DeserializePubAck(stream);
  70. case MqttControlPacketType.PubRec: return DeserializePubRec(stream);
  71. case MqttControlPacketType.PubRel: return DeserializePubRel(stream);
  72. case MqttControlPacketType.PubComp: return DeserializePubComp(stream);
  73. case MqttControlPacketType.PingReq: return new MqttPingReqPacket();
  74. case MqttControlPacketType.PingResp: return new MqttPingRespPacket();
  75. case MqttControlPacketType.Subscribe: return DeserializeSubscribe(stream, header);
  76. case MqttControlPacketType.SubAck: return DeserializeSubAck(stream, header);
  77. case MqttControlPacketType.Unsubscibe: return DeserializeUnsubscribe(stream, header);
  78. case MqttControlPacketType.UnsubAck: return DeserializeUnsubAck(stream);
  79. default: throw new MqttProtocolViolationException($"Packet type ({(int)header.ControlPacketType}) not supported.");
  80. }
  81. }
  82. private static MqttBasePacket DeserializeUnsubAck(Stream stream)
  83. {
  84. return new MqttUnsubAckPacket
  85. {
  86. PacketIdentifier = stream.ReadUInt16()
  87. };
  88. }
  89. private static MqttBasePacket DeserializePubComp(Stream stream)
  90. {
  91. return new MqttPubCompPacket
  92. {
  93. PacketIdentifier = stream.ReadUInt16()
  94. };
  95. }
  96. private static MqttBasePacket DeserializePubRel(Stream stream)
  97. {
  98. return new MqttPubRelPacket
  99. {
  100. PacketIdentifier = stream.ReadUInt16()
  101. };
  102. }
  103. private static MqttBasePacket DeserializePubRec(Stream stream)
  104. {
  105. return new MqttPubRecPacket
  106. {
  107. PacketIdentifier = stream.ReadUInt16()
  108. };
  109. }
  110. private static MqttBasePacket DeserializePubAck(Stream stream)
  111. {
  112. return new MqttPubAckPacket
  113. {
  114. PacketIdentifier = stream.ReadUInt16()
  115. };
  116. }
  117. private static MqttBasePacket DeserializeUnsubscribe(Stream stream, MqttPacketHeader header)
  118. {
  119. var packet = new MqttUnsubscribePacket
  120. {
  121. PacketIdentifier = stream.ReadUInt16(),
  122. };
  123. while (stream.Position != header.BodyLength)
  124. {
  125. packet.TopicFilters.Add(stream.ReadStringWithLengthPrefix());
  126. }
  127. return packet;
  128. }
  129. private static MqttBasePacket DeserializeSubscribe(Stream stream, MqttPacketHeader header)
  130. {
  131. var packet = new MqttSubscribePacket
  132. {
  133. PacketIdentifier = stream.ReadUInt16()
  134. };
  135. while (stream.Position != header.BodyLength)
  136. {
  137. packet.TopicFilters.Add(new TopicFilter(
  138. stream.ReadStringWithLengthPrefix(),
  139. (MqttQualityOfServiceLevel)stream.ReadByte()));
  140. }
  141. return packet;
  142. }
  143. private static MqttBasePacket DeserializePublish(Stream stream, MqttPacketHeader mqttPacketHeader)
  144. {
  145. var fixedHeader = new ByteReader(mqttPacketHeader.FixedHeader);
  146. var retain = fixedHeader.Read();
  147. var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2);
  148. var dup = fixedHeader.Read();
  149. var topic = stream.ReadStringWithLengthPrefix();
  150. ushort? packetIdentifier = null;
  151. if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  152. {
  153. packetIdentifier = stream.ReadUInt16();
  154. }
  155. var packet = new MqttPublishPacket
  156. {
  157. PacketIdentifier = packetIdentifier,
  158. Retain = retain,
  159. Topic = topic,
  160. Payload = stream.ReadRemainingData(mqttPacketHeader),
  161. QualityOfServiceLevel = qualityOfServiceLevel,
  162. Dup = dup
  163. };
  164. return packet;
  165. }
  166. private static MqttBasePacket DeserializeConnect(Stream stream)
  167. {
  168. stream.ReadBytes(2); // Skip 2 bytes for header and remaining length.
  169. MqttProtocolVersion protocolVersion;
  170. var protocolName = stream.ReadBytes(4);
  171. if (protocolName.SequenceEqual(ProtocolVersionV311Name))
  172. {
  173. protocolVersion = MqttProtocolVersion.V311;
  174. }
  175. else
  176. {
  177. var buffer = new byte[6];
  178. Array.Copy(protocolName, buffer, 4);
  179. protocolName = stream.ReadBytes(2);
  180. Array.Copy(protocolName, 0, buffer, 4, 2);
  181. if (protocolName.SequenceEqual(ProtocolVersionV310Name))
  182. {
  183. protocolVersion = MqttProtocolVersion.V310;
  184. }
  185. else
  186. {
  187. throw new MqttProtocolViolationException("Protocol name is not supported.");
  188. }
  189. }
  190. stream.ReadByte(); // Skip protocol level
  191. var connectFlags = stream.ReadByte();
  192. var connectFlagsReader = new ByteReader(connectFlags);
  193. connectFlagsReader.Read(); // Reserved.
  194. var packet = new MqttConnectPacket
  195. {
  196. ProtocolVersion = protocolVersion,
  197. CleanSession = connectFlagsReader.Read()
  198. };
  199. var willFlag = connectFlagsReader.Read();
  200. var willQoS = connectFlagsReader.Read(2);
  201. var willRetain = connectFlagsReader.Read();
  202. var passwordFlag = connectFlagsReader.Read();
  203. var usernameFlag = connectFlagsReader.Read();
  204. packet.KeepAlivePeriod = stream.ReadUInt16();
  205. packet.ClientId = stream.ReadStringWithLengthPrefix();
  206. if (willFlag)
  207. {
  208. packet.WillMessage = new MqttApplicationMessage
  209. {
  210. Topic = stream.ReadStringWithLengthPrefix(),
  211. Payload = stream.ReadWithLengthPrefix(),
  212. QualityOfServiceLevel = (MqttQualityOfServiceLevel)willQoS,
  213. Retain = willRetain
  214. };
  215. }
  216. if (usernameFlag)
  217. {
  218. packet.Username = stream.ReadStringWithLengthPrefix();
  219. }
  220. if (passwordFlag)
  221. {
  222. packet.Password = stream.ReadStringWithLengthPrefix();
  223. }
  224. ValidateConnectPacket(packet);
  225. return packet;
  226. }
  227. private static MqttBasePacket DeserializeSubAck(Stream stream, MqttPacketHeader header)
  228. {
  229. var packet = new MqttSubAckPacket
  230. {
  231. PacketIdentifier = stream.ReadUInt16()
  232. };
  233. while (stream.Position != header.BodyLength)
  234. {
  235. packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)stream.ReadByte());
  236. }
  237. return packet;
  238. }
  239. private MqttBasePacket DeserializeConnAck(Stream stream)
  240. {
  241. var packet = new MqttConnAckPacket();
  242. var firstByteReader = new ByteReader(stream.ReadByte());
  243. if (ProtocolVersion == MqttProtocolVersion.V311)
  244. {
  245. packet.IsSessionPresent = firstByteReader.Read();
  246. }
  247. packet.ConnectReturnCode = (MqttConnectReturnCode)stream.ReadByte();
  248. return packet;
  249. }
  250. private static void ValidateConnectPacket(MqttConnectPacket packet)
  251. {
  252. if (packet == null) throw new ArgumentNullException(nameof(packet));
  253. if (string.IsNullOrEmpty(packet.ClientId) && !packet.CleanSession)
  254. {
  255. throw new MqttProtocolViolationException("CleanSession must be set if ClientId is empty [MQTT-3.1.3-7].");
  256. }
  257. }
  258. private static void ValidatePublishPacket(MqttPublishPacket packet)
  259. {
  260. if (packet == null) throw new ArgumentNullException(nameof(packet));
  261. if (packet.QualityOfServiceLevel == 0 && packet.Dup)
  262. {
  263. throw new MqttProtocolViolationException("Dup flag must be false for QoS 0 packets [MQTT-3.3.1-2].");
  264. }
  265. }
  266. private byte Serialize(MqttConnectPacket packet, Stream stream)
  267. {
  268. ValidateConnectPacket(packet);
  269. // Write variable header
  270. if (ProtocolVersion == MqttProtocolVersion.V311)
  271. {
  272. stream.WriteWithLengthPrefix(ProtocolVersionV311Name);
  273. stream.WriteByte(0x04); // 3.1.2.2 Protocol Level 4
  274. }
  275. else
  276. {
  277. stream.WriteWithLengthPrefix(ProtocolVersionV310Name);
  278. stream.WriteByte(0x03); // Protocol Level 3
  279. }
  280. var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
  281. connectFlags.Write(false); // Reserved
  282. connectFlags.Write(packet.CleanSession);
  283. connectFlags.Write(packet.WillMessage != null);
  284. if (packet.WillMessage != null)
  285. {
  286. connectFlags.Write((int)packet.WillMessage.QualityOfServiceLevel, 2);
  287. connectFlags.Write(packet.WillMessage.Retain);
  288. }
  289. else
  290. {
  291. connectFlags.Write(0, 2);
  292. connectFlags.Write(false);
  293. }
  294. if (packet.Password != null && packet.Username == null)
  295. {
  296. throw new MqttProtocolViolationException("If the User Name Flag is set to 0, the Password Flag MUST be set to 0 [MQTT-3.1.2-22].");
  297. }
  298. connectFlags.Write(packet.Password != null);
  299. connectFlags.Write(packet.Username != null);
  300. stream.Write(connectFlags);
  301. stream.Write(packet.KeepAlivePeriod);
  302. stream.WriteWithLengthPrefix(packet.ClientId);
  303. if (packet.WillMessage != null)
  304. {
  305. stream.WriteWithLengthPrefix(packet.WillMessage.Topic);
  306. stream.WriteWithLengthPrefix(packet.WillMessage.Payload);
  307. }
  308. if (packet.Username != null)
  309. {
  310. stream.WriteWithLengthPrefix(packet.Username);
  311. }
  312. if (packet.Password != null)
  313. {
  314. stream.WriteWithLengthPrefix(packet.Password);
  315. }
  316. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Connect);
  317. }
  318. private byte Serialize(MqttConnAckPacket packet, Stream stream)
  319. {
  320. if (ProtocolVersion == MqttProtocolVersion.V310)
  321. {
  322. stream.WriteByte(0);
  323. }
  324. else if (ProtocolVersion == MqttProtocolVersion.V311)
  325. {
  326. var connectAcknowledgeFlags = new ByteWriter();
  327. connectAcknowledgeFlags.Write(packet.IsSessionPresent);
  328. stream.Write(connectAcknowledgeFlags);
  329. }
  330. else
  331. {
  332. throw new MqttProtocolViolationException("Protocol version not supported.");
  333. }
  334. stream.WriteByte((byte)packet.ConnectReturnCode);
  335. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck);
  336. }
  337. private static byte Serialize(MqttPubRelPacket packet, Stream stream)
  338. {
  339. if (!packet.PacketIdentifier.HasValue)
  340. {
  341. throw new MqttProtocolViolationException("PubRel packet has no packet identifier.");
  342. }
  343. stream.Write(packet.PacketIdentifier.Value);
  344. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02);
  345. }
  346. private static byte Serialize(MqttPublishPacket packet, Stream stream)
  347. {
  348. ValidatePublishPacket(packet);
  349. stream.WriteWithLengthPrefix(packet.Topic);
  350. if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  351. {
  352. if (!packet.PacketIdentifier.HasValue)
  353. {
  354. throw new MqttProtocolViolationException("Publish packet has no packet identifier.");
  355. }
  356. stream.Write(packet.PacketIdentifier.Value);
  357. }
  358. else
  359. {
  360. if (packet.PacketIdentifier > 0)
  361. {
  362. throw new MqttProtocolViolationException("Packet identifier must be empty if QoS == 0 [MQTT-2.3.1-5].");
  363. }
  364. }
  365. if (packet.Payload?.Length > 0)
  366. {
  367. stream.Write(packet.Payload, 0, packet.Payload.Length);
  368. }
  369. byte fixedHeader = 0;
  370. if (packet.Retain)
  371. {
  372. fixedHeader |= 0x01;
  373. }
  374. fixedHeader |= (byte)((byte)packet.QualityOfServiceLevel << 1);
  375. if (packet.Dup)
  376. {
  377. fixedHeader |= 0x08;
  378. }
  379. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader);
  380. }
  381. private static byte Serialize(MqttPubAckPacket packet, Stream stream)
  382. {
  383. if (!packet.PacketIdentifier.HasValue)
  384. {
  385. throw new MqttProtocolViolationException("PubAck packet has no packet identifier.");
  386. }
  387. stream.Write(packet.PacketIdentifier.Value);
  388. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck);
  389. }
  390. private static byte Serialize(MqttPubRecPacket packet, Stream stream)
  391. {
  392. if (!packet.PacketIdentifier.HasValue)
  393. {
  394. throw new MqttProtocolViolationException("PubRec packet has no packet identifier.");
  395. }
  396. stream.Write(packet.PacketIdentifier.Value);
  397. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec);
  398. }
  399. private static byte Serialize(MqttPubCompPacket packet, Stream stream)
  400. {
  401. if (!packet.PacketIdentifier.HasValue)
  402. {
  403. throw new MqttProtocolViolationException("PubComp packet has no packet identifier.");
  404. }
  405. stream.Write(packet.PacketIdentifier.Value);
  406. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp);
  407. }
  408. private static byte Serialize(MqttSubscribePacket packet, Stream stream)
  409. {
  410. if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");
  411. if (!packet.PacketIdentifier.HasValue)
  412. {
  413. throw new MqttProtocolViolationException("Subscribe packet has no packet identifier.");
  414. }
  415. stream.Write(packet.PacketIdentifier.Value);
  416. if (packet.TopicFilters?.Count > 0)
  417. {
  418. foreach (var topicFilter in packet.TopicFilters)
  419. {
  420. stream.WriteWithLengthPrefix(topicFilter.Topic);
  421. stream.WriteByte((byte)topicFilter.QualityOfServiceLevel);
  422. }
  423. }
  424. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Subscribe, 0x02);
  425. }
  426. private static byte Serialize(MqttSubAckPacket packet, Stream stream)
  427. {
  428. if (!packet.PacketIdentifier.HasValue)
  429. {
  430. throw new MqttProtocolViolationException("SubAck packet has no packet identifier.");
  431. }
  432. stream.Write(packet.PacketIdentifier.Value);
  433. if (packet.SubscribeReturnCodes?.Any() == true)
  434. {
  435. foreach (var packetSubscribeReturnCode in packet.SubscribeReturnCodes)
  436. {
  437. stream.WriteByte((byte)packetSubscribeReturnCode);
  438. }
  439. }
  440. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.SubAck);
  441. }
  442. private static byte Serialize(MqttUnsubscribePacket packet, Stream stream)
  443. {
  444. if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");
  445. if (!packet.PacketIdentifier.HasValue)
  446. {
  447. throw new MqttProtocolViolationException("Unsubscribe packet has no packet identifier.");
  448. }
  449. stream.Write(packet.PacketIdentifier.Value);
  450. if (packet.TopicFilters?.Any() == true)
  451. {
  452. foreach (var topicFilter in packet.TopicFilters)
  453. {
  454. stream.WriteWithLengthPrefix(topicFilter);
  455. }
  456. }
  457. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
  458. }
  459. private static byte Serialize(MqttUnsubAckPacket packet, Stream stream)
  460. {
  461. if (!packet.PacketIdentifier.HasValue)
  462. {
  463. throw new MqttProtocolViolationException("UnsubAck packet has no packet identifier.");
  464. }
  465. stream.Write(packet.PacketIdentifier.Value);
  466. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck);
  467. }
  468. private static byte SerializeEmptyPacket(MqttControlPacketType type)
  469. {
  470. return MqttPacketWriter.BuildFixedHeader(type);
  471. }
  472. }
  473. }