You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

220 lines
6.9 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Core.Channel;
  7. using MQTTnet.Core.Exceptions;
  8. using MQTTnet.Core.Internal;
  9. using MQTTnet.Core.Packets;
  10. using MQTTnet.Core.Serializer;
  11. using Microsoft.Extensions.Logging;
  12. namespace MQTTnet.Core.Adapter
  13. {
  14. public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter
  15. {
  16. private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
  17. private readonly ILogger<MqttChannelCommunicationAdapter> _logger;
  18. private readonly IMqttCommunicationChannel _channel;
  19. public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer, ILogger<MqttChannelCommunicationAdapter> logger)
  20. {
  21. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  22. _channel = channel ?? throw new ArgumentNullException(nameof(channel));
  23. PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
  24. }
  25. public IMqttPacketSerializer PacketSerializer { get; }
  26. public async Task ConnectAsync(TimeSpan timeout)
  27. {
  28. try
  29. {
  30. await _channel.ConnectAsync().TimeoutAfter(timeout).ConfigureAwait(false);
  31. }
  32. catch (TaskCanceledException)
  33. {
  34. throw;
  35. }
  36. catch (OperationCanceledException)
  37. {
  38. throw;
  39. }
  40. catch (MqttCommunicationTimedOutException)
  41. {
  42. throw;
  43. }
  44. catch (MqttCommunicationException)
  45. {
  46. throw;
  47. }
  48. catch (Exception exception)
  49. {
  50. throw new MqttCommunicationException(exception);
  51. }
  52. }
  53. public async Task DisconnectAsync(TimeSpan timeout)
  54. {
  55. try
  56. {
  57. await _channel.DisconnectAsync().TimeoutAfter(timeout).ConfigureAwait(false);
  58. }
  59. catch (TaskCanceledException)
  60. {
  61. throw;
  62. }
  63. catch (OperationCanceledException)
  64. {
  65. throw;
  66. }
  67. catch (MqttCommunicationTimedOutException)
  68. {
  69. throw;
  70. }
  71. catch (MqttCommunicationException)
  72. {
  73. throw;
  74. }
  75. catch (Exception exception)
  76. {
  77. throw new MqttCommunicationException(exception);
  78. }
  79. }
  80. public async Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable<MqttBasePacket> packets)
  81. {
  82. try
  83. {
  84. await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
  85. foreach (var packet in packets)
  86. {
  87. if (packet == null)
  88. {
  89. continue;
  90. }
  91. _logger.LogInformation("TX >>> {0} [Timeout={1}]", packet, timeout);
  92. var writeBuffer = PacketSerializer.Serialize(packet);
  93. await _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false);
  94. }
  95. if (timeout > TimeSpan.Zero)
  96. {
  97. await _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
  98. }
  99. else
  100. {
  101. await _channel.SendStream.FlushAsync(cancellationToken).ConfigureAwait(false);
  102. }
  103. }
  104. catch (TaskCanceledException)
  105. {
  106. throw;
  107. }
  108. catch (OperationCanceledException)
  109. {
  110. throw;
  111. }
  112. catch (MqttCommunicationTimedOutException)
  113. {
  114. throw;
  115. }
  116. catch (MqttCommunicationException)
  117. {
  118. throw;
  119. }
  120. catch (Exception exception)
  121. {
  122. throw new MqttCommunicationException(exception);
  123. }
  124. finally
  125. {
  126. _semaphore.Release();
  127. }
  128. }
  129. public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
  130. {
  131. ReceivedMqttPacket receivedMqttPacket = null;
  132. try
  133. {
  134. if (timeout > TimeSpan.Zero)
  135. {
  136. receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
  137. }
  138. else
  139. {
  140. receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).ConfigureAwait(false);
  141. }
  142. if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested)
  143. {
  144. throw new TaskCanceledException();
  145. }
  146. var packet = PacketSerializer.Deserialize(receivedMqttPacket);
  147. if (packet == null)
  148. {
  149. throw new MqttProtocolViolationException("Received malformed packet.");
  150. }
  151. _logger.LogInformation("RX <<< {0}", packet);
  152. return packet;
  153. }
  154. catch (TaskCanceledException)
  155. {
  156. throw;
  157. }
  158. catch (OperationCanceledException)
  159. {
  160. throw;
  161. }
  162. catch (MqttCommunicationTimedOutException)
  163. {
  164. throw;
  165. }
  166. catch (MqttCommunicationException)
  167. {
  168. throw;
  169. }
  170. catch (Exception exception)
  171. {
  172. throw new MqttCommunicationException(exception);
  173. }
  174. finally
  175. {
  176. receivedMqttPacket?.Dispose();
  177. }
  178. }
  179. private static async Task<ReceivedMqttPacket> ReceiveAsync(Stream stream, CancellationToken cancellationToken)
  180. {
  181. var header = MqttPacketReader.ReadHeaderFromSource(stream, cancellationToken);
  182. if (header == null)
  183. {
  184. return null;
  185. }
  186. if (header.BodyLength == 0)
  187. {
  188. return new ReceivedMqttPacket(header, new MemoryStream(0));
  189. }
  190. var body = new byte[header.BodyLength];
  191. var offset = 0;
  192. do
  193. {
  194. var readBytesCount = await stream.ReadAsync(body, offset, body.Length - offset, cancellationToken).ConfigureAwait(false);
  195. offset += readBytesCount;
  196. } while (offset < header.BodyLength);
  197. return new ReceivedMqttPacket(header, new MemoryStream(body, 0, body.Length));
  198. }
  199. }
  200. }