@@ -41,6 +41,9 @@ EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Benchmarks", "Tests\MQTTnet.Benchmarks\MQTTnet.Benchmarks.csproj", "{998D04DD-7CB0-45F5-A393-E2495C16399E}" | |||
EndProject | |||
Global | |||
GlobalSection(Performance) = preSolution | |||
HasPerformanceSessions = true | |||
EndGlobalSection | |||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | |||
Debug|Any CPU = Debug|Any CPU | |||
Debug|ARM = Debug|ARM | |||
@@ -4,10 +4,12 @@ | |||
<OutputType>Exe</OutputType> | |||
<DebugType>Full</DebugType> | |||
<TargetFramework>net461</TargetFramework> | |||
<LangVersion>7.2</LangVersion> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="BenchmarkDotNet" Version="0.10.14" /> | |||
<PackageReference Include="System.IO.Pipelines" Version="4.5.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
@@ -1,11 +1,11 @@ | |||
using BenchmarkDotNet.Attributes; | |||
using MQTTnet.Adapter; | |||
using MQTTnet.Channel; | |||
using MQTTnet.Client; | |||
using MQTTnet.Diagnostics; | |||
using MQTTnet.Implementations; | |||
using MQTTnet.Server; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace MQTTnet.Benchmarks | |||
{ | |||
@@ -13,8 +13,10 @@ namespace MQTTnet.Benchmarks | |||
public class MqttTcpChannelBenchmark | |||
{ | |||
private IMqttServer _mqttServer; | |||
private IMqttChannel _clientChannel; | |||
private IMqttChannel _serverChannel; | |||
private IMqttChannel _clientChannel; | |||
[GlobalSetup] | |||
public void Setup() | |||
@@ -31,22 +33,43 @@ namespace MQTTnet.Benchmarks | |||
var clientOptions = new MqttClientOptionsBuilder() | |||
.WithTcpServer("localhost").Build(); | |||
_clientChannel = new MqttTcpChannel((MqttClientTcpOptions)clientOptions.ChannelOptions); | |||
var tcpOptions = (MqttClientTcpOptions) clientOptions.ChannelOptions; | |||
_clientChannel = new MqttTcpChannel(tcpOptions); | |||
_clientChannel.ConnectAsync(CancellationToken.None).GetAwaiter().GetResult(); | |||
} | |||
[Benchmark] | |||
public void Send_10000_Chunks() | |||
public async Task Send_10000_Chunks() | |||
{ | |||
var size = 5; | |||
var iterations = 10000; | |||
for (var i = 0; i < iterations; i++) | |||
await Task.WhenAll(WriteAsync(iterations, size), ReadAsync(iterations, size)); | |||
} | |||
private async Task ReadAsync(int iterations, int size) | |||
{ | |||
await Task.Yield(); | |||
var expected = iterations * size; | |||
long read = 0; | |||
while (read < expected) | |||
{ | |||
_serverChannel.WriteAsync(new byte[size], 0, size, CancellationToken.None).GetAwaiter().GetResult(); | |||
_clientChannel.ReadAsync(new byte[size], 0, size, CancellationToken.None).GetAwaiter().GetResult(); | |||
var readresult = await _clientChannel.ReadAsync(new byte[size], 0, size, CancellationToken.None).ConfigureAwait(false); | |||
read += readresult; | |||
} | |||
} | |||
private async Task WriteAsync(int iterations, int size) | |||
{ | |||
await Task.Yield(); | |||
for (var i = 0; i < iterations; i++) | |||
{ | |||
await _serverChannel.WriteAsync(new byte[size], 0, size, CancellationToken.None).ConfigureAwait(false); | |||
} | |||
} | |||
} | |||
} |
@@ -15,6 +15,7 @@ namespace MQTTnet.Benchmarks | |||
Console.WriteLine("4 = TopicFilterComparerBenchmark"); | |||
Console.WriteLine("5 = ChannelAdapterBenchmark"); | |||
Console.WriteLine("6 = MqttTcpChannelBenchmark"); | |||
Console.WriteLine("7 = TcpPipesBenchmark"); | |||
var pressedKey = Console.ReadKey(true); | |||
switch (pressedKey.KeyChar) | |||
@@ -37,6 +38,9 @@ namespace MQTTnet.Benchmarks | |||
case '6': | |||
BenchmarkRunner.Run<MqttTcpChannelBenchmark>(); | |||
break; | |||
case '7': | |||
BenchmarkRunner.Run<TcpPipesBenchmark>(); | |||
break; | |||
} | |||
Console.ReadLine(); | |||
@@ -0,0 +1,24 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Runtime.InteropServices; | |||
using System.Text; | |||
namespace Playground.Client.Mqtt.Tcp | |||
{ | |||
public static class BufferExtensions | |||
{ | |||
public static ArraySegment<byte> GetArray(this Memory<byte> memory) | |||
{ | |||
return ((ReadOnlyMemory<byte>)memory).GetArray(); | |||
} | |||
public static ArraySegment<byte> GetArray(this ReadOnlyMemory<byte> memory) | |||
{ | |||
if (!MemoryMarshal.TryGetArray(memory, out var result)) | |||
{ | |||
throw new InvalidOperationException("Buffer backed by array was expected"); | |||
} | |||
return result; | |||
} | |||
} | |||
} |
@@ -0,0 +1,41 @@ | |||
using System.IO.Pipelines; | |||
namespace MQTTnet.Benchmarks.Tcp | |||
{ | |||
public class DuplexPipe : IDuplexPipe | |||
{ | |||
public DuplexPipe(PipeReader reader, PipeWriter writer) | |||
{ | |||
Input = reader; | |||
Output = writer; | |||
} | |||
public PipeReader Input { get; } | |||
public PipeWriter Output { get; } | |||
public static DuplexPipePair CreateConnectionPair(PipeOptions inputOptions, PipeOptions outputOptions) | |||
{ | |||
var input = new Pipe(inputOptions); | |||
var output = new Pipe(outputOptions); | |||
var transportToApplication = new DuplexPipe(output.Reader, input.Writer); | |||
var applicationToTransport = new DuplexPipe(input.Reader, output.Writer); | |||
return new DuplexPipePair(applicationToTransport, transportToApplication); | |||
} | |||
// This class exists to work around issues with value tuple on .NET Framework | |||
public readonly struct DuplexPipePair | |||
{ | |||
public IDuplexPipe Transport { get; } | |||
public IDuplexPipe Application { get; } | |||
public DuplexPipePair(IDuplexPipe transport, IDuplexPipe application) | |||
{ | |||
Transport = transport; | |||
Application = application; | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,71 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Diagnostics; | |||
using System.IO.Pipelines; | |||
using System.Net.Sockets; | |||
using System.Runtime.CompilerServices; | |||
using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace Playground.Client.Mqtt.Tcp | |||
{ | |||
public class SocketAwaitable : ICriticalNotifyCompletion | |||
{ | |||
private static readonly Action _callbackCompleted = () => { }; | |||
private readonly PipeScheduler _ioScheduler; | |||
private Action _callback; | |||
private int _bytesTransferred; | |||
private SocketError _error; | |||
public SocketAwaitable(PipeScheduler ioScheduler) | |||
{ | |||
_ioScheduler = ioScheduler; | |||
} | |||
public SocketAwaitable GetAwaiter() => this; | |||
public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); | |||
public int GetResult() | |||
{ | |||
Debug.Assert(ReferenceEquals(_callback, _callbackCompleted)); | |||
_callback = null; | |||
if (_error != SocketError.Success) | |||
{ | |||
throw new SocketException((int)_error); | |||
} | |||
return _bytesTransferred; | |||
} | |||
public void OnCompleted(Action continuation) | |||
{ | |||
if (ReferenceEquals(_callback, _callbackCompleted) || | |||
ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted)) | |||
{ | |||
Task.Run(continuation); | |||
} | |||
} | |||
public void UnsafeOnCompleted(Action continuation) | |||
{ | |||
OnCompleted(continuation); | |||
} | |||
public void Complete(int bytesTransferred, SocketError socketError) | |||
{ | |||
_error = socketError; | |||
_bytesTransferred = bytesTransferred; | |||
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted); | |||
if (continuation != null) | |||
{ | |||
_ioScheduler.Schedule(state => ((Action)state)(), continuation); | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,39 @@ | |||
using System; | |||
using System.IO.Pipelines; | |||
using System.Net.Sockets; | |||
using Playground.Client.Mqtt.Tcp; | |||
namespace MQTTnet.Benchmarks.Tcp | |||
{ | |||
public class SocketReceiver | |||
{ | |||
private readonly Socket _socket; | |||
private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs(); | |||
private readonly SocketAwaitable _awaitable; | |||
public SocketReceiver(Socket socket, PipeScheduler scheduler) | |||
{ | |||
_socket = socket; | |||
_awaitable = new SocketAwaitable(scheduler); | |||
_eventArgs.UserToken = _awaitable; | |||
_eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError); | |||
} | |||
public SocketAwaitable ReceiveAsync(Memory<byte> buffer) | |||
{ | |||
#if NETCOREAPP2_1 | |||
_eventArgs.SetBuffer(buffer); | |||
#else | |||
var segment = buffer.GetArray(); | |||
_eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count); | |||
#endif | |||
if (!_socket.ReceiveAsync(_eventArgs)) | |||
{ | |||
_awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError); | |||
} | |||
return _awaitable; | |||
} | |||
} | |||
} |
@@ -0,0 +1,99 @@ | |||
using System; | |||
using System.Buffers; | |||
using System.Collections.Generic; | |||
using System.Diagnostics; | |||
using System.IO.Pipelines; | |||
using System.Net.Sockets; | |||
using Playground.Client.Mqtt.Tcp; | |||
namespace MQTTnet.Benchmarks.Tcp | |||
{ | |||
public class SocketSender | |||
{ | |||
private readonly Socket _socket; | |||
private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs(); | |||
private readonly SocketAwaitable _awaitable; | |||
private List<ArraySegment<byte>> _bufferList; | |||
public SocketSender(Socket socket, PipeScheduler scheduler) | |||
{ | |||
_socket = socket; | |||
_awaitable = new SocketAwaitable(scheduler); | |||
_eventArgs.UserToken = _awaitable; | |||
_eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError); | |||
} | |||
public SocketAwaitable SendAsync(in ReadOnlySequence<byte> buffers) | |||
{ | |||
if (buffers.IsSingleSegment) | |||
{ | |||
return SendAsync(buffers.First); | |||
} | |||
#if NETCOREAPP2_1 | |||
if (!_eventArgs.MemoryBuffer.Equals(Memory<byte>.Empty)) | |||
#else | |||
if (_eventArgs.Buffer != null) | |||
#endif | |||
{ | |||
_eventArgs.SetBuffer(null, 0, 0); | |||
} | |||
_eventArgs.BufferList = GetBufferList(buffers); | |||
if (!_socket.SendAsync(_eventArgs)) | |||
{ | |||
_awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError); | |||
} | |||
return _awaitable; | |||
} | |||
private SocketAwaitable SendAsync(ReadOnlyMemory<byte> memory) | |||
{ | |||
// The BufferList getter is much less expensive then the setter. | |||
if (_eventArgs.BufferList != null) | |||
{ | |||
_eventArgs.BufferList = null; | |||
} | |||
#if NETCOREAPP2_1 | |||
_eventArgs.SetBuffer(MemoryMarshal.AsMemory(memory)); | |||
#else | |||
var segment = memory.GetArray(); | |||
_eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count); | |||
#endif | |||
if (!_socket.SendAsync(_eventArgs)) | |||
{ | |||
_awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError); | |||
} | |||
return _awaitable; | |||
} | |||
private List<ArraySegment<byte>> GetBufferList(in ReadOnlySequence<byte> buffer) | |||
{ | |||
Debug.Assert(!buffer.IsEmpty); | |||
Debug.Assert(!buffer.IsSingleSegment); | |||
if (_bufferList == null) | |||
{ | |||
_bufferList = new List<ArraySegment<byte>>(); | |||
} | |||
else | |||
{ | |||
// Buffers are pooled, so it's OK to root them until the next multi-buffer write. | |||
_bufferList.Clear(); | |||
} | |||
foreach (var b in buffer) | |||
{ | |||
_bufferList.Add(b.GetArray()); | |||
} | |||
return _bufferList; | |||
} | |||
} | |||
} |
@@ -0,0 +1,247 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.IO; | |||
using System.IO.Pipelines; | |||
using System.Net; | |||
using System.Net.Sockets; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Exceptions; | |||
using Playground.Client.Mqtt.Tcp; | |||
namespace MQTTnet.Benchmarks.Tcp | |||
{ | |||
public class TcpConnection | |||
{ | |||
private readonly Socket _socket; | |||
private volatile bool _aborted; | |||
private readonly EndPoint _endPoint; | |||
private IDuplexPipe _application; | |||
private IDuplexPipe _transport; | |||
private readonly SocketSender _sender; | |||
private readonly SocketReceiver _receiver; | |||
public TcpConnection(EndPoint endPoint) | |||
{ | |||
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp); | |||
_endPoint = endPoint; | |||
_sender = new SocketSender(_socket, PipeScheduler.ThreadPool); | |||
_receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool); | |||
} | |||
public TcpConnection(Socket socket) | |||
{ | |||
_socket = socket; | |||
_endPoint = socket.RemoteEndPoint; | |||
_sender = new SocketSender(_socket, PipeScheduler.ThreadPool); | |||
_receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool); | |||
} | |||
public Task DisposeAsync() | |||
{ | |||
_transport?.Output.Complete(); | |||
_transport?.Input.Complete(); | |||
_socket?.Dispose(); | |||
return Task.CompletedTask; | |||
} | |||
public async Task<IDuplexPipe> StartAsync() | |||
{ | |||
if (!_socket.Connected) | |||
{ | |||
await _socket.ConnectAsync(_endPoint); | |||
} | |||
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); | |||
_transport = pair.Transport; | |||
_application = pair.Application; | |||
_ = ExecuteAsync(); | |||
return pair.Transport; | |||
} | |||
private async Task ExecuteAsync() | |||
{ | |||
Exception sendError = null; | |||
try | |||
{ | |||
// Spawn send and receive logic | |||
var receiveTask = DoReceive(); | |||
var sendTask = DoSend(); | |||
// If the sending task completes then close the receive | |||
// We don't need to do this in the other direction because the kestrel | |||
// will trigger the output closing once the input is complete. | |||
if (await Task.WhenAny(receiveTask, sendTask) == sendTask) | |||
{ | |||
// Tell the reader it's being aborted | |||
_socket.Dispose(); | |||
} | |||
// Now wait for both to complete | |||
await receiveTask; | |||
sendError = await sendTask; | |||
// Dispose the socket(should noop if already called) | |||
_socket.Dispose(); | |||
} | |||
catch (Exception ex) | |||
{ | |||
Console.WriteLine($"Unexpected exception in {nameof(TcpConnection)}.{nameof(StartAsync)}: " + ex); | |||
} | |||
finally | |||
{ | |||
// Complete the output after disposing the socket | |||
_application.Input.Complete(sendError); | |||
} | |||
} | |||
private async Task DoReceive() | |||
{ | |||
Exception error = null; | |||
try | |||
{ | |||
await ProcessReceives(); | |||
} | |||
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset) | |||
{ | |||
error = new MqttCommunicationException(ex); | |||
} | |||
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted || | |||
ex.SocketErrorCode == SocketError.ConnectionAborted || | |||
ex.SocketErrorCode == SocketError.Interrupted || | |||
ex.SocketErrorCode == SocketError.InvalidArgument) | |||
{ | |||
if (!_aborted) | |||
{ | |||
// Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix. | |||
//error = new MqttCommunicationException(); | |||
} | |||
} | |||
catch (ObjectDisposedException) | |||
{ | |||
if (!_aborted) | |||
{ | |||
//error = new MqttCommunicationException(); | |||
} | |||
} | |||
catch (IOException ex) | |||
{ | |||
error = ex; | |||
} | |||
catch (Exception ex) | |||
{ | |||
error = new IOException(ex.Message, ex); | |||
} | |||
finally | |||
{ | |||
if (_aborted) | |||
{ | |||
//error = error ?? new MqttCommunicationException(); | |||
} | |||
_application.Output.Complete(error); | |||
} | |||
} | |||
private async Task ProcessReceives() | |||
{ | |||
while (true) | |||
{ | |||
// Ensure we have some reasonable amount of buffer space | |||
var buffer = _application.Output.GetMemory(); | |||
var bytesReceived = await _receiver.ReceiveAsync(buffer); | |||
if (bytesReceived == 0) | |||
{ | |||
// FIN | |||
break; | |||
} | |||
_application.Output.Advance(bytesReceived); | |||
var flushTask = _application.Output.FlushAsync(); | |||
if (!flushTask.IsCompleted) | |||
{ | |||
await flushTask; | |||
} | |||
var result = flushTask.GetAwaiter().GetResult(); | |||
if (result.IsCompleted) | |||
{ | |||
// Pipe consumer is shut down, do we stop writing | |||
break; | |||
} | |||
} | |||
} | |||
private async Task<Exception> DoSend() | |||
{ | |||
Exception error = null; | |||
try | |||
{ | |||
await ProcessSends(); | |||
} | |||
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted) | |||
{ | |||
error = null; | |||
} | |||
catch (ObjectDisposedException) | |||
{ | |||
error = null; | |||
} | |||
catch (IOException ex) | |||
{ | |||
error = ex; | |||
} | |||
catch (Exception ex) | |||
{ | |||
error = new IOException(ex.Message, ex); | |||
} | |||
finally | |||
{ | |||
_aborted = true; | |||
_socket.Shutdown(SocketShutdown.Both); | |||
} | |||
return error; | |||
} | |||
private async Task ProcessSends() | |||
{ | |||
while (true) | |||
{ | |||
// Wait for data to write from the pipe producer | |||
var result = await _application.Input.ReadAsync(); | |||
var buffer = result.Buffer; | |||
if (result.IsCanceled) | |||
{ | |||
break; | |||
} | |||
var end = buffer.End; | |||
var isCompleted = result.IsCompleted; | |||
if (!buffer.IsEmpty) | |||
{ | |||
await _sender.SendAsync(buffer); | |||
} | |||
_application.Input.AdvanceTo(end); | |||
if (isCompleted) | |||
{ | |||
break; | |||
} | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,72 @@ | |||
using System.IO.Pipelines; | |||
using System.Net; | |||
using System.Net.Sockets; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using BenchmarkDotNet.Attributes; | |||
using MQTTnet.Benchmarks.Tcp; | |||
namespace MQTTnet.Benchmarks | |||
{ | |||
[MemoryDiagnoser] | |||
public class TcpPipesBenchmark | |||
{ | |||
private IDuplexPipe _client; | |||
private IDuplexPipe _server; | |||
[GlobalSetup] | |||
public void Setup() | |||
{ | |||
var server = new TcpListener(IPAddress.Any, 1883); | |||
server.Start(1); | |||
var task = Task.Run(() => server.AcceptSocket()); | |||
var clientConnection = new TcpConnection(new IPEndPoint(IPAddress.Loopback, 1883)); | |||
_client = clientConnection.StartAsync().GetAwaiter().GetResult(); | |||
var serverConnection = new TcpConnection(task.GetAwaiter().GetResult()); | |||
_server = serverConnection.StartAsync().GetAwaiter().GetResult(); | |||
} | |||
[Benchmark] | |||
public async Task Send_10000_Chunks_Pipe() | |||
{ | |||
var size = 5; | |||
var iterations = 10000; | |||
await Task.WhenAll(WriteAsync(iterations, size), ReadAsync(iterations, size)); | |||
} | |||
private async Task ReadAsync(int iterations, int size) | |||
{ | |||
await Task.Yield(); | |||
var expected = iterations * size; | |||
long read = 0; | |||
var input = _client.Input; | |||
while (read < expected) | |||
{ | |||
var readresult = await input.ReadAsync(CancellationToken.None).ConfigureAwait(false); | |||
input.AdvanceTo(readresult.Buffer.End); | |||
read += readresult.Buffer.Length; | |||
} | |||
} | |||
private async Task WriteAsync(int iterations, int size) | |||
{ | |||
await Task.Yield(); | |||
var output = _server.Output; | |||
for (var i = 0; i < iterations; i++) | |||
{ | |||
await output.WriteAsync(new byte[size], CancellationToken.None).ConfigureAwait(false); | |||
} | |||
} | |||
} | |||
} |