You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

244 lines
8.3 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Net.Sockets;
  5. using System.Runtime.InteropServices;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using MQTTnet.Channel;
  9. using MQTTnet.Diagnostics;
  10. using MQTTnet.Exceptions;
  11. using MQTTnet.Internal;
  12. using MQTTnet.Packets;
  13. using MQTTnet.Serializer;
  14. namespace MQTTnet.Adapter
  15. {
  16. public class MqttChannelAdapter : IMqttChannelAdapter
  17. {
  18. private const uint ErrorOperationAborted = 0x800703E3;
  19. private const int ReadBufferSize = 4096; // TODO: Move buffer size to config
  20. private readonly IMqttNetChildLogger _logger;
  21. private readonly IMqttChannel _channel;
  22. private bool _isDisposed;
  23. public MqttChannelAdapter(IMqttChannel channel, IMqttPacketSerializer serializer, IMqttNetChildLogger logger)
  24. {
  25. if (logger == null) throw new ArgumentNullException(nameof(logger));
  26. _channel = channel ?? throw new ArgumentNullException(nameof(channel));
  27. PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
  28. _logger = logger.CreateChildLogger(nameof(MqttChannelAdapter));
  29. }
  30. public string Endpoint => _channel.Endpoint;
  31. public IMqttPacketSerializer PacketSerializer { get; }
  32. public event EventHandler ReadingPacketStarted;
  33. public event EventHandler ReadingPacketCompleted;
  34. public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  35. {
  36. ThrowIfDisposed();
  37. _logger.Verbose("Connecting [Timeout={0}]", timeout);
  38. return ExecuteAndWrapExceptionAsync(() =>
  39. Internal.TaskExtensions.TimeoutAfter(ct => _channel.ConnectAsync(ct), timeout, cancellationToken));
  40. }
  41. public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  42. {
  43. ThrowIfDisposed();
  44. _logger.Verbose("Disconnecting [Timeout={0}]", timeout);
  45. return ExecuteAndWrapExceptionAsync(() =>
  46. Internal.TaskExtensions.TimeoutAfter(ct => _channel.DisconnectAsync(), timeout, cancellationToken));
  47. }
  48. public async Task SendPacketsAsync(TimeSpan timeout, IEnumerable<MqttBasePacket> packets, CancellationToken cancellationToken)
  49. {
  50. ThrowIfDisposed();
  51. foreach (var packet in packets)
  52. {
  53. if (packet == null)
  54. {
  55. continue;
  56. }
  57. await SendPacketAsync(timeout, cancellationToken, packet).ConfigureAwait(false);
  58. }
  59. }
  60. private Task SendPacketAsync(TimeSpan timeout, CancellationToken cancellationToken, MqttBasePacket packet)
  61. {
  62. return ExecuteAndWrapExceptionAsync(() =>
  63. {
  64. _logger.Verbose("TX >>> {0} [Timeout={1}]", packet, timeout);
  65. var packetData = PacketSerializer.Serialize(packet);
  66. return Internal.TaskExtensions.TimeoutAfter(ct => _channel.WriteAsync(
  67. packetData.Array,
  68. packetData.Offset,
  69. packetData.Count,
  70. ct), timeout, cancellationToken);
  71. });
  72. }
  73. public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
  74. {
  75. ThrowIfDisposed();
  76. MqttBasePacket packet = null;
  77. await ExecuteAndWrapExceptionAsync(async () =>
  78. {
  79. ReceivedMqttPacket receivedMqttPacket = null;
  80. try
  81. {
  82. if (timeout > TimeSpan.Zero)
  83. {
  84. receivedMqttPacket = await Internal.TaskExtensions.TimeoutAfter(ct => ReceiveAsync(_channel, ct), timeout, cancellationToken).ConfigureAwait(false);
  85. }
  86. else
  87. {
  88. receivedMqttPacket = await ReceiveAsync(_channel, cancellationToken).ConfigureAwait(false);
  89. }
  90. if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested)
  91. {
  92. throw new TaskCanceledException();
  93. }
  94. packet = PacketSerializer.Deserialize(receivedMqttPacket);
  95. if (packet == null)
  96. {
  97. throw new MqttProtocolViolationException("Received malformed packet.");
  98. }
  99. _logger.Verbose("RX <<< {0}", packet);
  100. }
  101. finally
  102. {
  103. receivedMqttPacket?.Dispose();
  104. }
  105. }).ConfigureAwait(false);
  106. return packet;
  107. }
  108. private async Task<ReceivedMqttPacket> ReceiveAsync(IMqttChannel channel, CancellationToken cancellationToken)
  109. {
  110. var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(channel, cancellationToken).ConfigureAwait(false);
  111. ReadingPacketStarted?.Invoke(this, EventArgs.Empty);
  112. try
  113. {
  114. var bodyLength = await MqttPacketReader.ReadBodyLengthAsync(channel, cancellationToken).ConfigureAwait(false);
  115. if (bodyLength == 0)
  116. {
  117. return new ReceivedMqttPacket(fixedHeader, null);
  118. }
  119. var body = new MemoryStream(bodyLength);
  120. var buffer = new byte[Math.Min(ReadBufferSize, bodyLength)];
  121. while (body.Length < bodyLength)
  122. {
  123. var bytesLeft = bodyLength - (int)body.Length;
  124. if (bytesLeft > buffer.Length)
  125. {
  126. bytesLeft = buffer.Length;
  127. }
  128. var readBytesCount = await channel.ReadAsync(buffer, 0, bytesLeft, cancellationToken).ConfigureAwait(false);
  129. if (readBytesCount <= 0)
  130. {
  131. ExceptionHelper.ThrowGracefulSocketClose();
  132. }
  133. // Here is no need to await because internally only an array is used and no real I/O operation is made.
  134. // Using async here will only generate overhead.
  135. body.Write(buffer, 0, readBytesCount);
  136. }
  137. body.Seek(0L, SeekOrigin.Begin);
  138. return new ReceivedMqttPacket(fixedHeader, body);
  139. }
  140. finally
  141. {
  142. ReadingPacketCompleted?.Invoke(this, EventArgs.Empty);
  143. }
  144. }
  145. private static async Task ExecuteAndWrapExceptionAsync(Func<Task> action)
  146. {
  147. try
  148. {
  149. await action().ConfigureAwait(false);
  150. }
  151. catch (TaskCanceledException)
  152. {
  153. throw;
  154. }
  155. catch (OperationCanceledException)
  156. {
  157. throw;
  158. }
  159. catch (MqttCommunicationTimedOutException)
  160. {
  161. throw;
  162. }
  163. catch (MqttCommunicationException)
  164. {
  165. throw;
  166. }
  167. catch (COMException comException)
  168. {
  169. if ((uint)comException.HResult == ErrorOperationAborted)
  170. {
  171. throw new OperationCanceledException();
  172. }
  173. throw new MqttCommunicationException(comException);
  174. }
  175. catch (IOException exception)
  176. {
  177. if (exception.InnerException is SocketException socketException)
  178. {
  179. if (socketException.SocketErrorCode == SocketError.ConnectionAborted)
  180. {
  181. throw new OperationCanceledException();
  182. }
  183. }
  184. throw new MqttCommunicationException(exception);
  185. }
  186. catch (Exception exception)
  187. {
  188. throw new MqttCommunicationException(exception);
  189. }
  190. }
  191. public void Dispose()
  192. {
  193. _isDisposed = true;
  194. _channel?.Dispose();
  195. }
  196. private void ThrowIfDisposed()
  197. {
  198. if (_isDisposed)
  199. {
  200. throw new ObjectDisposedException(nameof(MqttChannelAdapter));
  201. }
  202. }
  203. }
  204. }