Sfoglia il codice sorgente

Merge remote-tracking branch 'origin/develop' into develop

release/3.x.x
Christian Kratky 5 anni fa
parent
commit
5fa12649bf
1 ha cambiato i file con 43 aggiunte e 12 eliminazioni
  1. +43
    -12
      Source/MQTTnet/Client/MqttClient.cs

+ 43
- 12
Source/MQTTnet/Client/MqttClient.cs Vedi File

@@ -149,7 +149,7 @@ namespace MQTTnet.Client
Properties = new MqttAuthPacketProperties
{
// This must always be equal to the value from the CONNECT packet. So we use it here to ensure that.
AuthenticationMethod = Options.AuthenticationMethod,
AuthenticationMethod = Options.AuthenticationMethod,
AuthenticationData = data.AuthenticationData,
ReasonString = data.ReasonString,
UserProperties = data.UserProperties
@@ -258,26 +258,34 @@ namespace MQTTnet.Client
var clientWasConnected = IsConnected;

TryInitiateDisconnect();
IsConnected = false;

try
{
IsConnected = false;

if (_adapter != null)
{
_logger.Verbose("Disconnecting [Timeout={0}]", Options.CommunicationTimeout);
await _adapter.DisconnectAsync(Options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}

await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false);
await WaitForTaskAsync(_keepAlivePacketsSenderTask, sender).ConfigureAwait(false);

_logger.Verbose("Disconnected from adapter.");
}
catch (Exception adapterException)
{
_logger.Warning(adapterException, "Error while disconnecting from adapter.");
}

try
{
var receiverTask = WaitForTaskAsync(_packetReceiverTask, sender);
var keepAliveTask = WaitForTaskAsync(_keepAlivePacketsSenderTask, sender);

await Task.WhenAll(receiverTask, keepAliveTask).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.Warning(e, "Error while waiting for tasks.");
}
finally
{
Dispose();
@@ -344,11 +352,24 @@ namespace MQTTnet.Client
try
{
await _adapter.SendPacketAsync(requestPacket, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.Warning(e, "Error when sending packet of type '{0}'.", typeof(TResponsePacket).Name);
packetAwaiter.Cancel();
}

try
{
return await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false);
}
catch (MqttCommunicationTimedOutException)
catch (Exception exception)
{
_logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Name);
if (exception is MqttCommunicationTimedOutException)
{
_logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Name);
}

throw;
}
}
@@ -567,7 +588,7 @@ namespace MQTTnet.Client
};

await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false);
}
}
}
else
{
@@ -626,15 +647,25 @@ namespace MQTTnet.Client
return true;
}

private static async Task WaitForTaskAsync(Task task, Task sender)
private async Task WaitForTaskAsync(Task task, Task sender)
{
if (task == sender || task == null)
if (task == null)
{
return;
}

if (task.IsCanceled || task.IsCompleted || task.IsFaulted)
if (task == sender)
{
// Return here to avoid deadlocks, but first any eventual exception in the task
// must be handled to avoid not getting an unhandled task exception
if (!task.IsFaulted)
{
return;
}

// By accessing the Exception property the exception is considered handled and will
// not result in an unhandled task exception later by the finalizer
_logger.Warning(task.Exception, "Error while waiting for background task.");
return;
}



Caricamento…
Annulla
Salva