Преглед изворни кода

send multiple packages at once msg/sec from 9000 to 32000

release/3.x.x
Eggers Jan пре 7 година
родитељ
комит
473c8e0a15
15 измењених фајлова са 151 додато и 81 уклоњено
  1. +14
    -8
      Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
  2. +5
    -3
      Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
  3. +2
    -1
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  4. +5
    -4
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  5. +10
    -1
      MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs
  6. +14
    -8
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  7. +3
    -1
      MQTTnet.Core/Channel/IMqttCommunicationChannel.cs
  8. +9
    -1
      MQTTnet.Core/Client/IMqttClient.cs
  9. +34
    -24
      MQTTnet.Core/Client/MqttClient.cs
  10. +1
    -1
      MQTTnet.Core/Server/MqttClientMessageQueue.cs
  11. +7
    -7
      MQTTnet.Core/Server/MqttClientSession.cs
  12. +4
    -4
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  13. +3
    -1
      Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
  14. +7
    -2
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs
  15. +33
    -15
      Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs

+ 14
- 8
Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs Прегледај датотеку

@@ -13,11 +13,13 @@ namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{
private Stream _dataStream;
private Stream _receiveStream;
private Stream _sendStream;
private Socket _socket;
private SslStream _sslStream;

public Stream Stream => _dataStream;
public Stream ReceiveStream => _receiveStream;
public Stream SendStream => _sendStream;

/// <summary>
/// called on client sockets are created in connect
@@ -35,7 +37,7 @@ namespace MQTTnet.Implementations
{
_socket = socket ?? throw new ArgumentNullException(nameof(socket));
_sslStream = sslStream;
_dataStream = (Stream)sslStream ?? new NetworkStream(socket);
CreateCommStreams( socket, sslStream );
}

public async Task ConnectAsync(MqttClientOptions options)
@@ -54,13 +56,10 @@ namespace MQTTnet.Implementations
{
_sslStream = new SslStream(new NetworkStream(_socket, true));
_dataStream = _sslStream;
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
}
else
{
_dataStream = new NetworkStream(_socket);
}

CreateCommStreams( _socket, _sslStream );
}
catch (SocketException exception)
{
@@ -90,6 +89,13 @@ namespace MQTTnet.Implementations
_sslStream = null;
}

private void CreateCommStreams( Socket socket, SslStream sslStream )
{
//cannot use this as default buffering prevents from receiving the first connect message
_receiveStream = (Stream)sslStream ?? new NetworkStream( socket );
_sendStream = new BufferedStream( _receiveStream, BufferConstants.Size );
}

private static X509CertificateCollection LoadCertificates(MqttClientOptions options)
{
var certificates = new X509CertificateCollection();


+ 5
- 3
Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs Прегледај датотеку

@@ -13,7 +13,9 @@ namespace MQTTnet.Implementations
{
private ClientWebSocket _webSocket = new ClientWebSocket();
public Stream Stream { get; private set; }
public Stream ReceiveStream { get; private set; }

public Stream SendStream { get; private set; }

public async Task ConnectAsync(MqttClientOptions options)
{
@@ -24,7 +26,7 @@ namespace MQTTnet.Implementations
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);

Stream = new WebSocketStream(_webSocket);
ReceiveStream = SendStream = new WebSocketStream(_webSocket);
}
catch (WebSocketException exception)
{
@@ -34,7 +36,7 @@ namespace MQTTnet.Implementations

public Task DisconnectAsync()
{
Stream = null;
ReceiveStream = null;
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}



+ 2
- 1
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs Прегледај датотеку

@@ -18,7 +18,8 @@ namespace MQTTnet.Implementations
private SslStream _sslStream;


public Stream Stream => _dataStream;
public Stream ReceiveStream => _dataStream;
public Stream SendStream => _dataStream;

/// <summary>
/// called on client sockets are created in connect


+ 5
- 4
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs Прегледај датотеку

@@ -12,8 +12,9 @@ namespace MQTTnet.Implementations
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private ClientWebSocket _webSocket = new ClientWebSocket();

public Stream Stream { get; private set; }
public Stream SendStream { get; private set; }
public Stream ReceiveStream { get; private set; }

public async Task ConnectAsync(MqttClientOptions options)
{
@@ -24,7 +25,7 @@ namespace MQTTnet.Implementations
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);

Stream = new WebSocketStream(_webSocket);
SendStream = ReceiveStream = new WebSocketStream(_webSocket);
}
catch (WebSocketException exception)
{
@@ -34,7 +35,7 @@ namespace MQTTnet.Implementations

public Task DisconnectAsync()
{
Stream = null;
SendStream = ReceiveStream = null;
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}



+ 10
- 1
MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs Прегледај датотеку

@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
@@ -12,10 +13,18 @@ namespace MQTTnet.Core.Adapter

Task DisconnectAsync();

Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout);
Task SendPacketsAsync( TimeSpan timeout, IEnumerable<MqttBasePacket> packets );

Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout);

IMqttPacketSerializer PacketSerializer { get; }
}

public static class IMqttCommunicationAdapterExtensions
{
public static Task SendPacketsAsync( this IMqttCommunicationAdapter adapter, TimeSpan timeout, params MqttBasePacket[] packets )
{
return adapter.SendPacketsAsync( timeout, packets );
}
}
}

+ 14
- 8
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs Прегледај датотеку

@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
@@ -34,13 +35,18 @@ namespace MQTTnet.Core.Adapter
return _channel.DisconnectAsync();
}

public Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout)
public async Task SendPacketsAsync( TimeSpan timeout, IEnumerable<MqttBasePacket> packets )
{
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout);
foreach (var packet in packets )
{
MqttTrace.Information( nameof( MqttChannelCommunicationAdapter ), "TX >>> {0} [Timeout={1}]", packet, timeout );

var writeBuffer = PacketSerializer.Serialize(packet);
_sendTask = SendAsync( writeBuffer );
}

var writeBuffer = PacketSerializer.Serialize(packet);
_sendTask = SendAsync( writeBuffer );
return _sendTask.TimeoutAfter(timeout);
await _sendTask.ConfigureAwait( false );
await _channel.SendStream.FlushAsync().TimeoutAfter( timeout ).ConfigureAwait( false );
}

private Task _sendTask = Task.FromResult(0); // this task is used to prevent overlapping write
@@ -48,7 +54,7 @@ namespace MQTTnet.Core.Adapter
private async Task SendAsync(byte[] buffer)
{
await _sendTask.ConfigureAwait(false);
await _channel.Stream.WriteAsync(buffer, 0, buffer.Length).ConfigureAwait( false );
await _channel.SendStream.WriteAsync(buffer, 0, buffer.Length).ConfigureAwait( false );
}

public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
@@ -76,7 +82,7 @@ namespace MQTTnet.Core.Adapter

private async Task<Tuple<MqttPacketHeader, MemoryStream>> ReceiveAsync()
{
var header = MqttPacketReader.ReadHeaderFromSource(_channel.Stream);
var header = MqttPacketReader.ReadHeaderFromSource(_channel.ReceiveStream);

MemoryStream body = null;
if (header.BodyLength > 0)
@@ -85,7 +91,7 @@ namespace MQTTnet.Core.Adapter
var readBuffer = new byte[header.BodyLength];
do
{
var read = await _channel.Stream.ReadAsync(readBuffer, totalRead, header.BodyLength - totalRead)
var read = await _channel.ReceiveStream.ReadAsync(readBuffer, totalRead, header.BodyLength - totalRead)
.ConfigureAwait( false );
totalRead += read;
} while (totalRead < header.BodyLength);


+ 3
- 1
MQTTnet.Core/Channel/IMqttCommunicationChannel.cs Прегледај датотеку

@@ -10,6 +10,8 @@ namespace MQTTnet.Core.Channel

Task DisconnectAsync();
Stream Stream { get; }
Stream SendStream { get; }

Stream ReceiveStream { get; }
}
}

+ 9
- 1
MQTTnet.Core/Client/IMqttClient.cs Прегледај датотеку

@@ -15,10 +15,18 @@ namespace MQTTnet.Core.Client

Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null);
Task DisconnectAsync();
Task PublishAsync(MqttApplicationMessage applicationMessage);
Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages);
Task<IList<MqttSubscribeResult>> SubscribeAsync(IList<TopicFilter> topicFilters);
Task<IList<MqttSubscribeResult>> SubscribeAsync(params TopicFilter[] topicFilters);
Task Unsubscribe(IList<string> topicFilters);
Task Unsubscribe(params string[] topicFilters);
}

public static class IMqttClientExtensions
{
public static Task PublishAsync( this IMqttClient client, params MqttApplicationMessage[] applicationMessages )
{
return client.PublishAsync( applicationMessages );
}
}
}

+ 34
- 24
MQTTnet.Core/Client/MqttClient.cs Прегледај датотеку

@@ -161,32 +161,43 @@ namespace MQTTnet.Core.Client
return SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket);
}

public Task PublishAsync(MqttApplicationMessage applicationMessage)
public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
ThrowIfNotConnected();

var publishPacket = applicationMessage.ToPublishPacket();
var publishPackets = applicationMessages.Select(m => m.ToPublishPacket());

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
foreach (var qosGroup in publishPackets.GroupBy(p => p.QualityOfServiceLevel))
{
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
return SendAsync(publishPacket);
}

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
return SendAndReceiveAsync<MqttPubAckPacket>(publishPacket);
}

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
return PublishExactlyOncePacketAsync(publishPacket);
var qosPackets = qosGroup.ToArray();
switch ( qosGroup.Key )
{
case MqttQualityOfServiceLevel.AtMostOnce:
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, qosPackets);
break;
case MqttQualityOfServiceLevel.AtLeastOnce:
{
foreach (var publishPacket in qosPackets)
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket);
}
break;
}
case MqttQualityOfServiceLevel.ExactlyOnce:
{
foreach (var publishPacket in qosPackets)
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
await PublishExactlyOncePacketAsync( publishPacket );
}
break;
}
default:
throw new InvalidOperationException();
}
}

throw new InvalidOperationException();
}

private async Task PublishExactlyOncePacketAsync(MqttBasePacket publishPacket)
@@ -312,14 +323,13 @@ namespace MQTTnet.Core.Client

private Task SendAsync(MqttBasePacket packet)
{
return _adapter.SendPacketAsync(packet, _options.DefaultCommunicationTimeout);
return _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, packet);
}

private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
{
await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false);

return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout).ConfigureAwait(false);
await _adapter.SendPacketsAsync( _options.DefaultCommunicationTimeout, requestPacket ).ConfigureAwait(false);
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(requestPacket, typeof( TResponsePacket ), _options.DefaultCommunicationTimeout).ConfigureAwait(false);
}

private ushort GetNewPacketIdentifier()


+ 1
- 1
MQTTnet.Core/Server/MqttClientMessageQueue.cs Прегледај датотеку

@@ -105,7 +105,7 @@ namespace MQTTnet.Core.Server
}

publishPacketContext.PublishPacket.Dup = publishPacketContext.SendTries > 0;
await _adapter.SendPacketAsync(publishPacketContext.PublishPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false);
await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, publishPacketContext.PublishPacket).ConfigureAwait(false);

publishPacketContext.IsSent = true;
}


+ 7
- 7
MQTTnet.Core/Server/MqttClientSession.cs Прегледај датотеку

@@ -103,12 +103,12 @@ namespace MQTTnet.Core.Server
{
if (packet is MqttSubscribePacket subscribePacket)
{
return Adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout);
return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _subscriptionsManager.Subscribe(subscribePacket));
}

if (packet is MqttUnsubscribePacket unsubscribePacket)
{
return Adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout);
return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _subscriptionsManager.Unsubscribe(unsubscribePacket));
}

if (packet is MqttPublishPacket publishPacket)
@@ -123,7 +123,7 @@ namespace MQTTnet.Core.Server

if (packet is MqttPubRecPacket pubRecPacket)
{
return Adapter.SendPacketAsync(pubRecPacket.CreateResponse<MqttPubRelPacket>(), _options.DefaultCommunicationTimeout);
return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, pubRecPacket.CreateResponse<MqttPubRelPacket>());
}

if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
@@ -134,7 +134,7 @@ namespace MQTTnet.Core.Server

if (packet is MqttPingReqPacket)
{
return Adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout);
return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttPingRespPacket());
}

if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
@@ -160,7 +160,7 @@ namespace MQTTnet.Core.Server
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
_publishPacketReceivedCallback(this, publishPacket);
return Adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
}

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
@@ -173,7 +173,7 @@ namespace MQTTnet.Core.Server

_publishPacketReceivedCallback(this, publishPacket);

return Adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
}

throw new MqttCommunicationException("Received a not supported QoS level.");
@@ -186,7 +186,7 @@ namespace MQTTnet.Core.Server
_unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
}

return Adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier });
}
}
}

+ 4
- 4
MQTTnet.Core/Server/MqttClientSessionsManager.cs Прегледај датотеку

@@ -40,21 +40,21 @@ namespace MQTTnet.Core.Server
var connectReturnCode = ValidateConnection(connectPacket);
if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket
await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode
}, _options.DefaultCommunicationTimeout).ConfigureAwait(false);
}).ConfigureAwait(false);

return;
}

var clientSession = GetOrCreateClientSession(connectPacket);

await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket
await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode,
IsSessionPresent = clientSession.IsExistingSession
}, _options.DefaultCommunicationTimeout).ConfigureAwait(false);
}).ConfigureAwait(false);

await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter).ConfigureAwait(false);
}


+ 3
- 1
Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs Прегледај датотеку

@@ -391,7 +391,9 @@ namespace MQTTnet.Core.Tests
{
private readonly MemoryStream _stream = new MemoryStream();

public Stream Stream => _stream;
public Stream ReceiveStream => _stream;

public Stream SendStream => _stream;

public bool IsConnected { get; } = true;



+ 7
- 2
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs Прегледај датотеку

@@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
@@ -26,11 +27,15 @@ namespace MQTTnet.Core.Tests
return Task.FromResult(0);
}

public Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout)
public Task SendPacketsAsync(TimeSpan timeout, IEnumerable<MqttBasePacket> packets)
{
ThrowIfPartnerIsNull();

Partner.SendPacketInternal(packet);
foreach (var packet in packets)
{
Partner.SendPacketInternal(packet);
}

return Task.FromResult(0);
}



+ 33
- 15
Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs Прегледај датотеку

@@ -78,21 +78,37 @@ namespace MQTTnet.TestApp.NetFramework
Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###");

var last = DateTime.Now;
var msgs = 0;
var msgCount = 0;

while (true)
{
var sendTasks = Enumerable.Range( 0, msgChunkSize )
.Select( i => PublishSingleMessage( client, ref msgs ) )
var msgs = Enumerable.Range( 0, msgChunkSize )
.Select( i => CreateMessage() )
.ToList();

await Task.WhenAll( sendTasks );
if (false)
{
//send concurrent (test for raceconditions)
var sendTasks = msgs
.Select( msg => PublishSingleMessage( client, msg, ref msgCount ) )
.ToList();

await Task.WhenAll( sendTasks );
}
else
{
await client.PublishAsync( msgs );
msgCount += msgs.Count;
//send multiple
}


var now = DateTime.Now;
if (last < now - TimeSpan.FromSeconds(1))
{
Console.WriteLine( $"sending {msgs} inteded {msgChunkSize / interval.TotalSeconds}" );
msgs = 0;
Console.WriteLine( $"sending {msgCount} inteded {msgChunkSize / interval.TotalSeconds}" );
msgCount = 0;
last = now;
}

@@ -105,19 +121,21 @@ namespace MQTTnet.TestApp.NetFramework
}
}

private static Task PublishSingleMessage( IMqttClient client, ref int count )
private static MqttApplicationMessage CreateMessage()
{
return new MqttApplicationMessage(
"A/B/C",
Encoding.UTF8.GetBytes( "Hello World" ),
MqttQualityOfServiceLevel.AtMostOnce,
false
);
}

private static Task PublishSingleMessage( IMqttClient client, MqttApplicationMessage applicationMessage, ref int count )
{
Interlocked.Increment( ref count );
return Task.Run( () =>
{
var applicationMessage = new MqttApplicationMessage(
"A/B/C",
Encoding.UTF8.GetBytes( "Hello World" ),
MqttQualityOfServiceLevel.AtLeastOnce,
false
);

//do not await to send as much messages as possible
return client.PublishAsync( applicationMessage );
} );
}


Loading…
Откажи
Сачувај