@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
@@ -11,8 +10,6 @@ namespace MQTTnet.Core.Client
{
public class MqttPacketDispatcher
{
private readonly object _syncRoot = new object();
private readonly HashSet<MqttBasePacket> _receivedPackets = new HashSet<MqttBasePacket>();
private readonly ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>> _packetByResponseType = new ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>>();
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<ushort,TaskCompletionSource<MqttBasePacket>>> _packetByResponseTypeAndIdentifier = new ConcurrentDictionary<Type, ConcurrentDictionary<ushort,TaskCompletionSource<MqttBasePacket>>>();
@@ -21,8 +18,6 @@ namespace MQTTnet.Core.Client
if (request == null) throw new ArgumentNullException(nameof(request));
var packetAwaiter = AddPacketAwaiter(request, responseType);
DispatchPendingPackets();
try
{
return await packetAwaiter.Task.TimeoutAfter(timeout);
@@ -43,8 +38,6 @@ namespace MQTTnet.Core.Client
if (packet == null) throw new ArgumentNullException(nameof(packet));
var type = packet.GetType();
var packetDispatched = false;
if (packet is IMqttPacketWithIdentifier withIdentifier)
{
if (_packetByResponseTypeAndIdentifier.TryGetValue(type, out var byid))
@@ -52,37 +45,19 @@ namespace MQTTnet.Core.Client
if (byid.TryRemove( withIdentifier.PacketIdentifier, out var tcs))
{
tcs.TrySetResult( packet );
packetDispatched = true;
}
}
}
else if (_packetByResponseType.TryRemove(type, out var tcs))
{
tcs.TrySetResult(packet);
packetDispatched = true;
}
lock (_syncRoot)
{
if (!packetDispatched)
{
_receivedPackets.Add(packet);
}
else
{
_receivedPackets.Remove(packet);
}
}
}
public void Reset()
{
lock (_syncRoot)
{
_receivedPackets.Clear();
}
_packetByResponseTypeAndIdentifier.Clear();
_packetByResponseType.Clear();
}
private TaskCompletionSource<MqttBasePacket> AddPacketAwaiter(MqttBasePacket request, Type responseType)
@@ -113,19 +88,5 @@ namespace MQTTnet.Core.Client
_packetByResponseType.TryRemove(responseType, out var _);
}
}
private void DispatchPendingPackets()
{
List<MqttBasePacket> receivedPackets;
lock (_syncRoot)
{
receivedPackets = new List<MqttBasePacket>(_receivedPackets);
}
foreach (var pendingPacket in receivedPackets)
{
Dispatch(pendingPacket);
}
}
}
}