diff --git a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs index 551dc8e..f16f689 100644 --- a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs +++ b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs @@ -28,7 +28,7 @@ namespace MQTTnet.Core.Server { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); - Task.Run(() => SendPendingPublishPacketsAsync(adapter, cancellationToken), cancellationToken); + Task.Factory.StartNew(async () => await SendPendingPublishPacketsAsync(adapter, cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); } public void Enqueue(MqttPublishPacket publishPacket) @@ -74,7 +74,7 @@ namespace MQTTnet.Core.Server { _trace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to communication exception."); } - if (exception is OperationCanceledException) + else if (exception is OperationCanceledException) { } else diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index f913901..5941a4f 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -45,10 +45,9 @@ namespace MQTTnet.Core.Server { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); - _willMessage = willMessage; - try { + _willMessage = willMessage; _adapter = adapter; _cancellationTokenSource = new CancellationTokenSource(); @@ -70,18 +69,23 @@ namespace MQTTnet.Core.Server public void Stop() { - if (_willMessage != null) + try { - _mqttClientSessionsManager.DispatchApplicationMessage(this, _willMessage); - } - - _cancellationTokenSource?.Cancel(false); - _cancellationTokenSource?.Dispose(); - _cancellationTokenSource = null; + _cancellationTokenSource?.Cancel(false); + _cancellationTokenSource?.Dispose(); + _cancellationTokenSource = null; - _adapter = null; + _adapter = null; - _trace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId); + _trace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId); + } + finally + { + if (_willMessage != null) + { + _mqttClientSessionsManager.DispatchApplicationMessage(this, _willMessage); + } + } } public void EnqueuePublishPacket(MqttPublishPacket publishPacket) diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index aedd3d7..6238f2c 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -160,6 +160,7 @@ namespace MQTTnet.Core.Server _clientSessions.Remove(connectPacket.ClientId); clientSession.Dispose(); clientSession = null; + _trace.Verbose(nameof(MqttClientSessionsManager), "Disposed existing session of client '{0}'.", connectPacket.ClientId); } else