From a5f33bc244fe2ecdb71136f3e7869b049076fa23 Mon Sep 17 00:00:00 2001 From: Israel Lot Date: Fri, 13 Apr 2018 23:00:36 -0300 Subject: [PATCH] Replace double dictionary with a tuple key --- .../Adapter/MqttChannelAdapter.cs | 6 +++- .../Client/MqttPacketDispatcher.cs | 36 +++++++++---------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index c2f9ba4..123b88d 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -115,9 +115,13 @@ namespace MQTTnet.Adapter ReceivedMqttPacket receivedMqttPacket = null; try { + if (timeout > TimeSpan.Zero) { - receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false); + var timeoutCts = new CancellationTokenSource(timeout); + var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); + + receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, linkedCts.Token).ConfigureAwait(false); } else { diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs index 8b09fbc..172e53b 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs @@ -10,7 +10,8 @@ namespace MQTTnet.Client { public class MqttPacketDispatcher { - private readonly ConcurrentDictionary>> _awaiters = new ConcurrentDictionary>>(); + + private readonly ConcurrentDictionary, TaskCompletionSource> _awaiters = new ConcurrentDictionary, TaskCompletionSource>(); private readonly IMqttNetLogger _logger; public MqttPacketDispatcher(IMqttNetLogger logger) @@ -22,7 +23,7 @@ namespace MQTTnet.Client { var packetAwaiter = AddPacketAwaiter(responseType, identifier); try - { + { return await packetAwaiter.Task.TimeoutAfter(timeout).ConfigureAwait(false); } catch (MqttCommunicationTimedOutException) @@ -40,21 +41,20 @@ namespace MQTTnet.Client { if (packet == null) throw new ArgumentNullException(nameof(packet)); + ushort? identifier = 0; + if (packet is IMqttPacketWithIdentifier packetWithIdentifier) + { + identifier = packetWithIdentifier.PacketIdentifier; + } + var type = packet.GetType(); + var key = new Tuple(identifier, type); + - if (_awaiters.TryGetValue(type, out var byId)) + if (_awaiters.TryRemove(key, out var tcs)) { - ushort? identifier = 0; - if (packet is IMqttPacketWithIdentifier packetWithIdentifier) - { - identifier = packetWithIdentifier.PacketIdentifier; - } - - if (byId.TryRemove(identifier.Value, out var tcs)) - { - tcs.TrySetResult(packet); - return; - } + tcs.TrySetResult(packet); + return; } throw new InvalidOperationException($"Packet of type '{type.Name}' not handled or dispatched."); @@ -74,8 +74,8 @@ namespace MQTTnet.Client identifier = 0; } - var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary>()); - if (!byId.TryAdd(identifier.Value, tcs)) + var dictionaryKey = new Tuple(identifier, responseType); + if (!_awaiters.TryAdd(dictionaryKey,tcs)) { throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{responseType}' with identifier {identifier}."); } @@ -90,8 +90,8 @@ namespace MQTTnet.Client identifier = 0; } - var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary>()); - byId.TryRemove(identifier.Value, out var _); + var dictionaryKey = new Tuple(identifier, responseType); + _awaiters.TryRemove(dictionaryKey, out var _); } } } \ No newline at end of file