|
|
@@ -28,6 +28,7 @@ namespace MQTTnet.Client |
|
|
|
private Task _packetReceiverTask; |
|
|
|
private Task _keepAliveMessageSenderTask; |
|
|
|
private IMqttChannelAdapter _adapter; |
|
|
|
private bool _cleanDisconnectInitiated; |
|
|
|
|
|
|
|
public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger) |
|
|
|
{ |
|
|
@@ -59,7 +60,7 @@ namespace MQTTnet.Client |
|
|
|
|
|
|
|
_adapter = _adapterFactory.CreateClientAdapter(options, _logger); |
|
|
|
|
|
|
|
_logger.Verbose("Trying to connect with server."); |
|
|
|
_logger.Verbose($"Trying to connect with server ({_options.ChannelOptions})."); |
|
|
|
await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false); |
|
|
|
_logger.Verbose("Connection with server established."); |
|
|
|
|
|
|
@@ -94,6 +95,8 @@ namespace MQTTnet.Client |
|
|
|
{ |
|
|
|
try |
|
|
|
{ |
|
|
|
_cleanDisconnectInitiated = true; |
|
|
|
|
|
|
|
if (IsConnected && !_cancellationTokenSource.IsCancellationRequested) |
|
|
|
{ |
|
|
|
await SendAsync(new MqttDisconnectPacket(), _cancellationTokenSource.Token).ConfigureAwait(false); |
|
|
@@ -235,24 +238,7 @@ namespace MQTTnet.Client |
|
|
|
|
|
|
|
private async Task DisconnectInternalAsync(Task sender, Exception exception) |
|
|
|
{ |
|
|
|
await _disconnectLock.WaitAsync().ConfigureAwait(false); |
|
|
|
try |
|
|
|
{ |
|
|
|
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
_cancellationTokenSource.Cancel(false); |
|
|
|
} |
|
|
|
catch (Exception adapterException) |
|
|
|
{ |
|
|
|
_logger.Warning(adapterException, "Error while disconnecting from adapter."); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
_disconnectLock.Release(); |
|
|
|
} |
|
|
|
await InitiateDisconnectAsync().ConfigureAwait(false); |
|
|
|
|
|
|
|
var clientWasConnected = IsConnected; |
|
|
|
IsConnected = false; |
|
|
@@ -279,12 +265,35 @@ namespace MQTTnet.Client |
|
|
|
_adapter = null; |
|
|
|
_cancellationTokenSource?.Dispose(); |
|
|
|
_cancellationTokenSource = null; |
|
|
|
_cleanDisconnectInitiated = false; |
|
|
|
|
|
|
|
_logger.Info("Disconnected."); |
|
|
|
Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private async Task InitiateDisconnectAsync() |
|
|
|
{ |
|
|
|
await _disconnectLock.WaitAsync().ConfigureAwait(false); |
|
|
|
try |
|
|
|
{ |
|
|
|
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
_cancellationTokenSource.Cancel(false); |
|
|
|
} |
|
|
|
catch (Exception adapterException) |
|
|
|
{ |
|
|
|
_logger.Warning(adapterException, "Error while initiating disconnect."); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
_disconnectLock.Release(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken) |
|
|
|
{ |
|
|
|
return SendAsync(new[] { packet }, cancellationToken); |
|
|
@@ -361,6 +370,11 @@ namespace MQTTnet.Client |
|
|
|
} |
|
|
|
catch (Exception exception) |
|
|
|
{ |
|
|
|
if (_cleanDisconnectInitiated) |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
if (exception is OperationCanceledException) |
|
|
|
{ |
|
|
|
} |
|
|
@@ -390,11 +404,19 @@ namespace MQTTnet.Client |
|
|
|
while (!cancellationToken.IsCancellationRequested) |
|
|
|
{ |
|
|
|
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false); |
|
|
|
await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false); |
|
|
|
if (packet != null) |
|
|
|
{ |
|
|
|
await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Exception exception) |
|
|
|
{ |
|
|
|
if (_cleanDisconnectInitiated) |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
if (exception is OperationCanceledException) |
|
|
|
{ |
|
|
|
} |
|
|
@@ -407,8 +429,8 @@ namespace MQTTnet.Client |
|
|
|
_logger.Error(exception, "Unhandled exception while receiving packets."); |
|
|
|
} |
|
|
|
|
|
|
|
await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); |
|
|
|
_packetDispatcher.Dispatch(exception); |
|
|
|
await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
@@ -458,7 +480,7 @@ namespace MQTTnet.Client |
|
|
|
|
|
|
|
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) |
|
|
|
{ |
|
|
|
// QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] |
|
|
|
// QoS 2 is implement as method "B" (4.3.3 QoS 2: Exactly once delivery) |
|
|
|
FireApplicationMessageReceivedEvent(publishPacket); |
|
|
|
return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, cancellationToken); |
|
|
|
} |
|
|
|