|
|
@@ -39,10 +39,15 @@ namespace MQTTnet.Client |
|
|
|
IMqttChannelAdapter _adapter; |
|
|
|
bool _cleanDisconnectInitiated; |
|
|
|
volatile int _connectState; |
|
|
|
const int CONNECT_STATED_DISCONNECTED = 0; |
|
|
|
const int CONNECT_STATED_DISCONNECTING = 1; |
|
|
|
const int CONNECT_STATED_CONNECTED = 2; |
|
|
|
const int CONNECT_STATED_CONNECTING = 3; |
|
|
|
|
|
|
|
enum ConnectState |
|
|
|
{ |
|
|
|
Disconnected = 0, |
|
|
|
Disconnecting, |
|
|
|
Connected, |
|
|
|
Connecting |
|
|
|
} |
|
|
|
|
|
|
|
MqttClientDisconnectReason _disconnectReason; |
|
|
|
|
|
|
|
DateTime _lastPacketSentTimestamp; |
|
|
@@ -61,7 +66,7 @@ namespace MQTTnet.Client |
|
|
|
|
|
|
|
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get; set; } |
|
|
|
|
|
|
|
public bool IsConnected => _connectState == CONNECT_STATED_CONNECTED; |
|
|
|
public bool IsConnected => (ConnectState)_connectState == ConnectState.Connected; |
|
|
|
|
|
|
|
public IMqttClientOptions Options { get; private set; } |
|
|
|
|
|
|
@@ -74,7 +79,7 @@ namespace MQTTnet.Client |
|
|
|
|
|
|
|
ThrowIfDisposed(); |
|
|
|
|
|
|
|
if (Interlocked.CompareExchange(ref _connectState, CONNECT_STATED_CONNECTING, CONNECT_STATED_DISCONNECTED) != CONNECT_STATED_DISCONNECTED) |
|
|
|
if (CompareExchangeConnectState(ConnectState.Connecting, ConnectState.Disconnected) != ConnectState.Disconnected) |
|
|
|
throw new InvalidOperationException("Not allowed to connect while connect/disconnect is pending."); |
|
|
|
|
|
|
|
MqttClientAuthenticateResult authenticateResult = null; |
|
|
@@ -113,7 +118,7 @@ namespace MQTTnet.Client |
|
|
|
_keepAlivePacketsSenderTask = Task.Run(() => TrySendKeepAliveMessagesAsync(backgroundCancellationToken), backgroundCancellationToken); |
|
|
|
} |
|
|
|
|
|
|
|
Interlocked.CompareExchange(ref _connectState, CONNECT_STATED_CONNECTED, CONNECT_STATED_CONNECTING); |
|
|
|
CompareExchangeConnectState(ConnectState.Connected, ConnectState.Connecting); |
|
|
|
|
|
|
|
_logger.Info("Connected."); |
|
|
|
|
|
|
@@ -365,7 +370,7 @@ namespace MQTTnet.Client |
|
|
|
{ |
|
|
|
Cleanup(); |
|
|
|
_cleanDisconnectInitiated = false; |
|
|
|
Interlocked.CompareExchange(ref _connectState, CONNECT_STATED_DISCONNECTED, CONNECT_STATED_DISCONNECTING); |
|
|
|
CompareExchangeConnectState(ConnectState.Disconnected, ConnectState.Disconnecting); |
|
|
|
|
|
|
|
_logger.Info("Disconnected."); |
|
|
|
|
|
|
@@ -800,20 +805,20 @@ namespace MQTTnet.Client |
|
|
|
|
|
|
|
bool DisconnectIsPendingOrFinished() |
|
|
|
{ |
|
|
|
var connectState = _connectState; |
|
|
|
var connectState = (ConnectState)_connectState; |
|
|
|
do |
|
|
|
{ |
|
|
|
switch (connectState) |
|
|
|
{ |
|
|
|
case CONNECT_STATED_DISCONNECTING: |
|
|
|
case CONNECT_STATED_DISCONNECTED: |
|
|
|
case ConnectState.Disconnected: |
|
|
|
case ConnectState.Disconnecting: |
|
|
|
return true; |
|
|
|
case CONNECT_STATED_CONNECTING: |
|
|
|
case CONNECT_STATED_CONNECTED: |
|
|
|
case ConnectState.Connected: |
|
|
|
case ConnectState.Connecting: |
|
|
|
// This will compare the _connectState to old value and set it to "CONNECT_STATED_DISCONNECTING" afterwards. |
|
|
|
// So the first caller will get a "false" and all subsequent ones will get "true". |
|
|
|
var newState = Interlocked.CompareExchange(ref _connectState, CONNECT_STATED_DISCONNECTING, connectState); |
|
|
|
if (newState != connectState) |
|
|
|
var newState = CompareExchangeConnectState(ConnectState.Disconnecting, connectState); |
|
|
|
if (newState == connectState) |
|
|
|
{ |
|
|
|
return false; |
|
|
|
} |
|
|
@@ -822,5 +827,10 @@ namespace MQTTnet.Client |
|
|
|
} |
|
|
|
} while (true); |
|
|
|
} |
|
|
|
|
|
|
|
ConnectState CompareExchangeConnectState(ConnectState value, ConnectState comparand) |
|
|
|
{ |
|
|
|
return (ConnectState)Interlocked.CompareExchange(ref _connectState, (int)value, (int)comparand); |
|
|
|
} |
|
|
|
} |
|
|
|
} |