Parcourir la source

reduced footprint of client msg queue and numbers can go even higher

release/3.x.x
JanEggers il y a 7 ans
Parent
révision
d1b634511d
3 fichiers modifiés avec 30 ajouts et 69 suppressions
  1. +24
    -49
      MQTTnet.Core/Server/MqttClientMessageQueue.cs
  2. +0
    -19
      MQTTnet.Core/Server/MqttClientPublishPacketContext.cs
  3. +6
    -1
      Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs

+ 24
- 49
MQTTnet.Core/Server/MqttClientMessageQueue.cs Voir le fichier

@@ -6,16 +6,16 @@ using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets;
using System.Linq;

namespace MQTTnet.Core.Server
{
public sealed class MqttClientMessageQueue
{
private readonly BlockingCollection<MqttClientPublishPacketContext> _pendingPublishPackets = new BlockingCollection<MqttClientPublishPacketContext>();
private readonly BlockingCollection<MqttPublishPacket> _pendingPublishPackets = new BlockingCollection<MqttPublishPacket>();

private readonly MqttServerOptions _options;
private CancellationTokenSource _cancellationTokenSource;
private IMqttCommunicationAdapter _adapter;

public MqttClientMessageQueue(MqttServerOptions options)
{
@@ -29,15 +29,14 @@ namespace MQTTnet.Core.Server
throw new InvalidOperationException($"{nameof(MqttClientMessageQueue)} already started.");
}

_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
if (adapter == null) throw new ArgumentNullException(nameof(adapter));
_cancellationTokenSource = new CancellationTokenSource();

Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token, adapter), _cancellationTokenSource.Token);
}

public void Stop()
{
_adapter = null;
_cancellationTokenSource?.Cancel();
_cancellationTokenSource = null;
_pendingPublishPackets?.Dispose();
@@ -47,61 +46,37 @@ namespace MQTTnet.Core.Server
{
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));

_pendingPublishPackets.Add(new MqttClientPublishPacketContext(publishPacket));
_pendingPublishPackets.Add(publishPacket);
}

private async Task SendPendingPublishPacketsAsync(CancellationToken cancellationToken)
private async Task SendPendingPublishPacketsAsync(CancellationToken cancellationToken, IMqttCommunicationAdapter adapter)
{
foreach (var publishPacket in _pendingPublishPackets.GetConsumingEnumerable(cancellationToken))
var consumable = _pendingPublishPackets.GetConsumingEnumerable();
while (!cancellationToken.IsCancellationRequested)
{
var packets = consumable.Take(_pendingPublishPackets.Count).ToList();
try
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
if (_adapter == null)
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, packets).ConfigureAwait(false);
}
catch (MqttCommunicationException exception)
{
MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
foreach (var publishPacket in packets)
{
continue;
publishPacket.Dup = true;
_pendingPublishPackets.Add(publishPacket);
}

await TrySendPendingPublishPacketAsync(publishPacket).ConfigureAwait(false);
}
catch (Exception e)
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttClientMessageQueue), e, "Error while sending pending publish packets.");
}
}
}

private async Task TrySendPendingPublishPacketAsync(MqttClientPublishPacketContext publishPacketContext)
{
try
{
if (_adapter == null)
{
return;
MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
foreach (var publishPacket in packets)
{
publishPacket.Dup = true;
_pendingPublishPackets.Add(publishPacket);
}
}

publishPacketContext.PublishPacket.Dup = publishPacketContext.SendTries > 0;
await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, publishPacketContext.PublishPacket).ConfigureAwait(false);

publishPacketContext.IsSent = true;
}
catch (MqttCommunicationException exception)
{
MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
_pendingPublishPackets.Add(publishPacketContext);
}
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
_pendingPublishPackets.Add(publishPacketContext);
}
finally
{
publishPacketContext.SendTries++;
}
}
}


+ 0
- 19
MQTTnet.Core/Server/MqttClientPublishPacketContext.cs Voir le fichier

@@ -1,19 +0,0 @@
using System;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Server
{
public sealed class MqttClientPublishPacketContext
{
public MqttClientPublishPacketContext(MqttPublishPacket publishPacket)
{
PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket));
}

public MqttPublishPacket PublishPacket { get; }

public int SendTries { get; set; }

public bool IsSent { get; set; }
}
}

+ 6
- 1
Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs Voir le fichier

@@ -18,11 +18,16 @@ namespace MQTTnet.TestApp.NetFramework
public static async Task RunAsync()
{
var server = Task.Run(() => RunServerAsync());
var client = Task.Run(() => RunClientAsync(300, TimeSpan.FromMilliseconds(10)));
var client = Task.Run(() => RunClientAsync(400, TimeSpan.FromMilliseconds(10)));

await Task.WhenAll(server, client).ConfigureAwait(false);
}

private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval)
{
return Task.WhenAll(Enumerable.Range(0, 3).Select((i) => Task.Run(() => RunClientAsync(msgChunkSize, interval))));
}

private static async Task RunClientAsync( int msgChunkSize, TimeSpan interval )
{
try


Chargement…
Annuler
Enregistrer