Ver código fonte

Wait until receiving of packets is started before sending packets

release/3.x.x
Christian Kratky 7 anos atrás
pai
commit
32dc96ae93
3 arquivos alterados com 51 adições e 33 exclusões
  1. +45
    -27
      MQTTnet.Core/Client/MqttClient.cs
  2. +6
    -5
      MQTTnet.Core/Client/MqttPacketDispatcher.cs
  3. +0
    -1
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 45
- 27
MQTTnet.Core/Client/MqttClient.cs Ver arquivo

@@ -19,6 +19,7 @@ namespace MQTTnet.Core.Client
private readonly MqttClientOptions _options;
private readonly IMqttCommunicationAdapter _adapter;

private bool _isReceivingPackets;
private int _latestPacketIdentifier;
private CancellationTokenSource _cancellationTokenSource;

@@ -50,23 +51,8 @@ namespace MQTTnet.Core.Client
await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout, _options).ConfigureAwait(false);
MqttTrace.Verbose(nameof(MqttClient), "Connection with server established.");

StartReceivePackets(_cancellationTokenSource.Token);

var connectPacket = new MqttConnectPacket
{
ClientId = _options.ClientId,
Username = _options.UserName,
Password = _options.Password,
CleanSession = _options.CleanSession,
KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
WillMessage = willApplicationMessage
};

var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket).ConfigureAwait(false);
if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
throw new MqttConnectingFailedException(response.ConnectReturnCode);
}
await SetupIncomingPacketProcessingAsync();
await AuthenticateAsync(willApplicationMessage);

MqttTrace.Verbose(nameof(MqttClient), "MQTT connection with server established.");

@@ -184,6 +170,43 @@ namespace MQTTnet.Core.Client
}
}

private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage)
{
var connectPacket = new MqttConnectPacket
{
ClientId = _options.ClientId,
Username = _options.UserName,
Password = _options.Password,
CleanSession = _options.CleanSession,
KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
WillMessage = willApplicationMessage
};

var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket).ConfigureAwait(false);
if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
throw new MqttConnectingFailedException(response.ConnectReturnCode);
}
}

private async Task SetupIncomingPacketProcessingAsync()
{
_isReceivingPackets = false;

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Factory.StartNew(
() => ReceivePackets(_cancellationTokenSource.Token),
_cancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed

while (!_isReceivingPackets && _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
}

private void ThrowIfNotConnected()
{
if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
@@ -320,9 +343,9 @@ namespace MQTTnet.Core.Client

private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
{
var wait = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout);
var packetAwaiter = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout);
await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false);
return (TResponsePacket)await wait.ConfigureAwait(false);
return (TResponsePacket)await packetAwaiter.ConfigureAwait(false);
}

private ushort GetNewPacketIdentifier()
@@ -379,6 +402,8 @@ namespace MQTTnet.Core.Client
{
while (!cancellationToken.IsCancellationRequested)
{
_isReceivingPackets = true;

var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
if (cancellationToken.IsCancellationRequested)
{
@@ -419,17 +444,10 @@ namespace MQTTnet.Core.Client
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}

private void StartReceivePackets(CancellationToken cancellationToken)
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ReceivePackets(cancellationToken), cancellationToken).ConfigureAwait(false); ;
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}

private void StartSendKeepAliveMessages(CancellationToken cancellationToken)
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken).ConfigureAwait(false);
Task.Factory.StartNew(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
}

+ 6
- 5
MQTTnet.Core/Client/MqttPacketDispatcher.cs Ver arquivo

@@ -20,7 +20,7 @@ namespace MQTTnet.Core.Client
var packetAwaiter = AddPacketAwaiter(request, responseType);
try
{
return await packetAwaiter.Task.TimeoutAfter(timeout);
return await packetAwaiter.Task.TimeoutAfter(timeout).ConfigureAwait(false);
}
catch (MqttCommunicationTimedOutException)
{
@@ -67,10 +67,11 @@ namespace MQTTnet.Core.Client
private TaskCompletionSource<MqttBasePacket> AddPacketAwaiter(MqttBasePacket request, Type responseType)
{
var tcs = new TaskCompletionSource<MqttBasePacket>();
if (request is IMqttPacketWithIdentifier withIdent)

if (request is IMqttPacketWithIdentifier requestWithIdentifier)
{
var byId = _packetByResponseTypeAndIdentifier.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
byId[withIdent.PacketIdentifier] = tcs;
byId[requestWithIdentifier.PacketIdentifier] = tcs;
}
else
{
@@ -82,10 +83,10 @@ namespace MQTTnet.Core.Client

private void RemovePacketAwaiter(MqttBasePacket request, Type responseType)
{
if (request is IMqttPacketWithIdentifier withIdent)
if (request is IMqttPacketWithIdentifier requestWithIdentifier)
{
var byId = _packetByResponseTypeAndIdentifier.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
byId.TryRemove(withIdent.PacketIdentifier, out var _);
byId.TryRemove(requestWithIdentifier.PacketIdentifier, out var _);
}
else
{


+ 0
- 1
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs Ver arquivo

@@ -1,6 +1,5 @@
using System;
using System.Text;
using System.Threading.Tasks;
using Windows.UI.Core;
using Windows.UI.Xaml;
using MQTTnet.Core;


Carregando…
Cancelar
Salvar