Browse Source

Fix a deadlock when an exception is fired while connecting.

release/3.x.x
Christian Kratky 6 years ago
parent
commit
0c2ab9b231
3 changed files with 76 additions and 18 deletions
  1. +5
    -1
      Source/MQTTnet/Adapter/MqttChannelAdapter.cs
  2. +25
    -17
      Source/MQTTnet/Client/MqttClient.cs
  3. +46
    -0
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs

+ 5
- 1
Source/MQTTnet/Adapter/MqttChannelAdapter.cs View File

@@ -150,6 +150,9 @@ namespace MQTTnet.Adapter


return packet; return packet;
} }
catch (OperationCanceledException)
{
}
catch (Exception exception) catch (Exception exception)
{ {
if (IsWrappedException(exception)) if (IsWrappedException(exception))
@@ -237,7 +240,8 @@ namespace MQTTnet.Adapter
{ {
if (exception is IOException && exception.InnerException is SocketException socketException) if (exception is IOException && exception.InnerException is SocketException socketException)
{ {
if (socketException.SocketErrorCode == SocketError.ConnectionAborted)
if (socketException.SocketErrorCode == SocketError.ConnectionAborted ||
socketException.SocketErrorCode == SocketError.OperationAborted)
{ {
throw new OperationCanceledException(); throw new OperationCanceledException();
} }


+ 25
- 17
Source/MQTTnet/Client/MqttClient.cs View File

@@ -54,12 +54,12 @@ namespace MQTTnet.Client


try try
{ {
_cancellationTokenSource = new CancellationTokenSource();
_disconnectReason = new TaskCompletionSource<bool>();
_options = options; _options = options;
_packetIdentifierProvider.Reset(); _packetIdentifierProvider.Reset();
_packetDispatcher.Reset(); _packetDispatcher.Reset();


_cancellationTokenSource = new CancellationTokenSource();
_disconnectReason = new TaskCompletionSource<bool>();
_adapter = _adapterFactory.CreateClientAdapter(options, _logger); _adapter = _adapterFactory.CreateClientAdapter(options, _logger);


_logger.Verbose($"Trying to connect with server ({_options.ChannelOptions})."); _logger.Verbose($"Trying to connect with server ({_options.ChannelOptions}).");
@@ -87,10 +87,12 @@ namespace MQTTnet.Client
catch (Exception exception) catch (Exception exception)
{ {
_logger.Error(exception, "Error while connecting with server."); _logger.Error(exception, "Error while connecting with server.");

if (_disconnectReason.TrySetException(exception)) if (_disconnectReason.TrySetException(exception))
{ {
await DisconnectInternalAsync(null, exception).ConfigureAwait(false); await DisconnectInternalAsync(null, exception).ConfigureAwait(false);
} }

throw; throw;
} }
} }
@@ -183,7 +185,7 @@ namespace MQTTnet.Client


public void Dispose() public void Dispose()
{ {
_cancellationTokenSource?.Cancel (false);
_cancellationTokenSource?.Cancel(false);
_cancellationTokenSource?.Dispose(); _cancellationTokenSource?.Dispose();
_cancellationTokenSource = null; _cancellationTokenSource = null;


@@ -224,9 +226,10 @@ namespace MQTTnet.Client


private async Task DisconnectInternalAsync(Task sender, Exception exception) private async Task DisconnectInternalAsync(Task sender, Exception exception)
{ {
InitiateDisconnect();

var clientWasConnected = IsConnected; var clientWasConnected = IsConnected;

InitiateDisconnect();
IsConnected = false; IsConnected = false;


try try
@@ -247,7 +250,7 @@ namespace MQTTnet.Client
} }
finally finally
{ {
Dispose ();
Dispose();
_cleanDisconnectInitiated = false; _cleanDisconnectInitiated = false;


_logger.Info("Disconnected."); _logger.Info("Disconnected.");
@@ -261,16 +264,16 @@ namespace MQTTnet.Client
{ {
try try
{ {
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
if (_cancellationTokenSource?.IsCancellationRequested == true)
{ {
return; return;
} }


_cancellationTokenSource.Cancel(false);
_cancellationTokenSource?.Cancel(false);
} }
catch (Exception adapterException)
catch (Exception exception)
{ {
_logger.Warning(adapterException, "Error while initiating disconnect.");
_logger.Warning(exception, "Error while initiating disconnect.");
} }
} }
} }
@@ -377,8 +380,10 @@ namespace MQTTnet.Client
{ {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
if (packet != null)
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken)
.ConfigureAwait(false);

if (packet != null && !cancellationToken.IsCancellationRequested)
{ {
await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false); await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false);
} }
@@ -393,7 +398,6 @@ namespace MQTTnet.Client


if (exception is OperationCanceledException) if (exception is OperationCanceledException)
{ {
_logger.Verbose ("MQTT OperationCanceled exception while receiving packets.");
} }
else if (exception is MqttCommunicationException) else if (exception is MqttCommunicationException)
{ {
@@ -492,16 +496,20 @@ namespace MQTTnet.Client


private void StartReceivingPackets(CancellationToken cancellationToken) private void StartReceivingPackets(CancellationToken cancellationToken)
{ {
_packetReceiverTask = Task.Run(
_packetReceiverTask = Task.Factory.StartNew(
() => ReceivePacketsAsync(cancellationToken), () => ReceivePacketsAsync(cancellationToken),
cancellationToken);
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
} }


private void StartSendingKeepAliveMessages(CancellationToken cancellationToken) private void StartSendingKeepAliveMessages(CancellationToken cancellationToken)
{ {
_keepAliveMessageSenderTask = Task.Run(
_keepAliveMessageSenderTask = Task.Factory.StartNew(
() => SendKeepAliveMessagesAsync(cancellationToken), () => SendKeepAliveMessagesAsync(cancellationToken),
cancellationToken);
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
} }


private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)


+ 46
- 0
Tests/MQTTnet.Core.Tests/MqttServerTests.cs View File

@@ -8,6 +8,7 @@ using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Implementations; using MQTTnet.Implementations;


namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
@@ -552,6 +553,51 @@ namespace MQTTnet.Core.Tests
Assert.IsTrue(bodyIsMatching); Assert.IsTrue(bodyIsMatching);
} }


[TestMethod]
public async Task MqttServer_ConnectionDenied()
{
var server = new MqttFactory().CreateMqttServer();
var client = new MqttFactory().CreateMqttClient();

try
{
var options = new MqttServerOptionsBuilder().WithConnectionValidator(context =>
{
context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized;
}).Build();

await server.StartAsync(options);

var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost").Build();

try
{
await client.ConnectAsync(clientOptions);
Assert.Fail("An exception should be raised.");
}
catch (Exception exception)
{
if (exception is MqttConnectingFailedException)
{

}
else
{
Assert.Fail("Wrong exception.");
}
}
}
finally
{
await client.DisconnectAsync();
await server.StopAsync();
client.Dispose();
}
}

[TestMethod] [TestMethod]
public async Task MqttServer_SameClientIdConnectDisconnectEventOrder() public async Task MqttServer_SameClientIdConnectDisconnectEventOrder()
{ {


Loading…
Cancel
Save