You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

212 lines
6.7 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Core.Channel;
  7. using MQTTnet.Core.Exceptions;
  8. using MQTTnet.Core.Internal;
  9. using MQTTnet.Core.Packets;
  10. using MQTTnet.Core.Serializer;
  11. using Microsoft.Extensions.Logging;
  12. namespace MQTTnet.Core.Adapter
  13. {
  14. public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter
  15. {
  16. private readonly ILogger<MqttChannelCommunicationAdapter> _logger;
  17. private readonly IMqttCommunicationChannel _channel;
  18. private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
  19. public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer, ILogger<MqttChannelCommunicationAdapter> logger)
  20. {
  21. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  22. _channel = channel ?? throw new ArgumentNullException(nameof(channel));
  23. PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
  24. }
  25. public IMqttPacketSerializer PacketSerializer { get; }
  26. public async Task ConnectAsync(TimeSpan timeout)
  27. {
  28. try
  29. {
  30. await _channel.ConnectAsync().TimeoutAfter(timeout).ConfigureAwait(false);
  31. }
  32. catch (TaskCanceledException)
  33. {
  34. throw;
  35. }
  36. catch (OperationCanceledException)
  37. {
  38. throw;
  39. }
  40. catch (MqttCommunicationTimedOutException)
  41. {
  42. throw;
  43. }
  44. catch (MqttCommunicationException)
  45. {
  46. throw;
  47. }
  48. catch (Exception exception)
  49. {
  50. throw new MqttCommunicationException(exception);
  51. }
  52. }
  53. public async Task DisconnectAsync(TimeSpan timeout)
  54. {
  55. try
  56. {
  57. await _channel.DisconnectAsync().TimeoutAfter(timeout).ConfigureAwait(false);
  58. }
  59. catch (TaskCanceledException)
  60. {
  61. throw;
  62. }
  63. catch (OperationCanceledException)
  64. {
  65. throw;
  66. }
  67. catch (MqttCommunicationTimedOutException)
  68. {
  69. throw;
  70. }
  71. catch (MqttCommunicationException)
  72. {
  73. throw;
  74. }
  75. catch (Exception exception)
  76. {
  77. throw new MqttCommunicationException(exception);
  78. }
  79. }
  80. public async Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable<MqttBasePacket> packets)
  81. {
  82. await _semaphore.WaitAsync(cancellationToken);
  83. try
  84. {
  85. foreach (var packet in packets)
  86. {
  87. if (packet == null)
  88. {
  89. continue;
  90. }
  91. _logger.LogInformation("TX >>> {0} [Timeout={1}]", packet, timeout);
  92. var writeBuffer = PacketSerializer.Serialize(packet);
  93. await _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false);
  94. }
  95. if (timeout > TimeSpan.Zero)
  96. {
  97. await _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
  98. }
  99. else
  100. {
  101. await _channel.SendStream.FlushAsync(cancellationToken).ConfigureAwait(false);
  102. }
  103. }
  104. catch (TaskCanceledException)
  105. {
  106. throw;
  107. }
  108. catch (OperationCanceledException)
  109. {
  110. throw;
  111. }
  112. catch (MqttCommunicationTimedOutException)
  113. {
  114. throw;
  115. }
  116. catch (MqttCommunicationException)
  117. {
  118. throw;
  119. }
  120. catch (Exception exception)
  121. {
  122. throw new MqttCommunicationException(exception);
  123. }
  124. finally
  125. {
  126. _semaphore.Release();
  127. }
  128. }
  129. public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
  130. {
  131. try
  132. {
  133. ReceivedMqttPacket receivedMqttPacket;
  134. if (timeout > TimeSpan.Zero)
  135. {
  136. receivedMqttPacket = await ReceiveAsync(_channel.RawReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
  137. }
  138. else
  139. {
  140. receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).ConfigureAwait(false);
  141. }
  142. if (cancellationToken.IsCancellationRequested)
  143. {
  144. throw new TaskCanceledException();
  145. }
  146. var packet = PacketSerializer.Deserialize(receivedMqttPacket);
  147. if (packet == null)
  148. {
  149. throw new MqttProtocolViolationException("Received malformed packet.");
  150. }
  151. _logger.LogInformation("RX <<< {0}", packet);
  152. return packet;
  153. }
  154. catch (TaskCanceledException)
  155. {
  156. throw;
  157. }
  158. catch (OperationCanceledException)
  159. {
  160. throw;
  161. }
  162. catch (MqttCommunicationTimedOutException)
  163. {
  164. throw;
  165. }
  166. catch (MqttCommunicationException)
  167. {
  168. throw;
  169. }
  170. catch (Exception exception)
  171. {
  172. throw new MqttCommunicationException(exception);
  173. }
  174. }
  175. private static async Task<ReceivedMqttPacket> ReceiveAsync(Stream stream, CancellationToken cancellationToken)
  176. {
  177. var header = MqttPacketReader.ReadHeaderFromSource(stream, cancellationToken);
  178. if (header.BodyLength == 0)
  179. {
  180. return new ReceivedMqttPacket(header, new MemoryStream(0));
  181. }
  182. var body = new byte[header.BodyLength];
  183. var offset = 0;
  184. do
  185. {
  186. var readBytesCount = await stream.ReadAsync(body, offset, body.Length - offset, cancellationToken).ConfigureAwait(false);
  187. offset += readBytesCount;
  188. } while (offset < header.BodyLength);
  189. return new ReceivedMqttPacket(header, new MemoryStream(body, 0, body.Length));
  190. }
  191. }
  192. }