Browse Source

Added exception handling to make sure all tasks are observed to avoid UnobservedTaskException.

release/3.x.x
Jimmy Rosenskog 5 years ago
parent
commit
699558e47a
1 changed files with 28 additions and 9 deletions
  1. +28
    -9
      Source/MQTTnet/Client/MqttClient.cs

+ 28
- 9
Source/MQTTnet/Client/MqttClient.cs View File

@@ -258,28 +258,34 @@ namespace MQTTnet.Client
var clientWasConnected = IsConnected; var clientWasConnected = IsConnected;


TryInitiateDisconnect(); TryInitiateDisconnect();
IsConnected = false;


try try
{ {
IsConnected = false;

if (_adapter != null) if (_adapter != null)
{ {
_logger.Verbose("Disconnecting [Timeout={0}]", Options.CommunicationTimeout); _logger.Verbose("Disconnecting [Timeout={0}]", Options.CommunicationTimeout);
await _adapter.DisconnectAsync(Options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false); await _adapter.DisconnectAsync(Options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
} }


var receiverTask = WaitForTaskAsync(_packetReceiverTask, sender);
var keepAliveTask = WaitForTaskAsync(_keepAlivePacketsSenderTask, sender);

await Task.WhenAll(receiverTask, keepAliveTask).ConfigureAwait(false);

_logger.Verbose("Disconnected from adapter."); _logger.Verbose("Disconnected from adapter.");
} }
catch (Exception adapterException) catch (Exception adapterException)
{ {
_logger.Warning(adapterException, "Error while disconnecting from adapter."); _logger.Warning(adapterException, "Error while disconnecting from adapter.");
} }

try
{
var receiverTask = WaitForTaskAsync(_packetReceiverTask, sender);
var keepAliveTask = WaitForTaskAsync(_keepAlivePacketsSenderTask, sender);

await Task.WhenAll(receiverTask, keepAliveTask).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.Warning(e, "Error while waiting for tasks.");
}
finally finally
{ {
Dispose(); Dispose();
@@ -346,11 +352,24 @@ namespace MQTTnet.Client
try try
{ {
await _adapter.SendPacketAsync(requestPacket, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); await _adapter.SendPacketAsync(requestPacket, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.Warning(e, "Error when sending packet of type '{0}'.", typeof(TResponsePacket).Name);
packetAwaiter.Cancel();
}

try
{
return await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false); return await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false);
} }
catch (MqttCommunicationTimedOutException)
catch (Exception exception)
{ {
_logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Name);
if (exception is MqttCommunicationTimedOutException)
{
_logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Name);
}

throw; throw;
} }
} }


Loading…
Cancel
Save