From 0c2ab9b2319ce72aeb33dea8430708a6ab6d67c8 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 4 Oct 2018 20:53:02 +0200 Subject: [PATCH 1/4] Fix a deadlock when an exception is fired while connecting. --- Source/MQTTnet/Adapter/MqttChannelAdapter.cs | 6 ++- Source/MQTTnet/Client/MqttClient.cs | 42 ++++++++++-------- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 46 ++++++++++++++++++++ 3 files changed, 76 insertions(+), 18 deletions(-) diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index 4e61733..13e6606 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -150,6 +150,9 @@ namespace MQTTnet.Adapter return packet; } + catch (OperationCanceledException) + { + } catch (Exception exception) { if (IsWrappedException(exception)) @@ -237,7 +240,8 @@ namespace MQTTnet.Adapter { if (exception is IOException && exception.InnerException is SocketException socketException) { - if (socketException.SocketErrorCode == SocketError.ConnectionAborted) + if (socketException.SocketErrorCode == SocketError.ConnectionAborted || + socketException.SocketErrorCode == SocketError.OperationAborted) { throw new OperationCanceledException(); } diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 575cc90..0a408d0 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -54,12 +54,12 @@ namespace MQTTnet.Client try { - _cancellationTokenSource = new CancellationTokenSource(); - _disconnectReason = new TaskCompletionSource(); _options = options; _packetIdentifierProvider.Reset(); _packetDispatcher.Reset(); + _cancellationTokenSource = new CancellationTokenSource(); + _disconnectReason = new TaskCompletionSource(); _adapter = _adapterFactory.CreateClientAdapter(options, _logger); _logger.Verbose($"Trying to connect with server ({_options.ChannelOptions})."); @@ -87,10 +87,12 @@ namespace MQTTnet.Client catch (Exception exception) { _logger.Error(exception, "Error while connecting with server."); + if (_disconnectReason.TrySetException(exception)) { await DisconnectInternalAsync(null, exception).ConfigureAwait(false); } + throw; } } @@ -183,7 +185,7 @@ namespace MQTTnet.Client public void Dispose() { - _cancellationTokenSource?.Cancel (false); + _cancellationTokenSource?.Cancel(false); _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; @@ -224,9 +226,10 @@ namespace MQTTnet.Client private async Task DisconnectInternalAsync(Task sender, Exception exception) { - InitiateDisconnect(); - var clientWasConnected = IsConnected; + + InitiateDisconnect(); + IsConnected = false; try @@ -247,7 +250,7 @@ namespace MQTTnet.Client } finally { - Dispose (); + Dispose(); _cleanDisconnectInitiated = false; _logger.Info("Disconnected."); @@ -261,16 +264,16 @@ namespace MQTTnet.Client { try { - if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) + if (_cancellationTokenSource?.IsCancellationRequested == true) { return; } - _cancellationTokenSource.Cancel(false); + _cancellationTokenSource?.Cancel(false); } - catch (Exception adapterException) + catch (Exception exception) { - _logger.Warning(adapterException, "Error while initiating disconnect."); + _logger.Warning(exception, "Error while initiating disconnect."); } } } @@ -377,8 +380,10 @@ namespace MQTTnet.Client { while (!cancellationToken.IsCancellationRequested) { - var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false); - if (packet != null) + var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken) + .ConfigureAwait(false); + + if (packet != null && !cancellationToken.IsCancellationRequested) { await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false); } @@ -393,7 +398,6 @@ namespace MQTTnet.Client if (exception is OperationCanceledException) { - _logger.Verbose ("MQTT OperationCanceled exception while receiving packets."); } else if (exception is MqttCommunicationException) { @@ -492,16 +496,20 @@ namespace MQTTnet.Client private void StartReceivingPackets(CancellationToken cancellationToken) { - _packetReceiverTask = Task.Run( + _packetReceiverTask = Task.Factory.StartNew( () => ReceivePacketsAsync(cancellationToken), - cancellationToken); + cancellationToken, + TaskCreationOptions.LongRunning, + TaskScheduler.Default); } private void StartSendingKeepAliveMessages(CancellationToken cancellationToken) { - _keepAliveMessageSenderTask = Task.Run( + _keepAliveMessageSenderTask = Task.Factory.StartNew( () => SendKeepAliveMessagesAsync(cancellationToken), - cancellationToken); + cancellationToken, + TaskCreationOptions.LongRunning, + TaskScheduler.Default); } private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 0c34bde..e4e5b3b 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -8,6 +8,7 @@ using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Adapter; using MQTTnet.Implementations; namespace MQTTnet.Core.Tests @@ -552,6 +553,51 @@ namespace MQTTnet.Core.Tests Assert.IsTrue(bodyIsMatching); } + [TestMethod] + public async Task MqttServer_ConnectionDenied() + { + var server = new MqttFactory().CreateMqttServer(); + var client = new MqttFactory().CreateMqttClient(); + + try + { + var options = new MqttServerOptionsBuilder().WithConnectionValidator(context => + { + context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized; + }).Build(); + + await server.StartAsync(options); + + + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost").Build(); + + try + { + await client.ConnectAsync(clientOptions); + Assert.Fail("An exception should be raised."); + } + catch (Exception exception) + { + if (exception is MqttConnectingFailedException) + { + + } + else + { + Assert.Fail("Wrong exception."); + } + } + } + finally + { + await client.DisconnectAsync(); + await server.StopAsync(); + + client.Dispose(); + } + } + [TestMethod] public async Task MqttServer_SameClientIdConnectDisconnectEventOrder() { From 6bee6a046946993a0f458d6e8b9c8626f327deff Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 4 Oct 2018 20:58:15 +0200 Subject: [PATCH 2/4] Update docs. --- Build/MQTTnet.nuspec | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 2dbe731..fbcd355 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -10,10 +10,7 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * [Core] Added all factory methods to the factory interface. -* [Core] Fixed an issue with cancellation token handling (thanks to @acrabb). -* [Server] Added a new overload for configuring the ASP.net integration (thanks to @JanEggers). -* [Server] Added a method for clearing all retained messages. + * [Client] Fixed a deadlock when an exception is fired while connecting (thanks to @malibVB). Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin From fb4f89b4120e79216df3c281ee676ec083a3c53b Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 4 Oct 2018 22:30:03 +0200 Subject: [PATCH 3/4] Replace TaskCompletionSource in Client with Interlocked gate. --- Source/MQTTnet/Client/MqttClient.cs | 17 +++++++++++------ Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 13 ++++++------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 0a408d0..05330b5 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -29,7 +29,7 @@ namespace MQTTnet.Client internal Task _keepAliveMessageSenderTask; private IMqttChannelAdapter _adapter; private bool _cleanDisconnectInitiated; - private TaskCompletionSource _disconnectReason; + private int _disconnectGate; public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger) { @@ -59,7 +59,7 @@ namespace MQTTnet.Client _packetDispatcher.Reset(); _cancellationTokenSource = new CancellationTokenSource(); - _disconnectReason = new TaskCompletionSource(); + _disconnectGate = 0; _adapter = _adapterFactory.CreateClientAdapter(options, _logger); _logger.Verbose($"Trying to connect with server ({_options.ChannelOptions})."); @@ -88,7 +88,7 @@ namespace MQTTnet.Client { _logger.Error(exception, "Error while connecting with server."); - if (_disconnectReason.TrySetException(exception)) + if (!DisconnectIsPending()) { await DisconnectInternalAsync(null, exception).ConfigureAwait(false); } @@ -110,7 +110,7 @@ namespace MQTTnet.Client } finally { - if (_disconnectReason.TrySetCanceled()) + if (!DisconnectIsPending()) { await DisconnectInternalAsync(null, null).ConfigureAwait(false); } @@ -361,7 +361,7 @@ namespace MQTTnet.Client _logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets."); } - if (_disconnectReason.TrySetException(exception)) + if (!DisconnectIsPending()) { await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false); } @@ -410,7 +410,7 @@ namespace MQTTnet.Client _packetDispatcher.Dispatch(exception); - if (_disconnectReason.TrySetException(exception)) + if (!DisconnectIsPending()) { await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); } @@ -545,5 +545,10 @@ namespace MQTTnet.Client { } } + + private bool DisconnectIsPending() + { + return Interlocked.CompareExchange(ref _disconnectGate, 1, 0) != 0; + } } } \ No newline at end of file diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index e4e5b3b..0404ccc 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -669,25 +669,24 @@ namespace MQTTnet.Core.Tests MqttQualityOfServiceLevel filterQualityOfServiceLevel, int expectedReceivedMessagesCount) { - var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); + var s = new MqttFactory().CreateMqttServer(); var receivedMessagesCount = 0; try { await s.StartAsync(new MqttServerOptions()); - var c1 = await serverAdapter.ConnectTestClient("c1"); - var c2 = await serverAdapter.ConnectTestClient("c2"); - + var c1 = new MqttFactory().CreateMqttClient(); c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - + await c1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); + + var c2 = new MqttFactory().CreateMqttClient(); + await c2.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); await c2.PublishAsync(builder => builder.WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel)); await Task.Delay(500); await c1.UnsubscribeAsync(topicFilter); - await Task.Delay(500); } finally From c8b3066883aafd2e3189e0daf7754aec3951dec2 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 6 Oct 2018 13:12:15 +0200 Subject: [PATCH 4/4] Update docs. --- Build/MQTTnet.AspNetCore.nuspec | 2 +- Build/MQTTnet.Extensions.ManagedClient.nuspec | 2 +- Build/MQTTnet.Extensions.Rpc.nuspec | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Build/MQTTnet.AspNetCore.nuspec b/Build/MQTTnet.AspNetCore.nuspec index 5be2f94..cb1718f 100644 --- a/Build/MQTTnet.AspNetCore.nuspec +++ b/Build/MQTTnet.AspNetCore.nuspec @@ -15,7 +15,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.Extensions.ManagedClient.nuspec b/Build/MQTTnet.Extensions.ManagedClient.nuspec index 99a58c4..e5d1184 100644 --- a/Build/MQTTnet.Extensions.ManagedClient.nuspec +++ b/Build/MQTTnet.Extensions.ManagedClient.nuspec @@ -15,7 +15,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.Extensions.Rpc.nuspec b/Build/MQTTnet.Extensions.Rpc.nuspec index 736f92d..4075d65 100644 --- a/Build/MQTTnet.Extensions.Rpc.nuspec +++ b/Build/MQTTnet.Extensions.Rpc.nuspec @@ -15,7 +15,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - +