You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

TcpConnection.cs 7.6 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.IO.Pipelines;
  5. using System.Net;
  6. using System.Net.Sockets;
  7. using System.Threading.Tasks;
  8. using Microsoft.AspNetCore.Connections;
  9. using Microsoft.AspNetCore.Http.Features;
  10. using MQTTnet.Exceptions;
  11. namespace MQTTnet.AspNetCore.Client.Tcp
  12. {
  13. public class TcpConnection : ConnectionContext
  14. {
  15. private volatile bool _aborted;
  16. private readonly EndPoint _endPoint;
  17. private SocketSender _sender;
  18. private SocketReceiver _receiver;
  19. private Socket _socket;
  20. private IDuplexPipe _application;
  21. public bool IsConnected { get; private set; }
  22. public override string ConnectionId { get; set; }
  23. public override IFeatureCollection Features { get; }
  24. public override IDictionary<object, object> Items { get; set; }
  25. public override IDuplexPipe Transport { get; set; }
  26. public TcpConnection(EndPoint endPoint)
  27. {
  28. _endPoint = endPoint;
  29. }
  30. public TcpConnection(Socket socket)
  31. {
  32. _socket = socket;
  33. _endPoint = socket.RemoteEndPoint;
  34. _sender = new SocketSender(_socket, PipeScheduler.ThreadPool);
  35. _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool);
  36. }
  37. public Task DisposeAsync()
  38. {
  39. IsConnected = false;
  40. Transport?.Output.Complete();
  41. Transport?.Input.Complete();
  42. _socket?.Dispose();
  43. return Task.CompletedTask;
  44. }
  45. public async Task StartAsync()
  46. {
  47. if (_socket == null)
  48. {
  49. _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  50. _sender = new SocketSender(_socket, PipeScheduler.ThreadPool);
  51. _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool);
  52. await _socket.ConnectAsync(_endPoint);
  53. }
  54. var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
  55. Transport = pair.Transport;
  56. _application = pair.Application;
  57. _ = ExecuteAsync();
  58. IsConnected = true;
  59. }
  60. private async Task ExecuteAsync()
  61. {
  62. Exception sendError = null;
  63. try
  64. {
  65. // Spawn send and receive logic
  66. var receiveTask = DoReceive();
  67. var sendTask = DoSend();
  68. // If the sending task completes then close the receive
  69. // We don't need to do this in the other direction because the kestrel
  70. // will trigger the output closing once the input is complete.
  71. if (await Task.WhenAny(receiveTask, sendTask) == sendTask)
  72. {
  73. // Tell the reader it's being aborted
  74. _socket.Dispose();
  75. }
  76. // Now wait for both to complete
  77. await receiveTask;
  78. sendError = await sendTask;
  79. // Dispose the socket(should noop if already called)
  80. _socket.Dispose();
  81. }
  82. catch (Exception ex)
  83. {
  84. Console.WriteLine($"Unexpected exception in {nameof(TcpConnection)}.{nameof(StartAsync)}: " + ex);
  85. }
  86. finally
  87. {
  88. // Complete the output after disposing the socket
  89. _application.Input.Complete(sendError);
  90. }
  91. }
  92. private async Task DoReceive()
  93. {
  94. Exception error = null;
  95. try
  96. {
  97. await ProcessReceives();
  98. }
  99. catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
  100. {
  101. error = new MqttCommunicationException(ex);
  102. }
  103. catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted ||
  104. ex.SocketErrorCode == SocketError.ConnectionAborted ||
  105. ex.SocketErrorCode == SocketError.Interrupted ||
  106. ex.SocketErrorCode == SocketError.InvalidArgument)
  107. {
  108. if (!_aborted)
  109. {
  110. // Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix.
  111. error = ConnectionAborted();
  112. }
  113. }
  114. catch (ObjectDisposedException)
  115. {
  116. if (!_aborted)
  117. {
  118. error = ConnectionAborted();
  119. }
  120. }
  121. catch (IOException ex)
  122. {
  123. error = ex;
  124. }
  125. catch (Exception ex)
  126. {
  127. error = new IOException(ex.Message, ex);
  128. }
  129. finally
  130. {
  131. if (_aborted)
  132. {
  133. error = error ?? ConnectionAborted();
  134. }
  135. _application.Output.Complete(error);
  136. }
  137. }
  138. private async Task ProcessReceives()
  139. {
  140. while (true)
  141. {
  142. // Ensure we have some reasonable amount of buffer space
  143. var buffer = _application.Output.GetMemory();
  144. var bytesReceived = await _receiver.ReceiveAsync(buffer);
  145. if (bytesReceived == 0)
  146. {
  147. // FIN
  148. break;
  149. }
  150. _application.Output.Advance(bytesReceived);
  151. var flushTask = _application.Output.FlushAsync();
  152. if (!flushTask.IsCompleted)
  153. {
  154. await flushTask;
  155. }
  156. var result = flushTask.GetAwaiter().GetResult();
  157. if (result.IsCompleted)
  158. {
  159. // Pipe consumer is shut down, do we stop writing
  160. break;
  161. }
  162. }
  163. }
  164. private Exception ConnectionAborted()
  165. {
  166. return new MqttCommunicationException("Connection Aborted");
  167. }
  168. private async Task<Exception> DoSend()
  169. {
  170. Exception error = null;
  171. try
  172. {
  173. await ProcessSends();
  174. }
  175. catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
  176. {
  177. }
  178. catch (ObjectDisposedException)
  179. {
  180. }
  181. catch (IOException ex)
  182. {
  183. error = ex;
  184. }
  185. catch (Exception ex)
  186. {
  187. error = new IOException(ex.Message, ex);
  188. }
  189. finally
  190. {
  191. _aborted = true;
  192. _socket.Shutdown(SocketShutdown.Both);
  193. }
  194. return error;
  195. }
  196. private async Task ProcessSends()
  197. {
  198. while (true)
  199. {
  200. // Wait for data to write from the pipe producer
  201. var result = await _application.Input.ReadAsync();
  202. var buffer = result.Buffer;
  203. if (result.IsCanceled)
  204. {
  205. break;
  206. }
  207. var end = buffer.End;
  208. var isCompleted = result.IsCompleted;
  209. if (!buffer.IsEmpty)
  210. {
  211. await _sender.SendAsync(buffer);
  212. }
  213. _application.Input.AdvanceTo(end);
  214. if (isCompleted)
  215. {
  216. break;
  217. }
  218. }
  219. }
  220. }
  221. }