|
|
@@ -149,7 +149,7 @@ namespace MQTTnet.Client |
|
|
|
Properties = new MqttAuthPacketProperties |
|
|
|
{ |
|
|
|
// This must always be equal to the value from the CONNECT packet. So we use it here to ensure that. |
|
|
|
AuthenticationMethod = Options.AuthenticationMethod, |
|
|
|
AuthenticationMethod = Options.AuthenticationMethod, |
|
|
|
AuthenticationData = data.AuthenticationData, |
|
|
|
ReasonString = data.ReasonString, |
|
|
|
UserProperties = data.UserProperties |
|
|
@@ -258,26 +258,34 @@ namespace MQTTnet.Client |
|
|
|
var clientWasConnected = IsConnected; |
|
|
|
|
|
|
|
TryInitiateDisconnect(); |
|
|
|
IsConnected = false; |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
IsConnected = false; |
|
|
|
|
|
|
|
if (_adapter != null) |
|
|
|
{ |
|
|
|
_logger.Verbose("Disconnecting [Timeout={0}]", Options.CommunicationTimeout); |
|
|
|
await _adapter.DisconnectAsync(Options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false); |
|
|
|
} |
|
|
|
|
|
|
|
await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false); |
|
|
|
await WaitForTaskAsync(_keepAlivePacketsSenderTask, sender).ConfigureAwait(false); |
|
|
|
|
|
|
|
_logger.Verbose("Disconnected from adapter."); |
|
|
|
} |
|
|
|
catch (Exception adapterException) |
|
|
|
{ |
|
|
|
_logger.Warning(adapterException, "Error while disconnecting from adapter."); |
|
|
|
} |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
var receiverTask = WaitForTaskAsync(_packetReceiverTask, sender); |
|
|
|
var keepAliveTask = WaitForTaskAsync(_keepAlivePacketsSenderTask, sender); |
|
|
|
|
|
|
|
await Task.WhenAll(receiverTask, keepAliveTask).ConfigureAwait(false); |
|
|
|
} |
|
|
|
catch (Exception e) |
|
|
|
{ |
|
|
|
_logger.Warning(e, "Error while waiting for tasks."); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
Dispose(); |
|
|
@@ -344,11 +352,24 @@ namespace MQTTnet.Client |
|
|
|
try |
|
|
|
{ |
|
|
|
await _adapter.SendPacketAsync(requestPacket, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); |
|
|
|
} |
|
|
|
catch (Exception e) |
|
|
|
{ |
|
|
|
_logger.Warning(e, "Error when sending packet of type '{0}'.", typeof(TResponsePacket).Name); |
|
|
|
packetAwaiter.Cancel(); |
|
|
|
} |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
return await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false); |
|
|
|
} |
|
|
|
catch (MqttCommunicationTimedOutException) |
|
|
|
catch (Exception exception) |
|
|
|
{ |
|
|
|
_logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Name); |
|
|
|
if (exception is MqttCommunicationTimedOutException) |
|
|
|
{ |
|
|
|
_logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Name); |
|
|
|
} |
|
|
|
|
|
|
|
throw; |
|
|
|
} |
|
|
|
} |
|
|
@@ -567,7 +588,7 @@ namespace MQTTnet.Client |
|
|
|
}; |
|
|
|
|
|
|
|
await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
@@ -626,15 +647,25 @@ namespace MQTTnet.Client |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
private static async Task WaitForTaskAsync(Task task, Task sender) |
|
|
|
private async Task WaitForTaskAsync(Task task, Task sender) |
|
|
|
{ |
|
|
|
if (task == sender || task == null) |
|
|
|
if (task == null) |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
if (task.IsCanceled || task.IsCompleted || task.IsFaulted) |
|
|
|
if (task == sender) |
|
|
|
{ |
|
|
|
// Return here to avoid deadlocks, but first any eventual exception in the task |
|
|
|
// must be handled to avoid not getting an unhandled task exception |
|
|
|
if (!task.IsFaulted) |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
// By accessing the Exception property the exception is considered handled and will |
|
|
|
// not result in an unhandled task exception later by the finalizer |
|
|
|
_logger.Warning(task.Exception, "Error while waiting for background task."); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|