From 32dc96ae93dec3ab7801141682098a63c7915cc6 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 27 Sep 2017 21:08:36 +0200 Subject: [PATCH] Wait until receiving of packets is started before sending packets --- MQTTnet.Core/Client/MqttClient.cs | 72 ++++++++++++------- MQTTnet.Core/Client/MqttPacketDispatcher.cs | 11 +-- .../MainPage.xaml.cs | 1 - 3 files changed, 51 insertions(+), 33 deletions(-) diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index d300b3f..c03b1d0 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -19,6 +19,7 @@ namespace MQTTnet.Core.Client private readonly MqttClientOptions _options; private readonly IMqttCommunicationAdapter _adapter; + private bool _isReceivingPackets; private int _latestPacketIdentifier; private CancellationTokenSource _cancellationTokenSource; @@ -50,23 +51,8 @@ namespace MQTTnet.Core.Client await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout, _options).ConfigureAwait(false); MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); - StartReceivePackets(_cancellationTokenSource.Token); - - var connectPacket = new MqttConnectPacket - { - ClientId = _options.ClientId, - Username = _options.UserName, - Password = _options.Password, - CleanSession = _options.CleanSession, - KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, - WillMessage = willApplicationMessage - }; - - var response = await SendAndReceiveAsync(connectPacket).ConfigureAwait(false); - if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) - { - throw new MqttConnectingFailedException(response.ConnectReturnCode); - } + await SetupIncomingPacketProcessingAsync(); + await AuthenticateAsync(willApplicationMessage); MqttTrace.Verbose(nameof(MqttClient), "MQTT connection with server established."); @@ -184,6 +170,43 @@ namespace MQTTnet.Core.Client } } + private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage) + { + var connectPacket = new MqttConnectPacket + { + ClientId = _options.ClientId, + Username = _options.UserName, + Password = _options.Password, + CleanSession = _options.CleanSession, + KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, + WillMessage = willApplicationMessage + }; + + var response = await SendAndReceiveAsync(connectPacket).ConfigureAwait(false); + if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) + { + throw new MqttConnectingFailedException(response.ConnectReturnCode); + } + } + + private async Task SetupIncomingPacketProcessingAsync() + { + _isReceivingPackets = false; + +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Factory.StartNew( + () => ReceivePackets(_cancellationTokenSource.Token), + _cancellationTokenSource.Token, + TaskCreationOptions.LongRunning, + TaskScheduler.Default).ConfigureAwait(false); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + + while (!_isReceivingPackets && _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested) + { + await Task.Delay(TimeSpan.FromMilliseconds(100)); + } + } + private void ThrowIfNotConnected() { if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); @@ -320,9 +343,9 @@ namespace MQTTnet.Core.Client private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket { - var wait = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout); + var packetAwaiter = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout); await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false); - return (TResponsePacket)await wait.ConfigureAwait(false); + return (TResponsePacket)await packetAwaiter.ConfigureAwait(false); } private ushort GetNewPacketIdentifier() @@ -379,6 +402,8 @@ namespace MQTTnet.Core.Client { while (!cancellationToken.IsCancellationRequested) { + _isReceivingPackets = true; + var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false); if (cancellationToken.IsCancellationRequested) { @@ -419,17 +444,10 @@ namespace MQTTnet.Core.Client #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed } - private void StartReceivePackets(CancellationToken cancellationToken) - { -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => ReceivePackets(cancellationToken), cancellationToken).ConfigureAwait(false); ; -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - } - private void StartSendKeepAliveMessages(CancellationToken cancellationToken) { #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken).ConfigureAwait(false); + Task.Factory.StartNew(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed } } diff --git a/MQTTnet.Core/Client/MqttPacketDispatcher.cs b/MQTTnet.Core/Client/MqttPacketDispatcher.cs index e494509..8f632a2 100644 --- a/MQTTnet.Core/Client/MqttPacketDispatcher.cs +++ b/MQTTnet.Core/Client/MqttPacketDispatcher.cs @@ -20,7 +20,7 @@ namespace MQTTnet.Core.Client var packetAwaiter = AddPacketAwaiter(request, responseType); try { - return await packetAwaiter.Task.TimeoutAfter(timeout); + return await packetAwaiter.Task.TimeoutAfter(timeout).ConfigureAwait(false); } catch (MqttCommunicationTimedOutException) { @@ -67,10 +67,11 @@ namespace MQTTnet.Core.Client private TaskCompletionSource AddPacketAwaiter(MqttBasePacket request, Type responseType) { var tcs = new TaskCompletionSource(); - if (request is IMqttPacketWithIdentifier withIdent) + + if (request is IMqttPacketWithIdentifier requestWithIdentifier) { var byId = _packetByResponseTypeAndIdentifier.GetOrAdd(responseType, key => new ConcurrentDictionary>()); - byId[withIdent.PacketIdentifier] = tcs; + byId[requestWithIdentifier.PacketIdentifier] = tcs; } else { @@ -82,10 +83,10 @@ namespace MQTTnet.Core.Client private void RemovePacketAwaiter(MqttBasePacket request, Type responseType) { - if (request is IMqttPacketWithIdentifier withIdent) + if (request is IMqttPacketWithIdentifier requestWithIdentifier) { var byId = _packetByResponseTypeAndIdentifier.GetOrAdd(responseType, key => new ConcurrentDictionary>()); - byId.TryRemove(withIdent.PacketIdentifier, out var _); + byId.TryRemove(requestWithIdentifier.PacketIdentifier, out var _); } else { diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index e05e847..4c1ce4f 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -1,6 +1,5 @@ using System; using System.Text; -using System.Threading.Tasks; using Windows.UI.Core; using Windows.UI.Xaml; using MQTTnet.Core;