diff --git a/MQTTnet.Core/Server/MqttClientMessageQueue.cs b/MQTTnet.Core/Server/MqttClientMessageQueue.cs index 3cbe882..a8845e0 100644 --- a/MQTTnet.Core/Server/MqttClientMessageQueue.cs +++ b/MQTTnet.Core/Server/MqttClientMessageQueue.cs @@ -6,16 +6,16 @@ using MQTTnet.Core.Adapter; using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Packets; +using System.Linq; namespace MQTTnet.Core.Server { public sealed class MqttClientMessageQueue { - private readonly BlockingCollection _pendingPublishPackets = new BlockingCollection(); + private readonly BlockingCollection _pendingPublishPackets = new BlockingCollection(); private readonly MqttServerOptions _options; private CancellationTokenSource _cancellationTokenSource; - private IMqttCommunicationAdapter _adapter; public MqttClientMessageQueue(MqttServerOptions options) { @@ -29,15 +29,14 @@ namespace MQTTnet.Core.Server throw new InvalidOperationException($"{nameof(MqttClientMessageQueue)} already started."); } - _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); + if (adapter == null) throw new ArgumentNullException(nameof(adapter)); _cancellationTokenSource = new CancellationTokenSource(); - Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); + Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token, adapter), _cancellationTokenSource.Token); } public void Stop() { - _adapter = null; _cancellationTokenSource?.Cancel(); _cancellationTokenSource = null; _pendingPublishPackets?.Dispose(); @@ -47,61 +46,37 @@ namespace MQTTnet.Core.Server { if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); - _pendingPublishPackets.Add(new MqttClientPublishPacketContext(publishPacket)); + _pendingPublishPackets.Add(publishPacket); } - private async Task SendPendingPublishPacketsAsync(CancellationToken cancellationToken) + private async Task SendPendingPublishPacketsAsync(CancellationToken cancellationToken, IMqttCommunicationAdapter adapter) { - foreach (var publishPacket in _pendingPublishPackets.GetConsumingEnumerable(cancellationToken)) + var consumable = _pendingPublishPackets.GetConsumingEnumerable(); + while (!cancellationToken.IsCancellationRequested) { + var packets = consumable.Take(_pendingPublishPackets.Count).ToList(); try { - if (cancellationToken.IsCancellationRequested) - { - return; - } - - if (_adapter == null) + await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, packets).ConfigureAwait(false); + } + catch (MqttCommunicationException exception) + { + MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed."); + foreach (var publishPacket in packets) { - continue; + publishPacket.Dup = true; + _pendingPublishPackets.Add(publishPacket); } - - await TrySendPendingPublishPacketAsync(publishPacket).ConfigureAwait(false); } - catch (Exception e) + catch (Exception exception) { - MqttTrace.Error(nameof(MqttClientMessageQueue), e, "Error while sending pending publish packets."); - } - } - } - - private async Task TrySendPendingPublishPacketAsync(MqttClientPublishPacketContext publishPacketContext) - { - try - { - if (_adapter == null) - { - return; + MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed."); + foreach (var publishPacket in packets) + { + publishPacket.Dup = true; + _pendingPublishPackets.Add(publishPacket); + } } - - publishPacketContext.PublishPacket.Dup = publishPacketContext.SendTries > 0; - await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, publishPacketContext.PublishPacket).ConfigureAwait(false); - - publishPacketContext.IsSent = true; - } - catch (MqttCommunicationException exception) - { - MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed."); - _pendingPublishPackets.Add(publishPacketContext); - } - catch (Exception exception) - { - MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed."); - _pendingPublishPackets.Add(publishPacketContext); - } - finally - { - publishPacketContext.SendTries++; } } } diff --git a/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs b/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs deleted file mode 100644 index 9855390..0000000 --- a/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; -using MQTTnet.Core.Packets; - -namespace MQTTnet.Core.Server -{ - public sealed class MqttClientPublishPacketContext - { - public MqttClientPublishPacketContext(MqttPublishPacket publishPacket) - { - PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket)); - } - - public MqttPublishPacket PublishPacket { get; } - - public int SendTries { get; set; } - - public bool IsSent { get; set; } - } -} diff --git a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs index 8e0f430..052778a 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs @@ -18,11 +18,16 @@ namespace MQTTnet.TestApp.NetFramework public static async Task RunAsync() { var server = Task.Run(() => RunServerAsync()); - var client = Task.Run(() => RunClientAsync(300, TimeSpan.FromMilliseconds(10))); + var client = Task.Run(() => RunClientAsync(400, TimeSpan.FromMilliseconds(10))); await Task.WhenAll(server, client).ConfigureAwait(false); } + private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval) + { + return Task.WhenAll(Enumerable.Range(0, 3).Select((i) => Task.Run(() => RunClientAsync(msgChunkSize, interval)))); + } + private static async Task RunClientAsync( int msgChunkSize, TimeSpan interval ) { try