Browse Source

Merge pull request #220 from israellot/optmizations

Replace double dictionary with a tuple key
release/3.x.x
Christian 6 years ago
committed by GitHub
parent
commit
63a12d4d40
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 21 deletions
  1. +5
    -1
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  2. +18
    -18
      Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs
  3. +1
    -1
      README.md
  4. +1
    -1
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs

+ 5
- 1
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs View File

@@ -115,9 +115,13 @@ namespace MQTTnet.Adapter
ReceivedMqttPacket receivedMqttPacket = null; ReceivedMqttPacket receivedMqttPacket = null;
try try
{ {

if (timeout > TimeSpan.Zero) if (timeout > TimeSpan.Zero)
{ {
receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
var timeoutCts = new CancellationTokenSource(timeout);
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);

receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, linkedCts.Token).ConfigureAwait(false);
} }
else else
{ {


+ 18
- 18
Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs View File

@@ -10,7 +10,8 @@ namespace MQTTnet.Client
{ {
public class MqttPacketDispatcher public class MqttPacketDispatcher
{ {
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>> _awaiters = new ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>>();
private readonly ConcurrentDictionary<Tuple<ushort?, Type>, TaskCompletionSource<MqttBasePacket>> _awaiters = new ConcurrentDictionary<Tuple<ushort?, Type>, TaskCompletionSource<MqttBasePacket>>();
private readonly IMqttNetLogger _logger; private readonly IMqttNetLogger _logger;


public MqttPacketDispatcher(IMqttNetLogger logger) public MqttPacketDispatcher(IMqttNetLogger logger)
@@ -22,7 +23,7 @@ namespace MQTTnet.Client
{ {
var packetAwaiter = AddPacketAwaiter(responseType, identifier); var packetAwaiter = AddPacketAwaiter(responseType, identifier);
try try
{
{
return await packetAwaiter.Task.TimeoutAfter(timeout).ConfigureAwait(false); return await packetAwaiter.Task.TimeoutAfter(timeout).ConfigureAwait(false);
} }
catch (MqttCommunicationTimedOutException) catch (MqttCommunicationTimedOutException)
@@ -40,21 +41,20 @@ namespace MQTTnet.Client
{ {
if (packet == null) throw new ArgumentNullException(nameof(packet)); if (packet == null) throw new ArgumentNullException(nameof(packet));


ushort? identifier = 0;
if (packet is IMqttPacketWithIdentifier packetWithIdentifier)
{
identifier = packetWithIdentifier.PacketIdentifier;
}

var type = packet.GetType(); var type = packet.GetType();
var key = new Tuple<ushort?, Type>(identifier, type);


if (_awaiters.TryGetValue(type, out var byId))
if (_awaiters.TryRemove(key, out var tcs))
{ {
ushort? identifier = 0;
if (packet is IMqttPacketWithIdentifier packetWithIdentifier)
{
identifier = packetWithIdentifier.PacketIdentifier;
}

if (byId.TryRemove(identifier.Value, out var tcs))
{
tcs.TrySetResult(packet);
return;
}
tcs.TrySetResult(packet);
return;
} }


throw new InvalidOperationException($"Packet of type '{type.Name}' not handled or dispatched."); throw new InvalidOperationException($"Packet of type '{type.Name}' not handled or dispatched.");
@@ -74,8 +74,8 @@ namespace MQTTnet.Client
identifier = 0; identifier = 0;
} }


var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
if (!byId.TryAdd(identifier.Value, tcs))
var dictionaryKey = new Tuple<ushort?,Type>(identifier, responseType);
if (!_awaiters.TryAdd(dictionaryKey,tcs))
{ {
throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{responseType}' with identifier {identifier}."); throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{responseType}' with identifier {identifier}.");
} }
@@ -90,8 +90,8 @@ namespace MQTTnet.Client
identifier = 0; identifier = 0;
} }


var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
byId.TryRemove(identifier.Value, out var _);
var dictionaryKey = new Tuple<ushort?, Type>(identifier, responseType);
_awaiters.TryRemove(dictionaryKey, out var _);
} }
} }
} }

+ 1
- 1
README.md View File

@@ -78,7 +78,7 @@ If you want to contribute to this project just create a pull request. But only p
This library is used in the following projects: This library is used in the following projects:


* MQTT Client Rx (Wrapper for Reactive Extensions, <https://github.com/1iveowl/MQTTClient.rx>) * MQTT Client Rx (Wrapper for Reactive Extensions, <https://github.com/1iveowl/MQTTClient.rx>)
* MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester), [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8), and [Windows 10](https://www.microsoft.com/en-us/store/p/mqtt-tester/9wzdncrd272c))
* MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester) and [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8))
* Wirehome (Open Source Home Automation system for .NET, <https://github.com/chkr1011/Wirehome>) * Wirehome (Open Source Home Automation system for .NET, <https://github.com/chkr1011/Wirehome>)






+ 1
- 1
Tests/MQTTnet.Core.Tests/MqttServerTests.cs View File

@@ -75,7 +75,7 @@ namespace MQTTnet.Core.Tests
await s.StopAsync(); await s.StopAsync();
} }


Assert.AreEqual(1, receivedMessagesCount);
Assert.AreEqual(0, receivedMessagesCount);
} }


[TestMethod] [TestMethod]


Loading…
Cancel
Save