using System.IO.Pipelines; using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using MQTTnet.Benchmarks.Tcp; namespace MQTTnet.Benchmarks { [MemoryDiagnoser] public class TcpPipesBenchmark { private IDuplexPipe _client; private IDuplexPipe _server; [GlobalSetup] public void Setup() { var server = new TcpListener(IPAddress.Any, 1883); server.Start(1); var task = Task.Run(() => server.AcceptSocket()); var clientConnection = new TcpConnection(new IPEndPoint(IPAddress.Loopback, 1883)); _client = clientConnection.StartAsync().GetAwaiter().GetResult(); var serverConnection = new TcpConnection(task.GetAwaiter().GetResult()); _server = serverConnection.StartAsync().GetAwaiter().GetResult(); } [Benchmark] public async Task Send_10000_Chunks_Pipe() { var size = 5; var iterations = 10000; await Task.WhenAll(WriteAsync(iterations, size), ReadAsync(iterations, size)); } private async Task ReadAsync(int iterations, int size) { await Task.Yield(); var expected = iterations * size; long read = 0; var input = _client.Input; while (read < expected) { var readresult = await input.ReadAsync(CancellationToken.None).ConfigureAwait(false); input.AdvanceTo(readresult.Buffer.End); read += readresult.Buffer.Length; } } private async Task WriteAsync(int iterations, int size) { await Task.Yield(); var output = _server.Output; for (var i = 0; i < iterations; i++) { await output.WriteAsync(new byte[size], CancellationToken.None).ConfigureAwait(false); } } } }