diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index c2f9ba4..123b88d 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -115,9 +115,13 @@ namespace MQTTnet.Adapter ReceivedMqttPacket receivedMqttPacket = null; try { + if (timeout > TimeSpan.Zero) { - receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false); + var timeoutCts = new CancellationTokenSource(timeout); + var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); + + receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, linkedCts.Token).ConfigureAwait(false); } else { diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs index 8b09fbc..172e53b 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs @@ -10,7 +10,8 @@ namespace MQTTnet.Client { public class MqttPacketDispatcher { - private readonly ConcurrentDictionary>> _awaiters = new ConcurrentDictionary>>(); + + private readonly ConcurrentDictionary, TaskCompletionSource> _awaiters = new ConcurrentDictionary, TaskCompletionSource>(); private readonly IMqttNetLogger _logger; public MqttPacketDispatcher(IMqttNetLogger logger) @@ -22,7 +23,7 @@ namespace MQTTnet.Client { var packetAwaiter = AddPacketAwaiter(responseType, identifier); try - { + { return await packetAwaiter.Task.TimeoutAfter(timeout).ConfigureAwait(false); } catch (MqttCommunicationTimedOutException) @@ -40,21 +41,20 @@ namespace MQTTnet.Client { if (packet == null) throw new ArgumentNullException(nameof(packet)); + ushort? identifier = 0; + if (packet is IMqttPacketWithIdentifier packetWithIdentifier) + { + identifier = packetWithIdentifier.PacketIdentifier; + } + var type = packet.GetType(); + var key = new Tuple(identifier, type); + - if (_awaiters.TryGetValue(type, out var byId)) + if (_awaiters.TryRemove(key, out var tcs)) { - ushort? identifier = 0; - if (packet is IMqttPacketWithIdentifier packetWithIdentifier) - { - identifier = packetWithIdentifier.PacketIdentifier; - } - - if (byId.TryRemove(identifier.Value, out var tcs)) - { - tcs.TrySetResult(packet); - return; - } + tcs.TrySetResult(packet); + return; } throw new InvalidOperationException($"Packet of type '{type.Name}' not handled or dispatched."); @@ -74,8 +74,8 @@ namespace MQTTnet.Client identifier = 0; } - var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary>()); - if (!byId.TryAdd(identifier.Value, tcs)) + var dictionaryKey = new Tuple(identifier, responseType); + if (!_awaiters.TryAdd(dictionaryKey,tcs)) { throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{responseType}' with identifier {identifier}."); } @@ -90,8 +90,8 @@ namespace MQTTnet.Client identifier = 0; } - var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary>()); - byId.TryRemove(identifier.Value, out var _); + var dictionaryKey = new Tuple(identifier, responseType); + _awaiters.TryRemove(dictionaryKey, out var _); } } } \ No newline at end of file diff --git a/README.md b/README.md index b2e40ed..6a3111d 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,7 @@ If you want to contribute to this project just create a pull request. But only p This library is used in the following projects: * MQTT Client Rx (Wrapper for Reactive Extensions, ) -* MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester), [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8), and [Windows 10](https://www.microsoft.com/en-us/store/p/mqtt-tester/9wzdncrd272c)) +* MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester) and [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8)) * Wirehome (Open Source Home Automation system for .NET, ) diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index f0c74ea..dc2f525 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -75,7 +75,7 @@ namespace MQTTnet.Core.Tests await s.StopAsync(); } - Assert.AreEqual(1, receivedMessagesCount); + Assert.AreEqual(0, receivedMessagesCount); } [TestMethod]