Parcourir la source

Replace double dictionary with a tuple key

release/3.x.x
Israel Lot il y a 6 ans
Parent
révision
a5f33bc244
2 fichiers modifiés avec 23 ajouts et 19 suppressions
  1. +5
    -1
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  2. +18
    -18
      Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs

+ 5
- 1
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs Voir le fichier

@@ -115,9 +115,13 @@ namespace MQTTnet.Adapter
ReceivedMqttPacket receivedMqttPacket = null;
try
{

if (timeout > TimeSpan.Zero)
{
receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
var timeoutCts = new CancellationTokenSource(timeout);
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);

receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, linkedCts.Token).ConfigureAwait(false);
}
else
{


+ 18
- 18
Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs Voir le fichier

@@ -10,7 +10,8 @@ namespace MQTTnet.Client
{
public class MqttPacketDispatcher
{
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>> _awaiters = new ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>>();
private readonly ConcurrentDictionary<Tuple<ushort?, Type>, TaskCompletionSource<MqttBasePacket>> _awaiters = new ConcurrentDictionary<Tuple<ushort?, Type>, TaskCompletionSource<MqttBasePacket>>();
private readonly IMqttNetLogger _logger;

public MqttPacketDispatcher(IMqttNetLogger logger)
@@ -22,7 +23,7 @@ namespace MQTTnet.Client
{
var packetAwaiter = AddPacketAwaiter(responseType, identifier);
try
{
{
return await packetAwaiter.Task.TimeoutAfter(timeout).ConfigureAwait(false);
}
catch (MqttCommunicationTimedOutException)
@@ -40,21 +41,20 @@ namespace MQTTnet.Client
{
if (packet == null) throw new ArgumentNullException(nameof(packet));

ushort? identifier = 0;
if (packet is IMqttPacketWithIdentifier packetWithIdentifier)
{
identifier = packetWithIdentifier.PacketIdentifier;
}

var type = packet.GetType();
var key = new Tuple<ushort?, Type>(identifier, type);

if (_awaiters.TryGetValue(type, out var byId))
if (_awaiters.TryRemove(key, out var tcs))
{
ushort? identifier = 0;
if (packet is IMqttPacketWithIdentifier packetWithIdentifier)
{
identifier = packetWithIdentifier.PacketIdentifier;
}

if (byId.TryRemove(identifier.Value, out var tcs))
{
tcs.TrySetResult(packet);
return;
}
tcs.TrySetResult(packet);
return;
}

throw new InvalidOperationException($"Packet of type '{type.Name}' not handled or dispatched.");
@@ -74,8 +74,8 @@ namespace MQTTnet.Client
identifier = 0;
}

var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
if (!byId.TryAdd(identifier.Value, tcs))
var dictionaryKey = new Tuple<ushort?,Type>(identifier, responseType);
if (!_awaiters.TryAdd(dictionaryKey,tcs))
{
throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{responseType}' with identifier {identifier}.");
}
@@ -90,8 +90,8 @@ namespace MQTTnet.Client
identifier = 0;
}

var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
byId.TryRemove(identifier.Value, out var _);
var dictionaryKey = new Tuple<ushort?, Type>(identifier, responseType);
_awaiters.TryRemove(dictionaryKey, out var _);
}
}
}

Chargement…
Annuler
Enregistrer