From 473c8e0a15a499cc47a246838d397b70992d66e5 Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Tue, 12 Sep 2017 10:11:21 +0200 Subject: [PATCH] send multiple packages at once msg/sec from 9000 to 32000 --- .../Implementations/MqttTcpChannel.cs | 22 ++++--- .../Implementations/MqttWebSocketChannel.cs | 8 ++- .../Implementations/MqttTcpChannel.cs | 3 +- .../Implementations/MqttWebSocketChannel.cs | 9 +-- .../Adapter/IMqttCommunicationAdapter.cs | 11 +++- .../MqttChannelCommunicationAdapter.cs | 22 ++++--- .../Channel/IMqttCommunicationChannel.cs | 4 +- MQTTnet.Core/Client/IMqttClient.cs | 10 +++- MQTTnet.Core/Client/MqttClient.cs | 58 +++++++++++-------- MQTTnet.Core/Server/MqttClientMessageQueue.cs | 2 +- MQTTnet.Core/Server/MqttClientSession.cs | 14 ++--- .../Server/MqttClientSessionsManager.cs | 8 +-- .../MqttPacketSerializerTests.cs | 4 +- .../TestMqttCommunicationAdapter.cs | 9 ++- .../PerformanceTest.cs | 48 ++++++++++----- 15 files changed, 151 insertions(+), 81 deletions(-) diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index dede268..a1ee8c5 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -13,11 +13,13 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { - private Stream _dataStream; + private Stream _receiveStream; + private Stream _sendStream; private Socket _socket; private SslStream _sslStream; - public Stream Stream => _dataStream; + public Stream ReceiveStream => _receiveStream; + public Stream SendStream => _sendStream; /// /// called on client sockets are created in connect @@ -35,7 +37,7 @@ namespace MQTTnet.Implementations { _socket = socket ?? throw new ArgumentNullException(nameof(socket)); _sslStream = sslStream; - _dataStream = (Stream)sslStream ?? new NetworkStream(socket); + CreateCommStreams( socket, sslStream ); } public async Task ConnectAsync(MqttClientOptions options) @@ -54,13 +56,10 @@ namespace MQTTnet.Implementations { _sslStream = new SslStream(new NetworkStream(_socket, true)); - _dataStream = _sslStream; await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false); } - else - { - _dataStream = new NetworkStream(_socket); - } + + CreateCommStreams( _socket, _sslStream ); } catch (SocketException exception) { @@ -90,6 +89,13 @@ namespace MQTTnet.Implementations _sslStream = null; } + private void CreateCommStreams( Socket socket, SslStream sslStream ) + { + //cannot use this as default buffering prevents from receiving the first connect message + _receiveStream = (Stream)sslStream ?? new NetworkStream( socket ); + _sendStream = new BufferedStream( _receiveStream, BufferConstants.Size ); + } + private static X509CertificateCollection LoadCertificates(MqttClientOptions options) { var certificates = new X509CertificateCollection(); diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs index c3aed02..02cc32e 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs @@ -13,7 +13,9 @@ namespace MQTTnet.Implementations { private ClientWebSocket _webSocket = new ClientWebSocket(); - public Stream Stream { get; private set; } + public Stream ReceiveStream { get; private set; } + + public Stream SendStream { get; private set; } public async Task ConnectAsync(MqttClientOptions options) { @@ -24,7 +26,7 @@ namespace MQTTnet.Implementations _webSocket = new ClientWebSocket(); await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); - Stream = new WebSocketStream(_webSocket); + ReceiveStream = SendStream = new WebSocketStream(_webSocket); } catch (WebSocketException exception) { @@ -34,7 +36,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync() { - Stream = null; + ReceiveStream = null; return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index 817ab44..b8916e0 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -18,7 +18,8 @@ namespace MQTTnet.Implementations private SslStream _sslStream; - public Stream Stream => _dataStream; + public Stream ReceiveStream => _dataStream; + public Stream SendStream => _dataStream; /// /// called on client sockets are created in connect diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index 2e0bdf8..abce128 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -12,8 +12,9 @@ namespace MQTTnet.Implementations public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { private ClientWebSocket _webSocket = new ClientWebSocket(); - - public Stream Stream { get; private set; } + + public Stream SendStream { get; private set; } + public Stream ReceiveStream { get; private set; } public async Task ConnectAsync(MqttClientOptions options) { @@ -24,7 +25,7 @@ namespace MQTTnet.Implementations _webSocket = new ClientWebSocket(); await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); - Stream = new WebSocketStream(_webSocket); + SendStream = ReceiveStream = new WebSocketStream(_webSocket); } catch (WebSocketException exception) { @@ -34,7 +35,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync() { - Stream = null; + SendStream = ReceiveStream = null; return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } diff --git a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs index 77ab898..2aa2594 100644 --- a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using MQTTnet.Core.Client; using MQTTnet.Core.Packets; @@ -12,10 +13,18 @@ namespace MQTTnet.Core.Adapter Task DisconnectAsync(); - Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout); + Task SendPacketsAsync( TimeSpan timeout, IEnumerable packets ); Task ReceivePacketAsync(TimeSpan timeout); IMqttPacketSerializer PacketSerializer { get; } } + + public static class IMqttCommunicationAdapterExtensions + { + public static Task SendPacketsAsync( this IMqttCommunicationAdapter adapter, TimeSpan timeout, params MqttBasePacket[] packets ) + { + return adapter.SendPacketsAsync( timeout, packets ); + } + } } diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index 63740b2..b8a15fa 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -34,13 +35,18 @@ namespace MQTTnet.Core.Adapter return _channel.DisconnectAsync(); } - public Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) + public async Task SendPacketsAsync( TimeSpan timeout, IEnumerable packets ) { - MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout); + foreach (var packet in packets ) + { + MqttTrace.Information( nameof( MqttChannelCommunicationAdapter ), "TX >>> {0} [Timeout={1}]", packet, timeout ); + + var writeBuffer = PacketSerializer.Serialize(packet); + _sendTask = SendAsync( writeBuffer ); + } - var writeBuffer = PacketSerializer.Serialize(packet); - _sendTask = SendAsync( writeBuffer ); - return _sendTask.TimeoutAfter(timeout); + await _sendTask.ConfigureAwait( false ); + await _channel.SendStream.FlushAsync().TimeoutAfter( timeout ).ConfigureAwait( false ); } private Task _sendTask = Task.FromResult(0); // this task is used to prevent overlapping write @@ -48,7 +54,7 @@ namespace MQTTnet.Core.Adapter private async Task SendAsync(byte[] buffer) { await _sendTask.ConfigureAwait(false); - await _channel.Stream.WriteAsync(buffer, 0, buffer.Length).ConfigureAwait( false ); + await _channel.SendStream.WriteAsync(buffer, 0, buffer.Length).ConfigureAwait( false ); } public async Task ReceivePacketAsync(TimeSpan timeout) @@ -76,7 +82,7 @@ namespace MQTTnet.Core.Adapter private async Task> ReceiveAsync() { - var header = MqttPacketReader.ReadHeaderFromSource(_channel.Stream); + var header = MqttPacketReader.ReadHeaderFromSource(_channel.ReceiveStream); MemoryStream body = null; if (header.BodyLength > 0) @@ -85,7 +91,7 @@ namespace MQTTnet.Core.Adapter var readBuffer = new byte[header.BodyLength]; do { - var read = await _channel.Stream.ReadAsync(readBuffer, totalRead, header.BodyLength - totalRead) + var read = await _channel.ReceiveStream.ReadAsync(readBuffer, totalRead, header.BodyLength - totalRead) .ConfigureAwait( false ); totalRead += read; } while (totalRead < header.BodyLength); diff --git a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs index 6fb72f3..6b0854d 100644 --- a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs +++ b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs @@ -10,6 +10,8 @@ namespace MQTTnet.Core.Channel Task DisconnectAsync(); - Stream Stream { get; } + Stream SendStream { get; } + + Stream ReceiveStream { get; } } } diff --git a/MQTTnet.Core/Client/IMqttClient.cs b/MQTTnet.Core/Client/IMqttClient.cs index 0170adf..8446e25 100644 --- a/MQTTnet.Core/Client/IMqttClient.cs +++ b/MQTTnet.Core/Client/IMqttClient.cs @@ -15,10 +15,18 @@ namespace MQTTnet.Core.Client Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null); Task DisconnectAsync(); - Task PublishAsync(MqttApplicationMessage applicationMessage); + Task PublishAsync(IEnumerable applicationMessages); Task> SubscribeAsync(IList topicFilters); Task> SubscribeAsync(params TopicFilter[] topicFilters); Task Unsubscribe(IList topicFilters); Task Unsubscribe(params string[] topicFilters); } + + public static class IMqttClientExtensions + { + public static Task PublishAsync( this IMqttClient client, params MqttApplicationMessage[] applicationMessages ) + { + return client.PublishAsync( applicationMessages ); + } + } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index deb0242..62c6bb3 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -161,32 +161,43 @@ namespace MQTTnet.Core.Client return SendAndReceiveAsync(unsubscribePacket); } - public Task PublishAsync(MqttApplicationMessage applicationMessage) + public async Task PublishAsync(IEnumerable applicationMessages) { - if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); ThrowIfNotConnected(); - var publishPacket = applicationMessage.ToPublishPacket(); + var publishPackets = applicationMessages.Select(m => m.ToPublishPacket()); - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) + foreach (var qosGroup in publishPackets.GroupBy(p => p.QualityOfServiceLevel)) { - // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - return SendAsync(publishPacket); - } - - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) - { - publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - return SendAndReceiveAsync(publishPacket); - } - - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) - { - publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - return PublishExactlyOncePacketAsync(publishPacket); + var qosPackets = qosGroup.ToArray(); + switch ( qosGroup.Key ) + { + case MqttQualityOfServiceLevel.AtMostOnce: + // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] + await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, qosPackets); + break; + case MqttQualityOfServiceLevel.AtLeastOnce: + { + foreach (var publishPacket in qosPackets) + { + publishPacket.PacketIdentifier = GetNewPacketIdentifier(); + await SendAndReceiveAsync(publishPacket); + } + break; + } + case MqttQualityOfServiceLevel.ExactlyOnce: + { + foreach (var publishPacket in qosPackets) + { + publishPacket.PacketIdentifier = GetNewPacketIdentifier(); + await PublishExactlyOncePacketAsync( publishPacket ); + } + break; + } + default: + throw new InvalidOperationException(); + } } - - throw new InvalidOperationException(); } private async Task PublishExactlyOncePacketAsync(MqttBasePacket publishPacket) @@ -312,14 +323,13 @@ namespace MQTTnet.Core.Client private Task SendAsync(MqttBasePacket packet) { - return _adapter.SendPacketAsync(packet, _options.DefaultCommunicationTimeout); + return _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, packet); } private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket { - await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false); - - return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout).ConfigureAwait(false); + await _adapter.SendPacketsAsync( _options.DefaultCommunicationTimeout, requestPacket ).ConfigureAwait(false); + return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(requestPacket, typeof( TResponsePacket ), _options.DefaultCommunicationTimeout).ConfigureAwait(false); } private ushort GetNewPacketIdentifier() diff --git a/MQTTnet.Core/Server/MqttClientMessageQueue.cs b/MQTTnet.Core/Server/MqttClientMessageQueue.cs index 22fbdbb..ccf779d 100644 --- a/MQTTnet.Core/Server/MqttClientMessageQueue.cs +++ b/MQTTnet.Core/Server/MqttClientMessageQueue.cs @@ -105,7 +105,7 @@ namespace MQTTnet.Core.Server } publishPacketContext.PublishPacket.Dup = publishPacketContext.SendTries > 0; - await _adapter.SendPacketAsync(publishPacketContext.PublishPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false); + await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, publishPacketContext.PublishPacket).ConfigureAwait(false); publishPacketContext.IsSent = true; } diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 9b811ec..1411dd3 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -103,12 +103,12 @@ namespace MQTTnet.Core.Server { if (packet is MqttSubscribePacket subscribePacket) { - return Adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _subscriptionsManager.Subscribe(subscribePacket)); } if (packet is MqttUnsubscribePacket unsubscribePacket) { - return Adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _subscriptionsManager.Unsubscribe(unsubscribePacket)); } if (packet is MqttPublishPacket publishPacket) @@ -123,7 +123,7 @@ namespace MQTTnet.Core.Server if (packet is MqttPubRecPacket pubRecPacket) { - return Adapter.SendPacketAsync(pubRecPacket.CreateResponse(), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, pubRecPacket.CreateResponse()); } if (packet is MqttPubAckPacket || packet is MqttPubCompPacket) @@ -134,7 +134,7 @@ namespace MQTTnet.Core.Server if (packet is MqttPingReqPacket) { - return Adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttPingRespPacket()); } if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) @@ -160,7 +160,7 @@ namespace MQTTnet.Core.Server if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { _publishPacketReceivedCallback(this, publishPacket); - return Adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) @@ -173,7 +173,7 @@ namespace MQTTnet.Core.Server _publishPacketReceivedCallback(this, publishPacket); - return Adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } throw new MqttCommunicationException("Received a not supported QoS level."); @@ -186,7 +186,7 @@ namespace MQTTnet.Core.Server _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - return Adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }); } } } diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index b8ff4e6..483398d 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -40,21 +40,21 @@ namespace MQTTnet.Core.Server var connectReturnCode = ValidateConnection(connectPacket); if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { - await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket + await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode - }, _options.DefaultCommunicationTimeout).ConfigureAwait(false); + }).ConfigureAwait(false); return; } var clientSession = GetOrCreateClientSession(connectPacket); - await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket + await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode, IsSessionPresent = clientSession.IsExistingSession - }, _options.DefaultCommunicationTimeout).ConfigureAwait(false); + }).ConfigureAwait(false); await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter).ConfigureAwait(false); } diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index bf0d6f5..14da310 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -391,7 +391,9 @@ namespace MQTTnet.Core.Tests { private readonly MemoryStream _stream = new MemoryStream(); - public Stream Stream => _stream; + public Stream ReceiveStream => _stream; + + public Stream SendStream => _stream; public bool IsConnected { get; } = true; diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index fe6e448..50eb8a7 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; @@ -26,11 +27,15 @@ namespace MQTTnet.Core.Tests return Task.FromResult(0); } - public Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) + public Task SendPacketsAsync(TimeSpan timeout, IEnumerable packets) { ThrowIfPartnerIsNull(); - Partner.SendPacketInternal(packet); + foreach (var packet in packets) + { + Partner.SendPacketInternal(packet); + } + return Task.FromResult(0); } diff --git a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs index 6175f32..fde64d9 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs @@ -78,21 +78,37 @@ namespace MQTTnet.TestApp.NetFramework Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###"); var last = DateTime.Now; - var msgs = 0; + var msgCount = 0; while (true) { - var sendTasks = Enumerable.Range( 0, msgChunkSize ) - .Select( i => PublishSingleMessage( client, ref msgs ) ) + var msgs = Enumerable.Range( 0, msgChunkSize ) + .Select( i => CreateMessage() ) .ToList(); - await Task.WhenAll( sendTasks ); + if (false) + { + //send concurrent (test for raceconditions) + var sendTasks = msgs + .Select( msg => PublishSingleMessage( client, msg, ref msgCount ) ) + .ToList(); + + await Task.WhenAll( sendTasks ); + } + else + { + await client.PublishAsync( msgs ); + msgCount += msgs.Count; + //send multiple + } + + var now = DateTime.Now; if (last < now - TimeSpan.FromSeconds(1)) { - Console.WriteLine( $"sending {msgs} inteded {msgChunkSize / interval.TotalSeconds}" ); - msgs = 0; + Console.WriteLine( $"sending {msgCount} inteded {msgChunkSize / interval.TotalSeconds}" ); + msgCount = 0; last = now; } @@ -105,19 +121,21 @@ namespace MQTTnet.TestApp.NetFramework } } - private static Task PublishSingleMessage( IMqttClient client, ref int count ) + private static MqttApplicationMessage CreateMessage() + { + return new MqttApplicationMessage( + "A/B/C", + Encoding.UTF8.GetBytes( "Hello World" ), + MqttQualityOfServiceLevel.AtMostOnce, + false + ); + } + + private static Task PublishSingleMessage( IMqttClient client, MqttApplicationMessage applicationMessage, ref int count ) { Interlocked.Increment( ref count ); return Task.Run( () => { - var applicationMessage = new MqttApplicationMessage( - "A/B/C", - Encoding.UTF8.GetBytes( "Hello World" ), - MqttQualityOfServiceLevel.AtLeastOnce, - false - ); - - //do not await to send as much messages as possible return client.PublishAsync( applicationMessage ); } ); }