@@ -14,13 +14,13 @@ namespace MQTTnet.Client
{
public class MqttClient : IMqttClient
{
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly MqttPacketDispatcher _packetDispatcher;
private readonly IMqttNetLogger _logger;
private IMqttClientOptions _options;
private bool _isReceivingPackets;
private int _latestPacketIdentifier;
private CancellationTokenSource _cancellationTokenSource;
private IMqttChannelAdapter _adapter;
@@ -49,7 +49,7 @@ namespace MQTTnet.Client
{
_options = options;
_cancellationTokenSource = new CancellationTokenSource();
_latestPacketIdentifier = 0 ;
_packetIdentifierProvider.Reset() ;
_packetDispatcher.Reset();
_adapter = _adapterFactory.CreateClientAdapter(options.ChannelOptions, _logger);
@@ -106,7 +106,7 @@ namespace MQTTnet.Client
var subscribePacket = new MqttSubscribePacket
{
PacketIdentifier = GetNewPacketIdentifier(),
PacketIdentifier = _packetIdentifierProvider. GetNewPacketIdentifier(),
TopicFilters = topicFilters.ToList()
};
@@ -128,7 +128,7 @@ namespace MQTTnet.Client
var unsubscribePacket = new MqttUnsubscribePacket
{
PacketIdentifier = GetNewPacketIdentifier(),
PacketIdentifier = _packetIdentifierProvider. GetNewPacketIdentifier(),
TopicFilters = topicFilters.ToList()
};
@@ -156,7 +156,7 @@ namespace MQTTnet.Client
{
foreach (var publishPacket in qosGroup)
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
publishPacket.PacketIdentifier = _packetIdentifierProvider. GetNewPacketIdentifier();
await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket).ConfigureAwait(false);
}
@@ -166,7 +166,7 @@ namespace MQTTnet.Client
{
foreach (var publishPacket in qosGroup)
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
publishPacket.PacketIdentifier = _packetIdentifierProvider. GetNewPacketIdentifier();
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket).ConfigureAwait(false);
await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>()).ConfigureAwait(false);
}
@@ -340,11 +340,6 @@ namespace MQTTnet.Client
return (TResponsePacket)await packetAwaiter.ConfigureAwait(false);
}
private ushort GetNewPacketIdentifier()
{
return (ushort)Interlocked.Increment(ref _latestPacketIdentifier);
}
private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
{
_logger.Info<MqttClient>("Start sending keep alive packets.");