Browse Source

Added receiveTracker to detect whether messages are received from broker.

release/3.x.x
Christoph 5 years ago
committed by GitHub
parent
commit
313f90a526
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 13 additions and 7 deletions
  1. +13
    -7
      Source/MQTTnet/Client/MqttClient.cs

+ 13
- 7
Source/MQTTnet/Client/MqttClient.cs View File

@@ -1,4 +1,4 @@
using System;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -25,6 +25,7 @@ namespace MQTTnet.Client
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly Stopwatch _sendTracker = new Stopwatch();
private readonly Stopwatch _receiveTracker = new Stopwatch();
private readonly object _disconnectLock = new object();

private readonly IMqttClientAdapterFactory _adapterFactory;
@@ -88,6 +89,7 @@ namespace MQTTnet.Client
authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, cancellationToken).ConfigureAwait(false);

_sendTracker.Restart();
_receiveTracker.Restart();

if (Options.KeepAlivePeriod != TimeSpan.Zero)
{
@@ -344,7 +346,9 @@ namespace MQTTnet.Client
try
{
await _adapter.SendPacketAsync(requestPacket, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
return await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false);
var response = await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false);
_receiveTracker.Restart();
return response;
}
catch (MqttCommunicationTimedOutException)
{
@@ -369,14 +373,14 @@ namespace MQTTnet.Client
keepAliveSendInterval = Options.KeepAliveSendInterval.Value;
}

var waitTime = keepAliveSendInterval - _sendTracker.Elapsed;
if (waitTime <= TimeSpan.Zero)
var waitTimeSend = keepAliveSendInterval - _sendTracker.Elapsed;
var waitTimeReceive = keepAliveSendInterval - _receiveTracker.Elapsed;
if (waitTimeSend <= TimeSpan.Zero || waitTimeReceive <= TimeSpan.Zero)
{
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false);
waitTime = keepAliveSendInterval;
}

await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false);
await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception exception)
@@ -473,6 +477,8 @@ namespace MQTTnet.Client
{
try
{
_receiveTracker.Restart();

if (packet is MqttPublishPacket publishPacket)
{
await TryProcessReceivedPublishPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false);
@@ -652,4 +658,4 @@ namespace MQTTnet.Client
return Interlocked.CompareExchange(ref _disconnectGate, 1, 0) != 0;
}
}
}
}

Loading…
Cancel
Save