diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 27b56ff..527a5cd 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -149,7 +149,7 @@ namespace MQTTnet.Client Properties = new MqttAuthPacketProperties { // This must always be equal to the value from the CONNECT packet. So we use it here to ensure that. - AuthenticationMethod = Options.AuthenticationMethod, + AuthenticationMethod = Options.AuthenticationMethod, AuthenticationData = data.AuthenticationData, ReasonString = data.ReasonString, UserProperties = data.UserProperties @@ -258,26 +258,34 @@ namespace MQTTnet.Client var clientWasConnected = IsConnected; TryInitiateDisconnect(); + IsConnected = false; try { - IsConnected = false; - if (_adapter != null) { _logger.Verbose("Disconnecting [Timeout={0}]", Options.CommunicationTimeout); await _adapter.DisconnectAsync(Options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false); } - await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false); - await WaitForTaskAsync(_keepAlivePacketsSenderTask, sender).ConfigureAwait(false); - _logger.Verbose("Disconnected from adapter."); } catch (Exception adapterException) { _logger.Warning(adapterException, "Error while disconnecting from adapter."); } + + try + { + var receiverTask = WaitForTaskAsync(_packetReceiverTask, sender); + var keepAliveTask = WaitForTaskAsync(_keepAlivePacketsSenderTask, sender); + + await Task.WhenAll(receiverTask, keepAliveTask).ConfigureAwait(false); + } + catch (Exception e) + { + _logger.Warning(e, "Error while waiting for tasks."); + } finally { Dispose(); @@ -344,11 +352,24 @@ namespace MQTTnet.Client try { await _adapter.SendPacketAsync(requestPacket, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); + } + catch (Exception e) + { + _logger.Warning(e, "Error when sending packet of type '{0}'.", typeof(TResponsePacket).Name); + packetAwaiter.Cancel(); + } + + try + { return await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false); } - catch (MqttCommunicationTimedOutException) + catch (Exception exception) { - _logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Name); + if (exception is MqttCommunicationTimedOutException) + { + _logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Name); + } + throw; } } @@ -567,7 +588,7 @@ namespace MQTTnet.Client }; await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false); - } + } } else { @@ -626,15 +647,25 @@ namespace MQTTnet.Client return true; } - private static async Task WaitForTaskAsync(Task task, Task sender) + private async Task WaitForTaskAsync(Task task, Task sender) { - if (task == sender || task == null) + if (task == null) { return; } - if (task.IsCanceled || task.IsCompleted || task.IsFaulted) + if (task == sender) { + // Return here to avoid deadlocks, but first any eventual exception in the task + // must be handled to avoid not getting an unhandled task exception + if (!task.IsFaulted) + { + return; + } + + // By accessing the Exception property the exception is considered handled and will + // not result in an unhandled task exception later by the finalizer + _logger.Warning(task.Exception, "Error while waiting for background task."); return; }