Browse Source

Refactor thread management for incoming packets.

release/3.x.x
Christian Kratky 6 years ago
parent
commit
39edd22a9d
8 changed files with 29 additions and 79 deletions
  1. +1
    -1
      Build/MQTTnet.nuspec
  2. +2
    -1
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  3. +1
    -2
      Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs
  4. +19
    -54
      Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
  5. +1
    -2
      Frameworks/MQTTnet.NetStandard/Client/MqttClientOptions.cs
  6. +0
    -7
      Frameworks/MQTTnet.NetStandard/Client/MqttClientOptionsBuilder.cs
  7. +5
    -4
      Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs
  8. +0
    -8
      Frameworks/MQTTnet.NetStandard/Client/MqttReceivedApplicationMessageProcessingMode.cs

+ 1
- 1
Build/MQTTnet.nuspec View File

@@ -15,7 +15,7 @@
* [Core] Added several packet validations.
* [Core] Log messages now contain the complete source path including parent components.
* [Core] The adapter now has an _Endpoint_ definition as string containing remote IP and port.
* [Client] Received messages are now processed in the worker thread by default. Added a new setting for switching back to dedicated threads.
* [Client] Received messages are now processed completely in the worker thread without creating new Tasks.
* [Client] Fixed wrong calculation for sending keep alive packets (thanks to @cstichlberger)
* [ManagedClient] Fixed a loading issue of stored application messages (thanks to @JTrotta).
* [Server] Added support for other WebSocket sub protocol formats like mqttv-3.1.1 (thanks to @israellot).


+ 2
- 1
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs View File

@@ -135,9 +135,10 @@ namespace MQTTnet.Adapter
{
var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(channel, cancellationToken).ConfigureAwait(false);
ReadingPacketStarted?.Invoke(this, EventArgs.Empty);
try
{
ReadingPacketStarted?.Invoke(this, EventArgs.Empty);

var bodyLength = await MqttPacketReader.ReadBodyLengthAsync(channel, cancellationToken).ConfigureAwait(false);
if (bodyLength == 0)
{


+ 1
- 2
Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs View File

@@ -14,8 +14,7 @@ namespace MQTTnet.Client
TimeSpan CommunicationTimeout { get; }
TimeSpan KeepAlivePeriod { get; }
TimeSpan? KeepAliveSendInterval { get; }
MqttReceivedApplicationMessageProcessingMode ReceivedApplicationMessageProcessingMode { get; }

MqttApplicationMessage WillMessage { get; }
}
}

+ 19
- 54
Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs View File

@@ -19,7 +19,7 @@ namespace MQTTnet.Client
private readonly Stopwatch _sendTracker = new Stopwatch();
private readonly SemaphoreSlim _disconnectLock = new SemaphoreSlim(1, 1);
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly IMqttNetChildLogger _logger;

@@ -390,25 +390,7 @@ namespace MQTTnet.Client
while (!cancellationToken.IsCancellationRequested)
{
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);

if (cancellationToken.IsCancellationRequested)
{
return;
}

if (packet == null)
{
continue;
}

if (_options.ReceivedApplicationMessageProcessingMode == MqttReceivedApplicationMessageProcessingMode.SingleThread)
{
await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false);
}
else if (_options.ReceivedApplicationMessageProcessingMode == MqttReceivedApplicationMessageProcessingMode.DedicatedThread)
{
StartProcessReceivedPacketAsync(packet, cancellationToken);
}
await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception exception)
@@ -434,40 +416,30 @@ namespace MQTTnet.Client
}
}

private async Task ProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
private Task ProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{
try
if (packet is MqttPublishPacket publishPacket)
{
if (packet is MqttPublishPacket publishPacket)
{
await ProcessReceivedPublishPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false);
return;
}

if (packet is MqttPingReqPacket)
{
await SendAsync(new MqttPingRespPacket(), cancellationToken).ConfigureAwait(false);
return;
}

if (packet is MqttDisconnectPacket)
{
await DisconnectAsync().ConfigureAwait(false);
return;
}
return ProcessReceivedPublishPacketAsync(publishPacket, cancellationToken);
}

if (packet is MqttPubRelPacket pubRelPacket)
{
await ProcessReceivedPubRelPacket(pubRelPacket, cancellationToken).ConfigureAwait(false);
return;
}
if (packet is MqttPingReqPacket)
{
return SendAsync(new MqttPingRespPacket(), cancellationToken);
}

_packetDispatcher.Dispatch(packet);
if (packet is MqttDisconnectPacket)
{
return DisconnectAsync();
}
catch (Exception exception)

if (packet is MqttPubRelPacket pubRelPacket)
{
_logger.Error(exception, "Unhandled exception while processing received packet.");
return ProcessReceivedPubRelPacket(pubRelPacket, cancellationToken);
}

_packetDispatcher.Dispatch(packet);
return Task.FromResult(0);
}

private Task ProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
@@ -518,13 +490,6 @@ namespace MQTTnet.Client
cancellationToken);
}

private void StartProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ProcessReceivedPacketAsync(packet, cancellationToken), cancellationToken);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}

private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
{
try


+ 1
- 2
Frameworks/MQTTnet.NetStandard/Client/MqttClientOptions.cs View File

@@ -14,8 +14,7 @@ namespace MQTTnet.Client
public TimeSpan CommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);
public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(15);
public TimeSpan? KeepAliveSendInterval { get; set; }
public MqttReceivedApplicationMessageProcessingMode ReceivedApplicationMessageProcessingMode { get; set; } = MqttReceivedApplicationMessageProcessingMode.SingleThread;

public MqttApplicationMessage WillMessage { get; set; }
}
}

+ 0
- 7
Frameworks/MQTTnet.NetStandard/Client/MqttClientOptionsBuilder.cs View File

@@ -114,13 +114,6 @@ namespace MQTTnet.Client
return this;
}

public MqttClientOptionsBuilder WithReceivedApplicationMessageProcessingMode(
MqttReceivedApplicationMessageProcessingMode mode)
{
_options.ReceivedApplicationMessageProcessingMode = mode;
return this;
}

public IMqttClientOptions Build()
{
if (_tlsOptions != null)


+ 5
- 4
Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs View File

@@ -13,7 +13,7 @@ namespace MQTTnet.Client
{
foreach (var awaiter in _awaiters)
{
awaiter.Value.SetException(exception);
Task.Run(() => awaiter.Value.SetException(exception)); // Task.Run fixes a dead lock. Without this the client only receives one message.
}
}

@@ -30,9 +30,10 @@ namespace MQTTnet.Client
var type = packet.GetType();
var key = new Tuple<ushort, Type>(identifier, type);
if (_awaiters.TryRemove(key, out var tcs))
if (_awaiters.TryRemove(key, out var awaiter))
{
tcs.TrySetResult(packet);
awaiter.SetResult(packet);
Task.Run(() => awaiter.SetResult(packet)); // Task.Run fixes a dead lock. Without this the client only receives one message.
return;
}

@@ -70,7 +71,7 @@ namespace MQTTnet.Client
}

var key = new Tuple<ushort, Type>(identifier ?? 0, typeof(TResponsePacket));
_awaiters.TryRemove(key, out var _);
_awaiters.TryRemove(key, out _);
}
}
}

+ 0
- 8
Frameworks/MQTTnet.NetStandard/Client/MqttReceivedApplicationMessageProcessingMode.cs View File

@@ -1,8 +0,0 @@
namespace MQTTnet.Client
{
public enum MqttReceivedApplicationMessageProcessingMode
{
SingleThread,
DedicatedThread
}
}

Loading…
Cancel
Save