25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

TcpConnection.cs 7.2 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.IO.Pipelines;
  5. using System.Net;
  6. using System.Net.Sockets;
  7. using System.Threading.Tasks;
  8. using MQTTnet.Exceptions;
  9. namespace MQTTnet.Benchmarks.Tcp
  10. {
  11. public class TcpConnection
  12. {
  13. private readonly Socket _socket;
  14. private volatile bool _aborted;
  15. private readonly EndPoint _endPoint;
  16. private IDuplexPipe _application;
  17. private IDuplexPipe _transport;
  18. private readonly SocketSender _sender;
  19. private readonly SocketReceiver _receiver;
  20. public TcpConnection(EndPoint endPoint)
  21. {
  22. _socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
  23. _endPoint = endPoint;
  24. _sender = new SocketSender(_socket, PipeScheduler.ThreadPool);
  25. _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool);
  26. }
  27. public TcpConnection(Socket socket)
  28. {
  29. _socket = socket;
  30. _endPoint = socket.RemoteEndPoint;
  31. _sender = new SocketSender(_socket, PipeScheduler.ThreadPool);
  32. _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool);
  33. }
  34. public Task DisposeAsync()
  35. {
  36. _transport?.Output.Complete();
  37. _transport?.Input.Complete();
  38. _socket?.Dispose();
  39. return Task.CompletedTask;
  40. }
  41. public async Task<IDuplexPipe> StartAsync()
  42. {
  43. if (!_socket.Connected)
  44. {
  45. await _socket.ConnectAsync(_endPoint);
  46. }
  47. var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
  48. _transport = pair.Transport;
  49. _application = pair.Application;
  50. _ = ExecuteAsync();
  51. return pair.Transport;
  52. }
  53. private async Task ExecuteAsync()
  54. {
  55. Exception sendError = null;
  56. try
  57. {
  58. // Spawn send and receive logic
  59. var receiveTask = DoReceive();
  60. var sendTask = DoSend();
  61. // If the sending task completes then close the receive
  62. // We don't need to do this in the other direction because the kestrel
  63. // will trigger the output closing once the input is complete.
  64. if (await Task.WhenAny(receiveTask, sendTask) == sendTask)
  65. {
  66. // Tell the reader it's being aborted
  67. _socket.Dispose();
  68. }
  69. // Now wait for both to complete
  70. await receiveTask;
  71. sendError = await sendTask;
  72. // Dispose the socket(should noop if already called)
  73. _socket.Dispose();
  74. }
  75. catch (Exception ex)
  76. {
  77. Console.WriteLine($"Unexpected exception in {nameof(TcpConnection)}.{nameof(StartAsync)}: " + ex);
  78. }
  79. finally
  80. {
  81. // Complete the output after disposing the socket
  82. _application.Input.Complete(sendError);
  83. }
  84. }
  85. private async Task DoReceive()
  86. {
  87. Exception error = null;
  88. try
  89. {
  90. await ProcessReceives();
  91. }
  92. catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
  93. {
  94. error = new MqttCommunicationException(ex);
  95. }
  96. catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted ||
  97. ex.SocketErrorCode == SocketError.ConnectionAborted ||
  98. ex.SocketErrorCode == SocketError.Interrupted ||
  99. ex.SocketErrorCode == SocketError.InvalidArgument)
  100. {
  101. if (!_aborted)
  102. {
  103. // Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix.
  104. //error = new MqttCommunicationException();
  105. }
  106. }
  107. catch (ObjectDisposedException)
  108. {
  109. if (!_aborted)
  110. {
  111. //error = new MqttCommunicationException();
  112. }
  113. }
  114. catch (IOException ex)
  115. {
  116. error = ex;
  117. }
  118. catch (Exception ex)
  119. {
  120. error = new IOException(ex.Message, ex);
  121. }
  122. finally
  123. {
  124. if (_aborted)
  125. {
  126. //error = error ?? new MqttCommunicationException();
  127. }
  128. _application.Output.Complete(error);
  129. }
  130. }
  131. private async Task ProcessReceives()
  132. {
  133. while (true)
  134. {
  135. // Ensure we have some reasonable amount of buffer space
  136. var buffer = _application.Output.GetMemory();
  137. var bytesReceived = await _receiver.ReceiveAsync(buffer);
  138. if (bytesReceived == 0)
  139. {
  140. // FIN
  141. break;
  142. }
  143. _application.Output.Advance(bytesReceived);
  144. var flushTask = _application.Output.FlushAsync();
  145. if (!flushTask.IsCompleted)
  146. {
  147. await flushTask;
  148. }
  149. var result = flushTask.GetAwaiter().GetResult();
  150. if (result.IsCompleted)
  151. {
  152. // Pipe consumer is shut down, do we stop writing
  153. break;
  154. }
  155. }
  156. }
  157. private async Task<Exception> DoSend()
  158. {
  159. Exception error = null;
  160. try
  161. {
  162. await ProcessSends();
  163. }
  164. catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
  165. {
  166. error = null;
  167. }
  168. catch (ObjectDisposedException)
  169. {
  170. error = null;
  171. }
  172. catch (IOException ex)
  173. {
  174. error = ex;
  175. }
  176. catch (Exception ex)
  177. {
  178. error = new IOException(ex.Message, ex);
  179. }
  180. finally
  181. {
  182. _aborted = true;
  183. _socket.Shutdown(SocketShutdown.Both);
  184. }
  185. return error;
  186. }
  187. private async Task ProcessSends()
  188. {
  189. while (true)
  190. {
  191. // Wait for data to write from the pipe producer
  192. var result = await _application.Input.ReadAsync();
  193. var buffer = result.Buffer;
  194. if (result.IsCanceled)
  195. {
  196. break;
  197. }
  198. var end = buffer.End;
  199. var isCompleted = result.IsCompleted;
  200. if (!buffer.IsEmpty)
  201. {
  202. await _sender.SendAsync(buffer);
  203. }
  204. _application.Input.AdvanceTo(end);
  205. if (isCompleted)
  206. {
  207. break;
  208. }
  209. }
  210. }
  211. }
  212. }