Browse Source

Merge pull request #39 from JanEggers/develop

fixed unittest server and client both generated id 2 for differnt pac…
release/3.x.x
Christian 7 years ago
committed by GitHub
parent
commit
da7adc1068
4 changed files with 44 additions and 116 deletions
  1. +14
    -47
      MQTTnet.Core/Client/MqttPacketDispatcher.cs
  2. +24
    -49
      MQTTnet.Core/Server/MqttClientMessageQueue.cs
  3. +0
    -19
      MQTTnet.Core/Server/MqttClientPublishPacketContext.cs
  4. +6
    -1
      Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs

+ 14
- 47
MQTTnet.Core/Client/MqttPacketDispatcher.cs View File

@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
@@ -11,18 +10,14 @@ namespace MQTTnet.Core.Client
{
public class MqttPacketDispatcher
{
private readonly object _syncRoot = new object();
private readonly HashSet<MqttBasePacket> _receivedPackets = new HashSet<MqttBasePacket>();
private readonly ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>> _packetByResponseType = new ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>>();
private readonly ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>> _packetByIdentifier = new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>();
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<ushort,TaskCompletionSource<MqttBasePacket>>> _packetByResponseTypeAndIdentifier = new ConcurrentDictionary<Type, ConcurrentDictionary<ushort,TaskCompletionSource<MqttBasePacket>>>();

public async Task<MqttBasePacket> WaitForPacketAsync(MqttBasePacket request, Type responseType, TimeSpan timeout)
{
if (request == null) throw new ArgumentNullException(nameof(request));

var packetAwaiter = AddPacketAwaiter(request, responseType);
DispatchPendingPackets();

try
{
return await packetAwaiter.Task.TimeoutAfter(timeout);
@@ -42,43 +37,27 @@ namespace MQTTnet.Core.Client
{
if (packet == null) throw new ArgumentNullException(nameof(packet));

var packetDispatched = false;

var type = packet.GetType();
if (packet is IMqttPacketWithIdentifier withIdentifier)
{
if (_packetByIdentifier.TryRemove(withIdentifier.PacketIdentifier, out var tcs))
if (_packetByResponseTypeAndIdentifier.TryGetValue(type, out var byid))
{
tcs.TrySetResult(packet);
packetDispatched = true;
if (byid.TryRemove( withIdentifier.PacketIdentifier, out var tcs))
{
tcs.TrySetResult( packet );
}
}
}
else if (_packetByResponseType.TryRemove(packet.GetType(), out var tcs))
else if (_packetByResponseType.TryRemove(type, out var tcs))
{
tcs.TrySetResult(packet);
packetDispatched = true;
}

lock (_syncRoot)
{
if (!packetDispatched)
{
_receivedPackets.Add(packet);
}
else
{
_receivedPackets.Remove(packet);
}
}
}

public void Reset()
{
lock (_syncRoot)
{
_receivedPackets.Clear();
}

_packetByIdentifier.Clear();
_packetByResponseTypeAndIdentifier.Clear();
_packetByResponseType.Clear();
}

private TaskCompletionSource<MqttBasePacket> AddPacketAwaiter(MqttBasePacket request, Type responseType)
@@ -86,7 +65,8 @@ namespace MQTTnet.Core.Client
var tcs = new TaskCompletionSource<MqttBasePacket>();
if (request is IMqttPacketWithIdentifier withIdent)
{
_packetByIdentifier[withIdent.PacketIdentifier] = tcs;
var byId = _packetByResponseTypeAndIdentifier.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
byId[withIdent.PacketIdentifier] = tcs;
}
else
{
@@ -100,26 +80,13 @@ namespace MQTTnet.Core.Client
{
if (request is IMqttPacketWithIdentifier withIdent)
{
_packetByIdentifier.TryRemove(withIdent.PacketIdentifier, out var _);
var byId = _packetByResponseTypeAndIdentifier.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
byId.TryRemove(withIdent.PacketIdentifier, out var _);
}
else
{
_packetByResponseType.TryRemove(responseType, out var _);
}
}

private void DispatchPendingPackets()
{
List<MqttBasePacket> receivedPackets;
lock (_syncRoot)
{
receivedPackets = new List<MqttBasePacket>(_receivedPackets);
}

foreach (var pendingPacket in receivedPackets)
{
Dispatch(pendingPacket);
}
}
}
}

+ 24
- 49
MQTTnet.Core/Server/MqttClientMessageQueue.cs View File

@@ -6,16 +6,16 @@ using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets;
using System.Linq;

namespace MQTTnet.Core.Server
{
public sealed class MqttClientMessageQueue
{
private readonly BlockingCollection<MqttClientPublishPacketContext> _pendingPublishPackets = new BlockingCollection<MqttClientPublishPacketContext>();
private readonly BlockingCollection<MqttPublishPacket> _pendingPublishPackets = new BlockingCollection<MqttPublishPacket>();

private readonly MqttServerOptions _options;
private CancellationTokenSource _cancellationTokenSource;
private IMqttCommunicationAdapter _adapter;

public MqttClientMessageQueue(MqttServerOptions options)
{
@@ -29,15 +29,14 @@ namespace MQTTnet.Core.Server
throw new InvalidOperationException($"{nameof(MqttClientMessageQueue)} already started.");
}

_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
if (adapter == null) throw new ArgumentNullException(nameof(adapter));
_cancellationTokenSource = new CancellationTokenSource();

Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token, adapter), _cancellationTokenSource.Token);
}

public void Stop()
{
_adapter = null;
_cancellationTokenSource?.Cancel();
_cancellationTokenSource = null;
_pendingPublishPackets?.Dispose();
@@ -47,61 +46,37 @@ namespace MQTTnet.Core.Server
{
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));

_pendingPublishPackets.Add(new MqttClientPublishPacketContext(publishPacket));
_pendingPublishPackets.Add(publishPacket);
}

private async Task SendPendingPublishPacketsAsync(CancellationToken cancellationToken)
private async Task SendPendingPublishPacketsAsync(CancellationToken cancellationToken, IMqttCommunicationAdapter adapter)
{
foreach (var publishPacket in _pendingPublishPackets.GetConsumingEnumerable(cancellationToken))
var consumable = _pendingPublishPackets.GetConsumingEnumerable();
while (!cancellationToken.IsCancellationRequested)
{
var packets = consumable.Take(_pendingPublishPackets.Count).ToList();
try
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
if (_adapter == null)
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, packets).ConfigureAwait(false);
}
catch (MqttCommunicationException exception)
{
MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
foreach (var publishPacket in packets)
{
continue;
publishPacket.Dup = true;
_pendingPublishPackets.Add(publishPacket);
}

await TrySendPendingPublishPacketAsync(publishPacket).ConfigureAwait(false);
}
catch (Exception e)
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttClientMessageQueue), e, "Error while sending pending publish packets.");
}
}
}

private async Task TrySendPendingPublishPacketAsync(MqttClientPublishPacketContext publishPacketContext)
{
try
{
if (_adapter == null)
{
return;
MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
foreach (var publishPacket in packets)
{
publishPacket.Dup = true;
_pendingPublishPackets.Add(publishPacket);
}
}

publishPacketContext.PublishPacket.Dup = publishPacketContext.SendTries > 0;
await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, publishPacketContext.PublishPacket).ConfigureAwait(false);

publishPacketContext.IsSent = true;
}
catch (MqttCommunicationException exception)
{
MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
_pendingPublishPackets.Add(publishPacketContext);
}
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
_pendingPublishPackets.Add(publishPacketContext);
}
finally
{
publishPacketContext.SendTries++;
}
}
}


+ 0
- 19
MQTTnet.Core/Server/MqttClientPublishPacketContext.cs View File

@@ -1,19 +0,0 @@
using System;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Server
{
public sealed class MqttClientPublishPacketContext
{
public MqttClientPublishPacketContext(MqttPublishPacket publishPacket)
{
PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket));
}

public MqttPublishPacket PublishPacket { get; }

public int SendTries { get; set; }

public bool IsSent { get; set; }
}
}

+ 6
- 1
Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs View File

@@ -18,11 +18,16 @@ namespace MQTTnet.TestApp.NetFramework
public static async Task RunAsync()
{
var server = Task.Run(() => RunServerAsync());
var client = Task.Run(() => RunClientAsync(300, TimeSpan.FromMilliseconds(10)));
var client = Task.Run(() => RunClientAsync(2000, TimeSpan.FromMilliseconds(10)));

await Task.WhenAll(server, client).ConfigureAwait(false);
}

private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval)
{
return Task.WhenAll(Enumerable.Range(0, 3).Select((i) => Task.Run(() => RunClientAsync(msgChunkSize, interval))));
}

private static async Task RunClientAsync( int msgChunkSize, TimeSpan interval )
{
try


Loading…
Cancel
Save