Browse Source

Refactor traces of server session handling

release/3.x.x
Christian Kratky 7 years ago
parent
commit
11ccb951aa
4 changed files with 12 additions and 9 deletions
  1. +2
    -2
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  2. +2
    -0
      MQTTnet.Core/Adapter/MqttCommunicationAdapterExtensions.cs
  3. +8
    -6
      MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs
  4. +0
    -1
      MQTTnet.Core/Server/MqttClientSession.cs

+ 2
- 2
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs View File

@@ -85,10 +85,10 @@ namespace MQTTnet.Core.Adapter


public async Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable<MqttBasePacket> packets) public async Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable<MqttBasePacket> packets)
{ {
await _semaphore.WaitAsync(cancellationToken);

try try
{ {
await _semaphore.WaitAsync(cancellationToken);

foreach (var packet in packets) foreach (var packet in packets)
{ {
if (packet == null) if (packet == null)


+ 2
- 0
MQTTnet.Core/Adapter/MqttCommunicationAdapterExtensions.cs View File

@@ -9,6 +9,8 @@ namespace MQTTnet.Core.Adapter
{ {
public static Task SendPacketsAsync(this IMqttCommunicationAdapter adapter, TimeSpan timeout, CancellationToken cancellationToken, params MqttBasePacket[] packets) public static Task SendPacketsAsync(this IMqttCommunicationAdapter adapter, TimeSpan timeout, CancellationToken cancellationToken, params MqttBasePacket[] packets)
{ {
if (adapter == null) throw new ArgumentNullException(nameof(adapter));

return adapter.SendPacketsAsync(timeout, cancellationToken, packets); return adapter.SendPacketsAsync(timeout, cancellationToken, packets);
} }
} }

+ 8
- 6
MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs View File

@@ -13,8 +13,8 @@ namespace MQTTnet.Core.Server
public sealed class MqttClientPendingMessagesQueue public sealed class MqttClientPendingMessagesQueue
{ {
private readonly BlockingCollection<MqttPublishPacket> _pendingPublishPackets = new BlockingCollection<MqttPublishPacket>(); private readonly BlockingCollection<MqttPublishPacket> _pendingPublishPackets = new BlockingCollection<MqttPublishPacket>();
private readonly MqttClientSession _session;
private readonly MqttServerOptions _options; private readonly MqttServerOptions _options;
private readonly MqttClientSession _session;
private readonly ILogger<MqttClientPendingMessagesQueue> _logger; private readonly ILogger<MqttClientPendingMessagesQueue> _logger;


public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session, ILogger<MqttClientPendingMessagesQueue> logger) public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session, ILogger<MqttClientPendingMessagesQueue> logger)
@@ -28,7 +28,7 @@ namespace MQTTnet.Core.Server
{ {
if (adapter == null) throw new ArgumentNullException(nameof(adapter)); if (adapter == null) throw new ArgumentNullException(nameof(adapter));


Task.Factory.StartNew(async () => await SendPendingPublishPacketsAsync(adapter, cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
Task.Run(async () => await SendPendingPublishPacketsAsync(adapter, cancellationToken), cancellationToken).ConfigureAwait(false);
} }


public void Enqueue(MqttPublishPacket publishPacket) public void Enqueue(MqttPublishPacket publishPacket)
@@ -36,6 +36,7 @@ namespace MQTTnet.Core.Server
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));


_pendingPublishPackets.Add(publishPacket); _pendingPublishPackets.Add(publishPacket);
_logger.LogTrace("Enqueued packet (ClientId: {0}).", _session.ClientId);
} }


private async Task SendPendingPublishPacketsAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken) private async Task SendPendingPublishPacketsAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
@@ -52,7 +53,7 @@ namespace MQTTnet.Core.Server
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Unhandled exception while sending pending publish packets.");
_logger.LogError(new EventId(), exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _session.ClientId);
} }
} }


@@ -63,23 +64,24 @@ namespace MQTTnet.Core.Server
try try
{ {
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false);
_logger.LogTrace("Enqueued packet sent (ClientId: {0}).", _session.ClientId);
} }
catch (Exception exception) catch (Exception exception)
{ {
if (exception is MqttCommunicationTimedOutException) if (exception is MqttCommunicationTimedOutException)
{ {
_logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to timeout.");
_logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _session.ClientId);
} }
else if (exception is MqttCommunicationException) else if (exception is MqttCommunicationException)
{ {
_logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to communication exception.");
_logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _session.ClientId);
} }
else if (exception is OperationCanceledException) else if (exception is OperationCanceledException)
{ {
} }
else else
{ {
_logger.LogError(new EventId(), exception, "Sending publish packet failed.");
_logger.LogError(new EventId(), exception, "Sending publish packet failed (ClientId: {0}).", _session.ClientId);
} }


if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)


+ 0
- 1
MQTTnet.Core/Server/MqttClientSession.cs View File

@@ -108,7 +108,6 @@ namespace MQTTnet.Core.Server
} }


_pendingMessagesQueue.Enqueue(publishPacket); _pendingMessagesQueue.Enqueue(publishPacket);
_logger.LogTrace("Client '{0}': Enqueued pending publish packet.", ClientId);
} }


public void Dispose() public void Dispose()


Loading…
Cancel
Save