25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

MqttPacketSerializer.cs 19 KiB

7 년 전
7 년 전
7 년 전
7 년 전

  1. using MQTTnet.Exceptions;
  2. using MQTTnet.Packets;
  3. using MQTTnet.Protocol;
  4. using System;
  5. using System.Collections.Generic;
  6. using System.IO;
  7. using System.Linq;
  8. using System.Text;
  9. namespace MQTTnet.Serializer
  10. {
  11. public sealed class MqttPacketSerializer : IMqttPacketSerializer
  12. {
  13. private static byte[] ProtocolVersionV311Name { get; } = Encoding.UTF8.GetBytes("MQTT");
  14. private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs");
  15. public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
  16. public ICollection<ArraySegment<byte>> Serialize(MqttBasePacket packet)
  17. {
  18. if (packet == null) throw new ArgumentNullException(nameof(packet));
  19. using (var stream = new MemoryStream(128))
  20. using (var writer = new MqttPacketWriter(stream))
  21. {
  22. var fixedHeader = SerializePacket(packet, writer);
  23. var remainingLength = (int)stream.Length;
  24. writer.Write(fixedHeader);
  25. MqttPacketWriter.WriteRemainingLength(remainingLength, writer);
  26. var headerLength = (int)stream.Length - remainingLength;
  27. #if NET461 || NET452 || NETSTANDARD2_0
  28. var buffer = stream.GetBuffer();
  29. #else
  30. var buffer = stream.ToArray();
  31. #endif
  32. return new List<ArraySegment<byte>>
  33. {
  34. new ArraySegment<byte>(buffer, remainingLength, headerLength),
  35. new ArraySegment<byte>(buffer, 0, remainingLength)
  36. };
  37. }
  38. }
  39. public MqttBasePacket Deserialize(MqttPacketHeader header, byte[] body)
  40. {
  41. if (header == null) throw new ArgumentNullException(nameof(header));
  42. if (body == null) throw new ArgumentNullException(nameof(body));
  43. using (var bodyStream = new MemoryStream(body))
  44. using (var reader = new MqttPacketReader(header, bodyStream))
  45. {
  46. return Deserialize(header, reader);
  47. }
  48. }
  49. private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter writer)
  50. {
  51. switch (packet)
  52. {
  53. case MqttConnectPacket connectPacket: return Serialize(connectPacket, writer);
  54. case MqttConnAckPacket connAckPacket: return Serialize(connAckPacket, writer);
  55. case MqttDisconnectPacket _: return SerializeEmptyPacket(MqttControlPacketType.Disconnect);
  56. case MqttPingReqPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingReq);
  57. case MqttPingRespPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingResp);
  58. case MqttPublishPacket publishPacket: return Serialize(publishPacket, writer);
  59. case MqttPubAckPacket pubAckPacket: return Serialize(pubAckPacket, writer);
  60. case MqttPubRecPacket pubRecPacket: return Serialize(pubRecPacket, writer);
  61. case MqttPubRelPacket pubRelPacket: return Serialize(pubRelPacket, writer);
  62. case MqttPubCompPacket pubCompPacket: return Serialize(pubCompPacket, writer);
  63. case MqttSubscribePacket subscribePacket: return Serialize(subscribePacket, writer);
  64. case MqttSubAckPacket subAckPacket: return Serialize(subAckPacket, writer);
  65. case MqttUnsubscribePacket unsubscribePacket: return Serialize(unsubscribePacket, writer);
  66. case MqttUnsubAckPacket unsubAckPacket: return Serialize(unsubAckPacket, writer);
  67. default: throw new MqttProtocolViolationException("Packet type invalid.");
  68. }
  69. }
  70. private static MqttBasePacket Deserialize(MqttPacketHeader header, MqttPacketReader reader)
  71. {
  72. switch (header.ControlPacketType)
  73. {
  74. case MqttControlPacketType.Connect: return DeserializeConnect(reader);
  75. case MqttControlPacketType.ConnAck: return DeserializeConnAck(reader);
  76. case MqttControlPacketType.Disconnect: return new MqttDisconnectPacket();
  77. case MqttControlPacketType.Publish: return DeserializePublish(reader, header);
  78. case MqttControlPacketType.PubAck: return DeserializePubAck(reader);
  79. case MqttControlPacketType.PubRec: return DeserializePubRec(reader);
  80. case MqttControlPacketType.PubRel: return DeserializePubRel(reader);
  81. case MqttControlPacketType.PubComp: return DeserializePubComp(reader);
  82. case MqttControlPacketType.PingReq: return new MqttPingReqPacket();
  83. case MqttControlPacketType.PingResp: return new MqttPingRespPacket();
  84. case MqttControlPacketType.Subscribe: return DeserializeSubscribe(reader);
  85. case MqttControlPacketType.SubAck: return DeserializeSubAck(reader);
  86. case MqttControlPacketType.Unsubscibe: return DeserializeUnsubscribe(reader);
  87. case MqttControlPacketType.UnsubAck: return DeserializeUnsubAck(reader);
  88. default: throw new MqttProtocolViolationException($"Packet type ({(int)header.ControlPacketType}) not supported.");
  89. }
  90. }
  91. private static MqttBasePacket DeserializeUnsubAck(MqttPacketReader reader)
  92. {
  93. return new MqttUnsubAckPacket
  94. {
  95. PacketIdentifier = reader.ReadUInt16()
  96. };
  97. }
  98. private static MqttBasePacket DeserializePubComp(MqttPacketReader reader)
  99. {
  100. return new MqttPubCompPacket
  101. {
  102. PacketIdentifier = reader.ReadUInt16()
  103. };
  104. }
  105. private static MqttBasePacket DeserializePubRel(MqttPacketReader reader)
  106. {
  107. return new MqttPubRelPacket
  108. {
  109. PacketIdentifier = reader.ReadUInt16()
  110. };
  111. }
  112. private static MqttBasePacket DeserializePubRec(MqttPacketReader reader)
  113. {
  114. return new MqttPubRecPacket
  115. {
  116. PacketIdentifier = reader.ReadUInt16()
  117. };
  118. }
  119. private static MqttBasePacket DeserializePubAck(MqttPacketReader reader)
  120. {
  121. return new MqttPubAckPacket
  122. {
  123. PacketIdentifier = reader.ReadUInt16()
  124. };
  125. }
  126. private static MqttBasePacket DeserializeUnsubscribe(MqttPacketReader reader)
  127. {
  128. var packet = new MqttUnsubscribePacket
  129. {
  130. PacketIdentifier = reader.ReadUInt16(),
  131. };
  132. while (!reader.EndOfRemainingData)
  133. {
  134. packet.TopicFilters.Add(reader.ReadStringWithLengthPrefix());
  135. }
  136. return packet;
  137. }
  138. private static MqttBasePacket DeserializeSubscribe(MqttPacketReader reader)
  139. {
  140. var packet = new MqttSubscribePacket
  141. {
  142. PacketIdentifier = reader.ReadUInt16()
  143. };
  144. while (!reader.EndOfRemainingData)
  145. {
  146. packet.TopicFilters.Add(new TopicFilter(
  147. reader.ReadStringWithLengthPrefix(),
  148. (MqttQualityOfServiceLevel)reader.ReadByte()));
  149. }
  150. return packet;
  151. }
  152. private static MqttBasePacket DeserializePublish(MqttPacketReader reader, MqttPacketHeader mqttPacketHeader)
  153. {
  154. var fixedHeader = new ByteReader(mqttPacketHeader.FixedHeader);
  155. var retain = fixedHeader.Read();
  156. var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2);
  157. var dup = fixedHeader.Read();
  158. var topic = reader.ReadStringWithLengthPrefix();
  159. ushort packetIdentifier = 0;
  160. if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  161. {
  162. packetIdentifier = reader.ReadUInt16();
  163. }
  164. var packet = new MqttPublishPacket
  165. {
  166. Retain = retain,
  167. QualityOfServiceLevel = qualityOfServiceLevel,
  168. Dup = dup,
  169. Topic = topic,
  170. Payload = reader.ReadRemainingData(),
  171. PacketIdentifier = packetIdentifier
  172. };
  173. return packet;
  174. }
  175. private static MqttBasePacket DeserializeConnect(MqttPacketReader reader)
  176. {
  177. reader.ReadBytes(2); // Skip 2 bytes
  178. MqttProtocolVersion protocolVersion;
  179. var protocolName = reader.ReadBytes(4);
  180. if (protocolName.SequenceEqual(ProtocolVersionV310Name))
  181. {
  182. reader.ReadBytes(2);
  183. protocolVersion = MqttProtocolVersion.V310;
  184. }
  185. else if (protocolName.SequenceEqual(ProtocolVersionV311Name))
  186. {
  187. protocolVersion = MqttProtocolVersion.V311;
  188. }
  189. else
  190. {
  191. throw new MqttProtocolViolationException("Protocol name is not supported.");
  192. }
  193. reader.ReadByte(); // Skip protocol level
  194. var connectFlags = reader.ReadByte();
  195. var connectFlagsReader = new ByteReader(connectFlags);
  196. connectFlagsReader.Read(); // Reserved.
  197. var packet = new MqttConnectPacket
  198. {
  199. ProtocolVersion = protocolVersion,
  200. CleanSession = connectFlagsReader.Read()
  201. };
  202. var willFlag = connectFlagsReader.Read();
  203. var willQoS = connectFlagsReader.Read(2);
  204. var willRetain = connectFlagsReader.Read();
  205. var passwordFlag = connectFlagsReader.Read();
  206. var usernameFlag = connectFlagsReader.Read();
  207. packet.KeepAlivePeriod = reader.ReadUInt16();
  208. packet.ClientId = reader.ReadStringWithLengthPrefix();
  209. if (willFlag)
  210. {
  211. packet.WillMessage = new MqttApplicationMessage
  212. {
  213. Topic = reader.ReadStringWithLengthPrefix(),
  214. Payload = reader.ReadWithLengthPrefix(),
  215. QualityOfServiceLevel = (MqttQualityOfServiceLevel)willQoS,
  216. Retain = willRetain
  217. };
  218. }
  219. if (usernameFlag)
  220. {
  221. packet.Username = reader.ReadStringWithLengthPrefix();
  222. }
  223. if (passwordFlag)
  224. {
  225. packet.Password = reader.ReadStringWithLengthPrefix();
  226. }
  227. ValidateConnectPacket(packet);
  228. return packet;
  229. }
  230. private static MqttBasePacket DeserializeSubAck(MqttPacketReader reader)
  231. {
  232. var packet = new MqttSubAckPacket
  233. {
  234. PacketIdentifier = reader.ReadUInt16()
  235. };
  236. while (!reader.EndOfRemainingData)
  237. {
  238. packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)reader.ReadByte());
  239. }
  240. return packet;
  241. }
  242. private static MqttBasePacket DeserializeConnAck(MqttPacketReader reader)
  243. {
  244. var variableHeader1 = reader.ReadByte();
  245. var variableHeader2 = reader.ReadByte();
  246. var packet = new MqttConnAckPacket
  247. {
  248. IsSessionPresent = new ByteReader(variableHeader1).Read(),
  249. ConnectReturnCode = (MqttConnectReturnCode)variableHeader2
  250. };
  251. return packet;
  252. }
  253. private static void ValidateConnectPacket(MqttConnectPacket packet)
  254. {
  255. if (packet == null) throw new ArgumentNullException(nameof(packet));
  256. if (string.IsNullOrEmpty(packet.ClientId) && !packet.CleanSession)
  257. {
  258. throw new MqttProtocolViolationException("CleanSession must be set if ClientId is empty [MQTT-3.1.3-7].");
  259. }
  260. }
  261. private static void ValidatePublishPacket(MqttPublishPacket packet)
  262. {
  263. if (packet == null) throw new ArgumentNullException(nameof(packet));
  264. if (packet.QualityOfServiceLevel == 0 && packet.Dup)
  265. {
  266. throw new MqttProtocolViolationException("Dup flag must be false for QoS 0 packets [MQTT-3.3.1-2].");
  267. }
  268. }
  269. private byte Serialize(MqttConnectPacket packet, MqttPacketWriter writer)
  270. {
  271. ValidateConnectPacket(packet);
  272. // Write variable header
  273. writer.Write(0x00, 0x04); // 3.1.2.1 Protocol Name
  274. if (ProtocolVersion == MqttProtocolVersion.V311)
  275. {
  276. writer.Write(ProtocolVersionV311Name);
  277. writer.Write(0x04); // 3.1.2.2 Protocol Level (4)
  278. }
  279. else
  280. {
  281. writer.Write(ProtocolVersionV310Name);
  282. writer.Write(0x64, 0x70, 0x03); // Protocol Level (0x03)
  283. }
  284. var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
  285. connectFlags.Write(false); // Reserved
  286. connectFlags.Write(packet.CleanSession);
  287. connectFlags.Write(packet.WillMessage != null);
  288. if (packet.WillMessage != null)
  289. {
  290. connectFlags.Write((int)packet.WillMessage.QualityOfServiceLevel, 2);
  291. connectFlags.Write(packet.WillMessage.Retain);
  292. }
  293. else
  294. {
  295. connectFlags.Write(0, 2);
  296. connectFlags.Write(false);
  297. }
  298. connectFlags.Write(packet.Password != null);
  299. connectFlags.Write(packet.Username != null);
  300. writer.Write(connectFlags);
  301. writer.Write(packet.KeepAlivePeriod);
  302. writer.WriteWithLengthPrefix(packet.ClientId);
  303. if (packet.WillMessage != null)
  304. {
  305. writer.WriteWithLengthPrefix(packet.WillMessage.Topic);
  306. writer.WriteWithLengthPrefix(packet.WillMessage.Payload);
  307. }
  308. if (packet.Username != null)
  309. {
  310. writer.WriteWithLengthPrefix(packet.Username);
  311. }
  312. if (packet.Password != null)
  313. {
  314. writer.WriteWithLengthPrefix(packet.Password);
  315. }
  316. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Connect);
  317. }
  318. private byte Serialize(MqttConnAckPacket packet, MqttPacketWriter writer)
  319. {
  320. var connectAcknowledgeFlags = new ByteWriter();
  321. if (ProtocolVersion == MqttProtocolVersion.V311)
  322. {
  323. connectAcknowledgeFlags.Write(packet.IsSessionPresent);
  324. }
  325. writer.Write(connectAcknowledgeFlags);
  326. writer.Write((byte)packet.ConnectReturnCode);
  327. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck);
  328. }
  329. private static byte Serialize(MqttPubRelPacket packet, MqttPacketWriter writer)
  330. {
  331. writer.Write(packet.PacketIdentifier);
  332. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02);
  333. }
  334. private static byte Serialize(MqttPublishPacket packet, MqttPacketWriter writer)
  335. {
  336. ValidatePublishPacket(packet);
  337. writer.WriteWithLengthPrefix(packet.Topic);
  338. if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  339. {
  340. writer.Write(packet.PacketIdentifier);
  341. }
  342. else
  343. {
  344. if (packet.PacketIdentifier > 0)
  345. {
  346. throw new MqttProtocolViolationException("Packet identifier must be empty if QoS == 0 [MQTT-2.3.1-5].");
  347. }
  348. }
  349. if (packet.Payload?.Length > 0)
  350. {
  351. writer.Write(packet.Payload);
  352. }
  353. byte fixedHeader = 0;
  354. if (packet.Retain)
  355. {
  356. fixedHeader |= 0x01;
  357. }
  358. fixedHeader |= (byte)((byte)packet.QualityOfServiceLevel << 1);
  359. if (packet.Dup)
  360. {
  361. fixedHeader |= 0x08;
  362. }
  363. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader);
  364. }
  365. private static byte Serialize(MqttPubAckPacket packet, MqttPacketWriter writer)
  366. {
  367. writer.Write(packet.PacketIdentifier);
  368. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck);
  369. }
  370. private static byte Serialize(MqttPubRecPacket packet, MqttPacketWriter writer)
  371. {
  372. writer.Write(packet.PacketIdentifier);
  373. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec);
  374. }
  375. private static byte Serialize(MqttPubCompPacket packet, MqttPacketWriter writer)
  376. {
  377. writer.Write(packet.PacketIdentifier);
  378. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp);
  379. }
  380. private static byte Serialize(MqttSubscribePacket packet, MqttPacketWriter writer)
  381. {
  382. if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");
  383. writer.Write(packet.PacketIdentifier);
  384. if (packet.TopicFilters?.Count > 0)
  385. {
  386. foreach (var topicFilter in packet.TopicFilters)
  387. {
  388. writer.WriteWithLengthPrefix(topicFilter.Topic);
  389. writer.Write((byte)topicFilter.QualityOfServiceLevel);
  390. }
  391. }
  392. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Subscribe, 0x02);
  393. }
  394. private static byte Serialize(MqttSubAckPacket packet, MqttPacketWriter writer)
  395. {
  396. writer.Write(packet.PacketIdentifier);
  397. if (packet.SubscribeReturnCodes?.Any() == true)
  398. {
  399. foreach (var packetSubscribeReturnCode in packet.SubscribeReturnCodes)
  400. {
  401. writer.Write((byte)packetSubscribeReturnCode);
  402. }
  403. }
  404. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.SubAck);
  405. }
  406. private static byte Serialize(MqttUnsubscribePacket packet, MqttPacketWriter writer)
  407. {
  408. if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");
  409. writer.Write(packet.PacketIdentifier);
  410. if (packet.TopicFilters?.Any() == true)
  411. {
  412. foreach (var topicFilter in packet.TopicFilters)
  413. {
  414. writer.WriteWithLengthPrefix(topicFilter);
  415. }
  416. }
  417. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
  418. }
  419. private static byte Serialize(IMqttPacketWithIdentifier packet, BinaryWriter writer)
  420. {
  421. writer.Write(packet.PacketIdentifier);
  422. return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck);
  423. }
  424. private static byte SerializeEmptyPacket(MqttControlPacketType type)
  425. {
  426. return MqttPacketWriter.BuildFixedHeader(type);
  427. }
  428. }
  429. }