You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

183 regels
6.2 KiB

  1. using Microsoft.AspNetCore.Connections;
  2. using Microsoft.AspNetCore.Http.Connections.Features;
  3. using MQTTnet.Adapter;
  4. using MQTTnet.AspNetCore.Client.Tcp;
  5. using MQTTnet.Exceptions;
  6. using MQTTnet.Formatter;
  7. using MQTTnet.Packets;
  8. using System;
  9. using System.IO.Pipelines;
  10. using System.Security.Cryptography.X509Certificates;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. namespace MQTTnet.AspNetCore
  14. {
  15. public class MqttConnectionContext : IMqttChannelAdapter
  16. {
  17. public MqttConnectionContext(MqttPacketFormatterAdapter packetFormatterAdapter, ConnectionContext connection)
  18. {
  19. PacketFormatterAdapter = packetFormatterAdapter ?? throw new ArgumentNullException(nameof(packetFormatterAdapter));
  20. Connection = connection ?? throw new ArgumentNullException(nameof(connection));
  21. if (Connection.Transport != null)
  22. {
  23. _input = Connection.Transport.Input;
  24. _output = Connection.Transport.Output;
  25. }
  26. _reader = new SpanBasedMqttPacketBodyReader();
  27. }
  28. private PipeReader _input;
  29. private PipeWriter _output;
  30. private readonly SpanBasedMqttPacketBodyReader _reader;
  31. public string Endpoint
  32. {
  33. get
  34. {
  35. var connection = Http?.HttpContext?.Connection;
  36. if (connection == null)
  37. {
  38. return Connection.ConnectionId;
  39. }
  40. return $"{connection.RemoteIpAddress}:{connection.RemotePort}";
  41. }
  42. }
  43. public bool IsSecureConnection => Http?.HttpContext?.Request?.IsHttps ?? false;
  44. public X509Certificate2 ClientCertificate => Http?.HttpContext?.Connection?.ClientCertificate;
  45. private IHttpContextFeature Http => Connection.Features.Get<IHttpContextFeature>();
  46. public ConnectionContext Connection { get; }
  47. public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }
  48. public long BytesSent { get; set; }
  49. public long BytesReceived { get; set; }
  50. public Action ReadingPacketStartedCallback { get; set; }
  51. public Action ReadingPacketCompletedCallback { get; set; }
  52. private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1);
  53. public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  54. {
  55. if (Connection is TcpConnection tcp && !tcp.IsConnected)
  56. {
  57. await tcp.StartAsync().ConfigureAwait(false);
  58. }
  59. _input = Connection.Transport.Input;
  60. _output = Connection.Transport.Output;
  61. }
  62. public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  63. {
  64. _input?.Complete();
  65. _output?.Complete();
  66. return Task.CompletedTask;
  67. }
  68. public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
  69. {
  70. var input = Connection.Transport.Input;
  71. try
  72. {
  73. while (!cancellationToken.IsCancellationRequested)
  74. {
  75. ReadResult readResult;
  76. var readTask = input.ReadAsync(cancellationToken);
  77. if (readTask.IsCompleted)
  78. {
  79. readResult = readTask.Result;
  80. }
  81. else
  82. {
  83. readResult = await readTask.ConfigureAwait(false);
  84. }
  85. var buffer = readResult.Buffer;
  86. var consumed = buffer.Start;
  87. var observed = buffer.Start;
  88. try
  89. {
  90. if (!buffer.IsEmpty)
  91. {
  92. if (PacketFormatterAdapter.TryDecode(_reader, buffer, out var packet, out consumed, out observed, out var received))
  93. {
  94. BytesReceived += received;
  95. return packet;
  96. }
  97. else
  98. {
  99. // we did receive something but the message is not yet complete
  100. ReadingPacketStartedCallback?.Invoke();
  101. }
  102. }
  103. else if (readResult.IsCompleted)
  104. {
  105. throw new MqttCommunicationException("Connection Aborted");
  106. }
  107. }
  108. finally
  109. {
  110. // The buffer was sliced up to where it was consumed, so we can just advance to the start.
  111. // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data
  112. // before yielding the read again.
  113. input.AdvanceTo(consumed, observed);
  114. }
  115. }
  116. }
  117. finally
  118. {
  119. ReadingPacketCompletedCallback?.Invoke();
  120. }
  121. cancellationToken.ThrowIfCancellationRequested();
  122. return null;
  123. }
  124. public void ResetStatistics()
  125. {
  126. BytesReceived = 0;
  127. BytesSent = 0;
  128. }
  129. public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
  130. {
  131. var formatter = PacketFormatterAdapter;
  132. await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
  133. try
  134. {
  135. var buffer = formatter.Encode(packet);
  136. var msg = buffer.AsMemory();
  137. var output = _output;
  138. var result = await output.WriteAsync(msg, cancellationToken).ConfigureAwait(false);
  139. if (result.IsCompleted)
  140. {
  141. BytesSent += msg.Length;
  142. }
  143. PacketFormatterAdapter.FreeBuffer();
  144. }
  145. finally
  146. {
  147. _writerSemaphore.Release();
  148. }
  149. }
  150. public void Dispose()
  151. {
  152. }
  153. }
  154. }