Browse Source

Fix issue with connection management.

release/3.x.x
Christian Kratky 7 years ago
parent
commit
c90bdbfb93
7 changed files with 159 additions and 92 deletions
  1. +1
    -0
      Build/MQTTnet.nuspec
  2. +10
    -4
      Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
  3. +10
    -4
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  4. +9
    -3
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs
  5. +33
    -30
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  6. +90
    -50
      MQTTnet.Core/Client/MqttClient.cs
  7. +6
    -1
      MQTTnet.Core/Exceptions/MqttCommunicationException.cs

+ 1
- 0
Build/MQTTnet.nuspec View File

@@ -14,6 +14,7 @@
* [Server] Providing the used protocol version of connected clients * [Server] Providing the used protocol version of connected clients
* [Client] Added support for protocol version 3.1.0 * [Client] Added support for protocol version 3.1.0
* [Core] Several minor performance improvements * [Core] Several minor performance improvements
* [Core] Fixed an issue with connection management (Thanks to wuzhenda; Zuendelmeister)
</releaseNotes> </releaseNotes>
<copyright>Copyright Christian Kratky 2016-2017</copyright> <copyright>Copyright Christian Kratky 2016-2017</copyright>
<tags>MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware Arduino</tags> <tags>MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware Arduino</tags>


+ 10
- 4
Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs View File

@@ -12,12 +12,11 @@ namespace MQTTnet.Implementations
{ {
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{ {
private readonly Socket _socket;
private Socket _socket;
private SslStream _sslStream; private SslStream _sslStream;


public MqttTcpChannel() public MqttTcpChannel()
{ {
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
} }


public MqttTcpChannel(Socket socket, SslStream sslStream) public MqttTcpChannel(Socket socket, SslStream sslStream)
@@ -31,6 +30,11 @@ namespace MQTTnet.Implementations
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
try try
{ {
if (_socket == null)
{
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}

await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null); await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null);


if (options.TlsOptions.UseTls) if (options.TlsOptions.UseTls)
@@ -49,8 +53,7 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
_sslStream.Dispose();
_socket.Dispose();
Dispose();
return Task.FromResult(0); return Task.FromResult(0);
} }
catch (SocketException exception) catch (SocketException exception)
@@ -108,6 +111,9 @@ namespace MQTTnet.Implementations
{ {
_socket?.Dispose(); _socket?.Dispose();
_sslStream?.Dispose(); _sslStream?.Dispose();

_socket = null;
_sslStream = null;
} }


private static X509CertificateCollection LoadCertificates(MqttClientOptions options) private static X509CertificateCollection LoadCertificates(MqttClientOptions options)


+ 10
- 4
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs View File

@@ -12,12 +12,11 @@ namespace MQTTnet.Implementations
{ {
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{ {
private readonly Socket _socket;
private Socket _socket;
private SslStream _sslStream; private SslStream _sslStream;


public MqttTcpChannel() public MqttTcpChannel()
{ {
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
} }


public MqttTcpChannel(Socket socket, SslStream sslStream) public MqttTcpChannel(Socket socket, SslStream sslStream)
@@ -31,6 +30,11 @@ namespace MQTTnet.Implementations
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
try try
{ {
if (_socket == null)
{
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}

await _socket.ConnectAsync(options.Server, options.GetPort()); await _socket.ConnectAsync(options.Server, options.GetPort());
if (options.TlsOptions.UseTls) if (options.TlsOptions.UseTls)
@@ -49,8 +53,7 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
_sslStream.Dispose();
_socket.Dispose();
Dispose();
return Task.FromResult(0); return Task.FromResult(0);
} }
catch (SocketException exception) catch (SocketException exception)
@@ -101,6 +104,9 @@ namespace MQTTnet.Implementations
{ {
_socket?.Dispose(); _socket?.Dispose();
_sslStream?.Dispose(); _sslStream?.Dispose();

_socket = null;
_sslStream = null;
} }


private static X509CertificateCollection LoadCertificates(MqttClientOptions options) private static X509CertificateCollection LoadCertificates(MqttClientOptions options)


+ 9
- 3
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs View File

@@ -15,11 +15,10 @@ namespace MQTTnet.Implementations
{ {
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{ {
private readonly StreamSocket _socket;
private StreamSocket _socket;


public MqttTcpChannel() public MqttTcpChannel()
{ {
_socket = new StreamSocket();
} }


public MqttTcpChannel(StreamSocket socket) public MqttTcpChannel(StreamSocket socket)
@@ -32,6 +31,11 @@ namespace MQTTnet.Implementations
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
try try
{ {
if (_socket == null)
{
_socket = new StreamSocket();
}

if (!options.TlsOptions.UseTls) if (!options.TlsOptions.UseTls)
{ {
await _socket.ConnectAsync(new HostName(options.Server), options.GetPort().ToString()); await _socket.ConnectAsync(new HostName(options.Server), options.GetPort().ToString());
@@ -59,7 +63,7 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
_socket.Dispose();
Dispose();
return Task.FromResult(0); return Task.FromResult(0);
} }
catch (SocketException exception) catch (SocketException exception)
@@ -100,6 +104,8 @@ namespace MQTTnet.Implementations
public void Dispose() public void Dispose()
{ {
_socket?.Dispose(); _socket?.Dispose();

_socket = null;
} }


private static Certificate LoadCertificate(MqttClientOptions options) private static Certificate LoadCertificate(MqttClientOptions options)


+ 33
- 30
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs View File

@@ -23,11 +23,7 @@ namespace MQTTnet.Core.Adapter


public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
{ {
var task = _channel.ConnectAsync(options);
if (await Task.WhenAny(Task.Delay(timeout), task) != task)
{
throw new MqttCommunicationTimedOutException();
}
await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout);
} }


public async Task DisconnectAsync() public async Task DisconnectAsync()
@@ -39,21 +35,7 @@ namespace MQTTnet.Core.Adapter
{ {
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]");


bool hasTimeout;
try
{
var task = PacketSerializer.SerializeAsync(packet, _channel);
hasTimeout = await Task.WhenAny(Task.Delay(timeout), task) != task;
}
catch (Exception exception)
{
throw new MqttCommunicationException(exception);
}

if (hasTimeout)
{
throw new MqttCommunicationTimedOutException();
}
await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout);
} }


public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout) public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
@@ -61,16 +43,7 @@ namespace MQTTnet.Core.Adapter
MqttBasePacket packet; MqttBasePacket packet;
if (timeout > TimeSpan.Zero) if (timeout > TimeSpan.Zero)
{ {
var workerTask = PacketSerializer.DeserializeAsync(_channel);
var timeoutTask = Task.Delay(timeout);
var hasTimeout = Task.WhenAny(timeoutTask, workerTask) == timeoutTask;

if (hasTimeout)
{
throw new MqttCommunicationTimedOutException();
}

packet = workerTask.Result;
packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout);
} }
else else
{ {
@@ -85,5 +58,35 @@ namespace MQTTnet.Core.Adapter
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"RX <<< {packet}"); MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"RX <<< {packet}");
return packet; return packet;
} }

private static async Task<TResult> ExecuteWithTimeoutAsync<TResult>(Task<TResult> task, TimeSpan timeout)
{
var timeoutTask = Task.Delay(timeout);
if (await Task.WhenAny(timeoutTask, task) == timeoutTask)
{
throw new MqttCommunicationTimedOutException();
}

if (task.IsFaulted)
{
throw new MqttCommunicationException(task.Exception);
}

return task.Result;
}

private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout)
{
var timeoutTask = Task.Delay(timeout);
if (await Task.WhenAny(timeoutTask, task) == timeoutTask)
{
throw new MqttCommunicationTimedOutException();
}

if (task.IsFaulted)
{
throw new MqttCommunicationException(task.Exception);
}
}
} }
} }

+ 90
- 50
MQTTnet.Core/Client/MqttClient.cs View File

@@ -20,6 +20,7 @@ namespace MQTTnet.Core.Client
private readonly MqttClientOptions _options; private readonly MqttClientOptions _options;
private readonly IMqttCommunicationAdapter _adapter; private readonly IMqttCommunicationAdapter _adapter;


private bool _disconnectedEventSuspended;
private int _latestPacketIdentifier; private int _latestPacketIdentifier;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;


@@ -48,49 +49,64 @@ namespace MQTTnet.Core.Client
throw new MqttProtocolViolationException("It is not allowed to connect with a server after the connection is established."); throw new MqttProtocolViolationException("It is not allowed to connect with a server after the connection is established.");
} }


var connectPacket = new MqttConnectPacket
try
{ {
ClientId = _options.ClientId,
Username = _options.UserName,
Password = _options.Password,
CleanSession = _options.CleanSession,
KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
WillMessage = willApplicationMessage
};
_disconnectedEventSuspended = false;


await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout);
MqttTrace.Verbose(nameof(MqttClient), "Connection with server established.");
await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout);


_cancellationTokenSource = new CancellationTokenSource();
_latestPacketIdentifier = 0;
_packetDispatcher.Reset();
IsConnected = true;
MqttTrace.Verbose(nameof(MqttClient), "Connection with server established.");


#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ReceivePackets(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
var connectPacket = new MqttConnectPacket
{
ClientId = _options.ClientId,
Username = _options.UserName,
Password = _options.Password,
CleanSession = _options.CleanSession,
KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
WillMessage = willApplicationMessage
};

_cancellationTokenSource = new CancellationTokenSource();
_latestPacketIdentifier = 0;
_packetDispatcher.Reset();

StartReceivePackets();

var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket);
if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
await DisconnectInternalAsync();
throw new MqttConnectingFailedException(response.ConnectReturnCode);
}


var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket);
if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
await DisconnectAsync();
throw new MqttConnectingFailedException(response.ConnectReturnCode);
}
if (_options.KeepAlivePeriod != TimeSpan.Zero)
{
StartSendKeepAliveMessages();
}
MqttTrace.Verbose(nameof(MqttClient), "MQTT connection with server established.");


if (_options.KeepAlivePeriod != TimeSpan.Zero)
IsConnected = true;
Connected?.Invoke(this, EventArgs.Empty);
}
catch (Exception)
{ {
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => SendKeepAliveMessagesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
await DisconnectInternalAsync();
throw;
} }

Connected?.Invoke(this, EventArgs.Empty);
} }


public async Task DisconnectAsync() public async Task DisconnectAsync()
{ {
await SendAsync(new MqttDisconnectPacket());
await DisconnectInternalAsync();
try
{
await SendAsync(new MqttDisconnectPacket());
}
finally
{
await DisconnectInternalAsync();
}
} }


public Task<IList<MqttSubscribeResult>> SubscribeAsync(params TopicFilter[] topicFilters) public Task<IList<MqttSubscribeResult>> SubscribeAsync(params TopicFilter[] topicFilters)
@@ -181,8 +197,9 @@ namespace MQTTnet.Core.Client
{ {
await _adapter.DisconnectAsync(); await _adapter.DisconnectAsync();
} }
catch
catch (Exception exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting.");
} }
finally finally
{ {
@@ -191,7 +208,12 @@ namespace MQTTnet.Core.Client
_cancellationTokenSource = null; _cancellationTokenSource = null;


IsConnected = false; IsConnected = false;
Disconnected?.Invoke(this, EventArgs.Empty);

if (!_disconnectedEventSuspended)
{
_disconnectedEventSuspended = true;
Disconnected?.Invoke(this, EventArgs.Empty);
}
} }
} }


@@ -239,7 +261,7 @@ namespace MQTTnet.Core.Client
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message.");
MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message.");
} }
} }


@@ -278,7 +300,7 @@ namespace MQTTnet.Core.Client
{ {
_unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
} }
await SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>()); await SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>());
} }


@@ -300,15 +322,12 @@ namespace MQTTnet.Core.Client
var pi1 = requestPacket as IMqttPacketWithIdentifier; var pi1 = requestPacket as IMqttPacketWithIdentifier;
var pi2 = p as IMqttPacketWithIdentifier; var pi2 = p as IMqttPacketWithIdentifier;


if (pi1 != null && pi2 != null)
if (pi1 == null || pi2 == null)
{ {
if (pi1.PacketIdentifier != pi2.PacketIdentifier)
{
return false;
}
return true;
} }


return true;
return pi1.PacketIdentifier == pi2.PacketIdentifier;
} }


await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout); await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout);
@@ -335,15 +354,16 @@ namespace MQTTnet.Core.Client
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets.");
await DisconnectInternalAsync();
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets."); MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets.");
await DisconnectInternalAsync();
} }
finally finally
{ {
MqttTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); MqttTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets.");
await DisconnectInternalAsync();
} }
} }


@@ -354,27 +374,47 @@ namespace MQTTnet.Core.Client
{ {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
var mqttPacket = await _adapter.ReceivePacketAsync(TimeSpan.Zero);
MqttTrace.Information(nameof(MqttClient), $"Received <<< {mqttPacket}");
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero);
MqttTrace.Information(nameof(MqttClient), $"Received <<< {packet}");


#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ProcessReceivedPacketAsync(mqttPacket), cancellationToken);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
StartProcessReceivedPacket(packet, cancellationToken);
} }
} }
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets.");
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets.");
await DisconnectInternalAsync();
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Error(nameof(MqttClient), exception, "Error while receiving packets.");
MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets.");
await DisconnectInternalAsync();
} }
finally finally
{ {
MqttTrace.Information(nameof(MqttClient), "Stopped receiving packets."); MqttTrace.Information(nameof(MqttClient), "Stopped receiving packets.");
await DisconnectInternalAsync();
} }
} }

private void StartProcessReceivedPacket(MqttBasePacket packet, CancellationToken cancellationToken)
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ProcessReceivedPacketAsync(packet), cancellationToken);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}

private void StartReceivePackets()
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ReceivePackets(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}

private void StartSendKeepAliveMessages()
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => SendKeepAliveMessagesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
} }
} }

+ 6
- 1
MQTTnet.Core/Exceptions/MqttCommunicationException.cs View File

@@ -4,7 +4,7 @@ namespace MQTTnet.Core.Exceptions
{ {
public class MqttCommunicationException : Exception public class MqttCommunicationException : Exception
{ {
public MqttCommunicationException()
protected MqttCommunicationException()
{ {
} }


@@ -17,5 +17,10 @@ namespace MQTTnet.Core.Exceptions
: base(message) : base(message)
{ {
} }

public MqttCommunicationException(string message, Exception innerException)
: base(message, innerException)
{
}
} }
} }

Loading…
Cancel
Save