瀏覽代碼

use blockingcollection for pipelining

release/3.x.x
Eggers Jan 7 年之前
父節點
當前提交
6c7e70f78a
共有 2 個檔案被更改,包括 12 行新增70 行删除
  1. +0
    -32
      MQTTnet.Core/Internal/AsyncAutoResetEvent.cs
  2. +12
    -38
      MQTTnet.Core/Server/MqttClientMessageQueue.cs

+ 0
- 32
MQTTnet.Core/Internal/AsyncAutoResetEvent.cs 查看文件

@@ -1,32 +0,0 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MQTTnet.Core.Internal
{
public sealed class AsyncGate
{
private readonly Queue<TaskCompletionSource<bool>> _waitingTasks = new Queue<TaskCompletionSource<bool>>();

public Task WaitOneAsync()
{
var tcs = new TaskCompletionSource<bool>();
lock (_waitingTasks)
{
_waitingTasks.Enqueue(tcs);
}

return tcs.Task;
}

public void Set()
{
lock (_waitingTasks)
{
if (_waitingTasks.Count > 0)
{
_waitingTasks.Dequeue().SetResult(true);
}
}
}
}
}

+ 12
- 38
MQTTnet.Core/Server/MqttClientMessageQueue.cs 查看文件

@@ -1,20 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Server
{
public sealed class MqttClientMessageQueue
{
private readonly List<MqttClientPublishPacketContext> _pendingPublishPackets = new List<MqttClientPublishPacketContext>();
private readonly AsyncGate _gate = new AsyncGate();
private readonly BlockingCollection<MqttClientPublishPacketContext> _pendingPublishPackets = new BlockingCollection<MqttClientPublishPacketContext>();

private readonly MqttServerOptions _options;
private CancellationTokenSource _cancellationTokenSource;
@@ -43,55 +40,38 @@ namespace MQTTnet.Core.Server
_adapter = null;
_cancellationTokenSource?.Cancel();
_cancellationTokenSource = null;
_pendingPublishPackets?.Dispose();
}

public void Enqueue(MqttPublishPacket publishPacket)
{
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));

lock (_pendingPublishPackets)
{
_pendingPublishPackets.Add(new MqttClientPublishPacketContext(publishPacket));
_gate.Set();
}
_pendingPublishPackets.Add( new MqttClientPublishPacketContext( publishPacket ) );
}

private async Task SendPendingPublishPacketsAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
foreach (var publishPacket in _pendingPublishPackets.GetConsumingEnumerable(cancellationToken))
{
try
{
await _gate.WaitOneAsync().ConfigureAwait(false);
if (cancellationToken.IsCancellationRequested)
if ( cancellationToken.IsCancellationRequested )
{
return;
}

if (_adapter == null)
if ( _adapter == null )
{
continue;
}

List<MqttClientPublishPacketContext> pendingPublishPackets;
lock (_pendingPublishPackets)
{
pendingPublishPackets = _pendingPublishPackets.ToList();
}

foreach (var publishPacket in pendingPublishPackets)
{
await TrySendPendingPublishPacketAsync(publishPacket).ConfigureAwait(false);
}
await TrySendPendingPublishPacketAsync( publishPacket ).ConfigureAwait( false );
}
catch (Exception e)
catch ( Exception e )
{
MqttTrace.Error(nameof(MqttClientMessageQueue), e, "Error while sending pending publish packets.");
MqttTrace.Error( nameof( MqttClientMessageQueue ), e, "Error while sending pending publish packets." );
}
finally
{
Cleanup();
}
}
}

@@ -112,23 +92,17 @@ namespace MQTTnet.Core.Server
catch (MqttCommunicationException exception)
{
MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
_pendingPublishPackets.Add( publishPacketContext );
}
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
_pendingPublishPackets.Add( publishPacketContext );
}
finally
{
publishPacketContext.SendTries++;
}
}

private void Cleanup()
{
lock (_pendingPublishPackets)
{
_pendingPublishPackets.RemoveAll(p => p.IsSent);
}
}
}
}

Loading…
取消
儲存