Browse Source

Merge branch 'develop' of https://github.com/chkr1011/MQTTnet into develop

release/3.x.x
Christian 6 years ago
parent
commit
98fbbc2864
4 changed files with 25 additions and 21 deletions
  1. +5
    -1
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  2. +18
    -18
      Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs
  3. +1
    -1
      README.md
  4. +1
    -1
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs

+ 5
- 1
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs View File

@@ -115,9 +115,13 @@ namespace MQTTnet.Adapter
ReceivedMqttPacket receivedMqttPacket = null;
try
{

if (timeout > TimeSpan.Zero)
{
receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
var timeoutCts = new CancellationTokenSource(timeout);
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);

receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, linkedCts.Token).ConfigureAwait(false);
}
else
{


+ 18
- 18
Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs View File

@@ -10,7 +10,8 @@ namespace MQTTnet.Client
{
public class MqttPacketDispatcher
{
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>> _awaiters = new ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>>();
private readonly ConcurrentDictionary<Tuple<ushort?, Type>, TaskCompletionSource<MqttBasePacket>> _awaiters = new ConcurrentDictionary<Tuple<ushort?, Type>, TaskCompletionSource<MqttBasePacket>>();
private readonly IMqttNetLogger _logger;

public MqttPacketDispatcher(IMqttNetLogger logger)
@@ -22,7 +23,7 @@ namespace MQTTnet.Client
{
var packetAwaiter = AddPacketAwaiter(responseType, identifier);
try
{
{
return await packetAwaiter.Task.TimeoutAfter(timeout).ConfigureAwait(false);
}
catch (MqttCommunicationTimedOutException)
@@ -40,21 +41,20 @@ namespace MQTTnet.Client
{
if (packet == null) throw new ArgumentNullException(nameof(packet));

ushort? identifier = 0;
if (packet is IMqttPacketWithIdentifier packetWithIdentifier)
{
identifier = packetWithIdentifier.PacketIdentifier;
}

var type = packet.GetType();
var key = new Tuple<ushort?, Type>(identifier, type);

if (_awaiters.TryGetValue(type, out var byId))
if (_awaiters.TryRemove(key, out var tcs))
{
ushort? identifier = 0;
if (packet is IMqttPacketWithIdentifier packetWithIdentifier)
{
identifier = packetWithIdentifier.PacketIdentifier;
}

if (byId.TryRemove(identifier.Value, out var tcs))
{
tcs.TrySetResult(packet);
return;
}
tcs.TrySetResult(packet);
return;
}

throw new InvalidOperationException($"Packet of type '{type.Name}' not handled or dispatched.");
@@ -74,8 +74,8 @@ namespace MQTTnet.Client
identifier = 0;
}

var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
if (!byId.TryAdd(identifier.Value, tcs))
var dictionaryKey = new Tuple<ushort?,Type>(identifier, responseType);
if (!_awaiters.TryAdd(dictionaryKey,tcs))
{
throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{responseType}' with identifier {identifier}.");
}
@@ -90,8 +90,8 @@ namespace MQTTnet.Client
identifier = 0;
}

var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
byId.TryRemove(identifier.Value, out var _);
var dictionaryKey = new Tuple<ushort?, Type>(identifier, responseType);
_awaiters.TryRemove(dictionaryKey, out var _);
}
}
}

+ 1
- 1
README.md View File

@@ -78,7 +78,7 @@ If you want to contribute to this project just create a pull request. But only p
This library is used in the following projects:

* MQTT Client Rx (Wrapper for Reactive Extensions, <https://github.com/1iveowl/MQTTClient.rx>)
* MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester), [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8), and [Windows 10](https://www.microsoft.com/en-us/store/p/mqtt-tester/9wzdncrd272c))
* MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester) and [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8))
* Wirehome (Open Source Home Automation system for .NET, <https://github.com/chkr1011/Wirehome>)




+ 1
- 1
Tests/MQTTnet.Core.Tests/MqttServerTests.cs View File

@@ -75,7 +75,7 @@ namespace MQTTnet.Core.Tests
await s.StopAsync();
}

Assert.AreEqual(1, receivedMessagesCount);
Assert.AreEqual(0, receivedMessagesCount);
}

[TestMethod]


Loading…
Cancel
Save