From 21048b51ae673d04e91997bd71cf9d56d9697e10 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sun, 1 Jul 2018 19:12:18 +0200 Subject: [PATCH 1/9] prove there is something wrong --- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 26 +++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 95bb67d..14f1076 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -233,6 +233,32 @@ namespace MQTTnet.Core.Tests await c1.PublishAsync(message); } } + + [TestMethod] + public async Task MqttServer_ShutdownDisconnectsClientsGracefully() + { + var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); + + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost") + .Build(); + + bool disconnectCalled = false; + + await s.StartAsync(new MqttServerOptions()); + + var c1 = new MqttFactory().CreateMqttClient(); + c1.Disconnected += (sender, args) => disconnectCalled = true; + + await c1.ConnectAsync(clientOptions); + + await s.StopAsync(); + + await Task.Delay(500); + + Assert.IsTrue(disconnectCalled); + } [TestMethod] public async Task MqttServer_RetainedMessagesFlow() From b55f5ff2988ad5977da90f8d08b64280ba37e174 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sun, 1 Jul 2018 20:27:42 +0200 Subject: [PATCH 2/9] event is triggered but something is blocking --- Source/MQTTnet/Client/MqttClient.cs | 7 ++++-- .../MQTTnet/Implementations/MqttTcpChannel.cs | 23 ++++++++++++------- Source/MQTTnet/Server/MqttClientSession.cs | 21 ++++++++++++++++- .../Server/MqttClientSessionsManager.cs | 10 -------- Source/MQTTnet/Server/MqttServer.cs | 4 ++-- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 4 +++- 6 files changed, 45 insertions(+), 24 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index ccedb5a..9311259 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -222,8 +222,11 @@ namespace MQTTnet.Client try { - await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false); - await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false); + if (!(exception is MqttCommunicationClosedGracefullyException)) + { + await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false); + await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false); + } if (_adapter != null) { diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs index c48a06f..0745139 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs @@ -87,8 +87,14 @@ namespace MQTTnet.Implementations public void Dispose() { - TryDispose(_stream, () => _stream = null); - TryDispose(_socket, () => _socket = null); + Cleanup(ref _stream, (s) => s.Dispose()); + Cleanup(ref _socket, (s) => { + if (s.Connected) + { + s.Shutdown(SocketShutdown.Both); + } + s.Dispose(); + }); } private bool InternalUserCertificateValidationCallback(object sender, X509Certificate x509Certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) @@ -157,11 +163,16 @@ namespace MQTTnet.Implementations } } - private static void TryDispose(IDisposable disposable, Action afterDispose) + private static void Cleanup(ref T item, Action handler) where T : class { + var temp = item; + item = null; try { - disposable?.Dispose(); + if (temp != null) + { + handler(temp); + } } catch (ObjectDisposedException) { @@ -169,10 +180,6 @@ namespace MQTTnet.Implementations catch (NullReferenceException) { } - finally - { - afterDispose(); - } } } } diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 429fdb4..05d552c 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -29,6 +29,7 @@ namespace MQTTnet.Server private MqttApplicationMessage _willMessage; private bool _wasCleanDisconnect; private IMqttChannelAdapter _adapter; + private Task _run; public MqttClientSession( string clientId, @@ -65,7 +66,13 @@ namespace MQTTnet.Server status.LastNonKeepAlivePacketReceived = _keepAliveMonitor.LastNonKeepAlivePacketReceived; } - public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) + public Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) + { + _run = RunInternalAsync(connectPacket, adapter); + return _run; + } + + private async Task RunInternalAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) { if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); if (adapter == null) throw new ArgumentNullException(nameof(adapter)); @@ -122,6 +129,16 @@ namespace MQTTnet.Server { _adapter.ReadingPacketStarted -= OnAdapterReadingPacketStarted; _adapter.ReadingPacketCompleted -= OnAdapterReadingPacketCompleted; + + try + { + await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); + _adapter.Dispose(); + } + catch (Exception exception) + { + _logger.Error(exception, exception.Message); + } } _adapter = null; @@ -153,6 +170,8 @@ namespace MQTTnet.Server } _willMessage = null; + + _run?.GetAwaiter().GetResult(); } finally { diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 34080a0..6b62e44 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -249,16 +249,6 @@ namespace MQTTnet.Server } finally { - try - { - await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); - clientAdapter.Dispose(); - } - catch (Exception exception) - { - _logger.Error(exception, exception.Message); - } - if (!_options.EnablePersistentSessions) { DeleteSession(clientId); diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index dd732f1..6d53291 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -106,14 +106,14 @@ namespace MQTTnet.Server _cancellationTokenSource.Cancel(false); + _clientSessionsManager.Stop(); + foreach (var adapter in _adapters) { adapter.ClientAccepted -= OnClientAccepted; await adapter.StopAsync().ConfigureAwait(false); } - _clientSessionsManager.Stop(); - _logger.Info("Stopped."); Stopped?.Invoke(this, EventArgs.Empty); } diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 14f1076..24626ab 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -253,9 +253,11 @@ namespace MQTTnet.Core.Tests await c1.ConnectAsync(clientOptions); + await Task.Delay(500); + await s.StopAsync(); - await Task.Delay(500); + await Task.Delay(1000); Assert.IsTrue(disconnectCalled); } From 53bd2a0960666ec6d3a28aefddbf3b729f2634f9 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sun, 1 Jul 2018 21:24:48 +0200 Subject: [PATCH 3/9] fixed blocking code in networkstrem and reduced sleep in tests --- Source/MQTTnet/Server/MqttClientSession.cs | 32 ++++++++++++++------- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 4 +-- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 05d552c..204bdea 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -30,6 +30,7 @@ namespace MQTTnet.Server private bool _wasCleanDisconnect; private IMqttChannelAdapter _adapter; private Task _run; + private IDisposable _cleanupHandle; public MqttClientSession( string clientId, @@ -84,6 +85,10 @@ namespace MQTTnet.Server adapter.ReadingPacketCompleted += OnAdapterReadingPacketCompleted; _cancellationTokenSource = new CancellationTokenSource(); + + //woraround for https://github.com/dotnet/corefx/issues/24430 + _cleanupHandle = _cancellationTokenSource.Token.Register(() => Cleanup()); + //endworkaround _wasCleanDisconnect = false; _willMessage = connectPacket.WillMessage; @@ -129,20 +134,14 @@ namespace MQTTnet.Server { _adapter.ReadingPacketStarted -= OnAdapterReadingPacketStarted; _adapter.ReadingPacketCompleted -= OnAdapterReadingPacketCompleted; - - try - { - await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); - _adapter.Dispose(); - } - catch (Exception exception) - { - _logger.Error(exception, exception.Message); - } + await Cleanup(); } _adapter = null; + _cleanupHandle?.Dispose(); + _cleanupHandle = null; + _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; } @@ -150,6 +149,19 @@ namespace MQTTnet.Server return _wasCleanDisconnect; } + private async Task Cleanup() + { + try + { + await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); + _adapter.Dispose(); + } + catch (Exception exception) + { + _logger.Error(exception, exception.Message); + } + } + public void Stop(MqttClientDisconnectType type) { try diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 24626ab..39259d0 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -253,11 +253,11 @@ namespace MQTTnet.Core.Tests await c1.ConnectAsync(clientOptions); - await Task.Delay(500); + await Task.Delay(100); await s.StopAsync(); - await Task.Delay(1000); + await Task.Delay(100); Assert.IsTrue(disconnectCalled); } From ae80de26ec046ca92fc7ea6151f2ed30793c18bb Mon Sep 17 00:00:00 2001 From: JanEggers Date: Thu, 5 Jul 2018 19:41:58 +0200 Subject: [PATCH 4/9] addressed feedback --- Source/MQTTnet/Server/MqttClientSession.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 204bdea..8ef401c 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -29,7 +29,7 @@ namespace MQTTnet.Server private MqttApplicationMessage _willMessage; private bool _wasCleanDisconnect; private IMqttChannelAdapter _adapter; - private Task _run; + private Task _workerTask; private IDisposable _cleanupHandle; public MqttClientSession( @@ -69,8 +69,8 @@ namespace MQTTnet.Server public Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) { - _run = RunInternalAsync(connectPacket, adapter); - return _run; + _workerTask = RunInternalAsync(connectPacket, adapter); + return _workerTask; } private async Task RunInternalAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) @@ -134,7 +134,7 @@ namespace MQTTnet.Server { _adapter.ReadingPacketStarted -= OnAdapterReadingPacketStarted; _adapter.ReadingPacketCompleted -= OnAdapterReadingPacketCompleted; - await Cleanup(); + await Cleanup().ConfigureAwait(false); } _adapter = null; @@ -183,7 +183,7 @@ namespace MQTTnet.Server _willMessage = null; - _run?.GetAwaiter().GetResult(); + _workerTask?.GetAwaiter().GetResult(); } finally { From 0dea9e6a2bed220a8d462529810c271c5b5285e7 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sat, 7 Jul 2018 09:11:52 +0200 Subject: [PATCH 5/9] changed waiting condition --- Source/MQTTnet/Client/MqttClient.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 9311259..7bc21de 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -222,7 +222,7 @@ namespace MQTTnet.Client try { - if (!(exception is MqttCommunicationClosedGracefullyException)) + if (clientWasConnected) { await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false); await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false); From e6cfef5295482002671e34fdeec48913c09c0c18 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Wed, 18 Jul 2018 21:13:02 +0200 Subject: [PATCH 6/9] fixed disconnect to be triggered just once --- Source/MQTTnet/Client/MqttClient.cs | 24 ++++++++++++++++----- Tests/MQTTnet.Core.Tests/MqttClientTests.cs | 13 ++++++----- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 6 +++--- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 7bc21de..86264e4 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -29,6 +29,7 @@ namespace MQTTnet.Client private Task _keepAliveMessageSenderTask; private IMqttChannelAdapter _adapter; private bool _cleanDisconnectInitiated; + private TaskCompletionSource _disconnectReason; public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger) { @@ -54,6 +55,7 @@ namespace MQTTnet.Client try { _cancellationTokenSource = new CancellationTokenSource(); + _disconnectReason = new TaskCompletionSource(); _options = options; _packetIdentifierProvider.Reset(); _packetDispatcher.Reset(); @@ -85,8 +87,10 @@ namespace MQTTnet.Client catch (Exception exception) { _logger.Error(exception, "Error while connecting with server."); - await DisconnectInternalAsync(null, exception).ConfigureAwait(false); - + if (_disconnectReason.TrySetException(exception)) + { + await DisconnectInternalAsync(null, exception).ConfigureAwait(false); + } throw; } } @@ -104,7 +108,10 @@ namespace MQTTnet.Client } finally { - await DisconnectInternalAsync(null, null).ConfigureAwait(false); + if (_disconnectReason.TrySetCanceled()) + { + await DisconnectInternalAsync(null, null).ConfigureAwait(false); + } } } @@ -355,7 +362,10 @@ namespace MQTTnet.Client _logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets."); } - await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false); + if (_disconnectReason.TrySetException(exception)) + { + await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false); + } } finally { @@ -398,7 +408,11 @@ namespace MQTTnet.Client } _packetDispatcher.Dispatch(exception); - await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); + + if (_disconnectReason.TrySetException(exception)) + { + await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); + } } finally { diff --git a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs index 1334a80..455cf7d 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs @@ -1,4 +1,5 @@ -using System.Net.Sockets; +using System; +using System.Net.Sockets; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; @@ -16,10 +17,10 @@ namespace MQTTnet.Core.Tests var factory = new MqttFactory(); var client = factory.CreateMqttClient(); - var exceptionIsCorrect = false; + Exception ex = null; client.Disconnected += (s, e) => { - exceptionIsCorrect = e.Exception is MqttCommunicationException c && c.InnerException is SocketException; + ex = e.Exception; }; try @@ -29,8 +30,10 @@ namespace MQTTnet.Core.Tests catch { } - - Assert.IsTrue(exceptionIsCorrect); + + Assert.IsNotNull(ex); + Assert.IsInstanceOfType(ex, typeof(MqttCommunicationException)); + Assert.IsInstanceOfType(ex.InnerException, typeof(SocketException)); } } } diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 39259d0..117e6ef 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -244,12 +244,12 @@ namespace MQTTnet.Core.Tests .WithTcpServer("localhost") .Build(); - bool disconnectCalled = false; + var disconnectCalled = 0; await s.StartAsync(new MqttServerOptions()); var c1 = new MqttFactory().CreateMqttClient(); - c1.Disconnected += (sender, args) => disconnectCalled = true; + c1.Disconnected += (sender, args) => disconnectCalled++; await c1.ConnectAsync(clientOptions); @@ -259,7 +259,7 @@ namespace MQTTnet.Core.Tests await Task.Delay(100); - Assert.IsTrue(disconnectCalled); + Assert.AreEqual(1, disconnectCalled); } [TestMethod] From f7ea2d29e1867602f85c110cb3e3a784de3fc0d6 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Thu, 19 Jul 2018 19:14:36 +0200 Subject: [PATCH 7/9] prove its broken --- Source/MQTTnet/Client/MqttClient.cs | 4 +-- Source/MQTTnet/Properties/AssemblyInfo.cs | 3 ++ Tests/MQTTnet.Core.Tests/MqttClientTests.cs | 36 +++++++++++++++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) create mode 100644 Source/MQTTnet/Properties/AssemblyInfo.cs diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 86264e4..4891b1f 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -25,8 +25,8 @@ namespace MQTTnet.Client private IMqttClientOptions _options; private CancellationTokenSource _cancellationTokenSource; - private Task _packetReceiverTask; - private Task _keepAliveMessageSenderTask; + internal Task _packetReceiverTask; + internal Task _keepAliveMessageSenderTask; private IMqttChannelAdapter _adapter; private bool _cleanDisconnectInitiated; private TaskCompletionSource _disconnectReason; diff --git a/Source/MQTTnet/Properties/AssemblyInfo.cs b/Source/MQTTnet/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..8259cc1 --- /dev/null +++ b/Source/MQTTnet/Properties/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly:InternalsVisibleTo("MQTTnet.Core.Tests")] diff --git a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs index 455cf7d..2f1c5e9 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs @@ -1,9 +1,13 @@ using System; using System.Net.Sockets; +using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Adapter; using MQTTnet.Client; +using MQTTnet.Diagnostics; using MQTTnet.Exceptions; +using MQTTnet.Packets; namespace MQTTnet.Core.Tests { @@ -35,5 +39,37 @@ namespace MQTTnet.Core.Tests Assert.IsInstanceOfType(ex, typeof(MqttCommunicationException)); Assert.IsInstanceOfType(ex.InnerException, typeof(SocketException)); } + + [TestMethod] + public async Task ClientCleanupOnAuthentificationFails() + { + var channel = new TestMqttCommunicationAdapter(); + var channel2 = new TestMqttCommunicationAdapter(); + channel.Partner = channel2; + channel2.Partner = channel; + + Task.Run(async () => { + var connect = await channel2.ReceivePacketAsync(TimeSpan.Zero, CancellationToken.None); + await channel2.SendPacketAsync(new MqttConnAckPacket() { ConnectReturnCode = Protocol.MqttConnectReturnCode.ConnectionRefusedNotAuthorized }, CancellationToken.None); + }); + + + + var fake = new TestMqttCommunicationAdapterFactory(channel); + + var client = new MqttClient(fake, new MqttNetLogger()); + + try + { + await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("any-server").Build()); + } + catch (Exception ex) + { + Assert.IsInstanceOfType(ex, typeof(MqttConnectingFailedException)); + } + + Assert.IsTrue(client._packetReceiverTask == null || client._packetReceiverTask.IsCompleted, "receive loop not completed"); + Assert.IsTrue(client._keepAliveMessageSenderTask == null || client._keepAliveMessageSenderTask.IsCompleted, "keepalive loop not completed"); + } } } From d30fe612646043909d55ca5f181fbb474a6364a2 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Thu, 19 Jul 2018 19:15:48 +0200 Subject: [PATCH 8/9] fixed missing waits --- Source/MQTTnet/Client/MqttClient.cs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 4891b1f..0fa074c 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -229,11 +229,8 @@ namespace MQTTnet.Client try { - if (clientWasConnected) - { - await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false); - await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false); - } + await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false); + await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false); if (_adapter != null) { From 043cfdb3f9e23c6cb29b31064bd44afef7d46e05 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Mon, 23 Jul 2018 17:58:29 +0200 Subject: [PATCH 9/9] fixed deadlock and object disposed exception --- Source/MQTTnet/Server/MqttClientSession.cs | 43 +++++++++++++-------- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 40 +++++++++++++++++++ 2 files changed, 67 insertions(+), 16 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 8ef401c..8ddb845 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -126,18 +126,11 @@ namespace MQTTnet.Server _logger.Error(exception, "Client '{0}': Unhandled exception while receiving client packets.", ClientId); } - Stop(MqttClientDisconnectType.NotClean); + Stop(MqttClientDisconnectType.NotClean, true); } finally { - if (_adapter != null) - { - _adapter.ReadingPacketStarted -= OnAdapterReadingPacketStarted; - _adapter.ReadingPacketCompleted -= OnAdapterReadingPacketCompleted; - await Cleanup().ConfigureAwait(false); - } - - _adapter = null; + await Cleanup().ConfigureAwait(false); _cleanupHandle?.Dispose(); _cleanupHandle = null; @@ -153,8 +146,18 @@ namespace MQTTnet.Server { try { - await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); - _adapter.Dispose(); + var adapter = _adapter; + if (adapter == null) + { + return; + } + + _adapter = null; + + adapter.ReadingPacketStarted -= OnAdapterReadingPacketStarted; + adapter.ReadingPacketCompleted -= OnAdapterReadingPacketCompleted; + await adapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); + adapter.Dispose(); } catch (Exception exception) { @@ -163,6 +166,11 @@ namespace MQTTnet.Server } public void Stop(MqttClientDisconnectType type) + { + Stop(type, false); + } + + private void Stop(MqttClientDisconnectType type, bool isInsideSession) { try { @@ -183,7 +191,10 @@ namespace MQTTnet.Server _willMessage = null; - _workerTask?.GetAwaiter().GetResult(); + if (!isInsideSession) + { + _workerTask?.GetAwaiter().GetResult(); + } } finally { @@ -329,18 +340,18 @@ namespace MQTTnet.Server if (packet is MqttDisconnectPacket) { - Stop(MqttClientDisconnectType.Clean); + Stop(MqttClientDisconnectType.Clean, true); return; } if (packet is MqttConnectPacket) { - Stop(MqttClientDisconnectType.NotClean); + Stop(MqttClientDisconnectType.NotClean, true); return; } _logger.Warning(null, "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); - Stop(MqttClientDisconnectType.NotClean); + Stop(MqttClientDisconnectType.NotClean, true); } private void EnqueueSubscribedRetainedMessages(ICollection topicFilters) @@ -359,7 +370,7 @@ namespace MQTTnet.Server if (subscribeResult.CloseConnection) { - Stop(MqttClientDisconnectType.NotClean); + Stop(MqttClientDisconnectType.NotClean, true); return; } diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 117e6ef..0388866 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -262,6 +262,46 @@ namespace MQTTnet.Core.Tests Assert.AreEqual(1, disconnectCalled); } + [TestMethod] + public async Task MqttServer_HandleCleanDisconnect() + { + MqttNetGlobalLogger.LogMessagePublished += (_, e) => + { + System.Diagnostics.Debug.WriteLine($"[{e.TraceMessage.Timestamp:s}] {e.TraceMessage.Source} {e.TraceMessage.Message}"); + }; + + var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); + + var clientConnectedCalled = 0; + var clientDisconnectedCalled = 0; + + s.ClientConnected += (_, __) => clientConnectedCalled++; + s.ClientDisconnected += (_, __) => clientDisconnectedCalled++; + + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost") + .Build(); + + await s.StartAsync(new MqttServerOptions()); + + var c1 = new MqttFactory().CreateMqttClient(); + + await c1.ConnectAsync(clientOptions); + + await Task.Delay(100); + + await c1.DisconnectAsync(); + + await Task.Delay(100); + + await s.StopAsync(); + + await Task.Delay(100); + + Assert.AreEqual(clientConnectedCalled, clientDisconnectedCalled); + } + [TestMethod] public async Task MqttServer_RetainedMessagesFlow() {