From 11ccb951aa7e3141b0d92d5eadf1b79a29d6a221 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 1 Nov 2017 13:34:39 +0100 Subject: [PATCH] Refactor traces of server session handling --- .../Adapter/MqttChannelCommunicationAdapter.cs | 4 ++-- .../Adapter/MqttCommunicationAdapterExtensions.cs | 2 ++ .../Server/MqttClientPendingMessagesQueue.cs | 14 ++++++++------ MQTTnet.Core/Server/MqttClientSession.cs | 1 - 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index 7084227..04857b5 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -85,10 +85,10 @@ namespace MQTTnet.Core.Adapter public async Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable packets) { - await _semaphore.WaitAsync(cancellationToken); - try { + await _semaphore.WaitAsync(cancellationToken); + foreach (var packet in packets) { if (packet == null) diff --git a/MQTTnet.Core/Adapter/MqttCommunicationAdapterExtensions.cs b/MQTTnet.Core/Adapter/MqttCommunicationAdapterExtensions.cs index 0fa9ab5..2142fd0 100644 --- a/MQTTnet.Core/Adapter/MqttCommunicationAdapterExtensions.cs +++ b/MQTTnet.Core/Adapter/MqttCommunicationAdapterExtensions.cs @@ -9,6 +9,8 @@ namespace MQTTnet.Core.Adapter { public static Task SendPacketsAsync(this IMqttCommunicationAdapter adapter, TimeSpan timeout, CancellationToken cancellationToken, params MqttBasePacket[] packets) { + if (adapter == null) throw new ArgumentNullException(nameof(adapter)); + return adapter.SendPacketsAsync(timeout, cancellationToken, packets); } } diff --git a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs index 446e0a8..599f4ee 100644 --- a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs +++ b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs @@ -13,8 +13,8 @@ namespace MQTTnet.Core.Server public sealed class MqttClientPendingMessagesQueue { private readonly BlockingCollection _pendingPublishPackets = new BlockingCollection(); - private readonly MqttClientSession _session; private readonly MqttServerOptions _options; + private readonly MqttClientSession _session; private readonly ILogger _logger; public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session, ILogger logger) @@ -28,7 +28,7 @@ namespace MQTTnet.Core.Server { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); - Task.Factory.StartNew(async () => await SendPendingPublishPacketsAsync(adapter, cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); + Task.Run(async () => await SendPendingPublishPacketsAsync(adapter, cancellationToken), cancellationToken).ConfigureAwait(false); } public void Enqueue(MqttPublishPacket publishPacket) @@ -36,6 +36,7 @@ namespace MQTTnet.Core.Server if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); _pendingPublishPackets.Add(publishPacket); + _logger.LogTrace("Enqueued packet (ClientId: {0}).", _session.ClientId); } private async Task SendPendingPublishPacketsAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken) @@ -52,7 +53,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Unhandled exception while sending pending publish packets."); + _logger.LogError(new EventId(), exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _session.ClientId); } } @@ -63,23 +64,24 @@ namespace MQTTnet.Core.Server try { await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false); + _logger.LogTrace("Enqueued packet sent (ClientId: {0}).", _session.ClientId); } catch (Exception exception) { if (exception is MqttCommunicationTimedOutException) { - _logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to timeout."); + _logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _session.ClientId); } else if (exception is MqttCommunicationException) { - _logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to communication exception."); + _logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _session.ClientId); } else if (exception is OperationCanceledException) { } else { - _logger.LogError(new EventId(), exception, "Sending publish packet failed."); + _logger.LogError(new EventId(), exception, "Sending publish packet failed (ClientId: {0}).", _session.ClientId); } if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 903b330..52f6393 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -108,7 +108,6 @@ namespace MQTTnet.Core.Server } _pendingMessagesQueue.Enqueue(publishPacket); - _logger.LogTrace("Client '{0}': Enqueued pending publish packet.", ClientId); } public void Dispose()