diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 27b56ff..ea78843 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -25,6 +25,7 @@ namespace MQTTnet.Client private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); private readonly Stopwatch _sendTracker = new Stopwatch(); + private readonly Stopwatch _receiveTracker = new Stopwatch(); private readonly object _disconnectLock = new object(); private readonly IMqttClientAdapterFactory _adapterFactory; @@ -88,6 +89,7 @@ namespace MQTTnet.Client authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, cancellationToken).ConfigureAwait(false); _sendTracker.Restart(); + _receiveTracker.Restart(); if (Options.KeepAlivePeriod != TimeSpan.Zero) { @@ -344,7 +346,9 @@ namespace MQTTnet.Client try { await _adapter.SendPacketAsync(requestPacket, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); - return await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false); + var response = await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false); + _receiveTracker.Restart(); + return response; } catch (MqttCommunicationTimedOutException) { @@ -369,14 +373,14 @@ namespace MQTTnet.Client keepAliveSendInterval = Options.KeepAliveSendInterval.Value; } - var waitTime = keepAliveSendInterval - _sendTracker.Elapsed; - if (waitTime <= TimeSpan.Zero) + var waitTimeSend = keepAliveSendInterval - _sendTracker.Elapsed; + var waitTimeReceive = keepAliveSendInterval - _receiveTracker.Elapsed; + if (waitTimeSend <= TimeSpan.Zero || waitTimeReceive <= TimeSpan.Zero) { await SendAndReceiveAsync(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false); - waitTime = keepAliveSendInterval; } - await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false); + await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false); } } catch (Exception exception) @@ -473,6 +477,8 @@ namespace MQTTnet.Client { try { + _receiveTracker.Restart(); + if (packet is MqttPublishPacket publishPacket) { await TryProcessReceivedPublishPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false); @@ -652,4 +658,4 @@ namespace MQTTnet.Client return Interlocked.CompareExchange(ref _disconnectGate, 1, 0) != 0; } } -} \ No newline at end of file +}