You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

140 rivejä
4.1 KiB

  1. using System;
  2. using System.IO;
  3. using System.Text;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Channel;
  6. using MQTTnet.Core.Exceptions;
  7. using MQTTnet.Core.Protocol;
  8. namespace MQTTnet.Core.Serializer
  9. {
  10. public sealed class MqttPacketReader : IDisposable
  11. {
  12. private readonly MemoryStream _remainingData = new MemoryStream(1024);
  13. private readonly IMqttCommunicationChannel _source;
  14. private int _remainingLength;
  15. public MqttPacketReader(IMqttCommunicationChannel source)
  16. {
  17. _source = source ?? throw new ArgumentNullException(nameof(source));
  18. }
  19. public MqttControlPacketType ControlPacketType { get; private set; }
  20. public byte FixedHeader { get; private set; }
  21. public bool EndOfRemainingData => _remainingData.Position == _remainingData.Length;
  22. public async Task ReadToEndAsync()
  23. {
  24. await ReadFixedHeaderAsync().ConfigureAwait(false);
  25. await ReadRemainingLengthAsync().ConfigureAwait(false);
  26. if (_remainingLength == 0)
  27. {
  28. return;
  29. }
  30. var buffer = new byte[_remainingLength];
  31. await ReadFromSourceAsync(buffer).ConfigureAwait(false);
  32. _remainingData.Write(buffer, 0, buffer.Length);
  33. _remainingData.Position = 0;
  34. }
  35. public byte ReadRemainingDataByte()
  36. {
  37. return ReadRemainingData(1)[0];
  38. }
  39. public ushort ReadRemainingDataUShort()
  40. {
  41. var buffer = ReadRemainingData(2);
  42. var temp = buffer[0];
  43. buffer[0] = buffer[1];
  44. buffer[1] = temp;
  45. return BitConverter.ToUInt16(buffer, 0);
  46. }
  47. public string ReadRemainingDataStringWithLengthPrefix()
  48. {
  49. var buffer = ReadRemainingDataWithLengthPrefix();
  50. return Encoding.UTF8.GetString(buffer, 0, buffer.Length);
  51. }
  52. public byte[] ReadRemainingDataWithLengthPrefix()
  53. {
  54. var length = ReadRemainingDataUShort();
  55. return ReadRemainingData(length);
  56. }
  57. public byte[] ReadRemainingData()
  58. {
  59. return ReadRemainingData(_remainingLength - (int)_remainingData.Position);
  60. }
  61. public byte[] ReadRemainingData(int length)
  62. {
  63. var buffer = new byte[length];
  64. _remainingData.Read(buffer, 0, buffer.Length);
  65. return buffer;
  66. }
  67. private async Task ReadRemainingLengthAsync()
  68. {
  69. // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
  70. var multiplier = 1;
  71. var value = 0;
  72. byte encodedByte;
  73. do
  74. {
  75. encodedByte = await ReadStreamByteAsync().ConfigureAwait(false);
  76. value += (encodedByte & 127) * multiplier;
  77. multiplier *= 128;
  78. if (multiplier > 128 * 128 * 128)
  79. {
  80. throw new MqttProtocolViolationException("Remaining length is ivalid.");
  81. }
  82. } while ((encodedByte & 128) != 0);
  83. _remainingLength = value;
  84. }
  85. private Task ReadFromSourceAsync(byte[] buffer)
  86. {
  87. try
  88. {
  89. return _source.ReadAsync(buffer);
  90. }
  91. catch (Exception exception)
  92. {
  93. throw new MqttCommunicationException(exception);
  94. }
  95. }
  96. private async Task<byte> ReadStreamByteAsync()
  97. {
  98. var buffer = new byte[1];
  99. await ReadFromSourceAsync(buffer).ConfigureAwait(false);
  100. return buffer[0];
  101. }
  102. private async Task ReadFixedHeaderAsync()
  103. {
  104. FixedHeader = await ReadStreamByteAsync().ConfigureAwait(false);
  105. var byteReader = new ByteReader(FixedHeader);
  106. byteReader.Read(4);
  107. ControlPacketType = (MqttControlPacketType)byteReader.Read(4);
  108. }
  109. public void Dispose()
  110. {
  111. _remainingData?.Dispose();
  112. }
  113. }
  114. }