You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

709 lines
24 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading;
  7. using Microsoft.VisualStudio.TestTools.UnitTesting;
  8. using MQTTnet.Adapter;
  9. using MQTTnet.Diagnostics;
  10. using MQTTnet.Diagnostics.Logger;
  11. using MQTTnet.Exceptions;
  12. using MQTTnet.Formatter;
  13. using MQTTnet.Formatter.V3;
  14. using MQTTnet.Formatter.V5;
  15. using MQTTnet.Internal;
  16. using MQTTnet.Packets;
  17. using MQTTnet.Protocol;
  18. using MQTTnet.Tests.Extensions;
  19. namespace MQTTnet.Tests
  20. {
  21. [TestClass]
  22. public class MqttPacketSerializer_Tests
  23. {
  24. [TestMethod]
  25. public void DetectVersionFromMqttConnectPacket()
  26. {
  27. var packet = new MqttConnectPacket
  28. {
  29. ClientId = "XYZ",
  30. Password = Encoding.UTF8.GetBytes("PASS"),
  31. Username = "USER",
  32. KeepAlivePeriod = 123,
  33. CleanSession = true
  34. };
  35. Assert.AreEqual(
  36. MqttProtocolVersion.V310,
  37. DeserializeAndDetectVersion(new MqttPacketFormatterAdapter(new MqttPacketWriter()), Serialize(packet, MqttProtocolVersion.V310)));
  38. Assert.AreEqual(
  39. MqttProtocolVersion.V311,
  40. DeserializeAndDetectVersion(new MqttPacketFormatterAdapter(new MqttPacketWriter()), Serialize(packet, MqttProtocolVersion.V311)));
  41. Assert.AreEqual(
  42. MqttProtocolVersion.V500,
  43. DeserializeAndDetectVersion(new MqttPacketFormatterAdapter(new MqttPacketWriter()), Serialize(packet, MqttProtocolVersion.V500)));
  44. var adapter = new MqttPacketFormatterAdapter(new MqttPacketWriter());
  45. var ex = Assert.ThrowsException<MqttProtocolViolationException>(() => DeserializeAndDetectVersion(adapter, WriterFactory().AddMqttHeader(MqttControlPacketType.Connect, new byte[0])));
  46. Assert.AreEqual("CONNECT packet must have at least 7 bytes.", ex.Message);
  47. ex = Assert.ThrowsException<MqttProtocolViolationException>(() => DeserializeAndDetectVersion(adapter, WriterFactory().AddMqttHeader(MqttControlPacketType.Connect, new byte[7])));
  48. Assert.AreEqual("Protocol '' not supported.", ex.Message);
  49. ex = Assert.ThrowsException<MqttProtocolViolationException>(() => DeserializeAndDetectVersion(adapter, WriterFactory().AddMqttHeader(MqttControlPacketType.Connect, new byte[] { 255, 255, 0, 0, 0, 0, 0 })));
  50. Assert.AreEqual("Expected at least 65537 bytes but there are only 7 bytes", ex.Message);
  51. }
  52. [TestMethod]
  53. public void SerializeV310_MqttConnectPacket()
  54. {
  55. var p = new MqttConnectPacket
  56. {
  57. ClientId = "XYZ",
  58. Password = Encoding.UTF8.GetBytes("PASS"),
  59. Username = "USER",
  60. KeepAlivePeriod = 123,
  61. CleanSession = true
  62. };
  63. SerializeAndCompare(p, "EB0ABk1RSXNkcAPCAHsAA1hZWgAEVVNFUgAEUEFTUw==", MqttProtocolVersion.V310);
  64. }
  65. [TestMethod]
  66. public void SerializeV311_MqttConnectPacket()
  67. {
  68. var p = new MqttConnectPacket
  69. {
  70. ClientId = "XYZ",
  71. Password = Encoding.UTF8.GetBytes("PASS"),
  72. Username = "USER",
  73. KeepAlivePeriod = 123,
  74. CleanSession = true
  75. };
  76. SerializeAndCompare(p, "EBsABE1RVFQEwgB7AANYWVoABFVTRVIABFBBU1M=");
  77. }
  78. [TestMethod]
  79. public void SerializeV311_MqttConnectPacketWithWillMessage()
  80. {
  81. var p = new MqttConnectPacket
  82. {
  83. ClientId = "XYZ",
  84. Password = Encoding.UTF8.GetBytes("PASS"),
  85. Username = "USER",
  86. KeepAlivePeriod = 123,
  87. CleanSession = true,
  88. WillMessage = new MqttApplicationMessage
  89. {
  90. Topic = "My/last/will",
  91. Payload = Encoding.UTF8.GetBytes("Good byte."),
  92. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
  93. Retain = true
  94. }
  95. };
  96. SerializeAndCompare(p, "EDUABE1RVFQE7gB7AANYWVoADE15L2xhc3Qvd2lsbAAKR29vZCBieXRlLgAEVVNFUgAEUEFTUw==");
  97. }
  98. [TestMethod]
  99. public void DeserializeV311_MqttConnectPacket()
  100. {
  101. var p = new MqttConnectPacket
  102. {
  103. ClientId = "XYZ",
  104. Password = Encoding.UTF8.GetBytes("PASS"),
  105. Username = "USER",
  106. KeepAlivePeriod = 123,
  107. CleanSession = true
  108. };
  109. DeserializeAndCompare(p, "EBsABE1RVFQEwgB7AANYWVoABFVTRVIABFBBU1M=");
  110. }
  111. [TestMethod]
  112. public void DeserializeV311_MqttConnectPacketWithWillMessage()
  113. {
  114. var p = new MqttConnectPacket
  115. {
  116. ClientId = "XYZ",
  117. Password = Encoding.UTF8.GetBytes("PASS"),
  118. Username = "USER",
  119. KeepAlivePeriod = 123,
  120. CleanSession = true,
  121. WillMessage = new MqttApplicationMessage
  122. {
  123. Topic = "My/last/will",
  124. Payload = Encoding.UTF8.GetBytes("Good byte."),
  125. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
  126. Retain = true
  127. }
  128. };
  129. DeserializeAndCompare(p, "EDUABE1RVFQE7gB7AANYWVoADE15L2xhc3Qvd2lsbAAKR29vZCBieXRlLgAEVVNFUgAEUEFTUw==");
  130. }
  131. [TestMethod]
  132. public void SerializeV311_MqttConnAckPacket()
  133. {
  134. var p = new MqttConnAckPacket
  135. {
  136. IsSessionPresent = true,
  137. ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized
  138. };
  139. SerializeAndCompare(p, "IAIBBQ==");
  140. }
  141. [TestMethod]
  142. public void SerializeV310_MqttConnAckPacket()
  143. {
  144. var p = new MqttConnAckPacket
  145. {
  146. ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized
  147. };
  148. SerializeAndCompare(p, "IAIABQ==", MqttProtocolVersion.V310);
  149. }
  150. [TestMethod]
  151. public void DeserializeV311_MqttConnAckPacket()
  152. {
  153. var p = new MqttConnAckPacket
  154. {
  155. IsSessionPresent = true,
  156. ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized
  157. };
  158. DeserializeAndCompare(p, "IAIBBQ==");
  159. }
  160. [TestMethod]
  161. public void DeserializeV310_MqttConnAckPacket()
  162. {
  163. var p = new MqttConnAckPacket
  164. {
  165. ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized
  166. };
  167. DeserializeAndCompare(p, "IAIABQ==", MqttProtocolVersion.V310);
  168. }
  169. [TestMethod]
  170. public void Serialize_LargePacket()
  171. {
  172. var serializer = new MqttV311PacketFormatter(WriterFactory());
  173. const int payloadLength = 80000;
  174. var payload = new byte[payloadLength];
  175. var value = 0;
  176. for (var i = 0; i < payloadLength; i++)
  177. {
  178. if (value > 255)
  179. {
  180. value = 0;
  181. }
  182. payload[i] = (byte)value;
  183. }
  184. var publishPacket = new MqttPublishPacket
  185. {
  186. Topic = "abcdefghijklmnopqrstuvwxyz0123456789",
  187. Payload = payload
  188. };
  189. var publishPacketCopy = Roundtrip(publishPacket);
  190. //var buffer = serializer.Encode(publishPacket);
  191. //var testChannel = new TestMqttChannel(new MemoryStream(buffer.Array, buffer.Offset, buffer.Count));
  192. //var header = new MqttPacketReader(testChannel).ReadFixedHeaderAsync(
  193. // new byte[2],
  194. // CancellationToken.None).GetAwaiter().GetResult().FixedHeader;
  195. //var eof = buffer.Offset + buffer.Count;
  196. //var receivedPacket = new ReceivedMqttPacket(
  197. // header.Flags,
  198. // new MqttPacketBodyReader(buffer.Array, eof - header.RemainingLength, buffer.Count + buffer.Offset),
  199. // 0);
  200. //var packet = (MqttPublishPacket)serializer.Decode(receivedPacket);
  201. Assert.AreEqual(publishPacket.Topic, publishPacketCopy.Topic);
  202. Assert.IsTrue(publishPacket.Payload.SequenceEqual(publishPacketCopy.Payload));
  203. }
  204. [TestMethod]
  205. public void SerializeV311_MqttDisconnectPacket()
  206. {
  207. SerializeAndCompare(new MqttDisconnectPacket(), "4AA=");
  208. }
  209. [TestMethod]
  210. public void SerializeV311_MqttPingReqPacket()
  211. {
  212. SerializeAndCompare(new MqttPingReqPacket(), "wAA=");
  213. }
  214. [TestMethod]
  215. public void SerializeV311_MqttPingRespPacket()
  216. {
  217. SerializeAndCompare(new MqttPingRespPacket(), "0AA=");
  218. }
  219. [TestMethod]
  220. public void SerializeV311_MqttPublishPacket()
  221. {
  222. var p = new MqttPublishPacket
  223. {
  224. PacketIdentifier = 123,
  225. Dup = true,
  226. Retain = true,
  227. Payload = Encoding.ASCII.GetBytes("HELLO"),
  228. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
  229. Topic = "A/B/C"
  230. };
  231. SerializeAndCompare(p, "Ow4ABUEvQi9DAHtIRUxMTw==");
  232. }
  233. [TestMethod]
  234. public void SerializeV500_MqttPublishPacket()
  235. {
  236. var prop = new MqttPublishPacketProperties { UserProperties = new List<MqttUserProperty>() };
  237. prop.ResponseTopic = "/Response";
  238. prop.UserProperties.Add(new MqttUserProperty("Foo", "Bar"));
  239. var p = new MqttPublishPacket
  240. {
  241. PacketIdentifier = 123,
  242. Dup = true,
  243. Retain = true,
  244. Payload = Encoding.ASCII.GetBytes("HELLO"),
  245. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
  246. Topic = "A/B/C",
  247. Properties = prop
  248. };
  249. var deserialized = Roundtrip(p, MqttProtocolVersion.V500);
  250. Assert.AreEqual(prop.ResponseTopic, deserialized.Properties.ResponseTopic);
  251. Assert.IsTrue(deserialized.Properties.UserProperties.Any(x => x.Name == "Foo"));
  252. }
  253. [TestMethod]
  254. public void SerializeV500_MqttPublishPacket_CorrelationData()
  255. {
  256. var data = "123456789";
  257. var req = new MqttApplicationMessageBuilder()
  258. .WithTopic("Foo")
  259. .WithResponseTopic($"_")
  260. .WithCorrelationData(Guid.NewGuid().ToByteArray())
  261. .WithPayload(data)
  262. .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
  263. .Build();
  264. var p = new MqttV500DataConverter().CreatePublishPacket(req);
  265. var deserialized = Roundtrip(p, MqttProtocolVersion.V500);
  266. Assert.IsTrue(p.Payload.SequenceEqual(deserialized.Payload));
  267. }
  268. [TestMethod]
  269. public void DeserializeV311_MqttPublishPacket()
  270. {
  271. var p = new MqttPublishPacket
  272. {
  273. PacketIdentifier = 123,
  274. Dup = true,
  275. Retain = true,
  276. Payload = Encoding.ASCII.GetBytes("HELLO"),
  277. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
  278. Topic = "A/B/C"
  279. };
  280. DeserializeAndCompare(p, "Ow4ABUEvQi9DAHtIRUxMTw==");
  281. }
  282. [TestMethod]
  283. public void DeserializeV311_MqttPublishPacket_Qos1()
  284. {
  285. var p = new MqttPublishPacket
  286. {
  287. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce,
  288. };
  289. var p2 = Roundtrip(p);
  290. Assert.AreEqual(p.QualityOfServiceLevel, p2.QualityOfServiceLevel);
  291. Assert.AreEqual(p.Dup, p2.Dup);
  292. }
  293. [TestMethod]
  294. public void DeserializeV311_MqttPublishPacket_Qos2()
  295. {
  296. var p = new MqttPublishPacket
  297. {
  298. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
  299. PacketIdentifier = 1
  300. };
  301. var p2 = Roundtrip(p);
  302. Assert.AreEqual(p.QualityOfServiceLevel, p2.QualityOfServiceLevel);
  303. Assert.AreEqual(p.Dup, p2.Dup);
  304. }
  305. [TestMethod]
  306. public void DeserializeV311_MqttPublishPacket_Qos3()
  307. {
  308. var p = new MqttPublishPacket
  309. {
  310. QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce,
  311. PacketIdentifier = 1
  312. };
  313. var p2 = Roundtrip(p);
  314. Assert.AreEqual(p.QualityOfServiceLevel, p2.QualityOfServiceLevel);
  315. Assert.AreEqual(p.Dup, p2.Dup);
  316. }
  317. [TestMethod]
  318. public void DeserializeV311_MqttPublishPacket_DupFalse()
  319. {
  320. var p = new MqttPublishPacket
  321. {
  322. Dup = false,
  323. };
  324. var p2 = Roundtrip(p);
  325. Assert.AreEqual(p.Dup, p2.Dup);
  326. }
  327. [TestMethod]
  328. public void SerializeV311_MqttPubAckPacket()
  329. {
  330. var p = new MqttPubAckPacket
  331. {
  332. PacketIdentifier = 123
  333. };
  334. SerializeAndCompare(p, "QAIAew==");
  335. }
  336. [TestMethod]
  337. public void DeserializeV311_MqttPubAckPacket()
  338. {
  339. var p = new MqttPubAckPacket
  340. {
  341. PacketIdentifier = 123
  342. };
  343. DeserializeAndCompare(p, "QAIAew==");
  344. }
  345. [TestMethod]
  346. public void SerializeV311_MqttPubRecPacket()
  347. {
  348. var p = new MqttPubRecPacket
  349. {
  350. PacketIdentifier = 123
  351. };
  352. SerializeAndCompare(p, "UAIAew==");
  353. }
  354. [TestMethod]
  355. public void DeserializeV311_MqttPubRecPacket()
  356. {
  357. var p = new MqttPubRecPacket
  358. {
  359. PacketIdentifier = 123
  360. };
  361. DeserializeAndCompare(p, "UAIAew==");
  362. }
  363. [TestMethod]
  364. public void SerializeV311_MqttPubRelPacket()
  365. {
  366. var p = new MqttPubRelPacket
  367. {
  368. PacketIdentifier = 123
  369. };
  370. SerializeAndCompare(p, "YgIAew==");
  371. }
  372. [TestMethod]
  373. public void DeserializeV311_MqttPubRelPacket()
  374. {
  375. var p = new MqttPubRelPacket
  376. {
  377. PacketIdentifier = 123
  378. };
  379. DeserializeAndCompare(p, "YgIAew==");
  380. }
  381. [TestMethod]
  382. public void SerializeV311_MqttPubCompPacket()
  383. {
  384. var p = new MqttPubCompPacket
  385. {
  386. PacketIdentifier = 123
  387. };
  388. SerializeAndCompare(p, "cAIAew==");
  389. }
  390. [TestMethod]
  391. public void DeserializeV311_MqttPubCompPacket()
  392. {
  393. var p = new MqttPubCompPacket
  394. {
  395. PacketIdentifier = 123
  396. };
  397. DeserializeAndCompare(p, "cAIAew==");
  398. }
  399. [TestMethod]
  400. public void SerializeV311_MqttSubscribePacket()
  401. {
  402. var p = new MqttSubscribePacket
  403. {
  404. PacketIdentifier = 123
  405. };
  406. p.TopicFilters.Add(new MqttTopicFilter { Topic = "A/B/C", QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce });
  407. p.TopicFilters.Add(new MqttTopicFilter { Topic = "1/2/3", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  408. p.TopicFilters.Add(new MqttTopicFilter { Topic = "x/y/z", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  409. SerializeAndCompare(p, "ghoAewAFQS9CL0MCAAUxLzIvMwEABXgveS96AA==");
  410. }
  411. [TestMethod]
  412. public void DeserializeV311_MqttSubscribePacket()
  413. {
  414. var p = new MqttSubscribePacket
  415. {
  416. PacketIdentifier = 123
  417. };
  418. p.TopicFilters.Add(new MqttTopicFilter { Topic = "A/B/C", QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce });
  419. p.TopicFilters.Add(new MqttTopicFilter { Topic = "1/2/3", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  420. p.TopicFilters.Add(new MqttTopicFilter { Topic = "x/y/z", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  421. DeserializeAndCompare(p, "ghoAewAFQS9CL0MCAAUxLzIvMwEABXgveS96AA==");
  422. }
  423. [TestMethod]
  424. public void SerializeV311_MqttSubAckPacket()
  425. {
  426. var p = new MqttSubAckPacket
  427. {
  428. PacketIdentifier = 123
  429. };
  430. p.ReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS0);
  431. p.ReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1);
  432. p.ReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS2);
  433. p.ReturnCodes.Add(MqttSubscribeReturnCode.Failure);
  434. SerializeAndCompare(p, "kAYAewABAoA=");
  435. }
  436. [TestMethod]
  437. public void DeserializeV311_MqttSubAckPacket()
  438. {
  439. var p = new MqttSubAckPacket
  440. {
  441. PacketIdentifier = 123
  442. };
  443. p.ReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS0);
  444. p.ReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1);
  445. p.ReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS2);
  446. p.ReturnCodes.Add(MqttSubscribeReturnCode.Failure);
  447. DeserializeAndCompare(p, "kAYAewABAoA=");
  448. }
  449. [TestMethod]
  450. public void SerializeV311_MqttUnsubscribePacket()
  451. {
  452. var p = new MqttUnsubscribePacket
  453. {
  454. PacketIdentifier = 123
  455. };
  456. p.TopicFilters.Add("A/B/C");
  457. p.TopicFilters.Add("1/2/3");
  458. p.TopicFilters.Add("x/y/z");
  459. SerializeAndCompare(p, "ohcAewAFQS9CL0MABTEvMi8zAAV4L3kveg==");
  460. }
  461. [TestMethod]
  462. public void DeserializeV311_MqttUnsubscribePacket()
  463. {
  464. var p = new MqttUnsubscribePacket
  465. {
  466. PacketIdentifier = 123
  467. };
  468. p.TopicFilters.Add("A/B/C");
  469. p.TopicFilters.Add("1/2/3");
  470. p.TopicFilters.Add("x/y/z");
  471. DeserializeAndCompare(p, "ohcAewAFQS9CL0MABTEvMi8zAAV4L3kveg==");
  472. }
  473. [TestMethod]
  474. public void SerializeV311_MqttUnsubAckPacket()
  475. {
  476. var p = new MqttUnsubAckPacket
  477. {
  478. PacketIdentifier = 123
  479. };
  480. SerializeAndCompare(p, "sAIAew==");
  481. }
  482. [TestMethod]
  483. public void DeserializeV311_MqttUnsubAckPacket()
  484. {
  485. var p = new MqttUnsubAckPacket
  486. {
  487. PacketIdentifier = 123
  488. };
  489. DeserializeAndCompare(p, "sAIAew==");
  490. }
  491. void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
  492. {
  493. Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(Serialize(packet, protocolVersion)));
  494. }
  495. byte[] Serialize(MqttBasePacket packet, MqttProtocolVersion protocolVersion)
  496. {
  497. return MqttPacketFormatterAdapter.GetMqttPacketFormatter(protocolVersion, WriterFactory()).Encode(packet).ToArray();
  498. }
  499. protected virtual IMqttPacketWriter WriterFactory()
  500. {
  501. return new MqttPacketWriter();
  502. }
  503. protected virtual IMqttPacketBodyReader ReaderFactory(byte[] data)
  504. {
  505. return new MqttPacketBodyReader(data, 0, data.Length);
  506. }
  507. void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
  508. {
  509. var writer = WriterFactory();
  510. var serializer = MqttPacketFormatterAdapter.GetMqttPacketFormatter(protocolVersion, writer);
  511. var buffer1 = serializer.Encode(packet);
  512. using (var headerStream = new MemoryStream(buffer1.ToArray()))
  513. {
  514. var channel = new TestMqttChannel(headerStream);
  515. var adapter = new MqttChannelAdapter(channel, new MqttPacketFormatterAdapter(protocolVersion, writer), null, new MqttNetEventLogger());
  516. var receivedPacket = adapter.ReceivePacketAsync(CancellationToken.None).GetAwaiter().GetResult();
  517. var buffer2 = serializer.Encode(receivedPacket);
  518. Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(buffer2.ToArray()));
  519. //adapter.ReceivePacketAsync(CancellationToken.None);
  520. //var fixedHeader = new byte[2];
  521. //var header = new MqttPacketReader(channel).ReadFixedHeaderAsync(fixedHeader, CancellationToken.None).GetAwaiter().GetResult().FixedHeader;
  522. //using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, header.RemainingLength))
  523. //{
  524. // var reader = ReaderFactory(bodyStream.ToArray());
  525. // var deserializedPacket = serializer.Decode(new ReceivedMqttPacket(header.Flags, reader, 0));
  526. // var buffer2 = serializer.Encode(deserializedPacket);
  527. // Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(Join(buffer2)));
  528. //}
  529. }
  530. }
  531. TPacket Roundtrip<TPacket>(TPacket packet, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
  532. where TPacket : MqttBasePacket
  533. {
  534. var writer = WriterFactory();
  535. var serializer = MqttPacketFormatterAdapter.GetMqttPacketFormatter(protocolVersion, writer);
  536. var buffer = serializer.Encode(packet);
  537. var channel = new TestMqttChannel(buffer.ToArray());
  538. var adapter = new MqttChannelAdapter(channel, new MqttPacketFormatterAdapter(protocolVersion, writer), null, new MqttNetEventLogger());
  539. return (TPacket)adapter.ReceivePacketAsync(CancellationToken.None).GetAwaiter().GetResult();
  540. //using (var headerStream = new MemoryStream(buffer1.ToArray()))
  541. //{
  542. // //var fixedHeader = new byte[2];
  543. // //var header = new MqttPacketReader(channel).ReadFixedHeaderAsync(fixedHeader, CancellationToken.None).GetAwaiter().GetResult().FixedHeader;
  544. // //using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, (int)header.RemainingLength))
  545. // //{
  546. // // var reader = ReaderFactory(bodyStream.ToArray());
  547. // // return (T)serializer.Decode(new ReceivedMqttPacket(header.Flags, reader, 0));
  548. // //}
  549. //}
  550. }
  551. MqttProtocolVersion DeserializeAndDetectVersion(MqttPacketFormatterAdapter packetFormatterAdapter, byte[] buffer)
  552. {
  553. var channel = new TestMqttChannel(buffer);
  554. var adapter = new MqttChannelAdapter(channel, packetFormatterAdapter, null, new MqttNetEventLogger());
  555. adapter.ReceivePacketAsync(CancellationToken.None).GetAwaiter().GetResult();
  556. return packetFormatterAdapter.ProtocolVersion;
  557. //using (var headerStream = new MemoryStream(buffer))
  558. //{
  559. // //var fixedHeader = new byte[2];
  560. // //var header = new MqttPacketReader(channel).ReadFixedHeaderAsync(fixedHeader, CancellationToken.None).GetAwaiter().GetResult().FixedHeader;
  561. // //using (var bodyStream = new MemoryStream(buffer, (int)headerStream.Position, (int)header.RemainingLength))
  562. // //{
  563. // // var reader = ReaderFactory(bodyStream.ToArray());
  564. // // var packet = new ReceivedMqttPacket(header.Flags, reader, 0);
  565. // // packetFormatterAdapter.DetectProtocolVersion(packet);
  566. // // return adapter.ProtocolVersion;
  567. // //}
  568. //}
  569. }
  570. }
  571. }