diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index 4e61733..13e6606 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -150,6 +150,9 @@ namespace MQTTnet.Adapter return packet; } + catch (OperationCanceledException) + { + } catch (Exception exception) { if (IsWrappedException(exception)) @@ -237,7 +240,8 @@ namespace MQTTnet.Adapter { if (exception is IOException && exception.InnerException is SocketException socketException) { - if (socketException.SocketErrorCode == SocketError.ConnectionAborted) + if (socketException.SocketErrorCode == SocketError.ConnectionAborted || + socketException.SocketErrorCode == SocketError.OperationAborted) { throw new OperationCanceledException(); } diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 575cc90..0a408d0 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -54,12 +54,12 @@ namespace MQTTnet.Client try { - _cancellationTokenSource = new CancellationTokenSource(); - _disconnectReason = new TaskCompletionSource(); _options = options; _packetIdentifierProvider.Reset(); _packetDispatcher.Reset(); + _cancellationTokenSource = new CancellationTokenSource(); + _disconnectReason = new TaskCompletionSource(); _adapter = _adapterFactory.CreateClientAdapter(options, _logger); _logger.Verbose($"Trying to connect with server ({_options.ChannelOptions})."); @@ -87,10 +87,12 @@ namespace MQTTnet.Client catch (Exception exception) { _logger.Error(exception, "Error while connecting with server."); + if (_disconnectReason.TrySetException(exception)) { await DisconnectInternalAsync(null, exception).ConfigureAwait(false); } + throw; } } @@ -183,7 +185,7 @@ namespace MQTTnet.Client public void Dispose() { - _cancellationTokenSource?.Cancel (false); + _cancellationTokenSource?.Cancel(false); _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; @@ -224,9 +226,10 @@ namespace MQTTnet.Client private async Task DisconnectInternalAsync(Task sender, Exception exception) { - InitiateDisconnect(); - var clientWasConnected = IsConnected; + + InitiateDisconnect(); + IsConnected = false; try @@ -247,7 +250,7 @@ namespace MQTTnet.Client } finally { - Dispose (); + Dispose(); _cleanDisconnectInitiated = false; _logger.Info("Disconnected."); @@ -261,16 +264,16 @@ namespace MQTTnet.Client { try { - if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) + if (_cancellationTokenSource?.IsCancellationRequested == true) { return; } - _cancellationTokenSource.Cancel(false); + _cancellationTokenSource?.Cancel(false); } - catch (Exception adapterException) + catch (Exception exception) { - _logger.Warning(adapterException, "Error while initiating disconnect."); + _logger.Warning(exception, "Error while initiating disconnect."); } } } @@ -377,8 +380,10 @@ namespace MQTTnet.Client { while (!cancellationToken.IsCancellationRequested) { - var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false); - if (packet != null) + var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken) + .ConfigureAwait(false); + + if (packet != null && !cancellationToken.IsCancellationRequested) { await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false); } @@ -393,7 +398,6 @@ namespace MQTTnet.Client if (exception is OperationCanceledException) { - _logger.Verbose ("MQTT OperationCanceled exception while receiving packets."); } else if (exception is MqttCommunicationException) { @@ -492,16 +496,20 @@ namespace MQTTnet.Client private void StartReceivingPackets(CancellationToken cancellationToken) { - _packetReceiverTask = Task.Run( + _packetReceiverTask = Task.Factory.StartNew( () => ReceivePacketsAsync(cancellationToken), - cancellationToken); + cancellationToken, + TaskCreationOptions.LongRunning, + TaskScheduler.Default); } private void StartSendingKeepAliveMessages(CancellationToken cancellationToken) { - _keepAliveMessageSenderTask = Task.Run( + _keepAliveMessageSenderTask = Task.Factory.StartNew( () => SendKeepAliveMessagesAsync(cancellationToken), - cancellationToken); + cancellationToken, + TaskCreationOptions.LongRunning, + TaskScheduler.Default); } private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 0c34bde..e4e5b3b 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -8,6 +8,7 @@ using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Adapter; using MQTTnet.Implementations; namespace MQTTnet.Core.Tests @@ -552,6 +553,51 @@ namespace MQTTnet.Core.Tests Assert.IsTrue(bodyIsMatching); } + [TestMethod] + public async Task MqttServer_ConnectionDenied() + { + var server = new MqttFactory().CreateMqttServer(); + var client = new MqttFactory().CreateMqttClient(); + + try + { + var options = new MqttServerOptionsBuilder().WithConnectionValidator(context => + { + context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized; + }).Build(); + + await server.StartAsync(options); + + + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost").Build(); + + try + { + await client.ConnectAsync(clientOptions); + Assert.Fail("An exception should be raised."); + } + catch (Exception exception) + { + if (exception is MqttConnectingFailedException) + { + + } + else + { + Assert.Fail("Wrong exception."); + } + } + } + finally + { + await client.DisconnectAsync(); + await server.StopAsync(); + + client.Dispose(); + } + } + [TestMethod] public async Task MqttServer_SameClientIdConnectDisconnectEventOrder() {