You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

193 lines
6.4 KiB

  1. using Microsoft.AspNetCore.Connections;
  2. using Microsoft.AspNetCore.Http.Connections.Features;
  3. using MQTTnet.Adapter;
  4. using MQTTnet.AspNetCore.Client.Tcp;
  5. using MQTTnet.Exceptions;
  6. using MQTTnet.Formatter;
  7. using MQTTnet.Packets;
  8. using System;
  9. using System.IO.Pipelines;
  10. using System.Security.Cryptography.X509Certificates;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. using MQTTnet.AspNetCore.Extensions;
  14. using MQTTnet.Internal;
  15. namespace MQTTnet.AspNetCore
  16. {
  17. public sealed class MqttConnectionContext : IMqttChannelAdapter
  18. {
  19. readonly AsyncLock _writerLock = new AsyncLock();
  20. readonly SpanBasedMqttPacketBodyReader _reader;
  21. PipeReader _input;
  22. PipeWriter _output;
  23. public MqttConnectionContext(MqttPacketFormatterAdapter packetFormatterAdapter, ConnectionContext connection)
  24. {
  25. PacketFormatterAdapter = packetFormatterAdapter ?? throw new ArgumentNullException(nameof(packetFormatterAdapter));
  26. Connection = connection ?? throw new ArgumentNullException(nameof(connection));
  27. if (Connection.Transport != null)
  28. {
  29. _input = Connection.Transport.Input;
  30. _output = Connection.Transport.Output;
  31. }
  32. _reader = new SpanBasedMqttPacketBodyReader();
  33. }
  34. public string Endpoint
  35. {
  36. get
  37. {
  38. #if NETCOREAPP3_1
  39. if (Connection?.RemoteEndPoint != null)
  40. {
  41. return Connection.RemoteEndPoint.ToString();
  42. }
  43. #endif
  44. var connection = Http?.HttpContext?.Connection;
  45. if (connection == null)
  46. {
  47. return Connection.ConnectionId;
  48. }
  49. return $"{connection.RemoteIpAddress}:{connection.RemotePort}";
  50. }
  51. }
  52. public bool IsSecureConnection => Http?.HttpContext?.Request?.IsHttps ?? false;
  53. public X509Certificate2 ClientCertificate => Http?.HttpContext?.Connection?.ClientCertificate;
  54. public ConnectionContext Connection { get; }
  55. public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }
  56. public long BytesSent { get; set; }
  57. public long BytesReceived { get; set; }
  58. public bool IsReadingPacket { get; private set; }
  59. IHttpContextFeature Http => Connection.Features.Get<IHttpContextFeature>();
  60. public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  61. {
  62. if (Connection is TcpConnection tcp && !tcp.IsConnected)
  63. {
  64. await tcp.StartAsync().ConfigureAwait(false);
  65. }
  66. _input = Connection.Transport.Input;
  67. _output = Connection.Transport.Output;
  68. }
  69. public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  70. {
  71. _input?.Complete();
  72. _output?.Complete();
  73. return Task.CompletedTask;
  74. }
  75. public async Task<MqttBasePacket> ReceivePacketAsync(CancellationToken cancellationToken)
  76. {
  77. var input = Connection.Transport.Input;
  78. try
  79. {
  80. while (!cancellationToken.IsCancellationRequested)
  81. {
  82. ReadResult readResult;
  83. var readTask = input.ReadAsync(cancellationToken);
  84. if (readTask.IsCompleted)
  85. {
  86. readResult = readTask.Result;
  87. }
  88. else
  89. {
  90. readResult = await readTask.ConfigureAwait(false);
  91. }
  92. var buffer = readResult.Buffer;
  93. var consumed = buffer.Start;
  94. var observed = buffer.Start;
  95. try
  96. {
  97. if (!buffer.IsEmpty)
  98. {
  99. if (PacketFormatterAdapter.TryDecode(_reader, buffer, out var packet, out consumed, out observed, out var received))
  100. {
  101. BytesReceived += received;
  102. return packet;
  103. }
  104. else
  105. {
  106. // we did receive something but the message is not yet complete
  107. IsReadingPacket = true;
  108. }
  109. }
  110. else if (readResult.IsCompleted)
  111. {
  112. throw new MqttCommunicationException("Connection Aborted");
  113. }
  114. }
  115. finally
  116. {
  117. // The buffer was sliced up to where it was consumed, so we can just advance to the start.
  118. // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data
  119. // before yielding the read again.
  120. input.AdvanceTo(consumed, observed);
  121. }
  122. }
  123. }
  124. catch (Exception e)
  125. {
  126. // completing the channel makes sure that there is no more data read after a protocol error
  127. _input?.Complete(e);
  128. _output?.Complete(e);
  129. throw;
  130. }
  131. finally
  132. {
  133. IsReadingPacket = false;
  134. }
  135. cancellationToken.ThrowIfCancellationRequested();
  136. return null;
  137. }
  138. public void ResetStatistics()
  139. {
  140. BytesReceived = 0;
  141. BytesSent = 0;
  142. }
  143. public async Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  144. {
  145. var formatter = PacketFormatterAdapter;
  146. using (await _writerLock.WaitAsync(cancellationToken).ConfigureAwait(false))
  147. {
  148. var buffer = formatter.Encode(packet);
  149. var msg = buffer.AsMemory();
  150. var output = _output;
  151. var result = await output.WriteAsync(msg, cancellationToken).ConfigureAwait(false);
  152. if (result.IsCompleted)
  153. {
  154. BytesSent += msg.Length;
  155. }
  156. PacketFormatterAdapter.FreeBuffer();
  157. }
  158. }
  159. public void Dispose()
  160. {
  161. }
  162. }
  163. }