From fd50e4e26022f2e797d776870b64bd28cf64ff56 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Thu, 31 May 2018 22:10:04 +0200 Subject: [PATCH 1/3] more benchmarks --- .../ChannelAdapterBenchmark.cs | 107 ++++++++++++++++++ .../MqttTcpChannelBenchmark.cs | 52 +++++++++ Tests/MQTTnet.Benchmarks/Program.cs | 8 ++ 3 files changed, 167 insertions(+) create mode 100644 Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs create mode 100644 Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs diff --git a/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs b/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs new file mode 100644 index 0000000..5f66e6b --- /dev/null +++ b/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs @@ -0,0 +1,107 @@ +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Attributes.Exporters; +using BenchmarkDotNet.Attributes.Jobs; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Jobs; +using BenchmarkDotNet.Validators; +using MQTTnet.Adapter; +using MQTTnet.Core.Internal; +using MQTTnet.Diagnostics; +using MQTTnet.Internal; +using MQTTnet.Packets; +using MQTTnet.Serializer; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Benchmarks +{ + public class AllowNonOptimized : ManualConfig + { + public AllowNonOptimized() + { + Add(JitOptimizationsValidator.DontFailOnError); // ALLOW NON-OPTIMIZED DLLS + Add(DefaultConfig.Instance.GetLoggers().ToArray()); // manual config has no loggers by default + Add(DefaultConfig.Instance.GetExporters().ToArray()); // manual config has no exporters by default + Add(DefaultConfig.Instance.GetColumnProviders().ToArray()); // manual config has no columns by default + Add(Job.InProcess); + } + } + + [MemoryDiagnoser] + public class ChannelAdapterBenchmark + { + private MqttChannelAdapter _channelAdapter; + private int _iterations; + private MemoryStream _stream; + private MqttPublishPacket _packet; + + [GlobalSetup] + public void Setup() + { + var message = new MqttApplicationMessageBuilder() + .WithTopic("A") + .Build(); + + _packet = message.ToPublishPacket(); + var serializer = new MqttPacketSerializer(); + var serializedPacket = Join(serializer.Serialize(_packet)); + + _iterations = 10000; + + _stream = new MemoryStream(_iterations * serializedPacket.Length); + + for (var i = 0; i < _iterations; i++) + { + _stream.Write(serializedPacket, 0, serializedPacket.Length); + } + + _stream.Position = 0; + + var channel = new TestMqttChannel(_stream); + + _channelAdapter = new MqttChannelAdapter(channel, serializer, new MqttNetLogger().CreateChildLogger(nameof(MqttChannelAdapter))); + } + + [Benchmark] + public void Receive_10000_Messages() + { + _stream.Position = 0; + + for (var i = 0; i < 10000; i++) + { + _channelAdapter.ReceivePacketAsync(TimeSpan.Zero, CancellationToken.None).GetAwaiter().GetResult(); + } + + _stream.Position = 0; + } + + [Benchmark] + public void Send_10000_Messages() + { + _stream.Position = 0; + + for (var i = 0; i < 10000; i++) + { + _channelAdapter.SendPacketsAsync(TimeSpan.FromSeconds(15), new[] { _packet }, CancellationToken.None).GetAwaiter().GetResult(); + } + + _stream.Position = 0; + } + + private static byte[] Join(params ArraySegment[] chunks) + { + var buffer = new MemoryStream(); + foreach (var chunk in chunks) + { + buffer.Write(chunk.Array, chunk.Offset, chunk.Count); + } + + return buffer.ToArray(); + } + } +} diff --git a/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs b/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs new file mode 100644 index 0000000..6709ff7 --- /dev/null +++ b/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs @@ -0,0 +1,52 @@ +using BenchmarkDotNet.Attributes; +using MQTTnet.Adapter; +using MQTTnet.Channel; +using MQTTnet.Client; +using MQTTnet.Diagnostics; +using MQTTnet.Implementations; +using MQTTnet.Server; +using System.Threading; + +namespace MQTTnet.Benchmarks +{ + [MemoryDiagnoser] + public class MqttTcpChannelBenchmark + { + private IMqttServer _mqttServer; + private IMqttChannel _clientChannel; + private IMqttChannel _serverChannel; + + [GlobalSetup] + public void Setup() + { + var factory = new MqttFactory(); + var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); + tcpServer.ClientAccepted += (sender, args) => _serverChannel = (IMqttChannel)args.Client.GetType().GetField("_channel", System.Reflection.BindingFlags.NonPublic| System.Reflection.BindingFlags.Instance).GetValue(args.Client); + + _mqttServer = factory.CreateMqttServer(new[] { tcpServer }, new MqttNetLogger()); + + var serverOptions = new MqttServerOptionsBuilder().Build(); + _mqttServer.StartAsync(serverOptions).GetAwaiter().GetResult(); + + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost").Build(); + + _clientChannel = new MqttTcpChannel((MqttClientTcpOptions)clientOptions.ChannelOptions); + + _clientChannel.ConnectAsync(CancellationToken.None).GetAwaiter().GetResult(); + } + + [Benchmark] + public void Send_10000_Chunks() + { + var size = 5; + var iterations = 10000; + for (var i = 0; i < iterations; i++) + { + _serverChannel.WriteAsync(new byte[size], 0, size, CancellationToken.None).GetAwaiter().GetResult(); + _clientChannel.ReadAsync(new byte[size], 0, size, CancellationToken.None).GetAwaiter().GetResult(); + } + + } + } +} diff --git a/Tests/MQTTnet.Benchmarks/Program.cs b/Tests/MQTTnet.Benchmarks/Program.cs index a65de42..e643e97 100644 --- a/Tests/MQTTnet.Benchmarks/Program.cs +++ b/Tests/MQTTnet.Benchmarks/Program.cs @@ -13,6 +13,8 @@ namespace MQTTnet.Benchmarks Console.WriteLine("2 = SerializerBenchmark"); Console.WriteLine("3 = LoggerBenchmark"); Console.WriteLine("4 = TopicFilterComparerBenchmark"); + Console.WriteLine("5 = ChannelAdapterBenchmark"); + Console.WriteLine("6 = MqttTcpChannelBenchmark"); var pressedKey = Console.ReadKey(true); switch (pressedKey.KeyChar) @@ -29,6 +31,12 @@ namespace MQTTnet.Benchmarks case '4': BenchmarkRunner.Run(); break; + case '5': + BenchmarkRunner.Run(); + break; + case '6': + BenchmarkRunner.Run(); + break; } Console.ReadLine(); From cadf9f797f608a4a8f407164077d9d197580adcf Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Fri, 1 Jun 2018 08:47:00 +0200 Subject: [PATCH 2/3] added pipe based benchmak --- MQTTnet.sln | 3 + .../MQTTnet.Benchmarks.csproj | 2 + .../MqttTcpChannelBenchmark.cs | 39 ++- Tests/MQTTnet.Benchmarks/Program.cs | 4 + .../Tcp/BufferExtensions.cs | 24 ++ Tests/MQTTnet.Benchmarks/Tcp/DuplexPipe.cs | 41 +++ .../MQTTnet.Benchmarks/Tcp/SocketAwaitable.cs | 71 +++++ .../MQTTnet.Benchmarks/Tcp/SocketReceiver.cs | 39 +++ Tests/MQTTnet.Benchmarks/Tcp/SocketSender.cs | 99 +++++++ Tests/MQTTnet.Benchmarks/Tcp/TcpConnection.cs | 247 ++++++++++++++++++ Tests/MQTTnet.Benchmarks/TcpPipesBenchmark.cs | 72 +++++ 11 files changed, 633 insertions(+), 8 deletions(-) create mode 100644 Tests/MQTTnet.Benchmarks/Tcp/BufferExtensions.cs create mode 100644 Tests/MQTTnet.Benchmarks/Tcp/DuplexPipe.cs create mode 100644 Tests/MQTTnet.Benchmarks/Tcp/SocketAwaitable.cs create mode 100644 Tests/MQTTnet.Benchmarks/Tcp/SocketReceiver.cs create mode 100644 Tests/MQTTnet.Benchmarks/Tcp/SocketSender.cs create mode 100644 Tests/MQTTnet.Benchmarks/Tcp/TcpConnection.cs create mode 100644 Tests/MQTTnet.Benchmarks/TcpPipesBenchmark.cs diff --git a/MQTTnet.sln b/MQTTnet.sln index 6cb185d..a8370c6 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -41,6 +41,9 @@ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Benchmarks", "Tests\MQTTnet.Benchmarks\MQTTnet.Benchmarks.csproj", "{998D04DD-7CB0-45F5-A393-E2495C16399E}" EndProject Global + GlobalSection(Performance) = preSolution + HasPerformanceSessions = true + EndGlobalSection GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU Debug|ARM = Debug|ARM diff --git a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj index 72af093..ad3aa55 100644 --- a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj +++ b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj @@ -4,10 +4,12 @@ Exe Full net461 + 7.2 + diff --git a/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs b/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs index 6709ff7..740b83e 100644 --- a/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs @@ -1,11 +1,11 @@ using BenchmarkDotNet.Attributes; -using MQTTnet.Adapter; using MQTTnet.Channel; using MQTTnet.Client; using MQTTnet.Diagnostics; using MQTTnet.Implementations; using MQTTnet.Server; using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Benchmarks { @@ -13,8 +13,10 @@ namespace MQTTnet.Benchmarks public class MqttTcpChannelBenchmark { private IMqttServer _mqttServer; - private IMqttChannel _clientChannel; private IMqttChannel _serverChannel; + + + private IMqttChannel _clientChannel; [GlobalSetup] public void Setup() @@ -31,22 +33,43 @@ namespace MQTTnet.Benchmarks var clientOptions = new MqttClientOptionsBuilder() .WithTcpServer("localhost").Build(); - _clientChannel = new MqttTcpChannel((MqttClientTcpOptions)clientOptions.ChannelOptions); + var tcpOptions = (MqttClientTcpOptions) clientOptions.ChannelOptions; + _clientChannel = new MqttTcpChannel(tcpOptions); _clientChannel.ConnectAsync(CancellationToken.None).GetAwaiter().GetResult(); } - + [Benchmark] - public void Send_10000_Chunks() + public async Task Send_10000_Chunks() { var size = 5; var iterations = 10000; - for (var i = 0; i < iterations; i++) + + await Task.WhenAll(WriteAsync(iterations, size), ReadAsync(iterations, size)); + } + + private async Task ReadAsync(int iterations, int size) + { + await Task.Yield(); + + var expected = iterations * size; + long read = 0; + + while (read < expected) { - _serverChannel.WriteAsync(new byte[size], 0, size, CancellationToken.None).GetAwaiter().GetResult(); - _clientChannel.ReadAsync(new byte[size], 0, size, CancellationToken.None).GetAwaiter().GetResult(); + var readresult = await _clientChannel.ReadAsync(new byte[size], 0, size, CancellationToken.None).ConfigureAwait(false); + read += readresult; } + } + private async Task WriteAsync(int iterations, int size) + { + await Task.Yield(); + + for (var i = 0; i < iterations; i++) + { + await _serverChannel.WriteAsync(new byte[size], 0, size, CancellationToken.None).ConfigureAwait(false); + } } } } diff --git a/Tests/MQTTnet.Benchmarks/Program.cs b/Tests/MQTTnet.Benchmarks/Program.cs index e643e97..5306916 100644 --- a/Tests/MQTTnet.Benchmarks/Program.cs +++ b/Tests/MQTTnet.Benchmarks/Program.cs @@ -15,6 +15,7 @@ namespace MQTTnet.Benchmarks Console.WriteLine("4 = TopicFilterComparerBenchmark"); Console.WriteLine("5 = ChannelAdapterBenchmark"); Console.WriteLine("6 = MqttTcpChannelBenchmark"); + Console.WriteLine("7 = TcpPipesBenchmark"); var pressedKey = Console.ReadKey(true); switch (pressedKey.KeyChar) @@ -37,6 +38,9 @@ namespace MQTTnet.Benchmarks case '6': BenchmarkRunner.Run(); break; + case '7': + BenchmarkRunner.Run(); + break; } Console.ReadLine(); diff --git a/Tests/MQTTnet.Benchmarks/Tcp/BufferExtensions.cs b/Tests/MQTTnet.Benchmarks/Tcp/BufferExtensions.cs new file mode 100644 index 0000000..0a2c19e --- /dev/null +++ b/Tests/MQTTnet.Benchmarks/Tcp/BufferExtensions.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Text; + +namespace Playground.Client.Mqtt.Tcp +{ + public static class BufferExtensions + { + public static ArraySegment GetArray(this Memory memory) + { + return ((ReadOnlyMemory)memory).GetArray(); + } + + public static ArraySegment GetArray(this ReadOnlyMemory memory) + { + if (!MemoryMarshal.TryGetArray(memory, out var result)) + { + throw new InvalidOperationException("Buffer backed by array was expected"); + } + return result; + } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.Benchmarks/Tcp/DuplexPipe.cs b/Tests/MQTTnet.Benchmarks/Tcp/DuplexPipe.cs new file mode 100644 index 0000000..f5f3316 --- /dev/null +++ b/Tests/MQTTnet.Benchmarks/Tcp/DuplexPipe.cs @@ -0,0 +1,41 @@ +using System.IO.Pipelines; + +namespace MQTTnet.Benchmarks.Tcp +{ + public class DuplexPipe : IDuplexPipe + { + public DuplexPipe(PipeReader reader, PipeWriter writer) + { + Input = reader; + Output = writer; + } + + public PipeReader Input { get; } + + public PipeWriter Output { get; } + + public static DuplexPipePair CreateConnectionPair(PipeOptions inputOptions, PipeOptions outputOptions) + { + var input = new Pipe(inputOptions); + var output = new Pipe(outputOptions); + + var transportToApplication = new DuplexPipe(output.Reader, input.Writer); + var applicationToTransport = new DuplexPipe(input.Reader, output.Writer); + + return new DuplexPipePair(applicationToTransport, transportToApplication); + } + + // This class exists to work around issues with value tuple on .NET Framework + public readonly struct DuplexPipePair + { + public IDuplexPipe Transport { get; } + public IDuplexPipe Application { get; } + + public DuplexPipePair(IDuplexPipe transport, IDuplexPipe application) + { + Transport = transport; + Application = application; + } + } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.Benchmarks/Tcp/SocketAwaitable.cs b/Tests/MQTTnet.Benchmarks/Tcp/SocketAwaitable.cs new file mode 100644 index 0000000..dcaaaf4 --- /dev/null +++ b/Tests/MQTTnet.Benchmarks/Tcp/SocketAwaitable.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO.Pipelines; +using System.Net.Sockets; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Playground.Client.Mqtt.Tcp +{ + public class SocketAwaitable : ICriticalNotifyCompletion + { + private static readonly Action _callbackCompleted = () => { }; + + private readonly PipeScheduler _ioScheduler; + + private Action _callback; + private int _bytesTransferred; + private SocketError _error; + + public SocketAwaitable(PipeScheduler ioScheduler) + { + _ioScheduler = ioScheduler; + } + + public SocketAwaitable GetAwaiter() => this; + public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); + + public int GetResult() + { + Debug.Assert(ReferenceEquals(_callback, _callbackCompleted)); + + _callback = null; + + if (_error != SocketError.Success) + { + throw new SocketException((int)_error); + } + + return _bytesTransferred; + } + + public void OnCompleted(Action continuation) + { + if (ReferenceEquals(_callback, _callbackCompleted) || + ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted)) + { + Task.Run(continuation); + } + } + + public void UnsafeOnCompleted(Action continuation) + { + OnCompleted(continuation); + } + + public void Complete(int bytesTransferred, SocketError socketError) + { + _error = socketError; + _bytesTransferred = bytesTransferred; + var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted); + + if (continuation != null) + { + _ioScheduler.Schedule(state => ((Action)state)(), continuation); + } + } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.Benchmarks/Tcp/SocketReceiver.cs b/Tests/MQTTnet.Benchmarks/Tcp/SocketReceiver.cs new file mode 100644 index 0000000..e2f19c0 --- /dev/null +++ b/Tests/MQTTnet.Benchmarks/Tcp/SocketReceiver.cs @@ -0,0 +1,39 @@ +using System; +using System.IO.Pipelines; +using System.Net.Sockets; +using Playground.Client.Mqtt.Tcp; + +namespace MQTTnet.Benchmarks.Tcp +{ + public class SocketReceiver + { + private readonly Socket _socket; + private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs(); + private readonly SocketAwaitable _awaitable; + + public SocketReceiver(Socket socket, PipeScheduler scheduler) + { + _socket = socket; + _awaitable = new SocketAwaitable(scheduler); + _eventArgs.UserToken = _awaitable; + _eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError); + } + + public SocketAwaitable ReceiveAsync(Memory buffer) + { +#if NETCOREAPP2_1 + _eventArgs.SetBuffer(buffer); +#else + var segment = buffer.GetArray(); + + _eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count); +#endif + if (!_socket.ReceiveAsync(_eventArgs)) + { + _awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError); + } + + return _awaitable; + } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.Benchmarks/Tcp/SocketSender.cs b/Tests/MQTTnet.Benchmarks/Tcp/SocketSender.cs new file mode 100644 index 0000000..475fa68 --- /dev/null +++ b/Tests/MQTTnet.Benchmarks/Tcp/SocketSender.cs @@ -0,0 +1,99 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO.Pipelines; +using System.Net.Sockets; +using Playground.Client.Mqtt.Tcp; + +namespace MQTTnet.Benchmarks.Tcp +{ + public class SocketSender + { + private readonly Socket _socket; + private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs(); + private readonly SocketAwaitable _awaitable; + + private List> _bufferList; + + public SocketSender(Socket socket, PipeScheduler scheduler) + { + _socket = socket; + _awaitable = new SocketAwaitable(scheduler); + _eventArgs.UserToken = _awaitable; + _eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError); + } + + public SocketAwaitable SendAsync(in ReadOnlySequence buffers) + { + if (buffers.IsSingleSegment) + { + return SendAsync(buffers.First); + } + +#if NETCOREAPP2_1 + if (!_eventArgs.MemoryBuffer.Equals(Memory.Empty)) +#else + if (_eventArgs.Buffer != null) +#endif + { + _eventArgs.SetBuffer(null, 0, 0); + } + + _eventArgs.BufferList = GetBufferList(buffers); + + if (!_socket.SendAsync(_eventArgs)) + { + _awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError); + } + + return _awaitable; + } + + private SocketAwaitable SendAsync(ReadOnlyMemory memory) + { + // The BufferList getter is much less expensive then the setter. + if (_eventArgs.BufferList != null) + { + _eventArgs.BufferList = null; + } + +#if NETCOREAPP2_1 + _eventArgs.SetBuffer(MemoryMarshal.AsMemory(memory)); +#else + var segment = memory.GetArray(); + + _eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count); +#endif + if (!_socket.SendAsync(_eventArgs)) + { + _awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError); + } + + return _awaitable; + } + + private List> GetBufferList(in ReadOnlySequence buffer) + { + Debug.Assert(!buffer.IsEmpty); + Debug.Assert(!buffer.IsSingleSegment); + + if (_bufferList == null) + { + _bufferList = new List>(); + } + else + { + // Buffers are pooled, so it's OK to root them until the next multi-buffer write. + _bufferList.Clear(); + } + + foreach (var b in buffer) + { + _bufferList.Add(b.GetArray()); + } + + return _bufferList; + } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.Benchmarks/Tcp/TcpConnection.cs b/Tests/MQTTnet.Benchmarks/Tcp/TcpConnection.cs new file mode 100644 index 0000000..e73067c --- /dev/null +++ b/Tests/MQTTnet.Benchmarks/Tcp/TcpConnection.cs @@ -0,0 +1,247 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.IO.Pipelines; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using MQTTnet.Exceptions; +using Playground.Client.Mqtt.Tcp; + +namespace MQTTnet.Benchmarks.Tcp +{ + public class TcpConnection + { + private readonly Socket _socket; + private volatile bool _aborted; + private readonly EndPoint _endPoint; + private IDuplexPipe _application; + private IDuplexPipe _transport; + private readonly SocketSender _sender; + private readonly SocketReceiver _receiver; + + public TcpConnection(EndPoint endPoint) + { + _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + _endPoint = endPoint; + + _sender = new SocketSender(_socket, PipeScheduler.ThreadPool); + _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool); + } + + public TcpConnection(Socket socket) + { + _socket = socket; + _endPoint = socket.RemoteEndPoint; + + _sender = new SocketSender(_socket, PipeScheduler.ThreadPool); + _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool); + } + + public Task DisposeAsync() + { + _transport?.Output.Complete(); + _transport?.Input.Complete(); + + _socket?.Dispose(); + + return Task.CompletedTask; + } + + public async Task StartAsync() + { + if (!_socket.Connected) + { + await _socket.ConnectAsync(_endPoint); + } + + var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); + + _transport = pair.Transport; + _application = pair.Application; + + _ = ExecuteAsync(); + + return pair.Transport; + } + + private async Task ExecuteAsync() + { + Exception sendError = null; + try + { + // Spawn send and receive logic + var receiveTask = DoReceive(); + var sendTask = DoSend(); + + // If the sending task completes then close the receive + // We don't need to do this in the other direction because the kestrel + // will trigger the output closing once the input is complete. + if (await Task.WhenAny(receiveTask, sendTask) == sendTask) + { + // Tell the reader it's being aborted + _socket.Dispose(); + } + + // Now wait for both to complete + await receiveTask; + sendError = await sendTask; + + // Dispose the socket(should noop if already called) + _socket.Dispose(); + } + catch (Exception ex) + { + Console.WriteLine($"Unexpected exception in {nameof(TcpConnection)}.{nameof(StartAsync)}: " + ex); + } + finally + { + // Complete the output after disposing the socket + _application.Input.Complete(sendError); + } + } + private async Task DoReceive() + { + Exception error = null; + + try + { + await ProcessReceives(); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset) + { + error = new MqttCommunicationException(ex); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted || + ex.SocketErrorCode == SocketError.ConnectionAborted || + ex.SocketErrorCode == SocketError.Interrupted || + ex.SocketErrorCode == SocketError.InvalidArgument) + { + if (!_aborted) + { + // Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix. + //error = new MqttCommunicationException(); + } + } + catch (ObjectDisposedException) + { + if (!_aborted) + { + //error = new MqttCommunicationException(); + } + } + catch (IOException ex) + { + error = ex; + } + catch (Exception ex) + { + error = new IOException(ex.Message, ex); + } + finally + { + if (_aborted) + { + //error = error ?? new MqttCommunicationException(); + } + + _application.Output.Complete(error); + } + } + + private async Task ProcessReceives() + { + while (true) + { + // Ensure we have some reasonable amount of buffer space + var buffer = _application.Output.GetMemory(); + + var bytesReceived = await _receiver.ReceiveAsync(buffer); + + if (bytesReceived == 0) + { + // FIN + break; + } + + _application.Output.Advance(bytesReceived); + + var flushTask = _application.Output.FlushAsync(); + + if (!flushTask.IsCompleted) + { + await flushTask; + } + + var result = flushTask.GetAwaiter().GetResult(); + if (result.IsCompleted) + { + // Pipe consumer is shut down, do we stop writing + break; + } + } + } + + private async Task DoSend() + { + Exception error = null; + + try + { + await ProcessSends(); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted) + { + error = null; + } + catch (ObjectDisposedException) + { + error = null; + } + catch (IOException ex) + { + error = ex; + } + catch (Exception ex) + { + error = new IOException(ex.Message, ex); + } + finally + { + _aborted = true; + _socket.Shutdown(SocketShutdown.Both); + } + + return error; + } + + private async Task ProcessSends() + { + while (true) + { + // Wait for data to write from the pipe producer + var result = await _application.Input.ReadAsync(); + var buffer = result.Buffer; + + if (result.IsCanceled) + { + break; + } + + var end = buffer.End; + var isCompleted = result.IsCompleted; + if (!buffer.IsEmpty) + { + await _sender.SendAsync(buffer); + } + + _application.Input.AdvanceTo(end); + + if (isCompleted) + { + break; + } + } + } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.Benchmarks/TcpPipesBenchmark.cs b/Tests/MQTTnet.Benchmarks/TcpPipesBenchmark.cs new file mode 100644 index 0000000..cb3ffba --- /dev/null +++ b/Tests/MQTTnet.Benchmarks/TcpPipesBenchmark.cs @@ -0,0 +1,72 @@ +using System.IO.Pipelines; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using BenchmarkDotNet.Attributes; +using MQTTnet.Benchmarks.Tcp; + +namespace MQTTnet.Benchmarks +{ + [MemoryDiagnoser] + public class TcpPipesBenchmark + { + private IDuplexPipe _client; + private IDuplexPipe _server; + + [GlobalSetup] + public void Setup() + { + var server = new TcpListener(IPAddress.Any, 1883); + + server.Start(1); + + var task = Task.Run(() => server.AcceptSocket()); + + var clientConnection = new TcpConnection(new IPEndPoint(IPAddress.Loopback, 1883)); + + _client = clientConnection.StartAsync().GetAwaiter().GetResult(); + + var serverConnection = new TcpConnection(task.GetAwaiter().GetResult()); + _server = serverConnection.StartAsync().GetAwaiter().GetResult(); + } + + + [Benchmark] + public async Task Send_10000_Chunks_Pipe() + { + var size = 5; + var iterations = 10000; + + await Task.WhenAll(WriteAsync(iterations, size), ReadAsync(iterations, size)); + } + + private async Task ReadAsync(int iterations, int size) + { + await Task.Yield(); + + var expected = iterations * size; + long read = 0; + var input = _client.Input; + + while (read < expected) + { + var readresult = await input.ReadAsync(CancellationToken.None).ConfigureAwait(false); + input.AdvanceTo(readresult.Buffer.End); + read += readresult.Buffer.Length; + } + } + + private async Task WriteAsync(int iterations, int size) + { + await Task.Yield(); + + var output = _server.Output; + + for (var i = 0; i < iterations; i++) + { + await output.WriteAsync(new byte[size], CancellationToken.None).ConfigureAwait(false); + } + } + } +} From 2578b380f952e2c42443015553fd9a5516878094 Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Fri, 1 Jun 2018 12:27:06 +0200 Subject: [PATCH 3/3] fixed sln change and moved options to seperate file --- MQTTnet.sln | 3 --- Tests/MQTTnet.Benchmarks/AllowNonOptimized.cs | 23 +++++++++++++++++++ .../ChannelAdapterBenchmark.cs | 21 ----------------- 3 files changed, 23 insertions(+), 24 deletions(-) create mode 100644 Tests/MQTTnet.Benchmarks/AllowNonOptimized.cs diff --git a/MQTTnet.sln b/MQTTnet.sln index a8370c6..6cb185d 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -41,9 +41,6 @@ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Benchmarks", "Tests\MQTTnet.Benchmarks\MQTTnet.Benchmarks.csproj", "{998D04DD-7CB0-45F5-A393-E2495C16399E}" EndProject Global - GlobalSection(Performance) = preSolution - HasPerformanceSessions = true - EndGlobalSection GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU Debug|ARM = Debug|ARM diff --git a/Tests/MQTTnet.Benchmarks/AllowNonOptimized.cs b/Tests/MQTTnet.Benchmarks/AllowNonOptimized.cs new file mode 100644 index 0000000..ecb31a2 --- /dev/null +++ b/Tests/MQTTnet.Benchmarks/AllowNonOptimized.cs @@ -0,0 +1,23 @@ +using System.Linq; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Jobs; +using BenchmarkDotNet.Validators; + +namespace MQTTnet.Benchmarks +{ + /// + /// this options may be used to run benchmarks in debugmode and attach a performance profiler + /// https://benchmarkdotnet.org/Configs/Configs.htm + /// + public class AllowNonOptimized : ManualConfig + { + public AllowNonOptimized() + { + Add(JitOptimizationsValidator.DontFailOnError); // ALLOW NON-OPTIMIZED DLLS + Add(DefaultConfig.Instance.GetLoggers().ToArray()); // manual config has no loggers by default + Add(DefaultConfig.Instance.GetExporters().ToArray()); // manual config has no exporters by default + Add(DefaultConfig.Instance.GetColumnProviders().ToArray()); // manual config has no columns by default + Add(Job.InProcess); + } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs b/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs index 5f66e6b..726f5e0 100644 --- a/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs @@ -1,9 +1,4 @@ using BenchmarkDotNet.Attributes; -using BenchmarkDotNet.Attributes.Exporters; -using BenchmarkDotNet.Attributes.Jobs; -using BenchmarkDotNet.Configs; -using BenchmarkDotNet.Jobs; -using BenchmarkDotNet.Validators; using MQTTnet.Adapter; using MQTTnet.Core.Internal; using MQTTnet.Diagnostics; @@ -11,27 +6,11 @@ using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Serializer; using System; -using System.Collections.Generic; using System.IO; -using System.Linq; -using System.Text; using System.Threading; -using System.Threading.Tasks; namespace MQTTnet.Benchmarks { - public class AllowNonOptimized : ManualConfig - { - public AllowNonOptimized() - { - Add(JitOptimizationsValidator.DontFailOnError); // ALLOW NON-OPTIMIZED DLLS - Add(DefaultConfig.Instance.GetLoggers().ToArray()); // manual config has no loggers by default - Add(DefaultConfig.Instance.GetExporters().ToArray()); // manual config has no exporters by default - Add(DefaultConfig.Instance.GetColumnProviders().ToArray()); // manual config has no columns by default - Add(Job.InProcess); - } - } - [MemoryDiagnoser] public class ChannelAdapterBenchmark {