From fa34908d82a56df572d1a7d372927bb7f8d58b22 Mon Sep 17 00:00:00 2001 From: Marius Ruginosu Date: Wed, 18 Jul 2018 10:51:45 +0300 Subject: [PATCH 01/10] Add proxy settings for WebSocket connections for .NET 451 and 461 --- .../Client/MqttClientOptionsBuilder.cs | 23 ++++++++++++++++ .../Client/MqttClientWebSocketOptions.cs | 4 +++ .../Client/MqttClientWebSocketProxyOptions.cs | 21 +++++++++++++++ .../Implementations/MqttWebSocketChannel.cs | 26 ++++++++++++++++++- 4 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs diff --git a/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs b/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs index 985cda7..4948ad3 100644 --- a/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs +++ b/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs @@ -76,6 +76,29 @@ namespace MQTTnet.Client return this; } +#if NET452 || NET461 + + public MqttClientOptionsBuilder WithProxy(string address, string username = null, string password = null, string domain = null, bool bypassOnLocal = false, string[] bypassList = null) + { + if (_webSocketOptions == null) + { + throw new InvalidOperationException("A WebSocket channel must be set if MqttClientWebSocketProxy is configured."); + } + + _webSocketOptions.MqttClientWebSocketProxy = new MqttClientWebSocketProxyOptions + { + Address = address, + Username = username, + Password = password, + Domain = domain, + BypassOnLocal = bypassOnLocal, + BypassList = bypassList + }; + + return this; + } +#endif + public MqttClientOptionsBuilder WithWebSocketServer(string uri) { _webSocketOptions = new MqttClientWebSocketOptions diff --git a/Source/MQTTnet/Client/MqttClientWebSocketOptions.cs b/Source/MQTTnet/Client/MqttClientWebSocketOptions.cs index a4dd0d5..2cce431 100644 --- a/Source/MQTTnet/Client/MqttClientWebSocketOptions.cs +++ b/Source/MQTTnet/Client/MqttClientWebSocketOptions.cs @@ -13,6 +13,10 @@ namespace MQTTnet.Client public CookieContainer CookieContainer { get; set; } +#if NET452 || NET461 + public MqttClientWebSocketProxyOptions MqttClientWebSocketProxy { get; set; } = new MqttClientWebSocketProxyOptions(); +#endif + public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); public override string ToString() diff --git a/Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs b/Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs new file mode 100644 index 0000000..81f48da --- /dev/null +++ b/Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs @@ -0,0 +1,21 @@ +#if NET452 || NET461 + +namespace MQTTnet.Client +{ + public class MqttClientWebSocketProxyOptions + { + public string Address { get; set; } + + public string Username { get; set; } + + public string Password { get; set; } + + public string Domain { get; set; } + + public bool BypassOnLocal { get; set; } = true; + + public string[] BypassList { get; set; } + } +} + +#endif \ No newline at end of file diff --git a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs index 3243cd7..cf8c183 100644 --- a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs +++ b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs @@ -6,6 +6,10 @@ using System.Threading.Tasks; using MQTTnet.Channel; using MQTTnet.Client; +#if NET452 || NET461 + using System.Net; +#endif + namespace MQTTnet.Implementations { public class MqttWebSocketChannel : IMqttChannel @@ -14,7 +18,7 @@ namespace MQTTnet.Implementations private readonly MqttClientWebSocketOptions _options; private WebSocket _webSocket; - + public MqttWebSocketChannel(MqttClientWebSocketOptions options) { _options = options ?? throw new ArgumentNullException(nameof(options)); @@ -45,6 +49,26 @@ namespace MQTTnet.Implementations var clientWebSocket = new ClientWebSocket(); +#if NET452 || NET461 + // use proxy if we have an address + if (string.IsNullOrEmpty(_options.MqttClientWebSocketProxy.Address) == false) + { + var proxyUri = new Uri(_options.MqttClientWebSocketProxy.Address); + + // use proxy credentials if we have them + if (string.IsNullOrEmpty(_options.MqttClientWebSocketProxy.Username) == false && string.IsNullOrEmpty(_options.MqttClientWebSocketProxy.Password) == false) + { + var credentials = new NetworkCredential(_options.MqttClientWebSocketProxy.Username, _options.MqttClientWebSocketProxy.Password, _options.MqttClientWebSocketProxy.Domain); + clientWebSocket.Options.Proxy = new WebProxy(proxyUri, _options.MqttClientWebSocketProxy.BypassOnLocal, _options.MqttClientWebSocketProxy.BypassList, credentials); + } + else + { + // use proxy without credentials + clientWebSocket.Options.Proxy = new WebProxy(proxyUri, _options.MqttClientWebSocketProxy.BypassOnLocal, _options.MqttClientWebSocketProxy.BypassList); + } + } +#endif + if (_options.RequestHeaders != null) { foreach (var requestHeader in _options.RequestHeaders) From 65a8b091b9f840bb0bb5edbfe00a518d718887aa Mon Sep 17 00:00:00 2001 From: Marius Ruginosu Date: Wed, 18 Jul 2018 12:19:09 +0300 Subject: [PATCH 02/10] Add option to specify the SSLProtocol and CertificateValidationCallback in TLS options, SSLProtocol default is TLS1.2 --- Source/MQTTnet/Client/MqttClientOptionsBuilder.cs | 11 +++++++++-- Source/MQTTnet/Client/MqttClientTlsOptions.cs | 3 +++ Source/MQTTnet/Implementations/MqttTcpChannel.cs | 2 +- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs b/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs index 4948ad3..77c61bd 100644 --- a/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs +++ b/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs @@ -1,5 +1,8 @@ using System; using System.Linq; +using System.Net.Security; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; using MQTTnet.Serializer; namespace MQTTnet.Client @@ -113,7 +116,9 @@ namespace MQTTnet.Client bool allowUntrustedCertificates = false, bool ignoreCertificateChainErrors = false, bool ignoreCertificateRevocationErrors = false, - params byte[][] certificates) + byte[][] certificates = null, + SslProtocols sslProtocol = SslProtocols.Tls12, + Func certificateValidationCallback = null) { _tlsOptions = new MqttClientTlsOptions { @@ -121,7 +126,9 @@ namespace MQTTnet.Client AllowUntrustedCertificates = allowUntrustedCertificates, IgnoreCertificateChainErrors = ignoreCertificateChainErrors, IgnoreCertificateRevocationErrors = ignoreCertificateRevocationErrors, - Certificates = certificates?.ToList() + Certificates = certificates?.ToList(), + SslProtocol = sslProtocol, + CertificateValidationCallback = certificateValidationCallback }; return this; diff --git a/Source/MQTTnet/Client/MqttClientTlsOptions.cs b/Source/MQTTnet/Client/MqttClientTlsOptions.cs index 2536b23..73d65cb 100644 --- a/Source/MQTTnet/Client/MqttClientTlsOptions.cs +++ b/Source/MQTTnet/Client/MqttClientTlsOptions.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Net.Security; using System.Security.Cryptography.X509Certificates; +using System.Security.Authentication; namespace MQTTnet.Client { @@ -17,6 +18,8 @@ namespace MQTTnet.Client public List Certificates { get; set; } + public SslProtocols SslProtocol { get; set; } = SslProtocols.Tls12; + public Func CertificateValidationCallback { get; set; } } } diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs index c48a06f..d7c55a7 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs @@ -63,7 +63,7 @@ namespace MQTTnet.Implementations if (_options.TlsOptions.UseTls) { sslStream = new SslStream(new NetworkStream(_socket, true), false, InternalUserCertificateValidationCallback); - await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), SslProtocols.Tls12, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false); + await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false); } CreateStream(sslStream); From 6e995230a78d1e8efe1432dc1373fc816e149317 Mon Sep 17 00:00:00 2001 From: Marius Ruginosu Date: Wed, 18 Jul 2018 16:50:29 +0300 Subject: [PATCH 03/10] Created overload methods for WithTls default values --- .../Client/MqttClientOptionsBuilder.cs | 52 +++++++++++++++---- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs b/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs index 77c61bd..ed8ae03 100644 --- a/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs +++ b/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs @@ -112,8 +112,49 @@ namespace MQTTnet.Client return this; } + + public MqttClientOptionsBuilder WithTls() + { + return WithTls(null); + } + + public MqttClientOptionsBuilder WithTls(Func certificateValidationCallback) + { + return WithTls(SslProtocols.None, certificateValidationCallback); + } + + public MqttClientOptionsBuilder WithTls(SslProtocols sslProtocol, + Func certificateValidationCallback = null) + { + return WithTls(new byte[][] { }, sslProtocol, certificateValidationCallback); + } + + public MqttClientOptionsBuilder WithTls(byte[][] certificates, + SslProtocols sslProtocol = SslProtocols.Tls12, + Func certificateValidationCallback = null) + { + return WithTls(false, certificates, sslProtocol, certificateValidationCallback); + } + + public MqttClientOptionsBuilder WithTls(bool ignoreCertificateRevocationErrors, + byte[][] certificates = null, + SslProtocols sslProtocol = SslProtocols.Tls12, + Func certificateValidationCallback = null) + { + return WithTls(false, ignoreCertificateRevocationErrors, certificates, sslProtocol, certificateValidationCallback); + } + + public MqttClientOptionsBuilder WithTls(bool ignoreCertificateChainErrors, + bool ignoreCertificateRevocationErrors = false, + byte[][] certificates = null, + SslProtocols sslProtocol = SslProtocols.Tls12, + Func certificateValidationCallback = null) + { + return WithTls(false, ignoreCertificateChainErrors, ignoreCertificateRevocationErrors, certificates, sslProtocol, certificateValidationCallback); + } + public MqttClientOptionsBuilder WithTls( - bool allowUntrustedCertificates = false, + bool allowUntrustedCertificates, bool ignoreCertificateChainErrors = false, bool ignoreCertificateRevocationErrors = false, byte[][] certificates = null, @@ -134,15 +175,6 @@ namespace MQTTnet.Client return this; } - public MqttClientOptionsBuilder WithTls() - { - _tlsOptions = new MqttClientTlsOptions - { - UseTls = true - }; - - return this; - } public IMqttClientOptions Build() { From 55e8927ab7d292ee1a6c6dbff4804795ce1419c1 Mon Sep 17 00:00:00 2001 From: Jens-Christian Skibakk Date: Fri, 20 Jul 2018 14:13:21 +0200 Subject: [PATCH 04/10] Fix pause/resume of ClientKeepAliveMonitor --- Source/MQTTnet/Server/MqttClientSession.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 429fdb4..7879ea6 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -405,12 +405,12 @@ namespace MQTTnet.Server private void OnAdapterReadingPacketCompleted(object sender, EventArgs e) { - _keepAliveMonitor?.Pause(); + _keepAliveMonitor?.Resume(); } private void OnAdapterReadingPacketStarted(object sender, EventArgs e) { - _keepAliveMonitor?.Resume(); + _keepAliveMonitor?.Pause(); } } } From 2b10200101c8f8a985722fe152496259e687e384 Mon Sep 17 00:00:00 2001 From: Jens-Christian Skibakk Date: Sun, 22 Jul 2018 20:09:37 +0200 Subject: [PATCH 05/10] Small fix for inconsistent client name in log --- Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs index 8afb7ac..f1a209b 100644 --- a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs +++ b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs @@ -14,9 +14,9 @@ namespace MQTTnet.Server private readonly IMqttClientSession _clientSession; private readonly IMqttNetChildLogger _logger; - + private bool _isPaused; - + public MqttClientKeepAliveMonitor(IMqttClientSession clientSession, IMqttNetChildLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -76,7 +76,7 @@ namespace MQTTnet.Server { _logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientSession.ClientId); _clientSession.Stop(MqttClientDisconnectType.NotClean); - + return; } @@ -96,7 +96,7 @@ namespace MQTTnet.Server } finally { - _logger.Verbose("Client {0}: Stopped checking keep alive timeout.", _clientSession.ClientId); + _logger.Verbose("Client '{0}': Stopped checking keep alive timeout.", _clientSession.ClientId); } } } From 603639126e2c95a763eb63b0b1f655ecee66a8d8 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 24 Jul 2018 13:49:23 +0200 Subject: [PATCH 06/10] Fix issue in processing of server messages. --- .../Server/MqttClientSessionsManager.cs | 73 +++++++++++-------- 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 34080a0..48732a6 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -44,7 +44,7 @@ namespace MQTTnet.Server public void Start() { - Task.Factory.StartNew(() => ProcessQueuedApplicationMessages(_cancellationToken), _cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default); + Task.Factory.StartNew(() => TryProcessQueuedApplicationMessages(_cancellationToken), _cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default); } public void Stop() @@ -133,52 +133,67 @@ namespace MQTTnet.Server _messageQueue?.Dispose(); } - private void ProcessQueuedApplicationMessages(CancellationToken cancellationToken) + private void TryProcessQueuedApplicationMessages(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { try { - var enqueuedApplicationMessage = _messageQueue.Take(cancellationToken); - var sender = enqueuedApplicationMessage.Sender; - var applicationMessage = enqueuedApplicationMessage.PublishPacket.ToApplicationMessage(); - - var interceptorContext = InterceptApplicationMessage(sender, applicationMessage); - if (interceptorContext != null) - { - if (interceptorContext.CloseConnection) - { - enqueuedApplicationMessage.Sender.Stop(MqttClientDisconnectType.NotClean); - } - - if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish) - { - return; - } - - applicationMessage = interceptorContext.ApplicationMessage; - } + TryProcessNextQueuedApplicationMessage(cancellationToken); + } + catch (OperationCanceledException) + { + } + catch (Exception exception) + { + _logger.Error(exception, "Unhandled exception while processing queued application messages."); + } + } + } - Server.OnApplicationMessageReceived(sender?.ClientId, applicationMessage); + private void TryProcessNextQueuedApplicationMessage(CancellationToken cancellationToken) + { + try + { + var enqueuedApplicationMessage = _messageQueue.Take(cancellationToken); + var sender = enqueuedApplicationMessage.Sender; + var applicationMessage = enqueuedApplicationMessage.PublishPacket.ToApplicationMessage(); - if (applicationMessage.Retain) + var interceptorContext = InterceptApplicationMessage(sender, applicationMessage); + if (interceptorContext != null) + { + if (interceptorContext.CloseConnection) { - _retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).GetAwaiter().GetResult(); + enqueuedApplicationMessage.Sender.Stop(MqttClientDisconnectType.NotClean); } - foreach (var clientSession in GetSessions()) + if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish) { - clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage.ToPublishPacket()); + return; } + + applicationMessage = interceptorContext.ApplicationMessage; } - catch (OperationCanceledException) + + Server.OnApplicationMessageReceived(sender?.ClientId, applicationMessage); + + if (applicationMessage.Retain) { + _retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).GetAwaiter().GetResult(); } - catch (Exception exception) + + foreach (var clientSession in GetSessions()) { - _logger.Error(exception, "Unhandled exception while processing queued application message."); + clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage.ToPublishPacket()); } } + catch (OperationCanceledException) + { + } + catch (Exception exception) + { + _logger.Error(exception, "Unhandled exception while processing next queued application message."); + } } private List GetSessions() From 6b9015a9287a85182958339ae02cd18582f0be54 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 25 Jul 2018 20:18:27 +0200 Subject: [PATCH 07/10] Fix order of ClientConnected and ClientDisconnected events. --- Source/MQTTnet/Server/IMqttClientSession.cs | 2 +- Source/MQTTnet/Server/MqttClientSession.cs | 6 +-- .../Server/MqttClientSessionsManager.cs | 8 ++-- Tests/MQTTnet.Core.Tests/MqttClientTests.cs | 7 ++- .../MqttKeepAliveMonitorTests.cs | 4 +- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 48 +++++++++++++++++++ 6 files changed, 62 insertions(+), 13 deletions(-) diff --git a/Source/MQTTnet/Server/IMqttClientSession.cs b/Source/MQTTnet/Server/IMqttClientSession.cs index a94ad18..9fdb0eb 100644 --- a/Source/MQTTnet/Server/IMqttClientSession.cs +++ b/Source/MQTTnet/Server/IMqttClientSession.cs @@ -14,7 +14,7 @@ namespace MQTTnet.Server void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttPublishPacket publishPacket); void ClearPendingApplicationMessages(); - Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter); + Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter); void Stop(MqttClientDisconnectType disconnectType); Task SubscribeAsync(IList topicFilters); diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 7879ea6..fa90482 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -65,7 +65,7 @@ namespace MQTTnet.Server status.LastNonKeepAlivePacketReceived = _keepAliveMonitor.LastNonKeepAlivePacketReceived; } - public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) + public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) { if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); if (adapter == null) throw new ArgumentNullException(nameof(adapter)); @@ -129,8 +129,6 @@ namespace MQTTnet.Server _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; } - - return _wasCleanDisconnect; } public void Stop(MqttClientDisconnectType type) @@ -157,6 +155,8 @@ namespace MQTTnet.Server finally { _logger.Info("Client '{0}': Session stopped.", ClientId); + + _sessionsManager.Server.OnClientDisconnected(ClientId, _wasCleanDisconnect); } } diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 48732a6..c139151 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -125,6 +125,7 @@ namespace MQTTnet.Server { _sessions.Remove(clientId); } + _logger.Verbose("Session for client '{0}' deleted.", clientId); } @@ -207,8 +208,7 @@ namespace MQTTnet.Server private async Task RunSession(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) { var clientId = string.Empty; - var wasCleanDisconnect = false; - + try { var firstPacket = await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false); @@ -253,7 +253,7 @@ namespace MQTTnet.Server Server.OnClientConnected(clientId); - wasCleanDisconnect = await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); + await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -278,8 +278,6 @@ namespace MQTTnet.Server { DeleteSession(clientId); } - - Server.OnClientDisconnected(clientId, wasCleanDisconnect); } } diff --git a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs index 1334a80..f629f8f 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs @@ -1,15 +1,18 @@ -using System.Net.Sockets; +using System; +using System.Net.Sockets; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; +using MQTTnet.Diagnostics; using MQTTnet.Exceptions; +using MQTTnet.Implementations; +using MQTTnet.Server; namespace MQTTnet.Core.Tests { [TestClass] public class MqttClientTests { - [TestMethod] public async Task ClientDisconnectException() { diff --git a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs index 267b7bc..b18904a 100644 --- a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs @@ -60,7 +60,7 @@ namespace MQTTnet.Core.Tests { public string ClientId { get; } - public int StopCalledCount { get; set; } + public int StopCalledCount { get; private set; } public void FillStatus(MqttClientSessionStatus status) { @@ -77,7 +77,7 @@ namespace MQTTnet.Core.Tests throw new NotSupportedException(); } - public Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) + public Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) { throw new NotSupportedException(); } diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 95bb67d..58b27ba 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -484,6 +484,54 @@ namespace MQTTnet.Core.Tests Assert.IsTrue(bodyIsMatching); } + [TestMethod] + public async Task MqttServer_SameClientIdConnectDisconnectEventOrder() + { + var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); + + var connectedClient = false; + var connecteCalledBeforeConnectedClients = false; + + s.ClientConnected += (_, __) => + { + connecteCalledBeforeConnectedClients |= connectedClient; + connectedClient = true; + }; + + s.ClientDisconnected += (_, __) => + { + connectedClient = false; + }; + + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost") + .WithClientId(Guid.NewGuid().ToString()) + .Build(); + + await s.StartAsync(new MqttServerOptions()); + + var c1 = new MqttFactory().CreateMqttClient(); + var c2 = new MqttFactory().CreateMqttClient(); + + await c1.ConnectAsync(clientOptions); + + await Task.Delay(100); + + await c2.ConnectAsync(clientOptions); + + await Task.Delay(100); + + await c1.DisconnectAsync(); + await c2.DisconnectAsync(); + + await s.StopAsync(); + + await Task.Delay(100); + + Assert.IsFalse(connecteCalledBeforeConnectedClients, "ClientConnected was called before ClientDisconnect was called"); + } + private class TestStorage : IMqttServerStorage { public IList Messages = new List(); From 60bc4ebcee895c4cbf4970a8aa6a0e3c37be3c81 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 26 Jul 2018 10:35:52 +0200 Subject: [PATCH 08/10] Refactor proxy usage and support. Refactor TLS parameter usage. --- .../Client/MqttClientOptionsBuilder.cs | 91 +++++++------------ .../MqttClientOptionsBuilderTlsParameters.cs | 29 ++++++ .../Client/MqttClientWebSocketOptions.cs | 4 +- .../Client/MqttClientWebSocketProxyOptions.cs | 8 +- .../Implementations/MqttWebSocketChannel.cs | 50 +++++----- 5 files changed, 95 insertions(+), 87 deletions(-) create mode 100644 Source/MQTTnet/Client/MqttClientOptionsBuilderTlsParameters.cs diff --git a/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs b/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs index ed8ae03..c03e53a 100644 --- a/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs +++ b/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs @@ -1,8 +1,5 @@ using System; using System.Linq; -using System.Net.Security; -using System.Security.Authentication; -using System.Security.Cryptography.X509Certificates; using MQTTnet.Serializer; namespace MQTTnet.Client @@ -10,10 +7,10 @@ namespace MQTTnet.Client public class MqttClientOptionsBuilder { private readonly MqttClientOptions _options = new MqttClientOptions(); + private MqttClientTcpOptions _tcpOptions; private MqttClientWebSocketOptions _webSocketOptions; - - private MqttClientTlsOptions _tlsOptions; + private MqttClientOptionsBuilderTlsParameters _tlsParameters; public MqttClientOptionsBuilder WithProtocolVersion(MqttProtocolVersion value) { @@ -88,7 +85,7 @@ namespace MQTTnet.Client throw new InvalidOperationException("A WebSocket channel must be set if MqttClientWebSocketProxy is configured."); } - _webSocketOptions.MqttClientWebSocketProxy = new MqttClientWebSocketProxyOptions + _webSocketOptions.ProxyOptions = new MqttClientWebSocketProxyOptions { Address = address, Username = username, @@ -112,86 +109,66 @@ namespace MQTTnet.Client return this; } - - public MqttClientOptionsBuilder WithTls() - { - return WithTls(null); - } - - public MqttClientOptionsBuilder WithTls(Func certificateValidationCallback) - { - return WithTls(SslProtocols.None, certificateValidationCallback); - } - - public MqttClientOptionsBuilder WithTls(SslProtocols sslProtocol, - Func certificateValidationCallback = null) - { - return WithTls(new byte[][] { }, sslProtocol, certificateValidationCallback); - } - - public MqttClientOptionsBuilder WithTls(byte[][] certificates, - SslProtocols sslProtocol = SslProtocols.Tls12, - Func certificateValidationCallback = null) - { - return WithTls(false, certificates, sslProtocol, certificateValidationCallback); - } - - public MqttClientOptionsBuilder WithTls(bool ignoreCertificateRevocationErrors, - byte[][] certificates = null, - SslProtocols sslProtocol = SslProtocols.Tls12, - Func certificateValidationCallback = null) + public MqttClientOptionsBuilder WithTls(MqttClientOptionsBuilderTlsParameters parameters) { - return WithTls(false, ignoreCertificateRevocationErrors, certificates, sslProtocol, certificateValidationCallback); + _tlsParameters = parameters ?? throw new ArgumentNullException(nameof(parameters)); + return this; } - public MqttClientOptionsBuilder WithTls(bool ignoreCertificateChainErrors, - bool ignoreCertificateRevocationErrors = false, - byte[][] certificates = null, - SslProtocols sslProtocol = SslProtocols.Tls12, - Func certificateValidationCallback = null) + public MqttClientOptionsBuilder WithTls() { - return WithTls(false, ignoreCertificateChainErrors, ignoreCertificateRevocationErrors, certificates, sslProtocol, certificateValidationCallback); + return WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = true }); } + [Obsolete("Use method _WithTlps_ which accepts the _MqttClientOptionsBuilderTlsParameters_.")] public MqttClientOptionsBuilder WithTls( - bool allowUntrustedCertificates, + bool allowUntrustedCertificates = false, bool ignoreCertificateChainErrors = false, bool ignoreCertificateRevocationErrors = false, - byte[][] certificates = null, - SslProtocols sslProtocol = SslProtocols.Tls12, - Func certificateValidationCallback = null) + params byte[][] certificates) { - _tlsOptions = new MqttClientTlsOptions + _tlsParameters = new MqttClientOptionsBuilderTlsParameters { UseTls = true, AllowUntrustedCertificates = allowUntrustedCertificates, IgnoreCertificateChainErrors = ignoreCertificateChainErrors, IgnoreCertificateRevocationErrors = ignoreCertificateRevocationErrors, - Certificates = certificates?.ToList(), - SslProtocol = sslProtocol, - CertificateValidationCallback = certificateValidationCallback + Certificates = certificates?.ToList() }; return this; } - public IMqttClientOptions Build() { - if (_tlsOptions != null) + if (_tlsParameters != null) { if (_tcpOptions == null && _webSocketOptions == null) { throw new InvalidOperationException("A channel (TCP or WebSocket) must be set if TLS is configured."); } - if (_tcpOptions != null) - { - _tcpOptions.TlsOptions = _tlsOptions; - } - else if (_webSocketOptions != null) + if (_tlsParameters?.UseTls == true) { - _webSocketOptions.TlsOptions = _tlsOptions; + var tlsOptions = new MqttClientTlsOptions + { + UseTls = true, + SslProtocol = _tlsParameters.SslProtocol, + AllowUntrustedCertificates = _tlsParameters.AllowUntrustedCertificates, + Certificates = _tlsParameters.Certificates?.Select(c => c.ToArray()).ToList(), + CertificateValidationCallback = _tlsParameters.CertificateValidationCallback, + IgnoreCertificateChainErrors = _tlsParameters.IgnoreCertificateChainErrors, + IgnoreCertificateRevocationErrors = _tlsParameters.IgnoreCertificateRevocationErrors + }; + + if (_tcpOptions != null) + { + _tcpOptions.TlsOptions = tlsOptions; + } + else if (_webSocketOptions != null) + { + _webSocketOptions.TlsOptions = tlsOptions; + } } } diff --git a/Source/MQTTnet/Client/MqttClientOptionsBuilderTlsParameters.cs b/Source/MQTTnet/Client/MqttClientOptionsBuilderTlsParameters.cs new file mode 100644 index 0000000..5764c79 --- /dev/null +++ b/Source/MQTTnet/Client/MqttClientOptionsBuilderTlsParameters.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Net.Security; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; + +namespace MQTTnet.Client +{ + public class MqttClientOptionsBuilderTlsParameters + { + public bool UseTls { get; set; } + + public Func CertificateValidationCallback + { + get; + set; + } + + public SslProtocols SslProtocol { get; set; } = SslProtocols.Tls12; + + public IEnumerable> Certificates { get; set; } + + public bool AllowUntrustedCertificates { get; set; } + + public bool IgnoreCertificateChainErrors { get; set; } + + public bool IgnoreCertificateRevocationErrors { get; set; } + } +} diff --git a/Source/MQTTnet/Client/MqttClientWebSocketOptions.cs b/Source/MQTTnet/Client/MqttClientWebSocketOptions.cs index 2cce431..fe8f1d2 100644 --- a/Source/MQTTnet/Client/MqttClientWebSocketOptions.cs +++ b/Source/MQTTnet/Client/MqttClientWebSocketOptions.cs @@ -13,9 +13,7 @@ namespace MQTTnet.Client public CookieContainer CookieContainer { get; set; } -#if NET452 || NET461 - public MqttClientWebSocketProxyOptions MqttClientWebSocketProxy { get; set; } = new MqttClientWebSocketProxyOptions(); -#endif + public MqttClientWebSocketProxyOptions ProxyOptions { get; set; } public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); diff --git a/Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs b/Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs index 81f48da..0ac9df1 100644 --- a/Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs +++ b/Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs @@ -1,6 +1,4 @@ -#if NET452 || NET461 - -namespace MQTTnet.Client +namespace MQTTnet.Client { public class MqttClientWebSocketProxyOptions { @@ -16,6 +14,4 @@ namespace MQTTnet.Client public string[] BypassList { get; set; } } -} - -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs index cf8c183..53173d2 100644 --- a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs +++ b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs @@ -1,4 +1,5 @@ using System; +using System.Net; using System.Net.WebSockets; using System.Security.Cryptography.X509Certificates; using System.Threading; @@ -6,10 +7,6 @@ using System.Threading.Tasks; using MQTTnet.Channel; using MQTTnet.Client; -#if NET452 || NET461 - using System.Net; -#endif - namespace MQTTnet.Implementations { public class MqttWebSocketChannel : IMqttChannel @@ -49,25 +46,10 @@ namespace MQTTnet.Implementations var clientWebSocket = new ClientWebSocket(); -#if NET452 || NET461 - // use proxy if we have an address - if (string.IsNullOrEmpty(_options.MqttClientWebSocketProxy.Address) == false) + if (_options.ProxyOptions != null) { - var proxyUri = new Uri(_options.MqttClientWebSocketProxy.Address); - - // use proxy credentials if we have them - if (string.IsNullOrEmpty(_options.MqttClientWebSocketProxy.Username) == false && string.IsNullOrEmpty(_options.MqttClientWebSocketProxy.Password) == false) - { - var credentials = new NetworkCredential(_options.MqttClientWebSocketProxy.Username, _options.MqttClientWebSocketProxy.Password, _options.MqttClientWebSocketProxy.Domain); - clientWebSocket.Options.Proxy = new WebProxy(proxyUri, _options.MqttClientWebSocketProxy.BypassOnLocal, _options.MqttClientWebSocketProxy.BypassList, credentials); - } - else - { - // use proxy without credentials - clientWebSocket.Options.Proxy = new WebProxy(proxyUri, _options.MqttClientWebSocketProxy.BypassOnLocal, _options.MqttClientWebSocketProxy.BypassList); - } + clientWebSocket.Options.Proxy = CreateProxy(); } -#endif if (_options.RequestHeaders != null) { @@ -156,5 +138,31 @@ namespace MQTTnet.Implementations _webSocket = null; } } + + private IWebProxy CreateProxy() + { + if (string.IsNullOrEmpty(_options.ProxyOptions?.Address)) + { + return null; + } + +#if WINDOWS_UWP + throw new NotSupportedException("Proxies are not supported in UWP."); +#elif NETSTANDARD1_3 + throw new NotSupportedException("Proxies are not supported in netstandard 1.3."); +#else + var proxyUri = new Uri(_options.ProxyOptions.Address); + + if (!string.IsNullOrEmpty(_options.ProxyOptions.Username) && !string.IsNullOrEmpty(_options.ProxyOptions.Password)) + { + var credentials = + new NetworkCredential(_options.ProxyOptions.Username, _options.ProxyOptions.Password, _options.ProxyOptions.Domain); + + return new WebProxy(proxyUri, _options.ProxyOptions.BypassOnLocal, _options.ProxyOptions.BypassList, credentials); + } + + return new WebProxy(proxyUri, _options.ProxyOptions.BypassOnLocal, _options.ProxyOptions.BypassList); +#endif + } } } \ No newline at end of file From 0839fa575741e3fee1e96e7601ccdac90f41c140 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 26 Jul 2018 10:43:40 +0200 Subject: [PATCH 09/10] Update docs. --- Build/MQTTnet.nuspec | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 34e563b..10da9e5 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -11,11 +11,11 @@ false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). * [Core] Performance optimizations. -* [Core] Fixed a bug which prevents receiving large packets (UWP only) -* [Client] The ManagedClient options now allow configuring the interval for connection checks. -* [Server] Added the Endpoint of the Adapter (remote IP and port) to the connection validation callback. -* [Server] The ipv4 and ipv6 endpoint can be disabled now by setting the bound IP address to _None_. -* [Server] Fix a bug in the keep alive monitor which caused high CPU load (thanks to @GarageGadget). +* [Client] Added support for proxies when using web socket connections (thanks to PitySOFT). +* [Client] Refactored TLS parameter usage and added more parameters (thanks to PitySOFT). +* [Server] Fixed an issue in client keep alive checks (thanks to @jenscski). +* [Server] Changed the order of _ClientConnected_ and _ClientDisconnected_ events so that _ClientConnected_ is fired at first (thanks to @jenscski). +* [Server] Fixed an iussue which lets the server stop processing messages when using the application message interceptor (thanks to @alamsor). Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin @@ -56,6 +56,9 @@ + + + \ No newline at end of file From f70c79aaf9ab55a64dbeb4f0476a8c556b3e0c39 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 26 Jul 2018 11:34:24 +0200 Subject: [PATCH 10/10] Refactor options. --- README.md | 3 +- .../Client/MqttClientOptionsBuilder.cs | 33 ++++++++++--------- .../Client/MqttClientWebSocketProxyOptions.cs | 2 +- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 7181d72..1a34eee 100644 --- a/README.md +++ b/README.md @@ -83,9 +83,10 @@ This project also listed at Open Collective (https://opencollective.com/mqttnet) This library is used in the following projects: +* HA4IoT (Open Source Home Automation system for .NET, ) * MQTT Client Rx (Wrapper for Reactive Extensions, ) * MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester) and [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8)) -* HA4IoT (Open Source Home Automation system for .NET, ) +* Wirehome.Core (Open Source Home Automation system for .NET Core, ) If you use this library and want to see your project here please let me know. diff --git a/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs b/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs index c03e53a..85e2b4d 100644 --- a/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs +++ b/Source/MQTTnet/Client/MqttClientOptionsBuilder.cs @@ -11,6 +11,7 @@ namespace MQTTnet.Client private MqttClientTcpOptions _tcpOptions; private MqttClientWebSocketOptions _webSocketOptions; private MqttClientOptionsBuilderTlsParameters _tlsParameters; + private MqttClientWebSocketProxyOptions _proxyOptions; public MqttClientOptionsBuilder WithProtocolVersion(MqttProtocolVersion value) { @@ -76,16 +77,9 @@ namespace MQTTnet.Client return this; } -#if NET452 || NET461 - public MqttClientOptionsBuilder WithProxy(string address, string username = null, string password = null, string domain = null, bool bypassOnLocal = false, string[] bypassList = null) { - if (_webSocketOptions == null) - { - throw new InvalidOperationException("A WebSocket channel must be set if MqttClientWebSocketProxy is configured."); - } - - _webSocketOptions.ProxyOptions = new MqttClientWebSocketProxyOptions + _proxyOptions = new MqttClientWebSocketProxyOptions { Address = address, Username = username, @@ -97,7 +91,6 @@ namespace MQTTnet.Client return this; } -#endif public MqttClientOptionsBuilder WithWebSocketServer(string uri) { @@ -120,7 +113,7 @@ namespace MQTTnet.Client return WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = true }); } - [Obsolete("Use method _WithTlps_ which accepts the _MqttClientOptionsBuilderTlsParameters_.")] + [Obsolete("Use method _WithTls_ which accepts the _MqttClientOptionsBuilderTlsParameters_.")] public MqttClientOptionsBuilder WithTls( bool allowUntrustedCertificates = false, bool ignoreCertificateChainErrors = false, @@ -141,13 +134,13 @@ namespace MQTTnet.Client public IMqttClientOptions Build() { - if (_tlsParameters != null) + if (_tcpOptions == null && _webSocketOptions == null) { - if (_tcpOptions == null && _webSocketOptions == null) - { - throw new InvalidOperationException("A channel (TCP or WebSocket) must be set if TLS is configured."); - } + throw new InvalidOperationException("A channel must be set."); + } + if (_tlsParameters != null) + { if (_tlsParameters?.UseTls == true) { var tlsOptions = new MqttClientTlsOptions @@ -172,6 +165,16 @@ namespace MQTTnet.Client } } + if (_proxyOptions != null) + { + if (_webSocketOptions == null) + { + throw new InvalidOperationException("Proxies are only supported for WebSocket connections."); + } + + _webSocketOptions.ProxyOptions = _proxyOptions; + } + _options.ChannelOptions = (IMqttClientChannelOptions)_tcpOptions ?? _webSocketOptions; return _options; diff --git a/Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs b/Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs index 0ac9df1..13b1366 100644 --- a/Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs +++ b/Source/MQTTnet/Client/MqttClientWebSocketProxyOptions.cs @@ -10,7 +10,7 @@ public string Domain { get; set; } - public bool BypassOnLocal { get; set; } = true; + public bool BypassOnLocal { get; set; } public string[] BypassList { get; set; } }