diff --git a/MQTTnet.Core/Internal/AsyncAutoResetEvent.cs b/MQTTnet.Core/Internal/AsyncAutoResetEvent.cs deleted file mode 100644 index 795748d..0000000 --- a/MQTTnet.Core/Internal/AsyncAutoResetEvent.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System.Collections.Generic; -using System.Threading.Tasks; - -namespace MQTTnet.Core.Internal -{ - public sealed class AsyncGate - { - private readonly Queue> _waitingTasks = new Queue>(); - - public Task WaitOneAsync() - { - var tcs = new TaskCompletionSource(); - lock (_waitingTasks) - { - _waitingTasks.Enqueue(tcs); - } - - return tcs.Task; - } - - public void Set() - { - lock (_waitingTasks) - { - if (_waitingTasks.Count > 0) - { - _waitingTasks.Dequeue().SetResult(true); - } - } - } - } -} diff --git a/MQTTnet.Core/Server/MqttClientMessageQueue.cs b/MQTTnet.Core/Server/MqttClientMessageQueue.cs index ccf779d..396366e 100644 --- a/MQTTnet.Core/Server/MqttClientMessageQueue.cs +++ b/MQTTnet.Core/Server/MqttClientMessageQueue.cs @@ -1,20 +1,17 @@ using System; -using System.Collections.Generic; -using System.Linq; +using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; -using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; namespace MQTTnet.Core.Server { public sealed class MqttClientMessageQueue { - private readonly List _pendingPublishPackets = new List(); - private readonly AsyncGate _gate = new AsyncGate(); + private readonly BlockingCollection _pendingPublishPackets = new BlockingCollection(); private readonly MqttServerOptions _options; private CancellationTokenSource _cancellationTokenSource; @@ -43,55 +40,38 @@ namespace MQTTnet.Core.Server _adapter = null; _cancellationTokenSource?.Cancel(); _cancellationTokenSource = null; + _pendingPublishPackets?.Dispose(); } public void Enqueue(MqttPublishPacket publishPacket) { if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); - lock (_pendingPublishPackets) - { - _pendingPublishPackets.Add(new MqttClientPublishPacketContext(publishPacket)); - _gate.Set(); - } + _pendingPublishPackets.Add( new MqttClientPublishPacketContext( publishPacket ) ); } private async Task SendPendingPublishPacketsAsync(CancellationToken cancellationToken) { - while (!cancellationToken.IsCancellationRequested) + foreach (var publishPacket in _pendingPublishPackets.GetConsumingEnumerable(cancellationToken)) { try { - await _gate.WaitOneAsync().ConfigureAwait(false); - if (cancellationToken.IsCancellationRequested) + if ( cancellationToken.IsCancellationRequested ) { return; } - if (_adapter == null) + if ( _adapter == null ) { continue; } - List pendingPublishPackets; - lock (_pendingPublishPackets) - { - pendingPublishPackets = _pendingPublishPackets.ToList(); - } - - foreach (var publishPacket in pendingPublishPackets) - { - await TrySendPendingPublishPacketAsync(publishPacket).ConfigureAwait(false); - } + await TrySendPendingPublishPacketAsync( publishPacket ).ConfigureAwait( false ); } - catch (Exception e) + catch ( Exception e ) { - MqttTrace.Error(nameof(MqttClientMessageQueue), e, "Error while sending pending publish packets."); + MqttTrace.Error( nameof( MqttClientMessageQueue ), e, "Error while sending pending publish packets." ); } - finally - { - Cleanup(); - } } } @@ -112,23 +92,17 @@ namespace MQTTnet.Core.Server catch (MqttCommunicationException exception) { MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed."); + _pendingPublishPackets.Add( publishPacketContext ); } catch (Exception exception) { MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed."); + _pendingPublishPackets.Add( publishPacketContext ); } finally { publishPacketContext.SendTries++; } } - - private void Cleanup() - { - lock (_pendingPublishPackets) - { - _pendingPublishPackets.RemoveAll(p => p.IsSent); - } - } } }