Browse Source

Merge pull request #37 from JanEggers/PerfMsgDispatcher

Perf msg dispatcher
release/3.x.x
Christian 7 years ago
committed by GitHub
parent
commit
c0e6e54496
3 changed files with 35 additions and 56 deletions
  1. +2
    -16
      MQTTnet.Core/Client/MqttClient.cs
  2. +0
    -16
      MQTTnet.Core/Client/MqttPacketAwaiter.cs
  3. +33
    -24
      MQTTnet.Core/Client/MqttPacketDispatcher.cs

+ 2
- 16
MQTTnet.Core/Client/MqttClient.cs View File

@@ -321,23 +321,9 @@ namespace MQTTnet.Core.Client

private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
{
bool ResponsePacketSelector(MqttBasePacket p)
{
if (!(p is TResponsePacket p1))
{
return false;
}

if (!(requestPacket is IMqttPacketWithIdentifier pi1) || !(p is IMqttPacketWithIdentifier pi2))
{
return true;
}

return pi1.PacketIdentifier == pi2.PacketIdentifier;
}

await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false);
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout).ConfigureAwait(false);

return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout).ConfigureAwait(false);
}

private ushort GetNewPacketIdentifier()


+ 0
- 16
MQTTnet.Core/Client/MqttPacketAwaiter.cs View File

@@ -1,16 +0,0 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Client
{
public class MqttPacketAwaiter : TaskCompletionSource<MqttBasePacket>
{
public MqttPacketAwaiter(Func<MqttBasePacket, bool> packetSelector)
{
PacketSelector = packetSelector ?? throw new ArgumentNullException(nameof(packetSelector));
}

public Func<MqttBasePacket, bool> PacketSelector { get; }
}
}

+ 33
- 24
MQTTnet.Core/Client/MqttPacketDispatcher.cs View File

@@ -4,6 +4,7 @@ using System.Threading.Tasks;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets;
using System.Collections.Concurrent;

namespace MQTTnet.Core.Client
{
@@ -11,17 +12,18 @@ namespace MQTTnet.Core.Client
{
private readonly object _syncRoot = new object();
private readonly HashSet<MqttBasePacket> _receivedPackets = new HashSet<MqttBasePacket>();
private readonly List<MqttPacketAwaiter> _packetAwaiters = new List<MqttPacketAwaiter>();
private readonly ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>> _packetByResponseType = new ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>>();
private readonly ConcurrentDictionary<ushort,TaskCompletionSource<MqttBasePacket>> _packetByIdentifier = new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>();

public async Task<MqttBasePacket> WaitForPacketAsync(Func<MqttBasePacket, bool> selector, TimeSpan timeout)
public async Task<MqttBasePacket> WaitForPacketAsync(MqttBasePacket request, Type responseType, TimeSpan timeout)
{
if (selector == null) throw new ArgumentNullException(nameof(selector));
if (request == null) throw new ArgumentNullException(nameof(request));

var packetAwaiter = AddPacketAwaiter(selector);
var packetAwaiter = AddPacketAwaiter(request, responseType);
DispatchPendingPackets();

var hasTimeout = await Task.WhenAny(Task.Delay(timeout), packetAwaiter.Task).ConfigureAwait(false) != packetAwaiter.Task;
RemovePacketAwaiter(packetAwaiter);
RemovePacketAwaiter(request, responseType);

if (hasTimeout)
{
@@ -37,15 +39,20 @@ namespace MQTTnet.Core.Client
if (packet == null) throw new ArgumentNullException(nameof(packet));

var packetDispatched = false;
foreach (var packetAwaiter in GetPacketAwaiters())

if (packet is IMqttPacketWithIdentifier withIdentifier)
{
if (packetAwaiter.PacketSelector(packet))
if (_packetByIdentifier.TryRemove(withIdentifier.PacketIdentifier, out var tcs))
{
packetAwaiter.TrySetResult(packet);
tcs.TrySetResult(packet);
packetDispatched = true;
break;
}
}
else if (_packetByResponseType.TryRemove(packet.GetType(), out var tcs) )
{
tcs.TrySetResult( packet);
packetDispatched = true;
}

lock (_syncRoot)
{
@@ -64,34 +71,36 @@ namespace MQTTnet.Core.Client
{
lock (_syncRoot)
{
_packetAwaiters.Clear();
_receivedPackets.Clear();
}

_packetByIdentifier.Clear();
}

private List<MqttPacketAwaiter> GetPacketAwaiters()
private TaskCompletionSource<MqttBasePacket> AddPacketAwaiter(MqttBasePacket request, Type responseType)
{
lock (_syncRoot)
var tcs = new TaskCompletionSource<MqttBasePacket>();
if (request is IMqttPacketWithIdentifier withIdent)
{
return new List<MqttPacketAwaiter>(_packetAwaiters);
_packetByIdentifier[withIdent.PacketIdentifier] = tcs;
}
}

private MqttPacketAwaiter AddPacketAwaiter(Func<MqttBasePacket, bool> selector)
{
lock (_syncRoot)
else
{
var packetAwaiter = new MqttPacketAwaiter(selector);
_packetAwaiters.Add(packetAwaiter);
return packetAwaiter;
_packetByResponseType[responseType] = tcs;
}

return tcs;
}

private void RemovePacketAwaiter(MqttPacketAwaiter packetAwaiter)
private void RemovePacketAwaiter(MqttBasePacket request, Type responseType)
{
lock (_syncRoot)
if (request is IMqttPacketWithIdentifier withIdent)
{
_packetByIdentifier.TryRemove(withIdent.PacketIdentifier, out var tcs);
}
else
{
_packetAwaiters.Remove(packetAwaiter);
_packetByResponseType.TryRemove(responseType, out var tcs);
}
}



Loading…
Cancel
Save