You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

189 lines
6.4 KiB

  1. using Microsoft.AspNetCore.Connections;
  2. using Microsoft.AspNetCore.Http.Connections.Features;
  3. using MQTTnet.Adapter;
  4. using MQTTnet.AspNetCore.Client.Tcp;
  5. using MQTTnet.Exceptions;
  6. using MQTTnet.Formatter;
  7. using MQTTnet.Packets;
  8. using System;
  9. using System.IO.Pipelines;
  10. using System.Security.Cryptography.X509Certificates;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. namespace MQTTnet.AspNetCore
  14. {
  15. public class MqttConnectionContext : IMqttChannelAdapter
  16. {
  17. public MqttConnectionContext(MqttPacketFormatterAdapter packetFormatterAdapter, ConnectionContext connection)
  18. {
  19. PacketFormatterAdapter = packetFormatterAdapter ?? throw new ArgumentNullException(nameof(packetFormatterAdapter));
  20. Connection = connection ?? throw new ArgumentNullException(nameof(connection));
  21. if (Connection.Transport != null)
  22. {
  23. _input = Connection.Transport.Input;
  24. _output = Connection.Transport.Output;
  25. }
  26. _reader = new SpanBasedMqttPacketBodyReader();
  27. }
  28. private PipeReader _input;
  29. private PipeWriter _output;
  30. private readonly SpanBasedMqttPacketBodyReader _reader;
  31. public string Endpoint
  32. {
  33. get
  34. {
  35. #if NETCOREAPP3_1
  36. if (Connection?.RemoteEndPoint != null)
  37. {
  38. return Connection.RemoteEndPoint.ToString();
  39. }
  40. #endif
  41. var connection = Http?.HttpContext?.Connection;
  42. if (connection == null)
  43. {
  44. return Connection.ConnectionId;
  45. }
  46. return $"{connection.RemoteIpAddress}:{connection.RemotePort}";
  47. }
  48. }
  49. public bool IsSecureConnection => Http?.HttpContext?.Request?.IsHttps ?? false;
  50. public X509Certificate2 ClientCertificate => Http?.HttpContext?.Connection?.ClientCertificate;
  51. private IHttpContextFeature Http => Connection.Features.Get<IHttpContextFeature>();
  52. public ConnectionContext Connection { get; }
  53. public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }
  54. public long BytesSent { get; set; }
  55. public long BytesReceived { get; set; }
  56. public Action ReadingPacketStartedCallback { get; set; }
  57. public Action ReadingPacketCompletedCallback { get; set; }
  58. private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1);
  59. public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  60. {
  61. if (Connection is TcpConnection tcp && !tcp.IsConnected)
  62. {
  63. await tcp.StartAsync().ConfigureAwait(false);
  64. }
  65. _input = Connection.Transport.Input;
  66. _output = Connection.Transport.Output;
  67. }
  68. public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  69. {
  70. _input?.Complete();
  71. _output?.Complete();
  72. return Task.CompletedTask;
  73. }
  74. public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
  75. {
  76. var input = Connection.Transport.Input;
  77. try
  78. {
  79. while (!cancellationToken.IsCancellationRequested)
  80. {
  81. ReadResult readResult;
  82. var readTask = input.ReadAsync(cancellationToken);
  83. if (readTask.IsCompleted)
  84. {
  85. readResult = readTask.Result;
  86. }
  87. else
  88. {
  89. readResult = await readTask.ConfigureAwait(false);
  90. }
  91. var buffer = readResult.Buffer;
  92. var consumed = buffer.Start;
  93. var observed = buffer.Start;
  94. try
  95. {
  96. if (!buffer.IsEmpty)
  97. {
  98. if (PacketFormatterAdapter.TryDecode(_reader, buffer, out var packet, out consumed, out observed, out var received))
  99. {
  100. BytesReceived += received;
  101. return packet;
  102. }
  103. else
  104. {
  105. // we did receive something but the message is not yet complete
  106. ReadingPacketStartedCallback?.Invoke();
  107. }
  108. }
  109. else if (readResult.IsCompleted)
  110. {
  111. throw new MqttCommunicationException("Connection Aborted");
  112. }
  113. }
  114. finally
  115. {
  116. // The buffer was sliced up to where it was consumed, so we can just advance to the start.
  117. // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data
  118. // before yielding the read again.
  119. input.AdvanceTo(consumed, observed);
  120. }
  121. }
  122. }
  123. finally
  124. {
  125. ReadingPacketCompletedCallback?.Invoke();
  126. }
  127. cancellationToken.ThrowIfCancellationRequested();
  128. return null;
  129. }
  130. public void ResetStatistics()
  131. {
  132. BytesReceived = 0;
  133. BytesSent = 0;
  134. }
  135. public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
  136. {
  137. var formatter = PacketFormatterAdapter;
  138. await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
  139. try
  140. {
  141. var buffer = formatter.Encode(packet);
  142. var msg = buffer.AsMemory();
  143. var output = _output;
  144. var result = await output.WriteAsync(msg, cancellationToken).ConfigureAwait(false);
  145. if (result.IsCompleted)
  146. {
  147. BytesSent += msg.Length;
  148. }
  149. PacketFormatterAdapter.FreeBuffer();
  150. }
  151. finally
  152. {
  153. _writerSemaphore.Release();
  154. }
  155. }
  156. public void Dispose()
  157. {
  158. }
  159. }
  160. }