Browse Source

Fix broken tests

release/3.x.x
SilverFox 3 years ago
parent
commit
9b10eadb24
1 changed files with 53 additions and 46 deletions
  1. +53
    -46
      Source/MQTTnet/Client/MqttClient.cs

+ 53
- 46
Source/MQTTnet/Client/MqttClient.cs View File

@@ -38,8 +38,11 @@ namespace MQTTnet.Client

IMqttChannelAdapter _adapter;
bool _cleanDisconnectInitiated;
long _isDisconnectPending;
bool _isConnected;
volatile int _connectState;
const int CONNECT_STATED_DISCONNECTED = 0;
const int CONNECT_STATED_DISCONNECTING = 1;
const int CONNECT_STATED_CONNECTED = 2;
const int CONNECT_STATED_CONNECTING = 3;
MqttClientDisconnectReason _disconnectReason;

DateTime _lastPacketSentTimestamp;
@@ -58,7 +61,7 @@ namespace MQTTnet.Client

public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get; set; }

public bool IsConnected => _isConnected && Interlocked.Read(ref _isDisconnectPending) == 0;
public bool IsConnected => _connectState == CONNECT_STATED_CONNECTED;

public IMqttClientOptions Options { get; private set; }

@@ -71,8 +74,8 @@ namespace MQTTnet.Client

ThrowIfDisposed();

if (Volatile.Read(ref _isDisconnectPending) != 0)
throw new InvalidOperationException("Not allowed to connect while disconnect is pending.");
if (Interlocked.CompareExchange(ref _connectState, CONNECT_STATED_CONNECTING, CONNECT_STATED_DISCONNECTED) != CONNECT_STATED_DISCONNECTED)
throw new InvalidOperationException("Not allowed to connect while connect/disconnect is pending.");

MqttClientAuthenticateResult authenticateResult = null;

@@ -110,7 +113,7 @@ namespace MQTTnet.Client
_keepAlivePacketsSenderTask = Task.Run(() => TrySendKeepAliveMessagesAsync(backgroundCancellationToken), backgroundCancellationToken);
}

_isConnected = true;
Interlocked.CompareExchange(ref _connectState, CONNECT_STATED_CONNECTED, CONNECT_STATED_CONNECTING);

_logger.Info("Connected.");

@@ -128,10 +131,7 @@ namespace MQTTnet.Client

_logger.Error(exception, "Error while connecting with server.");

if (!DisconnectIsPending())
{
await DisconnectInternalAsync(null, exception, authenticateResult).ConfigureAwait(false);
}
await DisconnectInternalAsync(null, exception, authenticateResult).ConfigureAwait(false);

throw;
}
@@ -143,7 +143,8 @@ namespace MQTTnet.Client

ThrowIfDisposed();

if (DisconnectIsPending())
var clientWasConnected = IsConnected;
if (DisconnectIsPendingOrFinished())
{
return;
}
@@ -153,7 +154,7 @@ namespace MQTTnet.Client
_disconnectReason = MqttClientDisconnectReason.NormalDisconnection;
_cleanDisconnectInitiated = true;

if (_isConnected)
if (clientWasConnected)
{
var disconnectPacket = _adapter.PacketFormatterAdapter.DataConverter.CreateDisconnectPacket(options);
await SendAsync(disconnectPacket, cancellationToken).ConfigureAwait(false);
@@ -161,7 +162,7 @@ namespace MQTTnet.Client
}
finally
{
await DisconnectInternalAsync(null, null, null).ConfigureAwait(false);
await DisconnectCoreAsync(null, null, null, clientWasConnected).ConfigureAwait(false);
}
}

@@ -308,7 +309,7 @@ namespace MQTTnet.Client

void ThrowIfNotConnected()
{
if (!IsConnected || Interlocked.Read(ref _isDisconnectPending) == 1)
if (!IsConnected)
{
throw new MqttCommunicationException("The client is not connected.");
}
@@ -319,12 +320,19 @@ namespace MQTTnet.Client
if (IsConnected) throw new MqttProtocolViolationException(message);
}

async Task DisconnectInternalAsync(Task sender, Exception exception, MqttClientAuthenticateResult authenticateResult)
Task DisconnectInternalAsync(Task sender, Exception exception, MqttClientAuthenticateResult authenticateResult)
{
var clientWasConnected = _isConnected;
var clientWasConnected = IsConnected;
if (!DisconnectIsPendingOrFinished())
{
return DisconnectCoreAsync(sender, exception, authenticateResult, clientWasConnected);
}
return PlatformAbstractionLayer.CompletedTask;
}

async Task DisconnectCoreAsync(Task sender, Exception exception, MqttClientAuthenticateResult authenticateResult, bool clientWasConnected)
{
TryInitiateDisconnect();
_isConnected = false;

try
{
@@ -348,8 +356,6 @@ namespace MQTTnet.Client
var keepAliveTask = WaitForTaskAsync(_keepAlivePacketsSenderTask, sender);

await Task.WhenAll(receiverTask, publishPacketReceiverTask, keepAliveTask).ConfigureAwait(false);

_publishPacketReceiverQueue?.Dispose();
}
catch (Exception e)
{
@@ -359,7 +365,7 @@ namespace MQTTnet.Client
{
Cleanup();
_cleanDisconnectInitiated = false;
Volatile.Write(ref _isDisconnectPending, 0);
Interlocked.CompareExchange(ref _connectState, CONNECT_STATED_DISCONNECTED, CONNECT_STATED_DISCONNECTING);

_logger.Info("Disconnected.");

@@ -481,10 +487,7 @@ namespace MQTTnet.Client
_logger.Error(exception, "Error exception while sending/receiving keep alive packets.");
}

if (!DisconnectIsPending())
{
await DisconnectInternalAsync(_keepAlivePacketsSenderTask, exception, null).ConfigureAwait(false);
}
await DisconnectInternalAsync(_keepAlivePacketsSenderTask, exception, null).ConfigureAwait(false);
}
finally
{
@@ -509,10 +512,7 @@ namespace MQTTnet.Client

if (packet == null)
{
if (!DisconnectIsPending())
{
await DisconnectInternalAsync(_packetReceiverTask, null, null).ConfigureAwait(false);
}
await DisconnectInternalAsync(_packetReceiverTask, null, null).ConfigureAwait(false);

return;
}
@@ -541,10 +541,7 @@ namespace MQTTnet.Client

_packetDispatcher.FailAll(exception);

if (!DisconnectIsPending())
{
await DisconnectInternalAsync(_packetReceiverTask, exception, null).ConfigureAwait(false);
}
await DisconnectInternalAsync(_packetReceiverTask, exception, null).ConfigureAwait(false);
}
finally
{
@@ -613,10 +610,7 @@ namespace MQTTnet.Client

_packetDispatcher.FailAll(exception);

if (!DisconnectIsPending())
{
await DisconnectInternalAsync(_packetReceiverTask, exception, null).ConfigureAwait(false);
}
await DisconnectInternalAsync(_packetReceiverTask, exception, null).ConfigureAwait(false);
}
}

@@ -718,12 +712,7 @@ namespace MQTTnet.Client
// Also dispatch disconnect to waiting threads to generate a proper exception.
_packetDispatcher.FailAll(new MqttUnexpectedDisconnectReceivedException(disconnectPacket));

if (!DisconnectIsPending())
{
return DisconnectInternalAsync(_packetReceiverTask, null, null);
}

return PlatformAbstractionLayer.CompletedTask;
return DisconnectInternalAsync(_packetReceiverTask, null, null);
}

Task ProcessReceivedAuthPacket(MqttAuthPacket authPacket)
@@ -809,11 +798,29 @@ namespace MQTTnet.Client
}
}

bool DisconnectIsPending()
bool DisconnectIsPendingOrFinished()
{
// This will read the _isDisconnectPending and set it to "1" afterwards regardless of the value.
// So the first caller will get a "false" and all subsequent ones will get "true".
return Interlocked.CompareExchange(ref _isDisconnectPending, 1, 0) != 0;
var connectState = _connectState;
do
{
switch (connectState)
{
case CONNECT_STATED_DISCONNECTING:
case CONNECT_STATED_DISCONNECTED:
return true;
case CONNECT_STATED_CONNECTING:
case CONNECT_STATED_CONNECTED:
// This will compare the _connectState to old value and set it to "CONNECT_STATED_DISCONNECTING" afterwards.
// So the first caller will get a "false" and all subsequent ones will get "true".
var newState = Interlocked.CompareExchange(ref _connectState, CONNECT_STATED_DISCONNECTING, connectState);
if (newState != connectState)
{
return false;
}
connectState = newState;
break;
}
} while (true);
}
}
}

Loading…
Cancel
Save