diff --git a/Tests/MQTTnet.Benchmarks/AllowNonOptimized.cs b/Tests/MQTTnet.Benchmarks/AllowNonOptimized.cs
new file mode 100644
index 0000000..ecb31a2
--- /dev/null
+++ b/Tests/MQTTnet.Benchmarks/AllowNonOptimized.cs
@@ -0,0 +1,23 @@
+using System.Linq;
+using BenchmarkDotNet.Configs;
+using BenchmarkDotNet.Jobs;
+using BenchmarkDotNet.Validators;
+
+namespace MQTTnet.Benchmarks
+{
+ ///
+ /// this options may be used to run benchmarks in debugmode and attach a performance profiler
+ /// https://benchmarkdotnet.org/Configs/Configs.htm
+ ///
+ public class AllowNonOptimized : ManualConfig
+ {
+ public AllowNonOptimized()
+ {
+ Add(JitOptimizationsValidator.DontFailOnError); // ALLOW NON-OPTIMIZED DLLS
+ Add(DefaultConfig.Instance.GetLoggers().ToArray()); // manual config has no loggers by default
+ Add(DefaultConfig.Instance.GetExporters().ToArray()); // manual config has no exporters by default
+ Add(DefaultConfig.Instance.GetColumnProviders().ToArray()); // manual config has no columns by default
+ Add(Job.InProcess);
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs b/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs
new file mode 100644
index 0000000..726f5e0
--- /dev/null
+++ b/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs
@@ -0,0 +1,86 @@
+using BenchmarkDotNet.Attributes;
+using MQTTnet.Adapter;
+using MQTTnet.Core.Internal;
+using MQTTnet.Diagnostics;
+using MQTTnet.Internal;
+using MQTTnet.Packets;
+using MQTTnet.Serializer;
+using System;
+using System.IO;
+using System.Threading;
+
+namespace MQTTnet.Benchmarks
+{
+ [MemoryDiagnoser]
+ public class ChannelAdapterBenchmark
+ {
+ private MqttChannelAdapter _channelAdapter;
+ private int _iterations;
+ private MemoryStream _stream;
+ private MqttPublishPacket _packet;
+
+ [GlobalSetup]
+ public void Setup()
+ {
+ var message = new MqttApplicationMessageBuilder()
+ .WithTopic("A")
+ .Build();
+
+ _packet = message.ToPublishPacket();
+ var serializer = new MqttPacketSerializer();
+ var serializedPacket = Join(serializer.Serialize(_packet));
+
+ _iterations = 10000;
+
+ _stream = new MemoryStream(_iterations * serializedPacket.Length);
+
+ for (var i = 0; i < _iterations; i++)
+ {
+ _stream.Write(serializedPacket, 0, serializedPacket.Length);
+ }
+
+ _stream.Position = 0;
+
+ var channel = new TestMqttChannel(_stream);
+
+ _channelAdapter = new MqttChannelAdapter(channel, serializer, new MqttNetLogger().CreateChildLogger(nameof(MqttChannelAdapter)));
+ }
+
+ [Benchmark]
+ public void Receive_10000_Messages()
+ {
+ _stream.Position = 0;
+
+ for (var i = 0; i < 10000; i++)
+ {
+ _channelAdapter.ReceivePacketAsync(TimeSpan.Zero, CancellationToken.None).GetAwaiter().GetResult();
+ }
+
+ _stream.Position = 0;
+ }
+
+ [Benchmark]
+ public void Send_10000_Messages()
+ {
+ _stream.Position = 0;
+
+ for (var i = 0; i < 10000; i++)
+ {
+ _channelAdapter.SendPacketsAsync(TimeSpan.FromSeconds(15), new[] { _packet }, CancellationToken.None).GetAwaiter().GetResult();
+ }
+
+ _stream.Position = 0;
+ }
+
+ private static byte[] Join(params ArraySegment[] chunks)
+ {
+ var buffer = new MemoryStream();
+ foreach (var chunk in chunks)
+ {
+ buffer.Write(chunk.Array, chunk.Offset, chunk.Count);
+ }
+
+ return buffer.ToArray();
+ }
+ }
+}
diff --git a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj
index 72af093..ad3aa55 100644
--- a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj
+++ b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj
@@ -4,10 +4,12 @@
Exe
Full
net461
+ 7.2
+
diff --git a/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs b/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs
new file mode 100644
index 0000000..740b83e
--- /dev/null
+++ b/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs
@@ -0,0 +1,75 @@
+using BenchmarkDotNet.Attributes;
+using MQTTnet.Channel;
+using MQTTnet.Client;
+using MQTTnet.Diagnostics;
+using MQTTnet.Implementations;
+using MQTTnet.Server;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace MQTTnet.Benchmarks
+{
+ [MemoryDiagnoser]
+ public class MqttTcpChannelBenchmark
+ {
+ private IMqttServer _mqttServer;
+ private IMqttChannel _serverChannel;
+
+
+ private IMqttChannel _clientChannel;
+
+ [GlobalSetup]
+ public void Setup()
+ {
+ var factory = new MqttFactory();
+ var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
+ tcpServer.ClientAccepted += (sender, args) => _serverChannel = (IMqttChannel)args.Client.GetType().GetField("_channel", System.Reflection.BindingFlags.NonPublic| System.Reflection.BindingFlags.Instance).GetValue(args.Client);
+
+ _mqttServer = factory.CreateMqttServer(new[] { tcpServer }, new MqttNetLogger());
+
+ var serverOptions = new MqttServerOptionsBuilder().Build();
+ _mqttServer.StartAsync(serverOptions).GetAwaiter().GetResult();
+
+ var clientOptions = new MqttClientOptionsBuilder()
+ .WithTcpServer("localhost").Build();
+
+ var tcpOptions = (MqttClientTcpOptions) clientOptions.ChannelOptions;
+ _clientChannel = new MqttTcpChannel(tcpOptions);
+
+ _clientChannel.ConnectAsync(CancellationToken.None).GetAwaiter().GetResult();
+ }
+
+ [Benchmark]
+ public async Task Send_10000_Chunks()
+ {
+ var size = 5;
+ var iterations = 10000;
+
+ await Task.WhenAll(WriteAsync(iterations, size), ReadAsync(iterations, size));
+ }
+
+ private async Task ReadAsync(int iterations, int size)
+ {
+ await Task.Yield();
+
+ var expected = iterations * size;
+ long read = 0;
+
+ while (read < expected)
+ {
+ var readresult = await _clientChannel.ReadAsync(new byte[size], 0, size, CancellationToken.None).ConfigureAwait(false);
+ read += readresult;
+ }
+ }
+
+ private async Task WriteAsync(int iterations, int size)
+ {
+ await Task.Yield();
+
+ for (var i = 0; i < iterations; i++)
+ {
+ await _serverChannel.WriteAsync(new byte[size], 0, size, CancellationToken.None).ConfigureAwait(false);
+ }
+ }
+ }
+}
diff --git a/Tests/MQTTnet.Benchmarks/Program.cs b/Tests/MQTTnet.Benchmarks/Program.cs
index a65de42..5306916 100644
--- a/Tests/MQTTnet.Benchmarks/Program.cs
+++ b/Tests/MQTTnet.Benchmarks/Program.cs
@@ -13,6 +13,9 @@ namespace MQTTnet.Benchmarks
Console.WriteLine("2 = SerializerBenchmark");
Console.WriteLine("3 = LoggerBenchmark");
Console.WriteLine("4 = TopicFilterComparerBenchmark");
+ Console.WriteLine("5 = ChannelAdapterBenchmark");
+ Console.WriteLine("6 = MqttTcpChannelBenchmark");
+ Console.WriteLine("7 = TcpPipesBenchmark");
var pressedKey = Console.ReadKey(true);
switch (pressedKey.KeyChar)
@@ -29,6 +32,15 @@ namespace MQTTnet.Benchmarks
case '4':
BenchmarkRunner.Run();
break;
+ case '5':
+ BenchmarkRunner.Run();
+ break;
+ case '6':
+ BenchmarkRunner.Run();
+ break;
+ case '7':
+ BenchmarkRunner.Run();
+ break;
}
Console.ReadLine();
diff --git a/Tests/MQTTnet.Benchmarks/Tcp/BufferExtensions.cs b/Tests/MQTTnet.Benchmarks/Tcp/BufferExtensions.cs
new file mode 100644
index 0000000..0a2c19e
--- /dev/null
+++ b/Tests/MQTTnet.Benchmarks/Tcp/BufferExtensions.cs
@@ -0,0 +1,24 @@
+using System;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using System.Text;
+
+namespace Playground.Client.Mqtt.Tcp
+{
+ public static class BufferExtensions
+ {
+ public static ArraySegment GetArray(this Memory memory)
+ {
+ return ((ReadOnlyMemory)memory).GetArray();
+ }
+
+ public static ArraySegment GetArray(this ReadOnlyMemory memory)
+ {
+ if (!MemoryMarshal.TryGetArray(memory, out var result))
+ {
+ throw new InvalidOperationException("Buffer backed by array was expected");
+ }
+ return result;
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tests/MQTTnet.Benchmarks/Tcp/DuplexPipe.cs b/Tests/MQTTnet.Benchmarks/Tcp/DuplexPipe.cs
new file mode 100644
index 0000000..f5f3316
--- /dev/null
+++ b/Tests/MQTTnet.Benchmarks/Tcp/DuplexPipe.cs
@@ -0,0 +1,41 @@
+using System.IO.Pipelines;
+
+namespace MQTTnet.Benchmarks.Tcp
+{
+ public class DuplexPipe : IDuplexPipe
+ {
+ public DuplexPipe(PipeReader reader, PipeWriter writer)
+ {
+ Input = reader;
+ Output = writer;
+ }
+
+ public PipeReader Input { get; }
+
+ public PipeWriter Output { get; }
+
+ public static DuplexPipePair CreateConnectionPair(PipeOptions inputOptions, PipeOptions outputOptions)
+ {
+ var input = new Pipe(inputOptions);
+ var output = new Pipe(outputOptions);
+
+ var transportToApplication = new DuplexPipe(output.Reader, input.Writer);
+ var applicationToTransport = new DuplexPipe(input.Reader, output.Writer);
+
+ return new DuplexPipePair(applicationToTransport, transportToApplication);
+ }
+
+ // This class exists to work around issues with value tuple on .NET Framework
+ public readonly struct DuplexPipePair
+ {
+ public IDuplexPipe Transport { get; }
+ public IDuplexPipe Application { get; }
+
+ public DuplexPipePair(IDuplexPipe transport, IDuplexPipe application)
+ {
+ Transport = transport;
+ Application = application;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tests/MQTTnet.Benchmarks/Tcp/SocketAwaitable.cs b/Tests/MQTTnet.Benchmarks/Tcp/SocketAwaitable.cs
new file mode 100644
index 0000000..dcaaaf4
--- /dev/null
+++ b/Tests/MQTTnet.Benchmarks/Tcp/SocketAwaitable.cs
@@ -0,0 +1,71 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO.Pipelines;
+using System.Net.Sockets;
+using System.Runtime.CompilerServices;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Playground.Client.Mqtt.Tcp
+{
+ public class SocketAwaitable : ICriticalNotifyCompletion
+ {
+ private static readonly Action _callbackCompleted = () => { };
+
+ private readonly PipeScheduler _ioScheduler;
+
+ private Action _callback;
+ private int _bytesTransferred;
+ private SocketError _error;
+
+ public SocketAwaitable(PipeScheduler ioScheduler)
+ {
+ _ioScheduler = ioScheduler;
+ }
+
+ public SocketAwaitable GetAwaiter() => this;
+ public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
+
+ public int GetResult()
+ {
+ Debug.Assert(ReferenceEquals(_callback, _callbackCompleted));
+
+ _callback = null;
+
+ if (_error != SocketError.Success)
+ {
+ throw new SocketException((int)_error);
+ }
+
+ return _bytesTransferred;
+ }
+
+ public void OnCompleted(Action continuation)
+ {
+ if (ReferenceEquals(_callback, _callbackCompleted) ||
+ ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
+ {
+ Task.Run(continuation);
+ }
+ }
+
+ public void UnsafeOnCompleted(Action continuation)
+ {
+ OnCompleted(continuation);
+ }
+
+ public void Complete(int bytesTransferred, SocketError socketError)
+ {
+ _error = socketError;
+ _bytesTransferred = bytesTransferred;
+ var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
+
+ if (continuation != null)
+ {
+ _ioScheduler.Schedule(state => ((Action)state)(), continuation);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tests/MQTTnet.Benchmarks/Tcp/SocketReceiver.cs b/Tests/MQTTnet.Benchmarks/Tcp/SocketReceiver.cs
new file mode 100644
index 0000000..e2f19c0
--- /dev/null
+++ b/Tests/MQTTnet.Benchmarks/Tcp/SocketReceiver.cs
@@ -0,0 +1,39 @@
+using System;
+using System.IO.Pipelines;
+using System.Net.Sockets;
+using Playground.Client.Mqtt.Tcp;
+
+namespace MQTTnet.Benchmarks.Tcp
+{
+ public class SocketReceiver
+ {
+ private readonly Socket _socket;
+ private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs();
+ private readonly SocketAwaitable _awaitable;
+
+ public SocketReceiver(Socket socket, PipeScheduler scheduler)
+ {
+ _socket = socket;
+ _awaitable = new SocketAwaitable(scheduler);
+ _eventArgs.UserToken = _awaitable;
+ _eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError);
+ }
+
+ public SocketAwaitable ReceiveAsync(Memory buffer)
+ {
+#if NETCOREAPP2_1
+ _eventArgs.SetBuffer(buffer);
+#else
+ var segment = buffer.GetArray();
+
+ _eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count);
+#endif
+ if (!_socket.ReceiveAsync(_eventArgs))
+ {
+ _awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError);
+ }
+
+ return _awaitable;
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tests/MQTTnet.Benchmarks/Tcp/SocketSender.cs b/Tests/MQTTnet.Benchmarks/Tcp/SocketSender.cs
new file mode 100644
index 0000000..475fa68
--- /dev/null
+++ b/Tests/MQTTnet.Benchmarks/Tcp/SocketSender.cs
@@ -0,0 +1,99 @@
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO.Pipelines;
+using System.Net.Sockets;
+using Playground.Client.Mqtt.Tcp;
+
+namespace MQTTnet.Benchmarks.Tcp
+{
+ public class SocketSender
+ {
+ private readonly Socket _socket;
+ private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs();
+ private readonly SocketAwaitable _awaitable;
+
+ private List> _bufferList;
+
+ public SocketSender(Socket socket, PipeScheduler scheduler)
+ {
+ _socket = socket;
+ _awaitable = new SocketAwaitable(scheduler);
+ _eventArgs.UserToken = _awaitable;
+ _eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError);
+ }
+
+ public SocketAwaitable SendAsync(in ReadOnlySequence buffers)
+ {
+ if (buffers.IsSingleSegment)
+ {
+ return SendAsync(buffers.First);
+ }
+
+#if NETCOREAPP2_1
+ if (!_eventArgs.MemoryBuffer.Equals(Memory.Empty))
+#else
+ if (_eventArgs.Buffer != null)
+#endif
+ {
+ _eventArgs.SetBuffer(null, 0, 0);
+ }
+
+ _eventArgs.BufferList = GetBufferList(buffers);
+
+ if (!_socket.SendAsync(_eventArgs))
+ {
+ _awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError);
+ }
+
+ return _awaitable;
+ }
+
+ private SocketAwaitable SendAsync(ReadOnlyMemory memory)
+ {
+ // The BufferList getter is much less expensive then the setter.
+ if (_eventArgs.BufferList != null)
+ {
+ _eventArgs.BufferList = null;
+ }
+
+#if NETCOREAPP2_1
+ _eventArgs.SetBuffer(MemoryMarshal.AsMemory(memory));
+#else
+ var segment = memory.GetArray();
+
+ _eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count);
+#endif
+ if (!_socket.SendAsync(_eventArgs))
+ {
+ _awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError);
+ }
+
+ return _awaitable;
+ }
+
+ private List> GetBufferList(in ReadOnlySequence buffer)
+ {
+ Debug.Assert(!buffer.IsEmpty);
+ Debug.Assert(!buffer.IsSingleSegment);
+
+ if (_bufferList == null)
+ {
+ _bufferList = new List>();
+ }
+ else
+ {
+ // Buffers are pooled, so it's OK to root them until the next multi-buffer write.
+ _bufferList.Clear();
+ }
+
+ foreach (var b in buffer)
+ {
+ _bufferList.Add(b.GetArray());
+ }
+
+ return _bufferList;
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tests/MQTTnet.Benchmarks/Tcp/TcpConnection.cs b/Tests/MQTTnet.Benchmarks/Tcp/TcpConnection.cs
new file mode 100644
index 0000000..e73067c
--- /dev/null
+++ b/Tests/MQTTnet.Benchmarks/Tcp/TcpConnection.cs
@@ -0,0 +1,247 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.IO.Pipelines;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading.Tasks;
+using MQTTnet.Exceptions;
+using Playground.Client.Mqtt.Tcp;
+
+namespace MQTTnet.Benchmarks.Tcp
+{
+ public class TcpConnection
+ {
+ private readonly Socket _socket;
+ private volatile bool _aborted;
+ private readonly EndPoint _endPoint;
+ private IDuplexPipe _application;
+ private IDuplexPipe _transport;
+ private readonly SocketSender _sender;
+ private readonly SocketReceiver _receiver;
+
+ public TcpConnection(EndPoint endPoint)
+ {
+ _socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
+ _endPoint = endPoint;
+
+ _sender = new SocketSender(_socket, PipeScheduler.ThreadPool);
+ _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool);
+ }
+
+ public TcpConnection(Socket socket)
+ {
+ _socket = socket;
+ _endPoint = socket.RemoteEndPoint;
+
+ _sender = new SocketSender(_socket, PipeScheduler.ThreadPool);
+ _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool);
+ }
+
+ public Task DisposeAsync()
+ {
+ _transport?.Output.Complete();
+ _transport?.Input.Complete();
+
+ _socket?.Dispose();
+
+ return Task.CompletedTask;
+ }
+
+ public async Task StartAsync()
+ {
+ if (!_socket.Connected)
+ {
+ await _socket.ConnectAsync(_endPoint);
+ }
+
+ var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
+
+ _transport = pair.Transport;
+ _application = pair.Application;
+
+ _ = ExecuteAsync();
+
+ return pair.Transport;
+ }
+
+ private async Task ExecuteAsync()
+ {
+ Exception sendError = null;
+ try
+ {
+ // Spawn send and receive logic
+ var receiveTask = DoReceive();
+ var sendTask = DoSend();
+
+ // If the sending task completes then close the receive
+ // We don't need to do this in the other direction because the kestrel
+ // will trigger the output closing once the input is complete.
+ if (await Task.WhenAny(receiveTask, sendTask) == sendTask)
+ {
+ // Tell the reader it's being aborted
+ _socket.Dispose();
+ }
+
+ // Now wait for both to complete
+ await receiveTask;
+ sendError = await sendTask;
+
+ // Dispose the socket(should noop if already called)
+ _socket.Dispose();
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"Unexpected exception in {nameof(TcpConnection)}.{nameof(StartAsync)}: " + ex);
+ }
+ finally
+ {
+ // Complete the output after disposing the socket
+ _application.Input.Complete(sendError);
+ }
+ }
+ private async Task DoReceive()
+ {
+ Exception error = null;
+
+ try
+ {
+ await ProcessReceives();
+ }
+ catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
+ {
+ error = new MqttCommunicationException(ex);
+ }
+ catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted ||
+ ex.SocketErrorCode == SocketError.ConnectionAborted ||
+ ex.SocketErrorCode == SocketError.Interrupted ||
+ ex.SocketErrorCode == SocketError.InvalidArgument)
+ {
+ if (!_aborted)
+ {
+ // Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix.
+ //error = new MqttCommunicationException();
+ }
+ }
+ catch (ObjectDisposedException)
+ {
+ if (!_aborted)
+ {
+ //error = new MqttCommunicationException();
+ }
+ }
+ catch (IOException ex)
+ {
+ error = ex;
+ }
+ catch (Exception ex)
+ {
+ error = new IOException(ex.Message, ex);
+ }
+ finally
+ {
+ if (_aborted)
+ {
+ //error = error ?? new MqttCommunicationException();
+ }
+
+ _application.Output.Complete(error);
+ }
+ }
+
+ private async Task ProcessReceives()
+ {
+ while (true)
+ {
+ // Ensure we have some reasonable amount of buffer space
+ var buffer = _application.Output.GetMemory();
+
+ var bytesReceived = await _receiver.ReceiveAsync(buffer);
+
+ if (bytesReceived == 0)
+ {
+ // FIN
+ break;
+ }
+
+ _application.Output.Advance(bytesReceived);
+
+ var flushTask = _application.Output.FlushAsync();
+
+ if (!flushTask.IsCompleted)
+ {
+ await flushTask;
+ }
+
+ var result = flushTask.GetAwaiter().GetResult();
+ if (result.IsCompleted)
+ {
+ // Pipe consumer is shut down, do we stop writing
+ break;
+ }
+ }
+ }
+
+ private async Task DoSend()
+ {
+ Exception error = null;
+
+ try
+ {
+ await ProcessSends();
+ }
+ catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
+ {
+ error = null;
+ }
+ catch (ObjectDisposedException)
+ {
+ error = null;
+ }
+ catch (IOException ex)
+ {
+ error = ex;
+ }
+ catch (Exception ex)
+ {
+ error = new IOException(ex.Message, ex);
+ }
+ finally
+ {
+ _aborted = true;
+ _socket.Shutdown(SocketShutdown.Both);
+ }
+
+ return error;
+ }
+
+ private async Task ProcessSends()
+ {
+ while (true)
+ {
+ // Wait for data to write from the pipe producer
+ var result = await _application.Input.ReadAsync();
+ var buffer = result.Buffer;
+
+ if (result.IsCanceled)
+ {
+ break;
+ }
+
+ var end = buffer.End;
+ var isCompleted = result.IsCompleted;
+ if (!buffer.IsEmpty)
+ {
+ await _sender.SendAsync(buffer);
+ }
+
+ _application.Input.AdvanceTo(end);
+
+ if (isCompleted)
+ {
+ break;
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tests/MQTTnet.Benchmarks/TcpPipesBenchmark.cs b/Tests/MQTTnet.Benchmarks/TcpPipesBenchmark.cs
new file mode 100644
index 0000000..cb3ffba
--- /dev/null
+++ b/Tests/MQTTnet.Benchmarks/TcpPipesBenchmark.cs
@@ -0,0 +1,72 @@
+using System.IO.Pipelines;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using BenchmarkDotNet.Attributes;
+using MQTTnet.Benchmarks.Tcp;
+
+namespace MQTTnet.Benchmarks
+{
+ [MemoryDiagnoser]
+ public class TcpPipesBenchmark
+ {
+ private IDuplexPipe _client;
+ private IDuplexPipe _server;
+
+ [GlobalSetup]
+ public void Setup()
+ {
+ var server = new TcpListener(IPAddress.Any, 1883);
+
+ server.Start(1);
+
+ var task = Task.Run(() => server.AcceptSocket());
+
+ var clientConnection = new TcpConnection(new IPEndPoint(IPAddress.Loopback, 1883));
+
+ _client = clientConnection.StartAsync().GetAwaiter().GetResult();
+
+ var serverConnection = new TcpConnection(task.GetAwaiter().GetResult());
+ _server = serverConnection.StartAsync().GetAwaiter().GetResult();
+ }
+
+
+ [Benchmark]
+ public async Task Send_10000_Chunks_Pipe()
+ {
+ var size = 5;
+ var iterations = 10000;
+
+ await Task.WhenAll(WriteAsync(iterations, size), ReadAsync(iterations, size));
+ }
+
+ private async Task ReadAsync(int iterations, int size)
+ {
+ await Task.Yield();
+
+ var expected = iterations * size;
+ long read = 0;
+ var input = _client.Input;
+
+ while (read < expected)
+ {
+ var readresult = await input.ReadAsync(CancellationToken.None).ConfigureAwait(false);
+ input.AdvanceTo(readresult.Buffer.End);
+ read += readresult.Buffer.Length;
+ }
+ }
+
+ private async Task WriteAsync(int iterations, int size)
+ {
+ await Task.Yield();
+
+ var output = _server.Output;
+
+ for (var i = 0; i < iterations; i++)
+ {
+ await output.WriteAsync(new byte[size], CancellationToken.None).ConfigureAwait(false);
+ }
+ }
+ }
+}