Browse Source

Fix client dead lock and protocol version issues.

release/3.x.x
Christian 6 years ago
parent
commit
ec41efd860
22 changed files with 191 additions and 127 deletions
  1. +3
    -3
      Build/MQTTnet.AspNetCore.nuspec
  2. +5
    -9
      Build/MQTTnet.nuspec
  3. +4
    -4
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  4. +1
    -1
      Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs
  5. +1
    -1
      Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs
  6. +46
    -46
      Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
  7. +1
    -1
      Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs
  8. +1
    -1
      Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs
  9. +15
    -5
      Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs
  10. +1
    -1
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  11. +1
    -1
      Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs
  12. +4
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttApplicationMessageInterceptorContext.cs
  13. +4
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttClientDisconnectedEventArgs.cs
  14. +1
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs
  15. +2
    -2
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
  16. +17
    -6
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
  17. +24
    -16
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
  18. +1
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs
  19. +4
    -4
      Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs
  20. +1
    -1
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs
  21. +27
    -22
      Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
  22. +27
    -0
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 3
- 3
Build/MQTTnet.AspNetCore.nuspec View File

@@ -2,7 +2,7 @@
<package > <package >
<metadata> <metadata>
<id>MQTTnet.AspNetCore</id> <id>MQTTnet.AspNetCore</id>
<version>2.7.3</version>
<version>2.7.4</version>
<authors>Christian Kratky</authors> <authors>Christian Kratky</authors>
<owners>Christian Kratky</owners> <owners>Christian Kratky</owners>
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl> <licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl>
@@ -10,13 +10,13 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl> <iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance> <requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is a support library to integrate MQTTnet into AspNetCore.</description> <description>This is a support library to integrate MQTTnet into AspNetCore.</description>
<releaseNotes>* Updated to MQTTnet 2.7.3.
<releaseNotes>* Updated to MQTTnet 2.7.4.
</releaseNotes> </releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright> <copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies> <dependencies>
<group targetFramework="netstandard2.0"> <group targetFramework="netstandard2.0">
<dependency id="MQTTnet" version="2.7.3" />
<dependency id="MQTTnet" version="2.7.4" />
</group> </group>
</dependencies> </dependencies>
</metadata> </metadata>


+ 5
- 9
Build/MQTTnet.nuspec View File

@@ -10,9 +10,13 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl> <iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance> <requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> <description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>
<releaseNotes> * [Client] Fixed a deadlock while the client disconnects.
* [Client] Fixed broken support for protocol version 3.1.0.
* [Server] The _MqttTcpServerAdapter_ is now added to the ASP.NET services. * [Server] The _MqttTcpServerAdapter_ is now added to the ASP.NET services.
* [Server] _MqttServerAdapter_ is renamed to _MqttTcpServerAdapter_ (BREAKING CHANGE!). * [Server] _MqttServerAdapter_ is renamed to _MqttTcpServerAdapter_ (BREAKING CHANGE!).
* [Server] The server no longer sends the will message of a client if the disconnect was clean (via _Disconnect_ packet).
* [Server] The application message interceptor now allows closing the connection.
* [Server] Added a new flag for the _ClientDisconnected_ event which contains a value indicating whether the disconnect was clean (via _Disconnect_ packet).
</releaseNotes> </releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright> <copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
@@ -42,12 +46,6 @@
<group targetFramework="net461"> <group targetFramework="net461">
</group> </group>
<group targetFramework="net462">
</group>
<group targetFramework="net471">
</group>

</dependencies> </dependencies>
</metadata> </metadata>


@@ -65,8 +63,6 @@
<!-- .NET Framework --> <!-- .NET Framework -->
<file src="..\Frameworks\MQTTnet.Netstandard\bin\Release\net452\MQTTnet.*" target="lib\net452\"/> <file src="..\Frameworks\MQTTnet.Netstandard\bin\Release\net452\MQTTnet.*" target="lib\net452\"/>
<file src="..\Frameworks\MQTTnet.Netstandard\bin\Release\net461\MQTTnet.*" target="lib\net461\"/> <file src="..\Frameworks\MQTTnet.Netstandard\bin\Release\net461\MQTTnet.*" target="lib\net461\"/>
<file src="..\Frameworks\MQTTnet.Netstandard\bin\Release\net462\MQTTnet.*" target="lib\net462\"/>
<file src="..\Frameworks\MQTTnet.Netstandard\bin\Release\net471\MQTTnet.*" target="lib\net471\"/>
</files> </files>
</package> </package>

+ 4
- 4
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs View File

@@ -36,7 +36,7 @@ namespace MQTTnet.Adapter
public Task ConnectAsync(TimeSpan timeout) public Task ConnectAsync(TimeSpan timeout)
{ {
ThrowIfDisposed(); ThrowIfDisposed();
_logger.Trace<MqttChannelAdapter>("Connecting [Timeout={0}]", timeout);
_logger.Verbose<MqttChannelAdapter>("Connecting [Timeout={0}]", timeout);


return ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout)); return ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout));
} }
@@ -44,7 +44,7 @@ namespace MQTTnet.Adapter
public Task DisconnectAsync(TimeSpan timeout) public Task DisconnectAsync(TimeSpan timeout)
{ {
ThrowIfDisposed(); ThrowIfDisposed();
_logger.Trace<MqttChannelAdapter>("Disconnecting [Timeout={0}]", timeout);
_logger.Verbose<MqttChannelAdapter>("Disconnecting [Timeout={0}]", timeout);


return ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout)); return ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout));
} }
@@ -70,7 +70,7 @@ namespace MQTTnet.Adapter
continue; continue;
} }


_logger.Trace<MqttChannelAdapter>("TX >>> {0} [Timeout={1}]", packet, timeout);
_logger.Verbose<MqttChannelAdapter>("TX >>> {0} [Timeout={1}]", packet, timeout);


var chunks = PacketSerializer.Serialize(packet); var chunks = PacketSerializer.Serialize(packet);
foreach (var chunk in chunks) foreach (var chunk in chunks)
@@ -135,7 +135,7 @@ namespace MQTTnet.Adapter
throw new MqttProtocolViolationException("Received malformed packet."); throw new MqttProtocolViolationException("Received malformed packet.");
} }


_logger.Trace<MqttChannelAdapter>("RX <<< {0}", packet);
_logger.Verbose<MqttChannelAdapter>("RX <<< {0}", packet);
} }
finally finally
{ {


+ 1
- 1
Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs View File

@@ -5,6 +5,6 @@ namespace MQTTnet.Client
{ {
public interface IMqttClientAdapterFactory public interface IMqttClientAdapterFactory
{ {
IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger);
IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger);
} }
} }

+ 1
- 1
Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs View File

@@ -13,7 +13,7 @@ namespace MQTTnet.Client
TimeSpan CommunicationTimeout { get; } TimeSpan CommunicationTimeout { get; }
TimeSpan KeepAlivePeriod { get; } TimeSpan KeepAlivePeriod { get; }
TimeSpan? KeepAliveSendInterval { get; set; }
TimeSpan? KeepAliveSendInterval { get; }


MqttProtocolVersion ProtocolVersion { get; } MqttProtocolVersion ProtocolVersion { get; }




+ 46
- 46
Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs View File

@@ -57,16 +57,16 @@ namespace MQTTnet.Client
_packetIdentifierProvider.Reset(); _packetIdentifierProvider.Reset();
_packetDispatcher.Reset(); _packetDispatcher.Reset();


_adapter = _adapterFactory.CreateClientAdapter(options.ChannelOptions, _logger);
_adapter = _adapterFactory.CreateClientAdapter(options, _logger);


_logger.Trace<MqttClient>("Trying to connect with server.");
_logger.Verbose<MqttClient>("Trying to connect with server.");
await _adapter.ConnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); await _adapter.ConnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
_logger.Trace<MqttClient>("Connection with server established.");
_logger.Verbose<MqttClient>("Connection with server established.");


await StartReceivingPacketsAsync().ConfigureAwait(false); await StartReceivingPacketsAsync().ConfigureAwait(false);


var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false); var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false);
_logger.Trace<MqttClient>("MQTT connection with server established.");
_logger.Verbose<MqttClient>("MQTT connection with server established.");


_sendTracker.Restart(); _sendTracker.Restart();


@@ -77,12 +77,14 @@ namespace MQTTnet.Client


IsConnected = true; IsConnected = true;
Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent)); Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent));

_logger.Info<MqttClient>("Connected.");
return new MqttClientConnectResult(connectResponse.IsSessionPresent); return new MqttClientConnectResult(connectResponse.IsSessionPresent);
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.Error<MqttClient>(exception, "Error while connecting with server."); _logger.Error<MqttClient>(exception, "Error while connecting with server.");
await DisconnectInternalAsync(exception).ConfigureAwait(false);
await DisconnectInternalAsync(null, exception).ConfigureAwait(false);


throw; throw;
} }
@@ -104,7 +106,7 @@ namespace MQTTnet.Client
} }
finally finally
{ {
await DisconnectInternalAsync(null).ConfigureAwait(false);
await DisconnectInternalAsync(null, null).ConfigureAwait(false);
} }
} }


@@ -159,7 +161,7 @@ namespace MQTTnet.Client
case MqttQualityOfServiceLevel.AtMostOnce: case MqttQualityOfServiceLevel.AtMostOnce:
{ {
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await SendAsync((MqttPublishPacket[])qosGroup.ToArray()).ConfigureAwait(false);
await SendAsync(qosGroup.Cast<MqttBasePacket>().ToArray()).ConfigureAwait(false);
break; break;
} }
case MqttQualityOfServiceLevel.AtLeastOnce: case MqttQualityOfServiceLevel.AtLeastOnce:
@@ -236,33 +238,48 @@ namespace MQTTnet.Client
if (IsConnected) throw new MqttProtocolViolationException(message); if (IsConnected) throw new MqttProtocolViolationException(message);
} }


private async Task DisconnectInternalAsync(Exception exception)
private async Task DisconnectInternalAsync(Task sender, Exception exception)
{ {
await _disconnectLock.WaitAsync(); await _disconnectLock.WaitAsync();
var clientWasConnected = IsConnected;
try try
{ {
IsConnected = false;

if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
{ {
return; return;
} }


_cancellationTokenSource.Cancel(false); _cancellationTokenSource.Cancel(false);
}
catch (Exception adapterException)
{
_logger.Warning<MqttClient>(adapterException, "Error while disconnecting from adapter.");
}
finally
{
_disconnectLock.Release();
}

var clientWasConnected = IsConnected;
IsConnected = false;


if (_packetReceiverTask != null)
try
{
if (_packetReceiverTask != null && _packetReceiverTask != sender)
{ {
Task.WaitAll(_packetReceiverTask);
_packetReceiverTask.Wait();
} }


if (_keepAliveMessageSenderTask != null)
if (_keepAliveMessageSenderTask != null && _keepAliveMessageSenderTask != sender)
{ {
Task.WaitAll(_keepAliveMessageSenderTask);
_keepAliveMessageSenderTask.Wait();
} }


await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
_logger.Trace<MqttClient>("Disconnected from adapter.");
if (_adapter != null)
{
await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
}
_logger.Verbose<MqttClient>("Disconnected from adapter.");
} }
catch (Exception adapterException) catch (Exception adapterException)
{ {
@@ -272,12 +289,9 @@ namespace MQTTnet.Client
{ {
_adapter?.Dispose(); _adapter?.Dispose();
_adapter = null; _adapter = null;

_cancellationTokenSource?.Dispose(); _cancellationTokenSource?.Dispose();
_cancellationTokenSource = null; _cancellationTokenSource = null;


_disconnectLock.Release();
_logger.Info<MqttClient>("Disconnected."); _logger.Info<MqttClient>("Disconnected.");
Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception)); Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
} }
@@ -287,8 +301,6 @@ namespace MQTTnet.Client
{ {
try try
{ {
_logger.Info<MqttClient>("Received <<< {0}", packet);

if (packet is MqttPublishPacket publishPacket) if (packet is MqttPublishPacket publishPacket)
{ {
await ProcessReceivedPublishPacketAsync(publishPacket).ConfigureAwait(false); await ProcessReceivedPublishPacketAsync(publishPacket).ConfigureAwait(false);
@@ -395,7 +407,7 @@ namespace MQTTnet.Client


private async Task SendKeepAliveMessagesAsync() private async Task SendKeepAliveMessagesAsync()
{ {
_logger.Info<MqttClient>("Start sending keep alive packets.");
_logger.Verbose<MqttClient>("Start sending keep alive packets.");


try try
{ {
@@ -415,37 +427,31 @@ namespace MQTTnet.Client
await Task.Delay(keepAliveSendInterval, _cancellationTokenSource.Token).ConfigureAwait(false); await Task.Delay(keepAliveSendInterval, _cancellationTokenSource.Token).ConfigureAwait(false);
} }
} }
catch (OperationCanceledException)
{
}
catch (Exception exception) catch (Exception exception)
{ {
if (_cancellationTokenSource.Token.IsCancellationRequested)
if (exception is OperationCanceledException)
{ {
return;
} }

if (exception is MqttCommunicationException)
else if (exception is MqttCommunicationException)
{ {
_logger.Warning<MqttClient>(exception, "MQTT communication exception while sending/receiving keep alive packets."); _logger.Warning<MqttClient>(exception, "MQTT communication exception while sending/receiving keep alive packets.");
} }
else else
{ {
_logger.Warning<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets.");

_logger.Error<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets.");
} }
await DisconnectInternalAsync(exception).ConfigureAwait(false);
await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false);
} }
finally finally
{ {
_logger.Info<MqttClient>("Stopped sending keep alive packets.");
_logger.Verbose<MqttClient>("Stopped sending keep alive packets.");
} }
} }


private async Task ReceivePacketsAsync() private async Task ReceivePacketsAsync()
{ {
_logger.Info<MqttClient>("Start receiving packets.");
_logger.Verbose<MqttClient>("Start receiving packets.");


try try
{ {
@@ -463,31 +469,25 @@ namespace MQTTnet.Client
StartProcessReceivedPacket(packet); StartProcessReceivedPacket(packet);
} }
} }
catch (OperationCanceledException)
{
}
catch (Exception exception) catch (Exception exception)
{ {
if (_cancellationTokenSource.IsCancellationRequested)
if (exception is OperationCanceledException)
{ {
return;
} }

if (exception is MqttCommunicationException)
else if (exception is MqttCommunicationException)
{ {
_logger.Warning<MqttClient>(exception, "MQTT communication exception while receiving packets."); _logger.Warning<MqttClient>(exception, "MQTT communication exception while receiving packets.");
} }
else else
{ {
_logger.Error<MqttClient>(exception, "Unhandled exception while receiving packets."); _logger.Error<MqttClient>(exception, "Unhandled exception while receiving packets.");

} }


await DisconnectInternalAsync(exception).ConfigureAwait(false);
await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
} }
finally finally
{ {
_logger.Info<MqttClient>("Stopped receiving packets.");
_logger.Verbose<MqttClient>("Stopped receiving packets.");
} }
} }




+ 1
- 1
Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs View File

@@ -6,7 +6,7 @@ namespace MQTTnet.Diagnostics
{ {
event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished; event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;


void Trace<TSource>(string message, params object[] parameters);
void Verbose<TSource>(string message, params object[] parameters);


void Info<TSource>(string message, params object[] parameters); void Info<TSource>(string message, params object[] parameters);




+ 1
- 1
Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs View File

@@ -13,7 +13,7 @@ namespace MQTTnet.Diagnostics


public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished; public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;


public void Trace<TSource>(string message, params object[] parameters)
public void Verbose<TSource>(string message, params object[] parameters)
{ {
Publish<TSource>(MqttNetLogLevel.Verbose, null, message, parameters); Publish<TSource>(MqttNetLogLevel.Verbose, null, message, parameters);
} }


+ 15
- 5
Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs View File

@@ -8,18 +8,28 @@ namespace MQTTnet.Implementations
{ {
public class MqttClientAdapterFactory : IMqttClientAdapterFactory public class MqttClientAdapterFactory : IMqttClientAdapterFactory
{ {
public IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger)
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));


switch (options)
var serializer = new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion };

switch (options.ChannelOptions)
{ {
case MqttClientTcpOptions tcpOptions: case MqttClientTcpOptions tcpOptions:
return new MqttChannelAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer(), logger);
{
return new MqttChannelAdapter(new MqttTcpChannel(tcpOptions), serializer, logger);
}

case MqttClientWebSocketOptions webSocketOptions: case MqttClientWebSocketOptions webSocketOptions:
return new MqttChannelAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer(), logger);
{
return new MqttChannelAdapter(new MqttWebSocketChannel(webSocketOptions), serializer, logger);
}

default: default:
throw new NotSupportedException();
{
throw new NotSupportedException();
}
} }
} }
} }


+ 1
- 1
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs View File

@@ -9,7 +9,7 @@ using MQTTnet.Client;


namespace MQTTnet.Implementations namespace MQTTnet.Implementations
{ {
public sealed class MqttWebSocketChannel : IMqttChannel, IDisposable
public sealed class MqttWebSocketChannel : IMqttChannel
{ {
// ReSharper disable once MemberCanBePrivate.Global // ReSharper disable once MemberCanBePrivate.Global
// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global


+ 1
- 1
Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs View File

@@ -244,7 +244,7 @@ namespace MQTTnet.ManagedClient
} }
finally finally
{ {
_logger.Trace<ManagedMqttClient>("Stopped publishing messages.");
_logger.Verbose<ManagedMqttClient>("Stopped publishing messages.");
} }
} }




+ 4
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttApplicationMessageInterceptorContext.cs View File

@@ -11,5 +11,9 @@
public string ClientId { get; } public string ClientId { get; }


public MqttApplicationMessage ApplicationMessage { get; set; } public MqttApplicationMessage ApplicationMessage { get; set; }

public bool AcceptPublish { get; set; } = true;

public bool CloseConnection { get; set; }
} }
} }

+ 4
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttClientDisconnectedEventArgs.cs View File

@@ -4,11 +4,14 @@ namespace MQTTnet.Server
{ {
public class MqttClientDisconnectedEventArgs : EventArgs public class MqttClientDisconnectedEventArgs : EventArgs
{ {
public MqttClientDisconnectedEventArgs(ConnectedMqttClient client)
public MqttClientDisconnectedEventArgs(ConnectedMqttClient client, bool wasCleanDisconnect)
{ {
Client = client ?? throw new ArgumentNullException(nameof(client)); Client = client ?? throw new ArgumentNullException(nameof(client));
WasCleanDisconnect = wasCleanDisconnect;
} }
public ConnectedMqttClient Client { get; } public ConnectedMqttClient Client { get; }

public bool WasCleanDisconnect { get; }
} }
} }

+ 1
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs View File

@@ -81,7 +81,7 @@ namespace MQTTnet.Server
} }
finally finally
{ {
_logger.Trace<MqttClientSession>("Client {0}: Stopped checking keep alive timeout.", _clientId);
_logger.Verbose<MqttClientSession>("Client {0}: Stopped checking keep alive timeout.", _clientId);
} }
} }




+ 2
- 2
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs View File

@@ -57,7 +57,7 @@ namespace MQTTnet.Server
_queue.Enqueue(packet); _queue.Enqueue(packet);
_queueWaitSemaphore.Release(); _queueWaitSemaphore.Release();


_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _clientSession.ClientId);
_logger.Verbose<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _clientSession.ClientId);
} }


private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
@@ -96,7 +96,7 @@ namespace MQTTnet.Server


await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { packet }).ConfigureAwait(false); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { packet }).ConfigureAwait(false);


_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
_logger.Verbose<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
} }
catch (Exception exception) catch (Exception exception)
{ {


+ 17
- 6
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs View File

@@ -23,6 +23,7 @@ namespace MQTTnet.Server
private IMqttChannelAdapter _adapter; private IMqttChannelAdapter _adapter;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
private MqttApplicationMessage _willMessage; private MqttApplicationMessage _willMessage;
private bool _wasCleanDisconnect;


public MqttClientSession( public MqttClientSession(
string clientId, string clientId,
@@ -55,7 +56,7 @@ namespace MQTTnet.Server


public bool IsConnected => _adapter != null; public bool IsConnected => _adapter != null;


public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
public async Task<bool> RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
{ {
if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket));
if (adapter == null) throw new ArgumentNullException(nameof(adapter)); if (adapter == null) throw new ArgumentNullException(nameof(adapter));
@@ -64,6 +65,7 @@ namespace MQTTnet.Server
{ {
var cancellationTokenSource = new CancellationTokenSource(); var cancellationTokenSource = new CancellationTokenSource();


_wasCleanDisconnect = false;
_willMessage = connectPacket.WillMessage; _willMessage = connectPacket.WillMessage;
_adapter = adapter; _adapter = adapter;
_cancellationTokenSource = cancellationTokenSource; _cancellationTokenSource = cancellationTokenSource;
@@ -84,9 +86,11 @@ namespace MQTTnet.Server
{ {
_logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); _logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
} }

return _wasCleanDisconnect;
} }


public async Task StopAsync()
public async Task StopAsync(bool wasCleanDisconnect = false)
{ {
try try
{ {
@@ -95,6 +99,8 @@ namespace MQTTnet.Server
return; return;
} }


_wasCleanDisconnect = wasCleanDisconnect;

_cancellationTokenSource?.Cancel(false); _cancellationTokenSource?.Cancel(false);


PendingMessagesQueue.WaitForCompletion(); PendingMessagesQueue.WaitForCompletion();
@@ -110,9 +116,10 @@ namespace MQTTnet.Server
finally finally
{ {
var willMessage = _willMessage; var willMessage = _willMessage;
if (willMessage != null)
_willMessage = null; // clear willmessage so it is send just once

if (willMessage != null && !wasCleanDisconnect)
{ {
_willMessage = null; // clear willmessage so it is send just once
await ApplicationMessageReceivedCallback(this, willMessage).ConfigureAwait(false); await ApplicationMessageReceivedCallback(this, willMessage).ConfigureAwait(false);
} }
} }
@@ -246,7 +253,12 @@ namespace MQTTnet.Server
return HandleIncomingUnsubscribePacketAsync(adapter, unsubscribePacket, cancellationToken); return HandleIncomingUnsubscribePacketAsync(adapter, unsubscribePacket, cancellationToken);
} }


if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
if (packet is MqttDisconnectPacket)
{
return StopAsync(true);
}

if (packet is MqttConnectPacket)
{ {
return StopAsync(); return StopAsync();
} }
@@ -262,7 +274,6 @@ namespace MQTTnet.Server


if (subscribeResult.CloseConnection) if (subscribeResult.CloseConnection)
{ {
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { new MqttDisconnectPacket() }).ConfigureAwait(false);
await StopAsync().ConfigureAwait(false); await StopAsync().ConfigureAwait(false);
} }




+ 24
- 16
Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs View File

@@ -29,7 +29,7 @@ namespace MQTTnet.Server
} }


public Action<ConnectedMqttClient> ClientConnectedCallback { get; set; } public Action<ConnectedMqttClient> ClientConnectedCallback { get; set; }
public Action<ConnectedMqttClient> ClientDisconnectedCallback { get; set; }
public Action<ConnectedMqttClient, bool> ClientDisconnectedCallback { get; set; }
public Action<string, TopicFilter> ClientSubscribedTopicCallback { get; set; } public Action<string, TopicFilter> ClientSubscribedTopicCallback { get; set; }
public Action<string, string> ClientUnsubscribedTopicCallback { get; set; } public Action<string, string> ClientUnsubscribedTopicCallback { get; set; }
public Action<string, MqttApplicationMessage> ApplicationMessageReceivedCallback { get; set; } public Action<string, MqttApplicationMessage> ApplicationMessageReceivedCallback { get; set; }
@@ -37,7 +37,9 @@ namespace MQTTnet.Server
public async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) public async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
{ {
var clientId = string.Empty; var clientId = string.Empty;
var wasCleanDisconnect = false;
MqttClientSession clientSession = null; MqttClientSession clientSession = null;
try try
{ {
if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken) if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken)
@@ -84,7 +86,7 @@ namespace MQTTnet.Server
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
}); });


await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
wasCleanDisconnect = await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
@@ -110,7 +112,8 @@ namespace MQTTnet.Server
ClientId = clientId, ClientId = clientId,
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion, ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion,
PendingApplicationMessages = clientSession?.PendingMessagesQueue.Count ?? 0 PendingApplicationMessages = clientSession?.PendingMessagesQueue.Count ?? 0
});
},
wasCleanDisconnect);
} }
} }


@@ -156,8 +159,13 @@ namespace MQTTnet.Server
{ {
try try
{ {
applicationMessage = InterceptApplicationMessage(senderClientSession, applicationMessage);
if (applicationMessage == null)
var interceptorContext = InterceptApplicationMessage(senderClientSession, applicationMessage);
if (interceptorContext.CloseConnection)
{
await senderClientSession.StopAsync().ConfigureAwait(false);
}

if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish)
{ {
return; return;
} }
@@ -230,20 +238,20 @@ namespace MQTTnet.Server
} }
} }


private MqttApplicationMessage InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
private MqttApplicationMessageInterceptorContext InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
{ {
var interceptor = _options.ApplicationMessageInterceptor;
if (interceptor == null)
{
return applicationMessage;
}

var interceptorContext = new MqttApplicationMessageInterceptorContext( var interceptorContext = new MqttApplicationMessageInterceptorContext(
senderClientSession?.ClientId, senderClientSession?.ClientId,
applicationMessage); applicationMessage);


var interceptor = _options.ApplicationMessageInterceptor;
if (interceptor == null)
{
return interceptorContext;
}
interceptor(interceptorContext); interceptor(interceptorContext);
return interceptorContext.ApplicationMessage;
return interceptorContext;
} }


private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket) private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
@@ -279,11 +287,11 @@ namespace MQTTnet.Server
clientSession.Dispose(); clientSession.Dispose();
clientSession = null; clientSession = null;


_logger.Trace<MqttClientSessionsManager>("Stopped existing session of client '{0}'.", connectPacket.ClientId);
_logger.Verbose<MqttClientSessionsManager>("Stopped existing session of client '{0}'.", connectPacket.ClientId);
} }
else else
{ {
_logger.Trace<MqttClientSessionsManager>("Reusing existing session of client '{0}'.", connectPacket.ClientId);
_logger.Verbose<MqttClientSessionsManager>("Reusing existing session of client '{0}'.", connectPacket.ClientId);
} }
} }


@@ -302,7 +310,7 @@ namespace MQTTnet.Server


_sessions[connectPacket.ClientId] = clientSession; _sessions[connectPacket.ClientId] = clientSession;


_logger.Trace<MqttClientSessionsManager>("Created a new session for client '{0}'.", connectPacket.ClientId);
_logger.Verbose<MqttClientSessionsManager>("Created a new session for client '{0}'.", connectPacket.ClientId);
} }


return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession };


+ 1
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs View File

@@ -132,7 +132,7 @@ namespace MQTTnet.Server


if (!saveIsRequired) if (!saveIsRequired)
{ {
_logger.Trace<MqttRetainedMessagesManager>("Skipped saving retained messages because no changes were detected.");
_logger.Verbose<MqttRetainedMessagesManager>("Skipped saving retained messages because no changes were detected.");
} }


if (saveIsRequired && _options.Storage != null) if (saveIsRequired && _options.Storage != null)


+ 4
- 4
Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs View File

@@ -138,10 +138,10 @@ namespace MQTTnet.Server
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(client)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(client));
} }


private void OnClientDisconnected(ConnectedMqttClient client)
private void OnClientDisconnected(ConnectedMqttClient client, bool wasCleanDisconnect)
{ {
_logger.Info<MqttServer>("Client '{0}': Disconnected.", client.ClientId);
ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client));
_logger.Info<MqttServer>("Client '{0}': Disconnected (clean={1}).", client.ClientId, wasCleanDisconnect);
ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client, wasCleanDisconnect));
} }


private void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter) private void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter)
@@ -162,7 +162,7 @@ namespace MQTTnet.Server
private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs)
{ {
eventArgs.SessionTask = Task.Run( eventArgs.SessionTask = Task.Run(
async () => await _clientSessionsManager.RunSessionAsync(eventArgs.Client, _cancellationTokenSource.Token).ConfigureAwait(false),
() => _clientSessionsManager.RunSessionAsync(eventArgs.Client, _cancellationTokenSource.Token),
_cancellationTokenSource.Token); _cancellationTokenSource.Token);
} }
} }


+ 1
- 1
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs View File

@@ -13,7 +13,7 @@ namespace MQTTnet.Core.Tests
_adapter = adapter; _adapter = adapter;
} }
public IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger)
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger)
{ {
return _adapter; return _adapter;
} }


+ 27
- 22
Tests/MQTTnet.TestApp.NetCore/ServerTest.cs View File

@@ -3,7 +3,6 @@ using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using MQTTnet.Server; using MQTTnet.Server;
using Newtonsoft.Json.Linq;


namespace MQTTnet.TestApp.NetCore namespace MQTTnet.TestApp.NetCore
{ {
@@ -38,6 +37,12 @@ namespace MQTTnet.TestApp.NetCore
// based payload with the timestamp is a suitable use case. // based payload with the timestamp is a suitable use case.
context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O")); context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
} }

if (context.ApplicationMessage.Topic == "not_allowed_topic")
{
context.AcceptPublish = false;
context.CloseConnection = true;
}
}, },
SubscriptionInterceptor = context => SubscriptionInterceptor = context =>
{ {
@@ -72,27 +77,27 @@ namespace MQTTnet.TestApp.NetCore
ConsoleColor.Magenta); ConsoleColor.Magenta);
}; };


options.ApplicationMessageInterceptor = c =>
{
if (c.ApplicationMessage.Payload == null || c.ApplicationMessage.Payload.Length == 0)
{
return;
}
try
{
var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload));
var timestampProperty = content.Property("timestamp");
if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
{
timestampProperty.Value = DateTime.Now.ToString("O");
c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString());
}
}
catch (Exception)
{
}
};
//options.ApplicationMessageInterceptor = c =>
//{
// if (c.ApplicationMessage.Payload == null || c.ApplicationMessage.Payload.Length == 0)
// {
// return;
// }
// try
// {
// var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload));
// var timestampProperty = content.Property("timestamp");
// if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
// {
// timestampProperty.Value = DateTime.Now.ToString("O");
// c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString());
// }
// }
// catch (Exception)
// {
// }
//};


mqttServer.ClientDisconnected += (s, e) => mqttServer.ClientDisconnected += (s, e) =>
{ {


+ 27
- 0
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs View File

@@ -13,6 +13,8 @@ using MQTTnet.Implementations;
using MQTTnet.ManagedClient; using MQTTnet.ManagedClient;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using MQTTnet.Server; using MQTTnet.Server;
using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;


namespace MQTTnet.TestApp.UniversalWindows namespace MQTTnet.TestApp.UniversalWindows
{ {
@@ -33,6 +35,11 @@ namespace MQTTnet.TestApp.UniversalWindows
private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e) private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
{ {
_traceMessages.Enqueue(e.TraceMessage); _traceMessages.Enqueue(e.TraceMessage);
await UpdateLogAsync();
}

private async Task UpdateLogAsync()
{
while (_traceMessages.Count > 100) while (_traceMessages.Count > 100)
{ {
_traceMessages.TryDequeue(out _); _traceMessages.TryDequeue(out _);
@@ -113,11 +120,15 @@ namespace MQTTnet.TestApp.UniversalWindows
{ {
await _mqttClient.DisconnectAsync(); await _mqttClient.DisconnectAsync();
_mqttClient.ApplicationMessageReceived -= OnApplicationMessageReceived; _mqttClient.ApplicationMessageReceived -= OnApplicationMessageReceived;
_mqttClient.Connected -= OnConnected;
_mqttClient.Disconnected -= OnDisconnected;
} }


var factory = new MqttFactory(); var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient(); _mqttClient = factory.CreateMqttClient();
_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
_mqttClient.Connected += OnConnected;
_mqttClient.Disconnected += OnDisconnected;


await _mqttClient.ConnectAsync(options); await _mqttClient.ConnectAsync(options);
} }
@@ -127,6 +138,22 @@ namespace MQTTnet.TestApp.UniversalWindows
} }
} }


private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
{
_traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
"", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null));

Task.Run(UpdateLogAsync);
}

private void OnConnected(object sender, MqttClientConnectedEventArgs e)
{
_traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
"", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null));

Task.Run(UpdateLogAsync);
}

private async void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs) private async void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
{ {
var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}"; var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";


Loading…
Cancel
Save