@@ -1,5 +1,6 @@
using System;
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Linq;
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks;
@@ -15,6 +16,7 @@ namespace MQTTnet.Client
public class MqttClient : IMqttClient
public class MqttClient : IMqttClient
{
{
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly Stopwatch _sendTracker = new Stopwatch();
private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly MqttPacketDispatcher _packetDispatcher;
private readonly MqttPacketDispatcher _packetDispatcher;
private readonly IMqttNetLogger _logger;
private readonly IMqttNetLogger _logger;
@@ -59,10 +61,12 @@ namespace MQTTnet.Client
_logger.Trace<MqttClient>("Connection with server established.");
_logger.Trace<MqttClient>("Connection with server established.");
await StartReceivingPacketsAsync(_cancellationTokenSource.Token).ConfigureAwait(false);
await StartReceivingPacketsAsync(_cancellationTokenSource.Token).ConfigureAwait(false);
var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false);
var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false);
_logger.Trace<MqttClient>("MQTT connection with server established.");
_logger.Trace<MqttClient>("MQTT connection with server established.");
_sendTracker.Restart();
if (_options.KeepAlivePeriod != TimeSpan.Zero)
if (_options.KeepAlivePeriod != TimeSpan.Zero)
{
{
StartSendingKeepAliveMessages(_cancellationTokenSource.Token);
StartSendingKeepAliveMessages(_cancellationTokenSource.Token);
@@ -149,7 +153,7 @@ namespace MQTTnet.Client
case MqttQualityOfServiceLevel.AtMostOnce:
case MqttQualityOfServiceLevel.AtMostOnce:
{
{
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, qosGroup ).ConfigureAwait(false);
await SendAsync(qosGroup.ToArray() ).ConfigureAwait(false);
break;
break;
}
}
case MqttQualityOfServiceLevel.AtLeastOnce:
case MqttQualityOfServiceLevel.AtLeastOnce:
@@ -167,8 +171,14 @@ namespace MQTTnet.Client
foreach (var publishPacket in qosGroup)
foreach (var publishPacket in qosGroup)
{
{
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket).ConfigureAwait(false);
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket).ConfigureAwait(false);
await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>()).ConfigureAwait(false);
var pubRelPacket = new MqttPubRelPacket
{
PacketIdentifier = pubRecPacket.PacketIdentifier
};
await SendAndReceiveAsync<MqttPubCompPacket>(pubRelPacket).ConfigureAwait(false);
}
}
break;
break;
@@ -325,24 +335,31 @@ namespace MQTTnet.Client
private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
{
{
return SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>());
var response = new MqttPubCompPacket
{
PacketIdentifier = pubRelPacket.PacketIdentifier
};
return SendAsync(response);
}
}
private Task SendAsync(MqttBasePacket packet)
private Task SendAsync(params MqttBasePacket[] packets )
{
{
return _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, packet);
_sendTracker.Restart();
return _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, packets);
}
}
private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
{
{
ushort identifier = 0 ;
ushort? identifier = null ;
if (requestPacket is IMqttPacketWithIdentifier requestPacketWithIdentifier)
if (requestPacket is IMqttPacketWithIdentifier requestPacketWithIdentifier)
{
{
identifier = requestPacketWithIdentifier.PacketIdentifier;
identifier = requestPacketWithIdentifier.PacketIdentifier;
}
}
var packetAwaiter = _packetDispatcher.WaitForPacketAsync(typeof(TResponsePacket), identifier, _options.CommunicationTimeout);
var packetAwaiter = _packetDispatcher.WaitForPacketAsync(typeof(TResponsePacket), identifier, _options.CommunicationTimeout);
await _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false);
await SendAsync(requestPacket).ConfigureAwait(false);
return (TResponsePacket)await packetAwaiter.ConfigureAwait(false);
return (TResponsePacket)await packetAwaiter.ConfigureAwait(false);
}
}
@@ -354,8 +371,22 @@ namespace MQTTnet.Client
{
{
while (!cancellationToken.IsCancellationRequested)
while (!cancellationToken.IsCancellationRequested)
{
{
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false);
await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false);
TimeSpan keepAliveSendInterval;
if (_options.KeepAliveSendInterval.HasValue)
{
keepAliveSendInterval = _options.KeepAliveSendInterval.Value;
}
else
{
keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75);
}
if (_sendTracker.Elapsed > keepAliveSendInterval)
{
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false);
}
await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false);
}
}
}
}
catch (OperationCanceledException)
catch (OperationCanceledException)