Browse Source

Merge branch 'develop' into ServerShutdown

release/3.x.x
Christian 6 years ago
committed by GitHub
parent
commit
ac36c9c406
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 269 additions and 78 deletions
  1. +8
    -5
      Build/MQTTnet.nuspec
  2. +2
    -1
      README.md
  3. +63
    -21
      Source/MQTTnet/Client/MqttClientOptionsBuilder.cs
  4. +29
    -0
      Source/MQTTnet/Client/MqttClientOptionsBuilderTlsParameters.cs
  5. +3
    -0
      Source/MQTTnet/Client/MqttClientTlsOptions.cs
  6. +2
    -0
      Source/MQTTnet/Client/MqttClientWebSocketOptions.cs
  7. +17
    -0
      Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs
  8. +1
    -1
      Source/MQTTnet/Implementations/MqttTcpChannel.cs
  9. +33
    -1
      Source/MQTTnet/Implementations/MqttWebSocketChannel.cs
  10. +1
    -1
      Source/MQTTnet/Server/IMqttClientSession.cs
  11. +4
    -4
      Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs
  12. +6
    -6
      Source/MQTTnet/Server/MqttClientSession.cs
  13. +47
    -34
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  14. +3
    -2
      Tests/MQTTnet.Core.Tests/MqttClientTests.cs
  15. +2
    -2
      Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs
  16. +48
    -0
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs

+ 8
- 5
Build/MQTTnet.nuspec View File

@@ -11,11 +11,11 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance> <requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> <description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [Core] Performance optimizations. <releaseNotes>* [Core] Performance optimizations.
* [Core] Fixed a bug which prevents receiving large packets (UWP only)
* [Client] The ManagedClient options now allow configuring the interval for connection checks.
* [Server] Added the Endpoint of the Adapter (remote IP and port) to the connection validation callback.
* [Server] The ipv4 and ipv6 endpoint can be disabled now by setting the bound IP address to _None_.
* [Server] Fix a bug in the keep alive monitor which caused high CPU load (thanks to @GarageGadget).
* [Client] Added support for proxies when using web socket connections (thanks to PitySOFT).
* [Client] Refactored TLS parameter usage and added more parameters (thanks to PitySOFT).
* [Server] Fixed an issue in client keep alive checks (thanks to @jenscski).
* [Server] Changed the order of _ClientConnected_ and _ClientDisconnected_ events so that _ClientConnected_ is fired at first (thanks to @jenscski).
* [Server] Fixed an iussue which lets the server stop processing messages when using the application message interceptor (thanks to @alamsor).
</releaseNotes> </releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright> <copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
@@ -56,6 +56,9 @@
<file src="..\Source\MQTTnet\bin\Release\net452\MQTTnet.*" target="lib\net452\"/> <file src="..\Source\MQTTnet\bin\Release\net452\MQTTnet.*" target="lib\net452\"/>


<file src="..\Source\MQTTnet\bin\Release\netstandard2.0\MQTTnet.*" target="lib\net461\"/> <file src="..\Source\MQTTnet\bin\Release\netstandard2.0\MQTTnet.*" target="lib\net461\"/>
<file src="..\Source\MQTTnet\bin\Release\netstandard2.0\MQTTnet.*" target="lib\net462\"/>
<file src="..\Source\MQTTnet\bin\Release\netstandard2.0\MQTTnet.*" target="lib\net470\"/>
<file src="..\Source\MQTTnet\bin\Release\netstandard2.0\MQTTnet.*" target="lib\net471\"/>
<file src="..\Source\MQTTnet\bin\Release\netstandard2.0\MQTTnet.*" target="lib\net472\"/> <file src="..\Source\MQTTnet\bin\Release\netstandard2.0\MQTTnet.*" target="lib\net472\"/>
</files> </files>
</package> </package>

+ 2
- 1
README.md View File

@@ -83,9 +83,10 @@ This project also listed at Open Collective (https://opencollective.com/mqttnet)


This library is used in the following projects: This library is used in the following projects:


* HA4IoT (Open Source Home Automation system for .NET, <https://github.com/chkr1011/HA4IoT>)
* MQTT Client Rx (Wrapper for Reactive Extensions, <https://github.com/1iveowl/MQTTClient.rx>) * MQTT Client Rx (Wrapper for Reactive Extensions, <https://github.com/1iveowl/MQTTClient.rx>)
* MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester) and [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8)) * MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester) and [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8))
* HA4IoT (Open Source Home Automation system for .NET, <https://github.com/chkr1011/HA4IoT>)
* Wirehome.Core (Open Source Home Automation system for .NET Core, <https://github.com/chkr1011/Wirehome.Core>)


If you use this library and want to see your project here please let me know. If you use this library and want to see your project here please let me know.




+ 63
- 21
Source/MQTTnet/Client/MqttClientOptionsBuilder.cs View File

@@ -7,10 +7,11 @@ namespace MQTTnet.Client
public class MqttClientOptionsBuilder public class MqttClientOptionsBuilder
{ {
private readonly MqttClientOptions _options = new MqttClientOptions(); private readonly MqttClientOptions _options = new MqttClientOptions();

private MqttClientTcpOptions _tcpOptions; private MqttClientTcpOptions _tcpOptions;
private MqttClientWebSocketOptions _webSocketOptions; private MqttClientWebSocketOptions _webSocketOptions;
private MqttClientTlsOptions _tlsOptions;
private MqttClientOptionsBuilderTlsParameters _tlsParameters;
private MqttClientWebSocketProxyOptions _proxyOptions;


public MqttClientOptionsBuilder WithProtocolVersion(MqttProtocolVersion value) public MqttClientOptionsBuilder WithProtocolVersion(MqttProtocolVersion value)
{ {
@@ -76,6 +77,21 @@ namespace MQTTnet.Client
return this; return this;
} }


public MqttClientOptionsBuilder WithProxy(string address, string username = null, string password = null, string domain = null, bool bypassOnLocal = false, string[] bypassList = null)
{
_proxyOptions = new MqttClientWebSocketProxyOptions
{
Address = address,
Username = username,
Password = password,
Domain = domain,
BypassOnLocal = bypassOnLocal,
BypassList = bypassList
};

return this;
}

public MqttClientOptionsBuilder WithWebSocketServer(string uri) public MqttClientOptionsBuilder WithWebSocketServer(string uri)
{ {
_webSocketOptions = new MqttClientWebSocketOptions _webSocketOptions = new MqttClientWebSocketOptions
@@ -86,13 +102,25 @@ namespace MQTTnet.Client
return this; return this;
} }


public MqttClientOptionsBuilder WithTls(MqttClientOptionsBuilderTlsParameters parameters)
{
_tlsParameters = parameters ?? throw new ArgumentNullException(nameof(parameters));
return this;
}

public MqttClientOptionsBuilder WithTls()
{
return WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = true });
}

[Obsolete("Use method _WithTls_ which accepts the _MqttClientOptionsBuilderTlsParameters_.")]
public MqttClientOptionsBuilder WithTls( public MqttClientOptionsBuilder WithTls(
bool allowUntrustedCertificates = false, bool allowUntrustedCertificates = false,
bool ignoreCertificateChainErrors = false, bool ignoreCertificateChainErrors = false,
bool ignoreCertificateRevocationErrors = false, bool ignoreCertificateRevocationErrors = false,
params byte[][] certificates) params byte[][] certificates)
{ {
_tlsOptions = new MqttClientTlsOptions
_tlsParameters = new MqttClientOptionsBuilderTlsParameters
{ {
UseTls = true, UseTls = true,
AllowUntrustedCertificates = allowUntrustedCertificates, AllowUntrustedCertificates = allowUntrustedCertificates,
@@ -104,33 +132,47 @@ namespace MQTTnet.Client
return this; return this;
} }


public MqttClientOptionsBuilder WithTls()
public IMqttClientOptions Build()
{ {
_tlsOptions = new MqttClientTlsOptions
if (_tcpOptions == null && _webSocketOptions == null)
{ {
UseTls = true
};

return this;
}
throw new InvalidOperationException("A channel must be set.");
}


public IMqttClientOptions Build()
{
if (_tlsOptions != null)
if (_tlsParameters != null)
{ {
if (_tcpOptions == null && _webSocketOptions == null)
if (_tlsParameters?.UseTls == true)
{ {
throw new InvalidOperationException("A channel (TCP or WebSocket) must be set if TLS is configured.");
var tlsOptions = new MqttClientTlsOptions
{
UseTls = true,
SslProtocol = _tlsParameters.SslProtocol,
AllowUntrustedCertificates = _tlsParameters.AllowUntrustedCertificates,
Certificates = _tlsParameters.Certificates?.Select(c => c.ToArray()).ToList(),
CertificateValidationCallback = _tlsParameters.CertificateValidationCallback,
IgnoreCertificateChainErrors = _tlsParameters.IgnoreCertificateChainErrors,
IgnoreCertificateRevocationErrors = _tlsParameters.IgnoreCertificateRevocationErrors
};

if (_tcpOptions != null)
{
_tcpOptions.TlsOptions = tlsOptions;
}
else if (_webSocketOptions != null)
{
_webSocketOptions.TlsOptions = tlsOptions;
}
} }
}


if (_tcpOptions != null)
{
_tcpOptions.TlsOptions = _tlsOptions;
}
else if (_webSocketOptions != null)
if (_proxyOptions != null)
{
if (_webSocketOptions == null)
{ {
_webSocketOptions.TlsOptions = _tlsOptions;
throw new InvalidOperationException("Proxies are only supported for WebSocket connections.");
} }

_webSocketOptions.ProxyOptions = _proxyOptions;
} }


_options.ChannelOptions = (IMqttClientChannelOptions)_tcpOptions ?? _webSocketOptions; _options.ChannelOptions = (IMqttClientChannelOptions)_tcpOptions ?? _webSocketOptions;


+ 29
- 0
Source/MQTTnet/Client/MqttClientOptionsBuilderTlsParameters.cs View File

@@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Net.Security;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;

namespace MQTTnet.Client
{
public class MqttClientOptionsBuilderTlsParameters
{
public bool UseTls { get; set; }

public Func<X509Certificate, X509Chain, SslPolicyErrors, IMqttClientOptions, bool> CertificateValidationCallback
{
get;
set;
}

public SslProtocols SslProtocol { get; set; } = SslProtocols.Tls12;

public IEnumerable<IEnumerable<byte>> Certificates { get; set; }

public bool AllowUntrustedCertificates { get; set; }

public bool IgnoreCertificateChainErrors { get; set; }

public bool IgnoreCertificateRevocationErrors { get; set; }
}
}

+ 3
- 0
Source/MQTTnet/Client/MqttClientTlsOptions.cs View File

@@ -2,6 +2,7 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Net.Security; using System.Net.Security;
using System.Security.Cryptography.X509Certificates; using System.Security.Cryptography.X509Certificates;
using System.Security.Authentication;


namespace MQTTnet.Client namespace MQTTnet.Client
{ {
@@ -17,6 +18,8 @@ namespace MQTTnet.Client


public List<byte[]> Certificates { get; set; } public List<byte[]> Certificates { get; set; }


public SslProtocols SslProtocol { get; set; } = SslProtocols.Tls12;

public Func<X509Certificate, X509Chain, SslPolicyErrors, IMqttClientOptions, bool> CertificateValidationCallback { get; set; } public Func<X509Certificate, X509Chain, SslPolicyErrors, IMqttClientOptions, bool> CertificateValidationCallback { get; set; }
} }
} }

+ 2
- 0
Source/MQTTnet/Client/MqttClientWebSocketOptions.cs View File

@@ -13,6 +13,8 @@ namespace MQTTnet.Client


public CookieContainer CookieContainer { get; set; } public CookieContainer CookieContainer { get; set; }


public MqttClientWebSocketProxyOptions ProxyOptions { get; set; }

public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions();


public override string ToString() public override string ToString()


+ 17
- 0
Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs View File

@@ -0,0 +1,17 @@
namespace MQTTnet.Client
{
public class MqttClientWebSocketProxyOptions
{
public string Address { get; set; }

public string Username { get; set; }

public string Password { get; set; }

public string Domain { get; set; }

public bool BypassOnLocal { get; set; }

public string[] BypassList { get; set; }
}
}

+ 1
- 1
Source/MQTTnet/Implementations/MqttTcpChannel.cs View File

@@ -63,7 +63,7 @@ namespace MQTTnet.Implementations
if (_options.TlsOptions.UseTls) if (_options.TlsOptions.UseTls)
{ {
sslStream = new SslStream(new NetworkStream(_socket, true), false, InternalUserCertificateValidationCallback); sslStream = new SslStream(new NetworkStream(_socket, true), false, InternalUserCertificateValidationCallback);
await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), SslProtocols.Tls12, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
} }


CreateStream(sslStream); CreateStream(sslStream);


+ 33
- 1
Source/MQTTnet/Implementations/MqttWebSocketChannel.cs View File

@@ -1,4 +1,5 @@
using System; using System;
using System.Net;
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Security.Cryptography.X509Certificates; using System.Security.Cryptography.X509Certificates;
using System.Threading; using System.Threading;
@@ -14,7 +15,7 @@ namespace MQTTnet.Implementations
private readonly MqttClientWebSocketOptions _options; private readonly MqttClientWebSocketOptions _options;


private WebSocket _webSocket; private WebSocket _webSocket;
public MqttWebSocketChannel(MqttClientWebSocketOptions options) public MqttWebSocketChannel(MqttClientWebSocketOptions options)
{ {
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
@@ -45,6 +46,11 @@ namespace MQTTnet.Implementations


var clientWebSocket = new ClientWebSocket(); var clientWebSocket = new ClientWebSocket();


if (_options.ProxyOptions != null)
{
clientWebSocket.Options.Proxy = CreateProxy();
}

if (_options.RequestHeaders != null) if (_options.RequestHeaders != null)
{ {
foreach (var requestHeader in _options.RequestHeaders) foreach (var requestHeader in _options.RequestHeaders)
@@ -132,5 +138,31 @@ namespace MQTTnet.Implementations
_webSocket = null; _webSocket = null;
} }
} }

private IWebProxy CreateProxy()
{
if (string.IsNullOrEmpty(_options.ProxyOptions?.Address))
{
return null;
}

#if WINDOWS_UWP
throw new NotSupportedException("Proxies are not supported in UWP.");
#elif NETSTANDARD1_3
throw new NotSupportedException("Proxies are not supported in netstandard 1.3.");
#else
var proxyUri = new Uri(_options.ProxyOptions.Address);

if (!string.IsNullOrEmpty(_options.ProxyOptions.Username) && !string.IsNullOrEmpty(_options.ProxyOptions.Password))
{
var credentials =
new NetworkCredential(_options.ProxyOptions.Username, _options.ProxyOptions.Password, _options.ProxyOptions.Domain);

return new WebProxy(proxyUri, _options.ProxyOptions.BypassOnLocal, _options.ProxyOptions.BypassList, credentials);
}

return new WebProxy(proxyUri, _options.ProxyOptions.BypassOnLocal, _options.ProxyOptions.BypassList);
#endif
}
} }
} }

+ 1
- 1
Source/MQTTnet/Server/IMqttClientSession.cs View File

@@ -14,7 +14,7 @@ namespace MQTTnet.Server
void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttPublishPacket publishPacket); void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttPublishPacket publishPacket);
void ClearPendingApplicationMessages(); void ClearPendingApplicationMessages();
Task<bool> RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter);
Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter);
void Stop(MqttClientDisconnectType disconnectType); void Stop(MqttClientDisconnectType disconnectType);


Task SubscribeAsync(IList<TopicFilter> topicFilters); Task SubscribeAsync(IList<TopicFilter> topicFilters);


+ 4
- 4
Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs View File

@@ -14,9 +14,9 @@ namespace MQTTnet.Server


private readonly IMqttClientSession _clientSession; private readonly IMqttClientSession _clientSession;
private readonly IMqttNetChildLogger _logger; private readonly IMqttNetChildLogger _logger;
private bool _isPaused; private bool _isPaused;
public MqttClientKeepAliveMonitor(IMqttClientSession clientSession, IMqttNetChildLogger logger) public MqttClientKeepAliveMonitor(IMqttClientSession clientSession, IMqttNetChildLogger logger)
{ {
if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));
@@ -76,7 +76,7 @@ namespace MQTTnet.Server
{ {
_logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientSession.ClientId); _logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientSession.ClientId);
_clientSession.Stop(MqttClientDisconnectType.NotClean); _clientSession.Stop(MqttClientDisconnectType.NotClean);
return; return;
} }


@@ -96,7 +96,7 @@ namespace MQTTnet.Server
} }
finally finally
{ {
_logger.Verbose("Client {0}: Stopped checking keep alive timeout.", _clientSession.ClientId);
_logger.Verbose("Client '{0}': Stopped checking keep alive timeout.", _clientSession.ClientId);
} }
} }
} }


+ 6
- 6
Source/MQTTnet/Server/MqttClientSession.cs View File

@@ -67,13 +67,13 @@ namespace MQTTnet.Server
status.LastNonKeepAlivePacketReceived = _keepAliveMonitor.LastNonKeepAlivePacketReceived; status.LastNonKeepAlivePacketReceived = _keepAliveMonitor.LastNonKeepAlivePacketReceived;
} }


public Task<bool> RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
public Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
{ {
_workerTask = RunInternalAsync(connectPacket, adapter); _workerTask = RunInternalAsync(connectPacket, adapter);
return _workerTask; return _workerTask;
} }


private async Task<bool> RunInternalAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
private async Task RunInternalAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
{ {
if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket));
if (adapter == null) throw new ArgumentNullException(nameof(adapter)); if (adapter == null) throw new ArgumentNullException(nameof(adapter));
@@ -138,8 +138,6 @@ namespace MQTTnet.Server
_cancellationTokenSource?.Dispose(); _cancellationTokenSource?.Dispose();
_cancellationTokenSource = null; _cancellationTokenSource = null;
} }

return _wasCleanDisconnect;
} }


private async Task Cleanup() private async Task Cleanup()
@@ -199,6 +197,8 @@ namespace MQTTnet.Server
finally finally
{ {
_logger.Info("Client '{0}': Session stopped.", ClientId); _logger.Info("Client '{0}': Session stopped.", ClientId);

_sessionsManager.Server.OnClientDisconnected(ClientId, _wasCleanDisconnect);
} }
} }


@@ -447,12 +447,12 @@ namespace MQTTnet.Server


private void OnAdapterReadingPacketCompleted(object sender, EventArgs e) private void OnAdapterReadingPacketCompleted(object sender, EventArgs e)
{ {
_keepAliveMonitor?.Pause();
_keepAliveMonitor?.Resume();
} }


private void OnAdapterReadingPacketStarted(object sender, EventArgs e) private void OnAdapterReadingPacketStarted(object sender, EventArgs e)
{ {
_keepAliveMonitor?.Resume();
_keepAliveMonitor?.Pause();
} }
} }
} }

+ 47
- 34
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -44,7 +44,7 @@ namespace MQTTnet.Server


public void Start() public void Start()
{ {
Task.Factory.StartNew(() => ProcessQueuedApplicationMessages(_cancellationToken), _cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
Task.Factory.StartNew(() => TryProcessQueuedApplicationMessages(_cancellationToken), _cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
} }


public void Stop() public void Stop()
@@ -125,6 +125,7 @@ namespace MQTTnet.Server
{ {
_sessions.Remove(clientId); _sessions.Remove(clientId);
} }

_logger.Verbose("Session for client '{0}' deleted.", clientId); _logger.Verbose("Session for client '{0}' deleted.", clientId);
} }


@@ -133,52 +134,67 @@ namespace MQTTnet.Server
_messageQueue?.Dispose(); _messageQueue?.Dispose();
} }


private void ProcessQueuedApplicationMessages(CancellationToken cancellationToken)
private void TryProcessQueuedApplicationMessages(CancellationToken cancellationToken)
{ {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
try try
{ {
var enqueuedApplicationMessage = _messageQueue.Take(cancellationToken);
var sender = enqueuedApplicationMessage.Sender;
var applicationMessage = enqueuedApplicationMessage.PublishPacket.ToApplicationMessage();

var interceptorContext = InterceptApplicationMessage(sender, applicationMessage);
if (interceptorContext != null)
{
if (interceptorContext.CloseConnection)
{
enqueuedApplicationMessage.Sender.Stop(MqttClientDisconnectType.NotClean);
}

if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish)
{
return;
}

applicationMessage = interceptorContext.ApplicationMessage;
}
TryProcessNextQueuedApplicationMessage(cancellationToken);
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.Error(exception, "Unhandled exception while processing queued application messages.");
}
}
}


Server.OnApplicationMessageReceived(sender?.ClientId, applicationMessage);
private void TryProcessNextQueuedApplicationMessage(CancellationToken cancellationToken)
{
try
{
var enqueuedApplicationMessage = _messageQueue.Take(cancellationToken);
var sender = enqueuedApplicationMessage.Sender;
var applicationMessage = enqueuedApplicationMessage.PublishPacket.ToApplicationMessage();


if (applicationMessage.Retain)
var interceptorContext = InterceptApplicationMessage(sender, applicationMessage);
if (interceptorContext != null)
{
if (interceptorContext.CloseConnection)
{ {
_retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).GetAwaiter().GetResult();
enqueuedApplicationMessage.Sender.Stop(MqttClientDisconnectType.NotClean);
} }


foreach (var clientSession in GetSessions())
if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish)
{ {
clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage.ToPublishPacket());
return;
} }

applicationMessage = interceptorContext.ApplicationMessage;
} }
catch (OperationCanceledException)

Server.OnApplicationMessageReceived(sender?.ClientId, applicationMessage);

if (applicationMessage.Retain)
{ {
_retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).GetAwaiter().GetResult();
} }
catch (Exception exception)

foreach (var clientSession in GetSessions())
{ {
_logger.Error(exception, "Unhandled exception while processing queued application message.");
clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage.ToPublishPacket());
} }
} }
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.Error(exception, "Unhandled exception while processing next queued application message.");
}
} }


private List<MqttClientSession> GetSessions() private List<MqttClientSession> GetSessions()
@@ -192,8 +208,7 @@ namespace MQTTnet.Server
private async Task RunSession(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) private async Task RunSession(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
{ {
var clientId = string.Empty; var clientId = string.Empty;
var wasCleanDisconnect = false;

try try
{ {
var firstPacket = await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false); var firstPacket = await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false);
@@ -238,7 +253,7 @@ namespace MQTTnet.Server


Server.OnClientConnected(clientId); Server.OnClientConnected(clientId);


wasCleanDisconnect = await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
@@ -253,8 +268,6 @@ namespace MQTTnet.Server
{ {
DeleteSession(clientId); DeleteSession(clientId);
} }

Server.OnClientDisconnected(clientId, wasCleanDisconnect);
} }
} }




+ 3
- 2
Tests/MQTTnet.Core.Tests/MqttClientTests.cs View File

@@ -1,4 +1,4 @@
using System;
using System;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -8,13 +8,14 @@ using MQTTnet.Client;
using MQTTnet.Diagnostics; using MQTTnet.Diagnostics;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Packets; using MQTTnet.Packets;
using MQTTnet.Implementations;
using MQTTnet.Server;


namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
{ {
[TestClass] [TestClass]
public class MqttClientTests public class MqttClientTests
{ {

[TestMethod] [TestMethod]
public async Task ClientDisconnectException() public async Task ClientDisconnectException()
{ {


+ 2
- 2
Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs View File

@@ -60,7 +60,7 @@ namespace MQTTnet.Core.Tests
{ {
public string ClientId { get; } public string ClientId { get; }


public int StopCalledCount { get; set; }
public int StopCalledCount { get; private set; }


public void FillStatus(MqttClientSessionStatus status) public void FillStatus(MqttClientSessionStatus status)
{ {
@@ -77,7 +77,7 @@ namespace MQTTnet.Core.Tests
throw new NotSupportedException(); throw new NotSupportedException();
} }


public Task<bool> RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
public Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
{ {
throw new NotSupportedException(); throw new NotSupportedException();
} }


+ 48
- 0
Tests/MQTTnet.Core.Tests/MqttServerTests.cs View File

@@ -552,6 +552,54 @@ namespace MQTTnet.Core.Tests
Assert.IsTrue(bodyIsMatching); Assert.IsTrue(bodyIsMatching);
} }


[TestMethod]
public async Task MqttServer_SameClientIdConnectDisconnectEventOrder()
{
var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());

var connectedClient = false;
var connecteCalledBeforeConnectedClients = false;

s.ClientConnected += (_, __) =>
{
connecteCalledBeforeConnectedClients |= connectedClient;
connectedClient = true;
};

s.ClientDisconnected += (_, __) =>
{
connectedClient = false;
};

var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost")
.WithClientId(Guid.NewGuid().ToString())
.Build();

await s.StartAsync(new MqttServerOptions());

var c1 = new MqttFactory().CreateMqttClient();
var c2 = new MqttFactory().CreateMqttClient();

await c1.ConnectAsync(clientOptions);

await Task.Delay(100);

await c2.ConnectAsync(clientOptions);

await Task.Delay(100);

await c1.DisconnectAsync();
await c2.DisconnectAsync();

await s.StopAsync();

await Task.Delay(100);

Assert.IsFalse(connecteCalledBeforeConnectedClients, "ClientConnected was called before ClientDisconnect was called");
}

private class TestStorage : IMqttServerStorage private class TestStorage : IMqttServerStorage
{ {
public IList<MqttApplicationMessage> Messages = new List<MqttApplicationMessage>(); public IList<MqttApplicationMessage> Messages = new List<MqttApplicationMessage>();


Loading…
Cancel
Save