From 22aab44d4970ce35a9eb480511ee301b4b4ef5e5 Mon Sep 17 00:00:00 2001 From: Kees Schollaart Date: Sun, 14 Oct 2018 11:46:32 +0200 Subject: [PATCH 01/29] Referencing my MQTT Bindings for Azure Functions Hi, if you're interested, hereby a reference to my work on MQTT Bindings for Azure Functions where I used this library. Thanks for the excellent work. More background [over here](http://case.schollaart.net/2018/09/22/mqtt-and-azure-functions.html) --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index fc2acea..a33cee7 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,7 @@ This library is used in the following projects: * MQTT Client Rx (Wrapper for Reactive Extensions, ) * MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester) and [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8)) * Wirehome.Core (Open Source Home Automation system for .NET Core, ) +* Azure Functions MQTT Bindings, [CaseOnline.Azure.WebJobs.Extensions.Mqtt](https://github.com/keesschollaart81/CaseOnline.Azure.WebJobs.Extensions.Mqtt/) If you use this library and want to see your project here please let me know. From d273148bc56571e04878e3b2aa5a459ecd5fe511 Mon Sep 17 00:00:00 2001 From: Christian Date: Sun, 14 Oct 2018 12:05:08 +0200 Subject: [PATCH 02/29] Update README.md Reorder references alphabetically. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a33cee7..dce0008 100644 --- a/README.md +++ b/README.md @@ -85,13 +85,13 @@ This project also listed at Open Collective (https://opencollective.com/mqttnet) This library is used in the following projects: +* Azure Functions MQTT Bindings, [CaseOnline.Azure.WebJobs.Extensions.Mqtt](https://github.com/keesschollaart81/CaseOnline.Azure.WebJobs.Extensions.Mqtt/) * HA4IoT (Open Source Home Automation system for .NET, ) * MQTT Client Rx (Wrapper for Reactive Extensions, ) * MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester) and [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8)) * Wirehome.Core (Open Source Home Automation system for .NET Core, ) -* Azure Functions MQTT Bindings, [CaseOnline.Azure.WebJobs.Extensions.Mqtt](https://github.com/keesschollaart81/CaseOnline.Azure.WebJobs.Extensions.Mqtt/) -If you use this library and want to see your project here please let me know. +If you use this library and want to see your project here please create a pull request. ## MIT License From af39e877035a03307f24676a218cfcf73b562ddc Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 17 Nov 2018 20:23:20 +0100 Subject: [PATCH 03/29] Refactor code. --- Source/MQTTnet/Implementations/MqttTcpChannel.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs index 3928328..58eece6 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs @@ -2,7 +2,6 @@ using System; using System.Net.Security; using System.Net.Sockets; -using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; using System.Threading.Tasks; using System.IO; @@ -17,7 +16,7 @@ namespace MQTTnet.Implementations { private readonly IMqttClientOptions _clientOptions; private readonly MqttClientTcpOptions _options; - + private Socket _socket; private Stream _stream; @@ -87,8 +86,9 @@ namespace MQTTnet.Implementations public void Dispose() { - Cleanup(ref _stream, (s) => s.Dispose()); - Cleanup(ref _socket, (s) => { + Cleanup(ref _stream, s => s.Dispose()); + Cleanup(ref _socket, s => + { if (s.Connected) { s.Shutdown(SocketShutdown.Both); @@ -102,7 +102,7 @@ namespace MQTTnet.Implementations // Try the instance callback. if (_options.TlsOptions.CertificateValidationCallback != null) { - return _options.TlsOptions.CertificateValidationCallback(x509Certificate, chain, sslPolicyErrors,_clientOptions); + return _options.TlsOptions.CertificateValidationCallback(x509Certificate, chain, sslPolicyErrors, _clientOptions); } // Try static callback. From 5f18f35e02bc4cbe736fabeed5b0a0651d488638 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 17 Nov 2018 20:23:52 +0100 Subject: [PATCH 04/29] Refactor task usage in client. --- Source/MQTTnet/Client/MqttClient.cs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index bede2d3..f6c6725 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -139,7 +139,7 @@ namespace MQTTnet.Client return subscribePacket.TopicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList(); } - public async Task UnsubscribeAsync(IEnumerable topicFilters) + public Task UnsubscribeAsync(IEnumerable topicFilters) { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); @@ -151,7 +151,7 @@ namespace MQTTnet.Client TopicFilters = topicFilters.ToList() }; - await SendAndReceiveAsync(unsubscribePacket, _cancellationTokenSource.Token).ConfigureAwait(false); + return SendAndReceiveAsync(unsubscribePacket, _cancellationTokenSource.Token); } public Task PublishAsync(MqttApplicationMessage applicationMessage) @@ -234,14 +234,14 @@ namespace MQTTnet.Client try { - await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false); - await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false); - if (_adapter != null) { await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false); } + await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false); + await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false); + _logger.Verbose("Disconnected from adapter."); } catch (Exception adapterException) @@ -320,10 +320,10 @@ namespace MQTTnet.Client private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) { - _logger.Verbose("Start sending keep alive packets."); - try { + _logger.Verbose("Start sending keep alive packets."); + while (!cancellationToken.IsCancellationRequested) { var keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75); @@ -374,10 +374,10 @@ namespace MQTTnet.Client private async Task ReceivePacketsAsync(CancellationToken cancellationToken) { - _logger.Verbose("Start receiving packets."); - try { + _logger.Verbose("Start receiving packets."); + while (!cancellationToken.IsCancellationRequested) { var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken) @@ -500,7 +500,7 @@ namespace MQTTnet.Client () => ReceivePacketsAsync(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, - TaskScheduler.Default); + TaskScheduler.Default).Unwrap(); } private void StartSendingKeepAliveMessages(CancellationToken cancellationToken) @@ -509,7 +509,7 @@ namespace MQTTnet.Client () => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, - TaskScheduler.Default); + TaskScheduler.Default).Unwrap(); } private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) From 3e7b41e712521ace2a4ef8798a6c236128cd5bac Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 17 Nov 2018 20:24:23 +0100 Subject: [PATCH 05/29] Refactor code. --- Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs index 60ce27d..563f343 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs @@ -61,13 +61,7 @@ namespace MQTTnet.TestApp.NetCore public class RandomPassword : IMqttClientCredentials { - public string Password - { - get - { - return Guid.NewGuid().ToString(); // The random password. - } - } + public string Password => Guid.NewGuid().ToString(); public string Username => "the_static_user"; } From d7b98080f504971155fc454b9505943a6a29bedf Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 18 Nov 2018 11:20:36 +0100 Subject: [PATCH 06/29] Refactor unit test for server events. --- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 30 ++++++++++++--------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index b37dfdf..4836f4d 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -4,6 +4,7 @@ using MQTTnet.Diagnostics; using MQTTnet.Protocol; using MQTTnet.Server; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; @@ -389,6 +390,8 @@ namespace MQTTnet.Core.Tests } await c2.DisconnectAsync(); + + await s.StopAsync(); } [TestMethod] @@ -655,26 +658,23 @@ namespace MQTTnet.Core.Tests [TestMethod] public async Task MqttServer_SameClientIdConnectDisconnectEventOrder() { - var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); - var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); + var s = new MqttFactory().CreateMqttServer(); - var connectedClient = false; - var connecteCalledBeforeConnectedClients = false; + var events = new ConcurrentBag(); s.ClientConnected += (_, __) => { - connecteCalledBeforeConnectedClients |= connectedClient; - connectedClient = true; + events.Add("c"); }; s.ClientDisconnected += (_, __) => { - connectedClient = false; + events.Add("d"); }; var clientOptions = new MqttClientOptionsBuilder() .WithTcpServer("localhost") - .WithClientId(Guid.NewGuid().ToString()) + .WithClientId("same_id") .Build(); await s.StartAsync(new MqttServerOptions()); @@ -684,20 +684,24 @@ namespace MQTTnet.Core.Tests await c1.ConnectAsync(clientOptions); - await Task.Delay(100); + await Task.Delay(250); await c2.ConnectAsync(clientOptions); - await Task.Delay(100); + await Task.Delay(250); await c1.DisconnectAsync(); await c2.DisconnectAsync(); + await Task.Delay(250); + + var flow = string.Join(string.Empty, events); + await s.StopAsync(); - await Task.Delay(100); + Assert.AreEqual("cdcd", flow); - Assert.IsFalse(connecteCalledBeforeConnectedClients, "ClientConnected was called before ClientDisconnect was called"); + //Assert.IsFalse(connecteCalledBeforeConnectedClients, "ClientConnected was called before ClientDisconnect was called"); } @@ -725,6 +729,8 @@ namespace MQTTnet.Core.Tests await server.StartAsync(new MqttServerOptions()); var client3 = new MqttFactory().CreateMqttClient(); await client3.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); + + await server.StopAsync(); } private class TestStorage : IMqttServerStorage From 210d57b2548fb70113947534d842986bb48a12c9 Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Mon, 19 Nov 2018 21:09:36 -0500 Subject: [PATCH 07/29] IEquatable for speed IEquatable for speed in using EqualityComparer.Default.Equals --- .../ManagedMqttApplicationMessage.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs index bba73b0..e8906e2 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs @@ -2,10 +2,15 @@ namespace MQTTnet.Extensions.ManagedClient { - public class ManagedMqttApplicationMessage + public class ManagedMqttApplicationMessage : IEquatable { public Guid Id { get; set; } = Guid.NewGuid(); public MqttApplicationMessage ApplicationMessage { get; set; } + + public bool Equals(ManagedMqttApplicationMessage other) + { + return Id.Equals(other.Id); + } } } From 9ff39c1fd920eac6d4b714278d68da65ac0e8d85 Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Mon, 19 Nov 2018 21:11:47 -0500 Subject: [PATCH 08/29] New functions This is to support the peek-and-publish model for the managed client --- Source/MQTTnet/Internal/BlockingQueue.cs | 32 ++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/Source/MQTTnet/Internal/BlockingQueue.cs b/Source/MQTTnet/Internal/BlockingQueue.cs index d6c28f4..546b646 100644 --- a/Source/MQTTnet/Internal/BlockingQueue.cs +++ b/Source/MQTTnet/Internal/BlockingQueue.cs @@ -55,6 +55,38 @@ namespace MQTTnet.Internal _gate.WaitOne(); } } + + public TItem PeekAndWait() + { + while (true) + { + lock (_syncRoot) + { + if (_items.Count > 0) + { + return _items.First.Value; + } + + if (_items.Count == 0) + { + _gate.Reset(); + } + } + + _gate.WaitOne(); + } + } + + public void RemoveFirstIfEqual(TItem item) + { + lock (_syncRoot) + { + if (_items.Count > 0 && EqualityComparer.Default.Equals(_items.First.Value, item)) + { + _items.RemoveFirst(); + } + } + } public TItem RemoveFirst() { From b2c31331b67e4eb9d153063152e38b2d95d6bb1a Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Mon, 19 Nov 2018 21:20:50 -0500 Subject: [PATCH 09/29] Peek-and-publish model We had been seeing an issue in which the queue could grow larger than the configured cap. I examined the code and saw that this could happen if _mqttClient.PublishAsync() throws an exception, in which case a message can be re-enqueued without honoring the cap. Furthermore, I saw that it was possible for the DropOldestQueuedMessage strategy to drop messages that were not actually the oldest ones, because when re-enqueueing the messages in the queue are no longer ordered by the original time they entered the queue. It made sense to us to peek at the message when publishing rather than dequeue it, so that when re-enqueueing after an exception 1) the cap is still honored and 2) the order of queued messages isn't altered. It's ok if another thread removes the message that's currently being published from the queue due to the cap, because all we have to do then is check if it's already been removed before removing it ourselves. --- .../ManagedMqttClient.cs | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 271e6b9..3568d1f 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -254,7 +254,15 @@ namespace MQTTnet.Extensions.ManagedClient { while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected) { - var message = _messageQueue.Dequeue(); + //Peek at the message without dequeueing in order to prevent the + //possibility of the queue growing beyond the configured cap. + //Previously, messages could be re-enqueued if there was an + //exception, and this re-enqueueing did not honor the cap. + //Furthermore, because re-enqueueing would shuffle the order + //of the messages, the DropOldestQueuedMessage strategy would + //be unable to know which message is actually the oldest and would + //instead drop the first item in the queue. + var message = _messageQueue.PeekAndWait(); if (message == null) { continue; @@ -284,6 +292,16 @@ namespace MQTTnet.Extensions.ManagedClient try { _mqttClient.PublishAsync(message.ApplicationMessage).GetAwaiter().GetResult(); + lock (_messageQueue) //lock to avoid conflict with this.PublishAsync + { + //While publishing this message, this.PublishAsync could have booted this + //message off the queue to make room for another (when using a cap + //with the DropOldestQueuedMessage strategy). If the first item + //in the queue is equal to this message, then it's safe to remove + //it from the queue. If not, that means this.PublishAsync has already + //removed it, in which case we don't want to do anything. + _messageQueue.RemoveFirstIfEqual(message); + } _storageManager?.RemoveAsync(message).GetAwaiter().GetResult(); } catch (MqttCommunicationException exception) @@ -292,9 +310,19 @@ namespace MQTTnet.Extensions.ManagedClient _logger.Warning(exception, $"Publishing application ({message.Id}) message failed."); - if (message.ApplicationMessage.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) + if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { - _messageQueue.Enqueue(message); + //If QoS 0, we don't want this message to stay on the queue. + //If QoS 1 or 2, it's possible that, when using a cap, this message + //has been booted off the queue by this.PublishAsync, in which case this + //thread will not continue to try to publish it. While this does + //contradict the expected behavior of QoS 1 and 2, that's also true + //for the usage of a message queue cap, so it's still consistent + //with prior behavior in that way. + lock (_messageQueue) //lock to avoid conflict with this.PublishAsync + { + _messageQueue.RemoveFirstIfEqual(message); + } } } catch (Exception exception) From 2c6c3ac6a4d070f6bbab48e2e9123f596bddad5a Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Wed, 21 Nov 2018 20:47:12 -0500 Subject: [PATCH 10/29] Changes to this file no longer needed --- .../ManagedMqttApplicationMessage.cs | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs deleted file mode 100644 index e8906e2..0000000 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System; - -namespace MQTTnet.Extensions.ManagedClient -{ - public class ManagedMqttApplicationMessage : IEquatable - { - public Guid Id { get; set; } = Guid.NewGuid(); - - public MqttApplicationMessage ApplicationMessage { get; set; } - - public bool Equals(ManagedMqttApplicationMessage other) - { - return Id.Equals(other.Id); - } - } -} From baa924121620a9d4b93adb93d876230d0cbabf07 Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Wed, 21 Nov 2018 21:35:15 -0500 Subject: [PATCH 11/29] Revert "Changes to this file no longer needed" This reverts commit 2c6c3ac6a4d070f6bbab48e2e9123f596bddad5a. --- .../ManagedMqttApplicationMessage.cs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs new file mode 100644 index 0000000..e8906e2 --- /dev/null +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs @@ -0,0 +1,16 @@ +using System; + +namespace MQTTnet.Extensions.ManagedClient +{ + public class ManagedMqttApplicationMessage : IEquatable + { + public Guid Id { get; set; } = Guid.NewGuid(); + + public MqttApplicationMessage ApplicationMessage { get; set; } + + public bool Equals(ManagedMqttApplicationMessage other) + { + return Id.Equals(other.Id); + } + } +} From d1c3d9b453d8879970266dc19450ec0db7d0ab96 Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Wed, 21 Nov 2018 21:37:01 -0500 Subject: [PATCH 12/29] Removed unnecessary code --- .../ManagedMqttApplicationMessage.cs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs index e8906e2..bba73b0 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessage.cs @@ -2,15 +2,10 @@ namespace MQTTnet.Extensions.ManagedClient { - public class ManagedMqttApplicationMessage : IEquatable + public class ManagedMqttApplicationMessage { public Guid Id { get; set; } = Guid.NewGuid(); public MqttApplicationMessage ApplicationMessage { get; set; } - - public bool Equals(ManagedMqttApplicationMessage other) - { - return Id.Equals(other.Id); - } } } From 74d59673a8173c051808ba0e24fcc83fc2ce0e23 Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Wed, 21 Nov 2018 21:38:06 -0500 Subject: [PATCH 13/29] Delegate for comparisons --- Source/MQTTnet/Internal/BlockingQueue.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet/Internal/BlockingQueue.cs b/Source/MQTTnet/Internal/BlockingQueue.cs index 546b646..2812349 100644 --- a/Source/MQTTnet/Internal/BlockingQueue.cs +++ b/Source/MQTTnet/Internal/BlockingQueue.cs @@ -77,11 +77,11 @@ namespace MQTTnet.Internal } } - public void RemoveFirstIfEqual(TItem item) + public void RemoveFirstIfEqual(TItem item, Func areEqual) { lock (_syncRoot) { - if (_items.Count > 0 && EqualityComparer.Default.Equals(_items.First.Value, item)) + if (_items.Count > 0 && areEqual(_items.First.Value, item)) { _items.RemoveFirst(); } From 31de6ee186215269ae76300f6a435f295338ddba Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Wed, 21 Nov 2018 21:39:42 -0500 Subject: [PATCH 14/29] Delegate comparison function --- .../ManagedMqttClient.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 3568d1f..da36023 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -300,7 +300,7 @@ namespace MQTTnet.Extensions.ManagedClient //in the queue is equal to this message, then it's safe to remove //it from the queue. If not, that means this.PublishAsync has already //removed it, in which case we don't want to do anything. - _messageQueue.RemoveFirstIfEqual(message); + _messageQueue.RemoveFirstIfEqual(message, IdsAreEqual); } _storageManager?.RemoveAsync(message).GetAwaiter().GetResult(); } @@ -321,7 +321,7 @@ namespace MQTTnet.Extensions.ManagedClient //with prior behavior in that way. lock (_messageQueue) //lock to avoid conflict with this.PublishAsync { - _messageQueue.RemoveFirstIfEqual(message); + _messageQueue.RemoveFirstIfEqual(message, IdsAreEqual); } } } @@ -335,6 +335,11 @@ namespace MQTTnet.Extensions.ManagedClient ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException)); } } + + private bool IdsAreEqual(ManagedMqttApplicationMessage message1, ManagedMqttApplicationMessage message2) + { + return message1.Id.Equals(message2.Id); + } private async Task SynchronizeSubscriptionsAsync() { From 2a93fc18e4f7295b6e9dd2e6b97bc5fec167f237 Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Wed, 21 Nov 2018 21:44:26 -0500 Subject: [PATCH 15/29] Made comparison function static --- Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index da36023..7647fc7 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -336,7 +336,7 @@ namespace MQTTnet.Extensions.ManagedClient } } - private bool IdsAreEqual(ManagedMqttApplicationMessage message1, ManagedMqttApplicationMessage message2) + private static bool IdsAreEqual(ManagedMqttApplicationMessage message1, ManagedMqttApplicationMessage message2) { return message1.Id.Equals(message2.Id); } From 43105f71d826e3a42e9a4f5836c08c1cd7aa097b Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 23 Nov 2018 22:56:10 +0100 Subject: [PATCH 16/29] Refactor removal from blocking queue. --- .../ManagedMqttClient.cs | 9 ++------- Source/MQTTnet/Internal/BlockingQueue.cs | 6 ++++-- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 7647fc7..29251c6 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -300,7 +300,7 @@ namespace MQTTnet.Extensions.ManagedClient //in the queue is equal to this message, then it's safe to remove //it from the queue. If not, that means this.PublishAsync has already //removed it, in which case we don't want to do anything. - _messageQueue.RemoveFirstIfEqual(message, IdsAreEqual); + _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); } _storageManager?.RemoveAsync(message).GetAwaiter().GetResult(); } @@ -321,7 +321,7 @@ namespace MQTTnet.Extensions.ManagedClient //with prior behavior in that way. lock (_messageQueue) //lock to avoid conflict with this.PublishAsync { - _messageQueue.RemoveFirstIfEqual(message, IdsAreEqual); + _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); } } } @@ -335,11 +335,6 @@ namespace MQTTnet.Extensions.ManagedClient ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException)); } } - - private static bool IdsAreEqual(ManagedMqttApplicationMessage message1, ManagedMqttApplicationMessage message2) - { - return message1.Id.Equals(message2.Id); - } private async Task SynchronizeSubscriptionsAsync() { diff --git a/Source/MQTTnet/Internal/BlockingQueue.cs b/Source/MQTTnet/Internal/BlockingQueue.cs index 2812349..485f644 100644 --- a/Source/MQTTnet/Internal/BlockingQueue.cs +++ b/Source/MQTTnet/Internal/BlockingQueue.cs @@ -77,11 +77,13 @@ namespace MQTTnet.Internal } } - public void RemoveFirstIfEqual(TItem item, Func areEqual) + public void RemoveFirst(Predicate match) { + if (match == null) throw new ArgumentNullException(nameof(match)); + lock (_syncRoot) { - if (_items.Count > 0 && areEqual(_items.First.Value, item)) + if (_items.Count > 0 && match(_items.First.Value)) { _items.RemoveFirst(); } From b3564e01a48542ada2273835273d3626b1190041 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 23 Nov 2018 22:57:34 +0100 Subject: [PATCH 17/29] Update docs. --- Build/MQTTnet.nuspec | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index e3a8d29..83e0477 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -14,9 +14,10 @@ * [Client] Fixed wrong behavior of publish method when client is disconnecting (thanks to @PaulFake). * [ManagedClient] Added max pending messages count option. * [ManagedClient] Add pending messages overflow strategy option. +* [ManagedClient] Fixed an issue which deletes the wrong message from the internal queue (thanks to @PaulFake). * [Server] Added new method which exposes all retained messages. * [Server] Removed (wrong) setter from the server options interface. -* [Server] fixed cpu spike in case a client disconnectes (issue 421). +* [Server] fixed cpu spike in case a client disconnects (issue 421). * [Server] fixed concurrent sends with Aspnetcore.Connections.Abstractions based transport. Copyright Christian Kratky 2016-2018 From 017f17a6b1ff693408af5091d11963db26ed745c Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 24 Nov 2018 20:19:16 +0100 Subject: [PATCH 18/29] Fix UnitTests. --- .../Server/MqttClientSessionsManager.cs | 11 +++++----- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 20 ++++++++++++------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 400045b..8bc800e 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -62,7 +62,7 @@ namespace MQTTnet.Server public Task StartSession(IMqttChannelAdapter clientAdapter) { - return Task.Run(() => RunSession(clientAdapter, _cancellationToken), _cancellationToken); + return Task.Run(() => RunSessionAsync(clientAdapter, _cancellationToken), _cancellationToken); } public IList GetClientStatus() @@ -205,7 +205,7 @@ namespace MQTTnet.Server } } - private async Task RunSession(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) + private async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) { var clientId = string.Empty; @@ -241,7 +241,8 @@ namespace MQTTnet.Server } var result = PrepareClientSession(connectPacket); - var clientSession = result.Session; + + Server.OnClientConnected(clientId); await clientAdapter.SendPacketAsync( new MqttConnAckPacket @@ -251,9 +252,7 @@ namespace MQTTnet.Server }, cancellationToken).ConfigureAwait(false); - Server.OnClientConnected(clientId); - - await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); + await result.Session.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); } catch (OperationCanceledException) { diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 4836f4d..2a48e99 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -660,16 +660,22 @@ namespace MQTTnet.Core.Tests { var s = new MqttFactory().CreateMqttServer(); - var events = new ConcurrentBag(); + var events = new List(); s.ClientConnected += (_, __) => { - events.Add("c"); + lock (events) + { + events.Add("c"); + } }; s.ClientDisconnected += (_, __) => { - events.Add("d"); + lock (events) + { + events.Add("d"); + } }; var clientOptions = new MqttClientOptionsBuilder() @@ -691,17 +697,17 @@ namespace MQTTnet.Core.Tests await Task.Delay(250); await c1.DisconnectAsync(); + + await Task.Delay(250); + await c2.DisconnectAsync(); await Task.Delay(250); - var flow = string.Join(string.Empty, events); - await s.StopAsync(); + var flow = string.Join(string.Empty, events); Assert.AreEqual("cdcd", flow); - - //Assert.IsFalse(connecteCalledBeforeConnectedClients, "ClientConnected was called before ClientDisconnect was called"); } From 83eca982e9e75a818a4a827db1994c4b81f75a62 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 24 Nov 2018 20:21:05 +0100 Subject: [PATCH 19/29] Update nugets. --- Build/MQTTnet.AspNetCore.nuspec | 2 +- Build/MQTTnet.Extensions.ManagedClient.nuspec | 2 +- Build/MQTTnet.Extensions.Rpc.nuspec | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Build/MQTTnet.AspNetCore.nuspec b/Build/MQTTnet.AspNetCore.nuspec index 20489c7..d68b441 100644 --- a/Build/MQTTnet.AspNetCore.nuspec +++ b/Build/MQTTnet.AspNetCore.nuspec @@ -14,7 +14,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.Extensions.ManagedClient.nuspec b/Build/MQTTnet.Extensions.ManagedClient.nuspec index 572f73d..91b6a23 100644 --- a/Build/MQTTnet.Extensions.ManagedClient.nuspec +++ b/Build/MQTTnet.Extensions.ManagedClient.nuspec @@ -14,7 +14,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.Extensions.Rpc.nuspec b/Build/MQTTnet.Extensions.Rpc.nuspec index 29cec2b..36f8965 100644 --- a/Build/MQTTnet.Extensions.Rpc.nuspec +++ b/Build/MQTTnet.Extensions.Rpc.nuspec @@ -14,7 +14,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + From ccc275902fdf98f3e8b36846b56c30b819fe00d2 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 28 Nov 2018 21:35:50 +0100 Subject: [PATCH 20/29] Fix wrong retain flag when distributing messages. --- Build/MQTTnet.AspNetCore.nuspec | 2 +- Build/MQTTnet.Extensions.ManagedClient.nuspec | 2 +- Build/MQTTnet.Extensions.Rpc.nuspec | 2 +- Build/MQTTnet.nuspec | 5 +- Source/MQTTnet/Server/MqttClientSession.cs | 9 +++- .../Server/MqttClientSessionsManager.cs | 7 ++- Tests/MQTTnet.Core.Tests/MqttClientTests.cs | 51 +++++++++++++++++-- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 14 +++-- 8 files changed, 77 insertions(+), 15 deletions(-) diff --git a/Build/MQTTnet.AspNetCore.nuspec b/Build/MQTTnet.AspNetCore.nuspec index d68b441..d59a524 100644 --- a/Build/MQTTnet.AspNetCore.nuspec +++ b/Build/MQTTnet.AspNetCore.nuspec @@ -14,7 +14,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.Extensions.ManagedClient.nuspec b/Build/MQTTnet.Extensions.ManagedClient.nuspec index 91b6a23..6fb244b 100644 --- a/Build/MQTTnet.Extensions.ManagedClient.nuspec +++ b/Build/MQTTnet.Extensions.ManagedClient.nuspec @@ -14,7 +14,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.Extensions.Rpc.nuspec b/Build/MQTTnet.Extensions.Rpc.nuspec index 36f8965..b77569e 100644 --- a/Build/MQTTnet.Extensions.Rpc.nuspec +++ b/Build/MQTTnet.Extensions.Rpc.nuspec @@ -14,7 +14,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 83e0477..f120bb8 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -17,8 +17,9 @@ * [ManagedClient] Fixed an issue which deletes the wrong message from the internal queue (thanks to @PaulFake). * [Server] Added new method which exposes all retained messages. * [Server] Removed (wrong) setter from the server options interface. -* [Server] fixed cpu spike in case a client disconnects (issue 421). -* [Server] fixed concurrent sends with Aspnetcore.Connections.Abstractions based transport. +* [Server] Fixed cpu spike in case a client disconnects (issue 421). +* [Server] Fixed concurrent sends with Aspnetcore.Connections.Abstractions based transport. +* [Server] Fixed wrong retain flag when distributing application messages (thanks to @trev0115). Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 131cd39..1204569 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -227,7 +227,7 @@ namespace MQTTnet.Server Topic = publishPacket.Topic, Payload = publishPacket.Payload, QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel, - Retain = false, + Retain = publishPacket.Retain, Dup = false }; @@ -371,7 +371,12 @@ namespace MQTTnet.Server var retainedMessages = _retainedMessagesManager.GetSubscribedMessages(topicFilters); foreach (var applicationMessage in retainedMessages) { - EnqueueApplicationMessage(null, applicationMessage.ToPublishPacket()); + var publishPacket = applicationMessage.ToPublishPacket(); + + // Set the retain flag to true according to [MQTT-3.3.1-8]. + publishPacket.Retain = true; + + EnqueueApplicationMessage(null, publishPacket); } } diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 8bc800e..28acc79 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -185,7 +185,12 @@ namespace MQTTnet.Server foreach (var clientSession in GetSessions()) { - clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage.ToPublishPacket()); + var publishPacket = applicationMessage.ToPublishPacket(); + + // Set the retain flag to true according to [MQTT-3.3.1-9]. + publishPacket.Retain = false; + + clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, publishPacket); } } catch (OperationCanceledException) diff --git a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs index 212e1ab..7d68bef 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; @@ -8,6 +10,7 @@ using MQTTnet.Client; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; using MQTTnet.Packets; +using MQTTnet.Server; namespace MQTTnet.Core.Tests { @@ -39,6 +42,45 @@ namespace MQTTnet.Core.Tests Assert.IsInstanceOfType(ex.InnerException, typeof(SocketException)); } + [TestMethod] + public async Task ClientPublish() + { + var server = new MqttFactory().CreateMqttServer(); + + try + { + var receivedMessages = new List(); + + await server.StartAsync(new MqttServerOptions()); + + var client1 = new MqttFactory().CreateMqttClient(); + client1.ApplicationMessageReceived += (_, e) => + { + lock (receivedMessages) + { + receivedMessages.Add(e.ApplicationMessage); + } + }; + + await client1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").Build()); + await client1.SubscribeAsync("a"); + + var client2 = new MqttFactory().CreateMqttClient(); + await client2.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").Build()); + var message = new MqttApplicationMessageBuilder().WithTopic("a").WithRetainFlag().Build(); + await client2.PublishAsync(message); + + await Task.Delay(500); + + Assert.AreEqual(1, receivedMessages.Count); + Assert.IsFalse(receivedMessages.First().Retain); // Must be false even if set above! + } + finally + { + await server.StopAsync(); + } + } + #if DEBUG [TestMethod] public async Task ClientCleanupOnAuthentificationFails() @@ -50,11 +92,12 @@ namespace MQTTnet.Core.Tests Task.Run(async () => { var connect = await channel2.ReceivePacketAsync(TimeSpan.Zero, CancellationToken.None); - await channel2.SendPacketAsync(new MqttConnAckPacket() { ConnectReturnCode = Protocol.MqttConnectReturnCode.ConnectionRefusedNotAuthorized }, CancellationToken.None); + await channel2.SendPacketAsync(new MqttConnAckPacket + { + ConnectReturnCode = Protocol.MqttConnectReturnCode.ConnectionRefusedNotAuthorized + }, CancellationToken.None); }); - - - + var fake = new TestMqttCommunicationAdapterFactory(channel); var client = new MqttClient(fake, new MqttNetLogger()); diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 2a48e99..b86c7b5 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -430,7 +430,7 @@ namespace MQTTnet.Core.Tests var serverAdapter = new TestMqttServerAdapter(); var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); - var receivedMessagesCount = 0; + var receivedMessages = new List(); try { await s.StartAsync(new MqttServerOptions()); @@ -440,7 +440,14 @@ namespace MQTTnet.Core.Tests await c1.DisconnectAsync(); var c2 = await serverAdapter.ConnectTestClient("c2"); - c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; + c2.ApplicationMessageReceived += (_, e) => + { + lock (receivedMessages) + { + receivedMessages.Add(e.ApplicationMessage); + } + }; + await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build()); await Task.Delay(500); @@ -450,7 +457,8 @@ namespace MQTTnet.Core.Tests await s.StopAsync(); } - Assert.AreEqual(1, receivedMessagesCount); + Assert.AreEqual(1, receivedMessages.Count); + Assert.IsTrue(receivedMessages.First().Retain); } [TestMethod] From bc1a197c4f6176594cc66c5238f872ca685bc460 Mon Sep 17 00:00:00 2001 From: Federico Di Gregorio Date: Fri, 30 Nov 2018 09:16:04 +0100 Subject: [PATCH 21/29] Align conditions in dependent project with conditions in MQTTnet --- .../MQTTnet.Extensions.ManagedClient.csproj | 3 ++- Source/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj | 3 ++- Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj | 5 +++-- Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj | 3 ++- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/MQTTnet.Extensions.ManagedClient.csproj b/Source/MQTTnet.Extensions.ManagedClient/MQTTnet.Extensions.ManagedClient.csproj index c8405a2..09dce3f 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/MQTTnet.Extensions.ManagedClient.csproj +++ b/Source/MQTTnet.Extensions.ManagedClient/MQTTnet.Extensions.ManagedClient.csproj @@ -1,7 +1,8 @@  - netstandard1.3;netstandard2.0;net452;net461 + netstandard1.3;netstandard2.0 + $(TargetFrameworks);net452;net461 $(TargetFrameworks);uap10.0 diff --git a/Source/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj b/Source/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj index c8405a2..09dce3f 100644 --- a/Source/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj +++ b/Source/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj @@ -1,7 +1,8 @@  - netstandard1.3;netstandard2.0;net452;net461 + netstandard1.3;netstandard2.0 + $(TargetFrameworks);net452;net461 $(TargetFrameworks);uap10.0 diff --git a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj index 33f0866..bfd2c39 100644 --- a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj +++ b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj @@ -3,8 +3,9 @@ Exe Full - net461 - 7.2 + net461 + netcoreapp2.1 + 7.2 diff --git a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj index af5f321..0dd56d7 100644 --- a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj +++ b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj @@ -3,7 +3,8 @@ Exe Full - netcoreapp2.1;net452;net461 + netcoreapp2.1 + $(TargetFrameworks);net452;net461 From 7292af7db4c4fbc844f38976f81172dc2b44079e Mon Sep 17 00:00:00 2001 From: Federico Di Gregorio Date: Fri, 30 Nov 2018 09:26:22 +0100 Subject: [PATCH 22/29] Added solution that does not include UWP --- MQTTnet.noUWP.sln | 222 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 MQTTnet.noUWP.sln diff --git a/MQTTnet.noUWP.sln b/MQTTnet.noUWP.sln new file mode 100644 index 0000000..8d9130e --- /dev/null +++ b/MQTTnet.noUWP.sln @@ -0,0 +1,222 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.27004.2010 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{9248C2E1-B9D6-40BF-81EC-86004D7765B4}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Source", "Source", "{32A630A7-2598-41D7-B625-204CD906F5FB}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet", "Source\MQTTnet\MQTTnet.csproj", "{3587E506-55A2-4EB3-99C7-DC01E42D25D2}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1-BC3A-420A-BE9C-FA2401431CF9}" + ProjectSection(SolutionItems) = preProject + Build\build.ps1 = Build\build.ps1 + Build\MQTTnet.AspNetCore.nuspec = Build\MQTTnet.AspNetCore.nuspec + Build\MQTTnet.Extensions.ManagedClient.nuspec = Build\MQTTnet.Extensions.ManagedClient.nuspec + Build\MQTTnet.Extensions.Rpc.nuspec = Build\MQTTnet.Extensions.Rpc.nuspec + Build\MQTTnet.nuspec = Build\MQTTnet.nuspec + Build\upload.ps1 = Build\upload.ps1 + EndProjectSection +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{B3F60ECB-45BA-4C66-8903-8BB89CA67998}" + ProjectSection(SolutionItems) = preProject + .bettercodehub.yml = .bettercodehub.yml + appveyor.yml = appveyor.yml + LICENSE = LICENSE + README.md = README.md + EndProjectSection +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.NetCore", "Tests\MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{3D283AAD-AAA8-4339-8394-52F80B6304DB}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore", "Source\MQTTnet.AspnetCore\MQTTnet.AspNetCore.csproj", "{F10C4060-F7EE-4A83-919F-FF723E72F94A}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Extensions", "Extensions", "{12816BCC-AF9E-44A9-9AE5-C246AF2A0587}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.Rpc", "Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj", "{C444E9C8-95FA-430E-9126-274129DE16CD}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Benchmarks", "Tests\MQTTnet.Benchmarks\MQTTnet.Benchmarks.csproj", "{998D04DD-7CB0-45F5-A393-E2495C16399E}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.ManagedClient", "Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj", "{C400533A-8EBA-4F0B-BF4D-295C3708604B}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore.Tests", "Tests\MQTTnet.AspNetCore.Tests\MQTTnet.AspNetCore.Tests.csproj", "{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Debug|ARM = Debug|ARM + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 + Release|Any CPU = Release|Any CPU + Release|ARM = Release|ARM + Release|x64 = Release|x64 + Release|x86 = Release|x86 + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.ActiveCfg = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.Build.0 = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.ActiveCfg = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.Build.0 = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.ActiveCfg = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.Build.0 = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.Build.0 = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.ActiveCfg = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.Build.0 = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.ActiveCfg = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.Build.0 = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.ActiveCfg = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|ARM.ActiveCfg = Debug|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|ARM.Build.0 = Debug|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|x64.ActiveCfg = Debug|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|x64.Build.0 = Debug|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|x86.ActiveCfg = Debug|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|x86.Build.0 = Debug|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|Any CPU.Build.0 = Release|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|ARM.ActiveCfg = Release|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|ARM.Build.0 = Release|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.ActiveCfg = Release|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.Build.0 = Release|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.ActiveCfg = Release|Any CPU + {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.Build.0 = Release|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|ARM.ActiveCfg = Debug|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|ARM.Build.0 = Debug|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|x64.ActiveCfg = Debug|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|x64.Build.0 = Debug|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|x86.ActiveCfg = Debug|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|x86.Build.0 = Debug|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|Any CPU.Build.0 = Release|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|ARM.ActiveCfg = Release|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|ARM.Build.0 = Release|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x64.ActiveCfg = Release|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x64.Build.0 = Release|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x86.ActiveCfg = Release|Any CPU + {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x86.Build.0 = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|ARM.ActiveCfg = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|ARM.Build.0 = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x64.ActiveCfg = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x64.Build.0 = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x86.ActiveCfg = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x86.Build.0 = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|Any CPU.Build.0 = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|ARM.ActiveCfg = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|ARM.Build.0 = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.ActiveCfg = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.Build.0 = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.ActiveCfg = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.Build.0 = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|ARM.ActiveCfg = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|ARM.Build.0 = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x64.ActiveCfg = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x64.Build.0 = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x86.ActiveCfg = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x86.Build.0 = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|Any CPU.Build.0 = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|ARM.ActiveCfg = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|ARM.Build.0 = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.ActiveCfg = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.Build.0 = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.ActiveCfg = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.Build.0 = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|ARM.ActiveCfg = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|ARM.Build.0 = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x64.ActiveCfg = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x64.Build.0 = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x86.ActiveCfg = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x86.Build.0 = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|Any CPU.Build.0 = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|ARM.ActiveCfg = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|ARM.Build.0 = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x64.ActiveCfg = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x64.Build.0 = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x86.ActiveCfg = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x86.Build.0 = Release|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|ARM.ActiveCfg = Debug|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|ARM.Build.0 = Debug|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|x64.ActiveCfg = Debug|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|x64.Build.0 = Debug|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|x86.ActiveCfg = Debug|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|x86.Build.0 = Debug|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|Any CPU.Build.0 = Release|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|ARM.ActiveCfg = Release|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|ARM.Build.0 = Release|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|x64.ActiveCfg = Release|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|x64.Build.0 = Release|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|x86.ActiveCfg = Release|Any CPU + {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|x86.Build.0 = Release|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|ARM.ActiveCfg = Debug|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|ARM.Build.0 = Debug|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|x64.ActiveCfg = Debug|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|x64.Build.0 = Debug|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|x86.ActiveCfg = Debug|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|x86.Build.0 = Debug|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|Any CPU.Build.0 = Release|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|ARM.ActiveCfg = Release|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|ARM.Build.0 = Release|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|x64.ActiveCfg = Release|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|x64.Build.0 = Release|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|x86.ActiveCfg = Release|Any CPU + {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|x86.Build.0 = Release|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|ARM.ActiveCfg = Debug|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|ARM.Build.0 = Debug|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|x64.ActiveCfg = Debug|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|x64.Build.0 = Debug|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|x86.ActiveCfg = Debug|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|x86.Build.0 = Debug|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|Any CPU.Build.0 = Release|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|ARM.ActiveCfg = Release|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|ARM.Build.0 = Release|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x64.ActiveCfg = Release|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x64.Build.0 = Release|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x86.ActiveCfg = Release|Any CPU + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x86.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} + {3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} + {3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} + {F10C4060-F7EE-4A83-919F-FF723E72F94A} = {32A630A7-2598-41D7-B625-204CD906F5FB} + {C444E9C8-95FA-430E-9126-274129DE16CD} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587} + {998D04DD-7CB0-45F5-A393-E2495C16399E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} + {C400533A-8EBA-4F0B-BF4D-295C3708604B} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587} + {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} + EndGlobalSection +EndGlobal From 0cfef0649adacb188d1598c06e3e09324ec94193 Mon Sep 17 00:00:00 2001 From: Federico Di Gregorio Date: Fri, 30 Nov 2018 09:38:10 +0100 Subject: [PATCH 23/29] Fixed appveyor configuration to use full solution --- appveyor.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/appveyor.yml b/appveyor.yml index 253e07d..43a3dd9 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -3,7 +3,7 @@ image: Visual Studio 2017 configuration: Release before_build: - cmd: >- - msbuild /t:restore + msbuild /t:restore MQTTnet.sln cd Tests/MQTTnet.TestApp.AspNetCore2/ From b091fee55472cc2f35dc07c9f863fb50d4712c16 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 1 Dec 2018 11:00:00 +0100 Subject: [PATCH 24/29] Reset the keep alive monitor if a session is being resumed from a different (or same) client. --- Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs | 6 ++++++ Source/MQTTnet/Server/MqttClientSession.cs | 9 +++++++-- Source/MQTTnet/Server/MqttClientSessionsManager.cs | 2 ++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs index f1a209b..5abc164 100644 --- a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs +++ b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs @@ -50,6 +50,12 @@ namespace MQTTnet.Server _isPaused = false; } + public void Reset() + { + _lastPacketReceivedTracker.Restart(); + _lastNonKeepAlivePacketReceivedTracker.Restart(); + } + public void PacketReceived(MqttBasePacket packet) { _lastPacketReceivedTracker.Restart(); diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 1204569..95d6d07 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -56,6 +56,11 @@ namespace MQTTnet.Server public string ClientId { get; } + public void ResumeSession() + { + _keepAliveMonitor.Reset(); + } + public void FillStatus(MqttClientSessionStatus status) { status.ClientId = ClientId; @@ -87,11 +92,11 @@ namespace MQTTnet.Server _cancellationTokenSource = new CancellationTokenSource(); - //woraround for https://github.com/dotnet/corefx/issues/24430 + //workaround for https://github.com/dotnet/corefx/issues/24430 #pragma warning disable 4014 _cleanupHandle = _cancellationTokenSource.Token.Register(() => CleanupAsync()); #pragma warning restore 4014 - //endworkaround + //end workaround _wasCleanDisconnect = false; _willMessage = connectPacket.WillMessage; diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 28acc79..b6cea7f 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -312,6 +312,8 @@ namespace MQTTnet.Server } else { + clientSession.ResumeSession(); + _logger.Verbose("Reusing existing session of client '{0}'.", connectPacket.ClientId); } } From 86c348df58e303fca5661e3654cd5a870b7a9a82 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 2 Dec 2018 21:50:16 +0100 Subject: [PATCH 25/29] Stop existing sessions before takeover from another client. --- .../Server/MqttClientKeepAliveMonitor.cs | 4 +- Source/MQTTnet/Server/MqttClientSession.cs | 71 +++++++++++-------- .../Server/MqttClientSessionsManager.cs | 27 +++---- .../Server/MqttClientSubscriptionsManager.cs | 10 +-- Source/MQTTnet/Server/MqttServer.cs | 38 +++------- .../Server/MqttServerEventDispatcher.cs | 42 +++++++++++ Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 33 +++++++++ .../MqttSubscriptionsManagerTests.cs | 12 ++-- 8 files changed, 152 insertions(+), 85 deletions(-) create mode 100644 Source/MQTTnet/Server/MqttServerEventDispatcher.cs diff --git a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs index 5abc164..fbe1f87 100644 --- a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs +++ b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs @@ -36,8 +36,8 @@ namespace MQTTnet.Server { return; } - - Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken); + + Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken).ConfigureAwait(false); } public void Pause() diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 95d6d07..9e402be 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -9,6 +9,7 @@ using MQTTnet.Exceptions; using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; +using MQTTnet.Serializer; namespace MQTTnet.Server { @@ -17,6 +18,7 @@ namespace MQTTnet.Server private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); private readonly MqttRetainedMessagesManager _retainedMessagesManager; + private readonly MqttServerEventDispatcher _eventDispatcher; private readonly MqttClientKeepAliveMonitor _keepAliveMonitor; private readonly MqttClientPendingPacketsQueue _pendingPacketsQueue; private readonly MqttClientSubscriptionsManager _subscriptionsManager; @@ -28,45 +30,44 @@ namespace MQTTnet.Server private CancellationTokenSource _cancellationTokenSource; private MqttApplicationMessage _willMessage; private bool _wasCleanDisconnect; - private IMqttChannelAdapter _adapter; private Task _workerTask; private IDisposable _cleanupHandle; + private string _adapterEndpoint; + private MqttProtocolVersion? _adapterProtocolVersion; + public MqttClientSession( string clientId, IMqttServerOptions options, MqttClientSessionsManager sessionsManager, MqttRetainedMessagesManager retainedMessagesManager, + MqttServerEventDispatcher eventDispatcher, IMqttNetChildLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); _options = options ?? throw new ArgumentNullException(nameof(options)); - _sessionsManager = sessionsManager; + _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); + _eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher)); ClientId = clientId; _logger = logger.CreateChildLogger(nameof(MqttClientSession)); _keepAliveMonitor = new MqttClientKeepAliveMonitor(this, _logger); - _subscriptionsManager = new MqttClientSubscriptionsManager(clientId, _options, sessionsManager.Server); + _subscriptionsManager = new MqttClientSubscriptionsManager(clientId, _options, eventDispatcher); _pendingPacketsQueue = new MqttClientPendingPacketsQueue(_options, this, _logger); } public string ClientId { get; } - public void ResumeSession() - { - _keepAliveMonitor.Reset(); - } - public void FillStatus(MqttClientSessionStatus status) { status.ClientId = ClientId; - status.IsConnected = _adapter != null; - status.Endpoint = _adapter?.Endpoint; - status.ProtocolVersion = _adapter?.PacketSerializer?.ProtocolVersion; + status.IsConnected = _cancellationTokenSource != null; + status.Endpoint = _adapterEndpoint; + status.ProtocolVersion = _adapterProtocolVersion; status.PendingApplicationMessagesCount = _pendingPacketsQueue.Count; status.LastPacketReceived = _keepAliveMonitor.LastPacketReceived; status.LastNonKeepAlivePacketReceived = _keepAliveMonitor.LastNonKeepAlivePacketReceived; @@ -85,7 +86,10 @@ namespace MQTTnet.Server try { - _adapter = adapter; + if (_cancellationTokenSource != null) + { + Stop(MqttClientDisconnectType.Clean, true); + } adapter.ReadingPacketStarted += OnAdapterReadingPacketStarted; adapter.ReadingPacketCompleted += OnAdapterReadingPacketCompleted; @@ -94,7 +98,7 @@ namespace MQTTnet.Server //workaround for https://github.com/dotnet/corefx/issues/24430 #pragma warning disable 4014 - _cleanupHandle = _cancellationTokenSource.Token.Register(() => CleanupAsync()); + _cleanupHandle = _cancellationTokenSource.Token.Register(() => TryDisposeAdapterAsync(adapter)); #pragma warning restore 4014 //end workaround @@ -104,6 +108,9 @@ namespace MQTTnet.Server _pendingPacketsQueue.Start(adapter, _cancellationTokenSource.Token); _keepAliveMonitor.Start(connectPacket.KeepAlivePeriod, _cancellationTokenSource.Token); + _adapterEndpoint = adapter.Endpoint; + _adapterProtocolVersion = adapter.PacketSerializer.ProtocolVersion; + while (!_cancellationTokenSource.IsCancellationRequested) { var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero, _cancellationTokenSource.Token).ConfigureAwait(false); @@ -139,30 +146,29 @@ namespace MQTTnet.Server } finally { - await CleanupAsync().ConfigureAwait(false); + _adapterEndpoint = null; + _adapterProtocolVersion = null; + + await TryDisposeAdapterAsync(adapter).ConfigureAwait(false); _cleanupHandle?.Dispose(); _cleanupHandle = null; - _adapter = null; _cancellationTokenSource?.Cancel(false); _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; } } - private async Task CleanupAsync() + private async Task TryDisposeAdapterAsync(IMqttChannelAdapter adapter) { - var adapter = _adapter; - try + if (adapter == null) { - if (adapter == null) - { - return; - } - - _adapter = null; + return; + } + try + { adapter.ReadingPacketStarted -= OnAdapterReadingPacketStarted; adapter.ReadingPacketCompleted -= OnAdapterReadingPacketCompleted; @@ -174,7 +180,13 @@ namespace MQTTnet.Server } finally { - adapter?.Dispose(); + try + { + adapter.Dispose(); + } + catch + { + } } } @@ -193,10 +205,10 @@ namespace MQTTnet.Server return; } - _wasCleanDisconnect = type == MqttClientDisconnectType.Clean; - _cancellationTokenSource?.Cancel(false); + _wasCleanDisconnect = type == MqttClientDisconnectType.Clean; + if (_willMessage != null && !_wasCleanDisconnect) { _sessionsManager.EnqueueApplicationMessage(this, _willMessage.ToPublishPacket()); @@ -211,9 +223,8 @@ namespace MQTTnet.Server } finally { - _logger.Info("Client '{0}': Session stopped.", ClientId); - - _sessionsManager.Server.OnClientDisconnected(ClientId, _wasCleanDisconnect); + _logger.Info("Client '{0}': Disconnected (clean={1}).", ClientId, _wasCleanDisconnect); + _eventDispatcher.OnClientDisconnected(ClientId, _wasCleanDisconnect); } } diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index b6cea7f..7b7e348 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -23,25 +23,29 @@ namespace MQTTnet.Server private readonly Dictionary _sessions = new Dictionary(); private readonly CancellationToken _cancellationToken; + private readonly MqttServerEventDispatcher _eventDispatcher; private readonly MqttRetainedMessagesManager _retainedMessagesManager; private readonly IMqttServerOptions _options; private readonly IMqttNetChildLogger _logger; - public MqttClientSessionsManager(IMqttServerOptions options, MqttServer server, MqttRetainedMessagesManager retainedMessagesManager, CancellationToken cancellationToken, IMqttNetChildLogger logger) + public MqttClientSessionsManager( + IMqttServerOptions options, + MqttRetainedMessagesManager retainedMessagesManager, + CancellationToken cancellationToken, + MqttServerEventDispatcher eventDispatcher, + IMqttNetChildLogger logger) { - if (logger == null) throw new ArgumentNullException(nameof(logger)); + _cancellationToken = cancellationToken; + if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttClientSessionsManager)); - _cancellationToken = cancellationToken; + _eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher)); _options = options ?? throw new ArgumentNullException(nameof(options)); - Server = server ?? throw new ArgumentNullException(nameof(server)); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); } - public MqttServer Server { get; } - public void Start() { Task.Factory.StartNew(() => TryProcessQueuedApplicationMessages(_cancellationToken), _cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default); @@ -176,7 +180,7 @@ namespace MQTTnet.Server applicationMessage = interceptorContext.ApplicationMessage; } - Server.OnApplicationMessageReceived(sender?.ClientId, applicationMessage); + _eventDispatcher.OnApplicationMessageReceived(sender?.ClientId, applicationMessage); if (applicationMessage.Retain) { @@ -247,8 +251,6 @@ namespace MQTTnet.Server var result = PrepareClientSession(connectPacket); - Server.OnClientConnected(clientId); - await clientAdapter.SendPacketAsync( new MqttConnAckPacket { @@ -257,6 +259,9 @@ namespace MQTTnet.Server }, cancellationToken).ConfigureAwait(false); + _logger.Info("Client '{0}': Connected.", clientId); + _eventDispatcher.OnClientConnected(clientId); + await result.Session.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); } catch (OperationCanceledException) @@ -312,8 +317,6 @@ namespace MQTTnet.Server } else { - clientSession.ResumeSession(); - _logger.Verbose("Reusing existing session of client '{0}'.", connectPacket.ClientId); } } @@ -323,7 +326,7 @@ namespace MQTTnet.Server { isExistingSession = false; - clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _retainedMessagesManager, _logger); + clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _retainedMessagesManager, _eventDispatcher, _logger); _sessions[connectPacket.ClientId] = clientSession; _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index 83ac033..38fad1f 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -10,14 +10,14 @@ namespace MQTTnet.Server { private readonly Dictionary _subscriptions = new Dictionary(); private readonly IMqttServerOptions _options; - private readonly MqttServer _server; + private readonly MqttServerEventDispatcher _eventDispatcher; private readonly string _clientId; - public MqttClientSubscriptionsManager(string clientId, IMqttServerOptions options, MqttServer server) + public MqttClientSubscriptionsManager(string clientId, IMqttServerOptions options, MqttServerEventDispatcher eventDispatcher) { _clientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); _options = options ?? throw new ArgumentNullException(nameof(options)); - _server = server; + _eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher)); } public MqttClientSubscribeResult Subscribe(MqttSubscribePacket subscribePacket) @@ -58,7 +58,7 @@ namespace MQTTnet.Server _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; } - _server.OnClientSubscribedTopic(_clientId, topicFilter); + _eventDispatcher.OnClientSubscribedTopic(_clientId, topicFilter); } } @@ -75,7 +75,7 @@ namespace MQTTnet.Server { _subscriptions.Remove(topicFilter); - _server.OnClientUnsubscribedTopic(_clientId, topicFilter); + _eventDispatcher.OnClientUnsubscribedTopic(_clientId, topicFilter); } } diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index a960ab0..737d756 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -11,6 +11,7 @@ namespace MQTTnet.Server { public class MqttServer : IMqttServer { + private readonly MqttServerEventDispatcher _eventDispatcher = new MqttServerEventDispatcher(); private readonly ICollection _adapters; private readonly IMqttNetChildLogger _logger; @@ -21,10 +22,16 @@ namespace MQTTnet.Server public MqttServer(IEnumerable adapters, IMqttNetChildLogger logger) { if (adapters == null) throw new ArgumentNullException(nameof(adapters)); + _adapters = adapters.ToList(); + if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttServer)); - _adapters = adapters.ToList(); + _eventDispatcher.ClientConnected += (s, e) => ClientConnected?.Invoke(s, e); + _eventDispatcher.ClientDisconnected += (s, e) => ClientDisconnected?.Invoke(s, e); + _eventDispatcher.ClientSubscribedTopic += (s, e) => ClientSubscribedTopic?.Invoke(s, e); + _eventDispatcher.ClientUnsubscribedTopic += (s, e) => ClientUnsubscribedTopic?.Invoke(s, e); + _eventDispatcher.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); } public event EventHandler Started; @@ -92,7 +99,7 @@ namespace MQTTnet.Server _retainedMessagesManager = new MqttRetainedMessagesManager(Options, _logger); await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false); - _clientSessionsManager = new MqttClientSessionsManager(Options, this, _retainedMessagesManager, _cancellationTokenSource.Token, _logger); + _clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _cancellationTokenSource.Token, _eventDispatcher, _logger); _clientSessionsManager.Start(); foreach (var adapter in _adapters) @@ -144,33 +151,6 @@ namespace MQTTnet.Server return _retainedMessagesManager?.ClearMessagesAsync(); } - internal void OnClientConnected(string clientId) - { - _logger.Info("Client '{0}': Connected.", clientId); - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientId)); - } - - internal void OnClientDisconnected(string clientId, bool wasCleanDisconnect) - { - _logger.Info("Client '{0}': Disconnected (clean={1}).", clientId, wasCleanDisconnect); - ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientId, wasCleanDisconnect)); - } - - internal void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter) - { - ClientSubscribedTopic?.Invoke(this, new MqttClientSubscribedTopicEventArgs(clientId, topicFilter)); - } - - internal void OnClientUnsubscribedTopic(string clientId, string topicFilter) - { - ClientUnsubscribedTopic?.Invoke(this, new MqttClientUnsubscribedTopicEventArgs(clientId, topicFilter)); - } - - internal void OnApplicationMessageReceived(string clientId, MqttApplicationMessage applicationMessage) - { - ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(clientId, applicationMessage)); - } - private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) { eventArgs.SessionTask = _clientSessionsManager.StartSession(eventArgs.Client); diff --git a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs new file mode 100644 index 0000000..8fd5652 --- /dev/null +++ b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs @@ -0,0 +1,42 @@ +using System; + +namespace MQTTnet.Server +{ + public class MqttServerEventDispatcher + { + public event EventHandler ClientSubscribedTopic; + + public event EventHandler ClientUnsubscribedTopic; + + public event EventHandler ClientConnected; + + public event EventHandler ClientDisconnected; + + public event EventHandler ApplicationMessageReceived; + + public void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter) + { + ClientSubscribedTopic?.Invoke(this, new MqttClientSubscribedTopicEventArgs(clientId, topicFilter)); + } + + public void OnClientUnsubscribedTopic(string clientId, string topicFilter) + { + ClientUnsubscribedTopic?.Invoke(this, new MqttClientUnsubscribedTopicEventArgs(clientId, topicFilter)); + } + + public void OnClientDisconnected(string clientId, bool wasCleanDisconnect) + { + ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientId, wasCleanDisconnect)); + } + + public void OnApplicationMessageReceived(string senderClientId, MqttApplicationMessage applicationMessage) + { + ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage)); + } + + public void OnClientConnected(string clientId) + { + ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientId)); + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index b86c7b5..39352ec 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -228,6 +228,39 @@ namespace MQTTnet.Core.Tests Assert.AreEqual(2000, receivedMessagesCount); } + + [TestMethod] + public async Task MqttServer_SessionTakeover() + { + var server = new MqttFactory().CreateMqttServer(); + try + { + await server.StartAsync(new MqttServerOptions()); + + var client1 = new MqttFactory().CreateMqttClient(); + var client2 = new MqttFactory().CreateMqttClient(); + + var options = new MqttClientOptionsBuilder() + .WithTcpServer("localhost") + .WithCleanSession(false) + .WithClientId("a").Build(); + + await client1.ConnectAsync(options); + + await Task.Delay(500); + + await client2.ConnectAsync(options); + + await Task.Delay(500); + + Assert.IsFalse(client1.IsConnected); + Assert.IsTrue(client2.IsConnected); + } + finally + { + await server.StopAsync(); + } + } private static async Task Publish(IMqttClient c1, MqttApplicationMessage message) { diff --git a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs index 268e5fe..2bdd15f 100644 --- a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs @@ -1,6 +1,4 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; -using MQTTnet.Adapter; -using MQTTnet.Diagnostics; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server; @@ -13,7 +11,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeSingleSuccess() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger(""))); + var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build()); @@ -28,7 +26,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeDifferentQoSSuccess() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger(""))); + var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce)); @@ -43,7 +41,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeTwoTimesSuccess() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger(""))); + var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)); @@ -59,7 +57,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeSingleNoSuccess() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger(""))); + var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build()); @@ -72,7 +70,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger(""))); + var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build()); From 619eaf284d5ecc39fc583be0f3e18805f3d6b479 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Mon, 3 Dec 2018 21:20:11 +0100 Subject: [PATCH 26/29] Add readonly property for client options. --- Build/MQTTnet.nuspec | 2 ++ .../IManagedMqttClient.cs | 1 + .../ManagedMqttClient.cs | 22 ++++++------- Source/MQTTnet/Client/IMqttClient.cs | 1 + Source/MQTTnet/Client/MqttClient.cs | 33 ++++++++++--------- 5 files changed, 32 insertions(+), 27 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index f120bb8..20db47b 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -12,9 +12,11 @@ MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). * [Core] Updated nuget packages due to security issues. * [Client] Fixed wrong behavior of publish method when client is disconnecting (thanks to @PaulFake). +* [Client] Added readonly property for accessing options. * [ManagedClient] Added max pending messages count option. * [ManagedClient] Add pending messages overflow strategy option. * [ManagedClient] Fixed an issue which deletes the wrong message from the internal queue (thanks to @PaulFake). +* [ManagedClient] Added readonly property for accessing options. * [Server] Added new method which exposes all retained messages. * [Server] Removed (wrong) setter from the server options interface. * [Server] Fixed cpu spike in case a client disconnects (issue 421). diff --git a/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs index d003584..b3835e3 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs @@ -10,6 +10,7 @@ namespace MQTTnet.Extensions.ManagedClient bool IsStarted { get; } bool IsConnected { get; } int PendingApplicationMessagesCount { get; } + IManagedMqttClientOptions Options { get; } event EventHandler Connected; event EventHandler Disconnected; diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 29251c6..866a75d 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -27,8 +27,7 @@ namespace MQTTnet.Extensions.ManagedClient private CancellationTokenSource _publishingCancellationToken; private ManagedMqttClientStorageManager _storageManager; - private IManagedMqttClientOptions _options; - + private bool _subscriptionsNotPushed; public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) @@ -47,6 +46,7 @@ namespace MQTTnet.Extensions.ManagedClient public bool IsConnected => _mqttClient.IsConnected; public bool IsStarted => _connectionCancellationToken != null; public int PendingApplicationMessagesCount => _messageQueue.Count; + public IManagedMqttClientOptions Options { get; private set; } public event EventHandler Connected; public event EventHandler Disconnected; @@ -70,11 +70,11 @@ namespace MQTTnet.Extensions.ManagedClient if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started."); - _options = options; + Options = options; - if (_options.Storage != null) + if (Options.Storage != null) { - _storageManager = new ManagedMqttClientStorageManager(_options.Storage); + _storageManager = new ManagedMqttClientStorageManager(Options.Storage); var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false); foreach (var message in messages) @@ -116,16 +116,16 @@ namespace MQTTnet.Extensions.ManagedClient ManagedMqttApplicationMessage removedMessage = null; lock (_messageQueue) { - if (_messageQueue.Count >= _options.MaxPendingMessages) + if (_messageQueue.Count >= Options.MaxPendingMessages) { - if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage) + if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage) { _logger.Verbose("Skipping publish of new application message because internal queue is full."); ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(applicationMessage)); return; } - if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage) + if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage) { removedMessage = _messageQueue.RemoveFirst(); _logger.Verbose("Removed oldest application message from internal queue because it is full."); @@ -219,7 +219,7 @@ namespace MQTTnet.Extensions.ManagedClient if (connectionState == ReconnectionResult.NotConnected) { StopPublishing(); - await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false); + await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false); return; } @@ -232,7 +232,7 @@ namespace MQTTnet.Extensions.ManagedClient if (connectionState == ReconnectionResult.StillConnected) { - await Task.Delay(_options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false); + await Task.Delay(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false); } } catch (OperationCanceledException) @@ -388,7 +388,7 @@ namespace MQTTnet.Extensions.ManagedClient try { - await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false); + await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false); return ReconnectionResult.Reconnected; } catch (Exception exception) diff --git a/Source/MQTTnet/Client/IMqttClient.cs b/Source/MQTTnet/Client/IMqttClient.cs index 40d1a48..b60a31a 100644 --- a/Source/MQTTnet/Client/IMqttClient.cs +++ b/Source/MQTTnet/Client/IMqttClient.cs @@ -7,6 +7,7 @@ namespace MQTTnet.Client public interface IMqttClient : IApplicationMessageReceiver, IApplicationMessagePublisher, IDisposable { bool IsConnected { get; } + IMqttClientOptions Options { get; } event EventHandler Connected; event EventHandler Disconnected; diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index f6c6725..0dc664d 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -23,7 +23,6 @@ namespace MQTTnet.Client private readonly IMqttClientAdapterFactory _adapterFactory; private readonly IMqttNetChildLogger _logger; - private IMqttClientOptions _options; private CancellationTokenSource _cancellationTokenSource; internal Task _packetReceiverTask; internal Task _keepAliveMessageSenderTask; @@ -44,6 +43,7 @@ namespace MQTTnet.Client public event EventHandler ApplicationMessageReceived; public bool IsConnected { get; private set; } + public IMqttClientOptions Options { get; private set; } public async Task ConnectAsync(IMqttClientOptions options) { @@ -54,7 +54,8 @@ namespace MQTTnet.Client try { - _options = options; + Options = options; + _packetIdentifierProvider.Reset(); _packetDispatcher.Reset(); @@ -62,8 +63,8 @@ namespace MQTTnet.Client _disconnectGate = 0; _adapter = _adapterFactory.CreateClientAdapter(options, _logger); - _logger.Verbose($"Trying to connect with server ({_options.ChannelOptions})."); - await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false); + _logger.Verbose($"Trying to connect with server ({Options.ChannelOptions})."); + await _adapter.ConnectAsync(Options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false); _logger.Verbose("Connection with server established."); StartReceivingPackets(_cancellationTokenSource.Token); @@ -73,7 +74,7 @@ namespace MQTTnet.Client _sendTracker.Restart(); - if (_options.KeepAlivePeriod != TimeSpan.Zero) + if (Options.KeepAlivePeriod != TimeSpan.Zero) { StartSendingKeepAliveMessages(_cancellationTokenSource.Token); } @@ -197,11 +198,11 @@ namespace MQTTnet.Client { var connectPacket = new MqttConnectPacket { - ClientId = _options.ClientId, - Username = _options.Credentials?.Username, - Password = _options.Credentials?.Password, - CleanSession = _options.CleanSession, - KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, + ClientId = Options.ClientId, + Username = Options.Credentials?.Username, + Password = Options.Credentials?.Password, + CleanSession = Options.CleanSession, + KeepAlivePeriod = (ushort)Options.KeepAlivePeriod.TotalSeconds, WillMessage = willApplicationMessage }; @@ -236,7 +237,7 @@ namespace MQTTnet.Client { if (_adapter != null) { - await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false); + await _adapter.DisconnectAsync(Options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false); } await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false); @@ -303,7 +304,7 @@ namespace MQTTnet.Client try { await _adapter.SendPacketAsync(requestPacket, cancellationToken).ConfigureAwait(false); - var respone = await Internal.TaskExtensions.TimeoutAfterAsync(ct => packetAwaiter.Task, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); + var respone = await Internal.TaskExtensions.TimeoutAfterAsync(ct => packetAwaiter.Task, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); return (TResponsePacket)respone; } @@ -326,10 +327,10 @@ namespace MQTTnet.Client while (!cancellationToken.IsCancellationRequested) { - var keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75); - if (_options.KeepAliveSendInterval.HasValue) + var keepAliveSendInterval = TimeSpan.FromSeconds(Options.KeepAlivePeriod.TotalSeconds * 0.75); + if (Options.KeepAliveSendInterval.HasValue) { - keepAliveSendInterval = _options.KeepAliveSendInterval.Value; + keepAliveSendInterval = Options.KeepAliveSendInterval.Value; } var waitTime = keepAliveSendInterval - _sendTracker.Elapsed; @@ -517,7 +518,7 @@ namespace MQTTnet.Client try { var applicationMessage = publishPacket.ToApplicationMessage(); - ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(_options.ClientId, applicationMessage)); + ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage)); } catch (Exception exception) { From 9b51404b4d18f59fa53c5d6da2bf91769a36f19f Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 6 Dec 2018 21:35:42 +0100 Subject: [PATCH 27/29] Add Unit Tests. --- Tests/MQTTnet.Core.Tests/MqttClientTests.cs | 53 +++++++++++++++++++-- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs index 7d68bef..181a314 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs @@ -18,7 +18,7 @@ namespace MQTTnet.Core.Tests public class MqttClientTests { [TestMethod] - public async Task ClientDisconnectException() + public async Task Client_Disconnect_Exception() { var factory = new MqttFactory(); var client = factory.CreateMqttClient(); @@ -43,7 +43,7 @@ namespace MQTTnet.Core.Tests } [TestMethod] - public async Task ClientPublish() + public async Task Client_Publish() { var server = new MqttFactory().CreateMqttServer(); @@ -81,9 +81,56 @@ namespace MQTTnet.Core.Tests } } + [TestMethod] + public async Task Publish_Special_Content() + { + var factory = new MqttFactory(); + var server = factory.CreateMqttServer(); + var serverOptions = new MqttServerOptionsBuilder().Build(); + + var receivedMessages = new List(); + + var client = factory.CreateMqttClient(); + + try + { + await server.StartAsync(serverOptions); + + client.Connected += async (s, e) => + { + await client.SubscribeAsync("RCU/P1/H0001/R0003"); + + var msg = new MqttApplicationMessageBuilder() + .WithPayload("DA|18RS00SC00XI0000RV00R100R200R300R400L100L200L300L400Y100Y200AC0102031800BELK0000BM0000|") + .WithTopic("RCU/P1/H0001/R0003"); + + await client.PublishAsync(msg.Build()); + }; + + client.ApplicationMessageReceived += (s, e) => + { + lock (receivedMessages) + { + receivedMessages.Add(e.ApplicationMessage); + } + }; + + await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); + + await Task.Delay(500); + + Assert.AreEqual(1, receivedMessages.Count); + Assert.AreEqual("DA|18RS00SC00XI0000RV00R100R200R300R400L100L200L300L400Y100Y200AC0102031800BELK0000BM0000|", receivedMessages.First().ConvertPayloadToString()); + } + finally + { + await server.StopAsync(); + } + } + #if DEBUG [TestMethod] - public async Task ClientCleanupOnAuthentificationFails() + public async Task Client_Cleanup_On_Authentification_Fails() { var channel = new TestMqttCommunicationAdapter(); var channel2 = new TestMqttCommunicationAdapter(); From ab24368de146e8fd51ad1905e11877708de2856d Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 8 Dec 2018 13:46:40 +0100 Subject: [PATCH 28/29] Update docs. --- Build/MQTTnet.AspNetCore.nuspec | 2 +- Build/MQTTnet.Extensions.ManagedClient.nuspec | 2 +- Build/MQTTnet.Extensions.Rpc.nuspec | 2 +- Build/MQTTnet.nuspec | 1 + 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Build/MQTTnet.AspNetCore.nuspec b/Build/MQTTnet.AspNetCore.nuspec index d59a524..e57035e 100644 --- a/Build/MQTTnet.AspNetCore.nuspec +++ b/Build/MQTTnet.AspNetCore.nuspec @@ -14,7 +14,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.Extensions.ManagedClient.nuspec b/Build/MQTTnet.Extensions.ManagedClient.nuspec index 6fb244b..a34c8d6 100644 --- a/Build/MQTTnet.Extensions.ManagedClient.nuspec +++ b/Build/MQTTnet.Extensions.ManagedClient.nuspec @@ -14,7 +14,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.Extensions.Rpc.nuspec b/Build/MQTTnet.Extensions.Rpc.nuspec index b77569e..bef3322 100644 --- a/Build/MQTTnet.Extensions.Rpc.nuspec +++ b/Build/MQTTnet.Extensions.Rpc.nuspec @@ -14,7 +14,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 20db47b..5814a73 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -22,6 +22,7 @@ * [Server] Fixed cpu spike in case a client disconnects (issue 421). * [Server] Fixed concurrent sends with Aspnetcore.Connections.Abstractions based transport. * [Server] Fixed wrong retain flag when distributing application messages (thanks to @trev0115). +* [Server] Fixed issue which closes a connection when reconnecting with the same client ID (thanks to @fogzot). Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin From 28b2562112c7de37d5ee2551c7f22dfc26110aed Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 9 Dec 2018 17:35:46 +0100 Subject: [PATCH 29/29] Fix client session disposal in server. --- Source/MQTTnet/Server/MqttClientSession.cs | 267 ++++++++++---------- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 50 +++- 2 files changed, 185 insertions(+), 132 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 9e402be..7a1d6fe 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -79,6 +79,129 @@ namespace MQTTnet.Server return _workerTask; } + public void Stop(MqttClientDisconnectType type) + { + Stop(type, false); + } + + public void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) + { + if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); + + var checkSubscriptionsResult = _subscriptionsManager.CheckSubscriptions(publishPacket.Topic, publishPacket.QualityOfServiceLevel); + if (!checkSubscriptionsResult.IsSubscribed) + { + return; + } + + publishPacket = new MqttPublishPacket + { + Topic = publishPacket.Topic, + Payload = publishPacket.Payload, + QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel, + Retain = publishPacket.Retain, + Dup = false + }; + + if (publishPacket.QualityOfServiceLevel > 0) + { + publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(); + } + + if (_options.ClientMessageQueueInterceptor != null) + { + var context = new MqttClientMessageQueueInterceptorContext( + senderClientSession?.ClientId, + ClientId, + publishPacket.ToApplicationMessage()); + + _options.ClientMessageQueueInterceptor?.Invoke(context); + + if (!context.AcceptEnqueue || context.ApplicationMessage == null) + { + return; + } + + publishPacket.Topic = context.ApplicationMessage.Topic; + publishPacket.Payload = context.ApplicationMessage.Payload; + publishPacket.QualityOfServiceLevel = context.ApplicationMessage.QualityOfServiceLevel; + } + + _pendingPacketsQueue.Enqueue(publishPacket); + } + + public Task SubscribeAsync(IList topicFilters) + { + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + + _subscriptionsManager.Subscribe(new MqttSubscribePacket + { + TopicFilters = topicFilters + }); + + EnqueueSubscribedRetainedMessages(topicFilters); + return Task.FromResult(0); + } + + public Task UnsubscribeAsync(IList topicFilters) + { + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + + _subscriptionsManager.Unsubscribe(new MqttUnsubscribePacket + { + TopicFilters = topicFilters + }); + + return Task.FromResult(0); + } + + public void ClearPendingApplicationMessages() + { + _pendingPacketsQueue.Clear(); + } + + public void Dispose() + { + _pendingPacketsQueue?.Dispose(); + + _cancellationTokenSource?.Cancel (); + _cancellationTokenSource?.Dispose(); + _cancellationTokenSource = null; + } + + private void Stop(MqttClientDisconnectType type, bool isInsideSession) + { + try + { + var cts = _cancellationTokenSource; + if (cts == null || cts.IsCancellationRequested) + { + return; + } + + _cancellationTokenSource?.Cancel(false); + + _wasCleanDisconnect = type == MqttClientDisconnectType.Clean; + + if (_willMessage != null && !_wasCleanDisconnect) + { + _sessionsManager.EnqueueApplicationMessage(this, _willMessage.ToPublishPacket()); + } + + _willMessage = null; + + if (!isInsideSession) + { + _workerTask?.GetAwaiter().GetResult(); + } + } + finally + { + _logger.Info("Client '{0}': Disconnected (clean={1}).", ClientId, _wasCleanDisconnect); + _eventDispatcher.OnClientDisconnected(ClientId, _wasCleanDisconnect); + } + } + private async Task RunInternalAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) { if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); @@ -98,7 +221,11 @@ namespace MQTTnet.Server //workaround for https://github.com/dotnet/corefx/issues/24430 #pragma warning disable 4014 - _cleanupHandle = _cancellationTokenSource.Token.Register(() => TryDisposeAdapterAsync(adapter)); + _cleanupHandle = _cancellationTokenSource.Token.Register(async () => + { + await TryDisconnectAdapterAsync(adapter).ConfigureAwait(false); + TryDisposeAdapter(adapter); + }); #pragma warning restore 4014 //end workaround @@ -149,7 +276,9 @@ namespace MQTTnet.Server _adapterEndpoint = null; _adapterProtocolVersion = null; - await TryDisposeAdapterAsync(adapter).ConfigureAwait(false); + // Uncomment as soon as the workaround above is no longer needed. + //await TryDisconnectAdapterAsync(adapter).ConfigureAwait(false); + //TryDisposeAdapter(adapter); _cleanupHandle?.Dispose(); _cleanupHandle = null; @@ -160,7 +289,7 @@ namespace MQTTnet.Server } } - private async Task TryDisposeAdapterAsync(IMqttChannelAdapter adapter) + private void TryDisposeAdapter(IMqttChannelAdapter adapter) { if (adapter == null) { @@ -172,145 +301,29 @@ namespace MQTTnet.Server adapter.ReadingPacketStarted -= OnAdapterReadingPacketStarted; adapter.ReadingPacketCompleted -= OnAdapterReadingPacketCompleted; - await adapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); + adapter.Dispose(); } catch (Exception exception) { - _logger.Error(exception, exception.Message); - } - finally - { - try - { - adapter.Dispose(); - } - catch - { - } - } - } - - public void Stop(MqttClientDisconnectType type) - { - Stop(type, false); - } - - private void Stop(MqttClientDisconnectType type, bool isInsideSession) - { - try - { - var cts = _cancellationTokenSource; - if (cts == null || cts.IsCancellationRequested) - { - return; - } - - _cancellationTokenSource?.Cancel(false); - - _wasCleanDisconnect = type == MqttClientDisconnectType.Clean; - - if (_willMessage != null && !_wasCleanDisconnect) - { - _sessionsManager.EnqueueApplicationMessage(this, _willMessage.ToPublishPacket()); - } - - _willMessage = null; - - if (!isInsideSession) - { - _workerTask?.GetAwaiter().GetResult(); - } - } - finally - { - _logger.Info("Client '{0}': Disconnected (clean={1}).", ClientId, _wasCleanDisconnect); - _eventDispatcher.OnClientDisconnected(ClientId, _wasCleanDisconnect); + _logger.Error(exception, "Error while disposing channel adapter."); } } - public void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) + private async Task TryDisconnectAdapterAsync(IMqttChannelAdapter adapter) { - if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); - - var checkSubscriptionsResult = _subscriptionsManager.CheckSubscriptions(publishPacket.Topic, publishPacket.QualityOfServiceLevel); - if (!checkSubscriptionsResult.IsSubscribed) + if (adapter == null) { return; } - publishPacket = new MqttPublishPacket - { - Topic = publishPacket.Topic, - Payload = publishPacket.Payload, - QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel, - Retain = publishPacket.Retain, - Dup = false - }; - - if (publishPacket.QualityOfServiceLevel > 0) + try { - publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(); + await adapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); } - - if (_options.ClientMessageQueueInterceptor != null) + catch (Exception exception) { - var context = new MqttClientMessageQueueInterceptorContext( - senderClientSession?.ClientId, - ClientId, - publishPacket.ToApplicationMessage()); - - _options.ClientMessageQueueInterceptor?.Invoke(context); - - if (!context.AcceptEnqueue || context.ApplicationMessage == null) - { - return; - } - - publishPacket.Topic = context.ApplicationMessage.Topic; - publishPacket.Payload = context.ApplicationMessage.Payload; - publishPacket.QualityOfServiceLevel = context.ApplicationMessage.QualityOfServiceLevel; + _logger.Error(exception, "Error while disconnecting channel adapter."); } - - _pendingPacketsQueue.Enqueue(publishPacket); - } - - public Task SubscribeAsync(IList topicFilters) - { - if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - - _subscriptionsManager.Subscribe(new MqttSubscribePacket - { - TopicFilters = topicFilters - }); - - EnqueueSubscribedRetainedMessages(topicFilters); - return Task.FromResult(0); - } - - public Task UnsubscribeAsync(IList topicFilters) - { - if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - - _subscriptionsManager.Unsubscribe(new MqttUnsubscribePacket - { - TopicFilters = topicFilters - }); - - return Task.FromResult(0); - } - - public void ClearPendingApplicationMessages() - { - _pendingPacketsQueue.Clear(); - } - - public void Dispose() - { - _pendingPacketsQueue?.Dispose(); - - _cancellationTokenSource?.Cancel (); - _cancellationTokenSource?.Dispose(); - _cancellationTokenSource = null; } private void ProcessReceivedPacket(IMqttChannelAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken) diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 39352ec..457de92 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -301,11 +301,6 @@ namespace MQTTnet.Core.Tests [TestMethod] public async Task MqttServer_HandleCleanDisconnect() { - MqttNetGlobalLogger.LogMessagePublished += (_, e) => - { - System.Diagnostics.Debug.WriteLine($"[{e.TraceMessage.Timestamp:s}] {e.TraceMessage.Source} {e.TraceMessage.Message}"); - }; - var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); @@ -338,6 +333,51 @@ namespace MQTTnet.Core.Tests Assert.AreEqual(clientConnectedCalled, clientDisconnectedCalled); } + [TestMethod] + public async Task MqttServer_Client_Disconnect_Without_Errors() + { + var errors = 0; + + MqttNetGlobalLogger.LogMessagePublished += (_, e) => + { + System.Diagnostics.Debug.WriteLine($"[{e.TraceMessage.Timestamp:s}] {e.TraceMessage.Source} {e.TraceMessage.Message}"); + + if (e.TraceMessage.Level == MqttNetLogLevel.Error) + { + errors++; + } + }; + + bool clientWasConnected; + + var server = new MqttFactory().CreateMqttServer(); + try + { + var options = new MqttServerOptionsBuilder().Build(); + await server.StartAsync(options); + + var client = new MqttFactory().CreateMqttClient(); + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost") + .Build(); + + await client.ConnectAsync(clientOptions); + + clientWasConnected = true; + + await client.DisconnectAsync(); + + await Task.Delay(500); + } + finally + { + await server.StopAsync(); + } + + Assert.IsTrue(clientWasConnected); + Assert.AreEqual(0, errors); + } + [TestMethod] public async Task MqttServer_LotsOfRetainedMessages() {