You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

192 lines
6.6 KiB

  1. using Microsoft.AspNetCore.Connections;
  2. using Microsoft.AspNetCore.Http.Connections.Features;
  3. using MQTTnet.Adapter;
  4. using MQTTnet.AspNetCore.Client.Tcp;
  5. using MQTTnet.Exceptions;
  6. using MQTTnet.Formatter;
  7. using MQTTnet.Packets;
  8. using System;
  9. using System.IO.Pipelines;
  10. using System.Security.Cryptography.X509Certificates;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. using MQTTnet.AspNetCore.Extensions;
  14. using MQTTnet.Internal;
  15. namespace MQTTnet.AspNetCore
  16. {
  17. public sealed class MqttConnectionContext : IMqttChannelAdapter
  18. {
  19. readonly AsyncLock _writerLock = new AsyncLock();
  20. readonly SpanBasedMqttPacketBodyReader _reader;
  21. PipeReader _input;
  22. PipeWriter _output;
  23. public MqttConnectionContext(MqttPacketFormatterAdapter packetFormatterAdapter, ConnectionContext connection)
  24. {
  25. PacketFormatterAdapter = packetFormatterAdapter ?? throw new ArgumentNullException(nameof(packetFormatterAdapter));
  26. Connection = connection ?? throw new ArgumentNullException(nameof(connection));
  27. if (Connection.Transport != null)
  28. {
  29. _input = Connection.Transport.Input;
  30. _output = Connection.Transport.Output;
  31. }
  32. _reader = new SpanBasedMqttPacketBodyReader();
  33. }
  34. public string Endpoint
  35. {
  36. get
  37. {
  38. #if NETCOREAPP3_1
  39. if (Connection?.RemoteEndPoint != null)
  40. {
  41. return Connection.RemoteEndPoint.ToString();
  42. }
  43. #endif
  44. var connection = Http?.HttpContext?.Connection;
  45. if (connection == null)
  46. {
  47. return Connection.ConnectionId;
  48. }
  49. return $"{connection.RemoteIpAddress}:{connection.RemotePort}";
  50. }
  51. }
  52. public bool IsSecureConnection => Http?.HttpContext?.Request?.IsHttps ?? false;
  53. public X509Certificate2 ClientCertificate => Http?.HttpContext?.Connection?.ClientCertificate;
  54. public ConnectionContext Connection { get; }
  55. public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }
  56. public long BytesSent { get; set; }
  57. public long BytesReceived { get; set; }
  58. public Action ReadingPacketStartedCallback { get; set; }
  59. public Action ReadingPacketCompletedCallback { get; set; }
  60. IHttpContextFeature Http => Connection.Features.Get<IHttpContextFeature>();
  61. public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  62. {
  63. if (Connection is TcpConnection tcp && !tcp.IsConnected)
  64. {
  65. await tcp.StartAsync().ConfigureAwait(false);
  66. }
  67. _input = Connection.Transport.Input;
  68. _output = Connection.Transport.Output;
  69. }
  70. public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  71. {
  72. _input?.Complete();
  73. _output?.Complete();
  74. return Task.CompletedTask;
  75. }
  76. public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
  77. {
  78. var input = Connection.Transport.Input;
  79. try
  80. {
  81. while (!cancellationToken.IsCancellationRequested)
  82. {
  83. ReadResult readResult;
  84. var readTask = input.ReadAsync(cancellationToken);
  85. if (readTask.IsCompleted)
  86. {
  87. readResult = readTask.Result;
  88. }
  89. else
  90. {
  91. readResult = await readTask.ConfigureAwait(false);
  92. }
  93. var buffer = readResult.Buffer;
  94. var consumed = buffer.Start;
  95. var observed = buffer.Start;
  96. try
  97. {
  98. if (!buffer.IsEmpty)
  99. {
  100. if (PacketFormatterAdapter.TryDecode(_reader, buffer, out var packet, out consumed, out observed, out var received))
  101. {
  102. BytesReceived += received;
  103. return packet;
  104. }
  105. else
  106. {
  107. // we did receive something but the message is not yet complete
  108. ReadingPacketStartedCallback?.Invoke();
  109. }
  110. }
  111. else if (readResult.IsCompleted)
  112. {
  113. throw new MqttCommunicationException("Connection Aborted");
  114. }
  115. }
  116. finally
  117. {
  118. // The buffer was sliced up to where it was consumed, so we can just advance to the start.
  119. // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data
  120. // before yielding the read again.
  121. input.AdvanceTo(consumed, observed);
  122. }
  123. }
  124. }
  125. catch (Exception e)
  126. {
  127. // completing the cannels makes sure that there is no more data read after a protocol error
  128. _input?.Complete(e);
  129. _output?.Complete(e);
  130. throw;
  131. }
  132. finally
  133. {
  134. ReadingPacketCompletedCallback?.Invoke();
  135. }
  136. cancellationToken.ThrowIfCancellationRequested();
  137. return null;
  138. }
  139. public void ResetStatistics()
  140. {
  141. BytesReceived = 0;
  142. BytesSent = 0;
  143. }
  144. public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
  145. {
  146. var formatter = PacketFormatterAdapter;
  147. using (await _writerLock.WaitAsync(cancellationToken).ConfigureAwait(false))
  148. {
  149. var buffer = formatter.Encode(packet);
  150. var msg = buffer.AsMemory();
  151. var output = _output;
  152. var result = await output.WriteAsync(msg, cancellationToken).ConfigureAwait(false);
  153. if (result.IsCompleted)
  154. {
  155. BytesSent += msg.Length;
  156. }
  157. PacketFormatterAdapter.FreeBuffer();
  158. }
  159. }
  160. public void Dispose()
  161. {
  162. }
  163. }
  164. }