From 447ca6cabbc9f5512d147ef38873638ff9db053d Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 5 Aug 2017 11:37:52 +0200 Subject: [PATCH] Add interfaces, optimize performance, fix QoS level 2 --- .../MQTTnet.NetFramework/MqttClientFactory.cs | 2 +- .../MQTTnet.NetFramework/MqttServerFactory.cs | 2 +- .../MQTTnet.NetStandard/MqttClientFactory.cs | 2 +- .../MQTTnet.NetStandard/MqttServerFactory.cs | 2 +- .../MqttClientFactory.cs | 2 +- .../MqttServerFactory.cs | 2 +- MQTTnet.Core/Client/IMqttClient.cs | 24 +++++++ MQTTnet.Core/Client/MqttClient.cs | 47 ++++++------- ...tifier.cs => IMqttPacketWithIdentifier.cs} | 2 +- MQTTnet.Core/Packets/MqttBasePacket.cs | 21 +----- MQTTnet.Core/Packets/MqttBasePublishPacket.cs | 2 +- MQTTnet.Core/Packets/MqttPacketExtensions.cs | 27 ++++++++ MQTTnet.Core/Packets/MqttSubAckPacket.cs | 2 +- MQTTnet.Core/Packets/MqttSubscribePacket.cs | 2 +- MQTTnet.Core/Packets/MqttUnsubAckPacket.cs | 2 +- MQTTnet.Core/Packets/MqttUnsubscribe.cs | 2 +- MQTTnet.Core/Server/IMqttServer.cs | 18 +++++ MQTTnet.Core/Server/MqttClientMessageQueue.cs | 7 +- .../Server/MqttClientPublishPacketContext.cs | 5 +- MQTTnet.Core/Server/MqttClientSession.cs | 68 ++++++++++--------- .../Server/MqttClientSessionsManager.cs | 22 ++++-- .../Server/MqttClientSubscriptionsManager.cs | 45 +++++++----- MQTTnet.Core/Server/MqttServer.cs | 10 ++- ...cs => MqttServerDefaultEndpointOptions.cs} | 2 +- MQTTnet.Core/Server/MqttServerOptions.cs | 4 +- .../MqttServerTlsEndpointOptionsExtensions.cs | 19 ------ .../MainPage.xaml.cs | 2 +- 27 files changed, 198 insertions(+), 147 deletions(-) create mode 100644 MQTTnet.Core/Client/IMqttClient.cs rename MQTTnet.Core/Packets/{IPacketWithPacketIdentifier.cs => IMqttPacketWithIdentifier.cs} (67%) create mode 100644 MQTTnet.Core/Packets/MqttPacketExtensions.cs create mode 100644 MQTTnet.Core/Server/IMqttServer.cs rename MQTTnet.Core/Server/{DefaultEndpointOptions.cs => MqttServerDefaultEndpointOptions.cs} (71%) delete mode 100644 MQTTnet.Core/Server/MqttServerTlsEndpointOptionsExtensions.cs diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs index 7b84239..ac12f97 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet { public class MqttClientFactory { - public MqttClient CreateMqttClient(MqttClientOptions options) + public IMqttClient CreateMqttClient(MqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs index eb7441c..00da03a 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet { public class MqttServerFactory { - public MqttServer CreateMqttServer(MqttServerOptions options) + public IMqttServer CreateMqttServer(MqttServerOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index ac6f611..6073583 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet { public class MqttClientFactory { - public MqttClient CreateMqttClient(MqttClientOptions options) + public IMqttClient CreateMqttClient(MqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs index eb7441c..00da03a 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet { public class MqttServerFactory { - public MqttServer CreateMqttServer(MqttServerOptions options) + public IMqttServer CreateMqttServer(MqttServerOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index 7b84239..ac12f97 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet { public class MqttClientFactory { - public MqttClient CreateMqttClient(MqttClientOptions options) + public IMqttClient CreateMqttClient(MqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs index eb7441c..00da03a 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet { public class MqttServerFactory { - public MqttServer CreateMqttServer(MqttServerOptions options) + public IMqttServer CreateMqttServer(MqttServerOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/MQTTnet.Core/Client/IMqttClient.cs b/MQTTnet.Core/Client/IMqttClient.cs new file mode 100644 index 0000000..0170adf --- /dev/null +++ b/MQTTnet.Core/Client/IMqttClient.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using MQTTnet.Core.Packets; + +namespace MQTTnet.Core.Client +{ + public interface IMqttClient + { + bool IsConnected { get; } + + event EventHandler ApplicationMessageReceived; + event EventHandler Connected; + event EventHandler Disconnected; + + Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null); + Task DisconnectAsync(); + Task PublishAsync(MqttApplicationMessage applicationMessage); + Task> SubscribeAsync(IList topicFilters); + Task> SubscribeAsync(params TopicFilter[] topicFilters); + Task Unsubscribe(IList topicFilters); + Task Unsubscribe(params string[] topicFilters); + } +} \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 3cec8d8..384de1f 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -13,10 +12,9 @@ using MQTTnet.Core.Protocol; namespace MQTTnet.Core.Client { - public class MqttClient + public class MqttClient : IMqttClient { - private readonly ConcurrentDictionary _pendingExactlyOncePublishPackets = new ConcurrentDictionary(); - private readonly HashSet _processedPublishPackets = new HashSet(); + private readonly HashSet _unacknowledgedPublishPackets = new HashSet(); private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); private readonly MqttClientOptions _options; @@ -63,7 +61,6 @@ namespace MQTTnet.Core.Client _cancellationTokenSource = new CancellationTokenSource(); _latestPacketIdentifier = 0; - _processedPublishPackets.Clear(); _packetDispatcher.Reset(); IsConnected = true; @@ -105,6 +102,7 @@ namespace MQTTnet.Core.Client { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); if (!topicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3]."); + ThrowIfNotConnected(); var subscribePacket = new MqttSubscribePacket @@ -154,6 +152,7 @@ namespace MQTTnet.Core.Client if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { + // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] await SendAsync(publishPacket); } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) @@ -164,8 +163,8 @@ namespace MQTTnet.Core.Client else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - await SendAndReceiveAsync(publishPacket); - await SendAsync(publishPacket.CreateResponse()); + var pubRecPacket = await SendAndReceiveAsync(publishPacket); + await SendAndReceiveAsync(pubRecPacket.CreateResponse()); } } @@ -208,14 +207,12 @@ namespace MQTTnet.Core.Client return DisconnectAsync(); } - var publishPacket = mqttPacket as MqttPublishPacket; - if (publishPacket != null) + if (mqttPacket is MqttPublishPacket publishPacket) { return ProcessReceivedPublishPacket(publishPacket); } - var pubRelPacket = mqttPacket as MqttPubRelPacket; - if (pubRelPacket != null) + if (mqttPacket is MqttPubRelPacket pubRelPacket) { return ProcessReceivedPubRelPacket(pubRelPacket); } @@ -232,11 +229,6 @@ namespace MQTTnet.Core.Client private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) { - if (publishPacket.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtMostOnce) - { - _processedPublishPackets.Add(publishPacket.PacketIdentifier); - } - var applicationMessage = publishPacket.ToApplicationMessage(); try @@ -265,7 +257,13 @@ namespace MQTTnet.Core.Client if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { - _pendingExactlyOncePublishPackets[publishPacket.PacketIdentifier] = publishPacket; + // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] + lock (_unacknowledgedPublishPackets) + { + _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier); + } + + FireApplicationMessageReceivedEvent(publishPacket); return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } @@ -274,15 +272,12 @@ namespace MQTTnet.Core.Client private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) { - MqttPublishPacket originalPublishPacket; - if (!_pendingExactlyOncePublishPackets.TryRemove(pubRelPacket.PacketIdentifier, out originalPublishPacket)) + lock (_unacknowledgedPublishPackets) { - throw new MqttCommunicationException(); + _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - - await SendAsync(originalPublishPacket.CreateResponse()); - - FireApplicationMessageReceivedEvent(originalPublishPacket); + + await SendAsync(pubRelPacket.CreateResponse()); } private Task SendAsync(MqttBasePacket packet) @@ -300,8 +295,8 @@ namespace MQTTnet.Core.Client return false; } - var pi1 = requestPacket as IPacketWithIdentifier; - var pi2 = p as IPacketWithIdentifier; + var pi1 = requestPacket as IMqttPacketWithIdentifier; + var pi2 = p as IMqttPacketWithIdentifier; if (pi1 != null && pi2 != null) { diff --git a/MQTTnet.Core/Packets/IPacketWithPacketIdentifier.cs b/MQTTnet.Core/Packets/IMqttPacketWithIdentifier.cs similarity index 67% rename from MQTTnet.Core/Packets/IPacketWithPacketIdentifier.cs rename to MQTTnet.Core/Packets/IMqttPacketWithIdentifier.cs index 128f3f5..420955c 100644 --- a/MQTTnet.Core/Packets/IPacketWithPacketIdentifier.cs +++ b/MQTTnet.Core/Packets/IMqttPacketWithIdentifier.cs @@ -1,6 +1,6 @@ namespace MQTTnet.Core.Packets { - public interface IPacketWithIdentifier + public interface IMqttPacketWithIdentifier { ushort PacketIdentifier { get; set; } } diff --git a/MQTTnet.Core/Packets/MqttBasePacket.cs b/MQTTnet.Core/Packets/MqttBasePacket.cs index 2a167cb..41901e5 100644 --- a/MQTTnet.Core/Packets/MqttBasePacket.cs +++ b/MQTTnet.Core/Packets/MqttBasePacket.cs @@ -1,25 +1,6 @@ -using System; - -namespace MQTTnet.Core.Packets +namespace MQTTnet.Core.Packets { public abstract class MqttBasePacket { - public TResponsePacket CreateResponse() - { - var responsePacket = Activator.CreateInstance(); - var responsePacketWithIdentifier = responsePacket as IPacketWithIdentifier; - if (responsePacketWithIdentifier != null) - { - var requestPacketWithIdentifier = this as IPacketWithIdentifier; - if (requestPacketWithIdentifier == null) - { - throw new InvalidOperationException("Response packet has PacketIdentifier but request packet does not."); - } - - responsePacketWithIdentifier.PacketIdentifier = requestPacketWithIdentifier.PacketIdentifier; - } - - return responsePacket; - } } } diff --git a/MQTTnet.Core/Packets/MqttBasePublishPacket.cs b/MQTTnet.Core/Packets/MqttBasePublishPacket.cs index ff57003..67d55b9 100644 --- a/MQTTnet.Core/Packets/MqttBasePublishPacket.cs +++ b/MQTTnet.Core/Packets/MqttBasePublishPacket.cs @@ -1,6 +1,6 @@ namespace MQTTnet.Core.Packets { - public class MqttBasePublishPacket : MqttBasePacket, IPacketWithIdentifier + public class MqttBasePublishPacket : MqttBasePacket, IMqttPacketWithIdentifier { public ushort PacketIdentifier { get; set; } } diff --git a/MQTTnet.Core/Packets/MqttPacketExtensions.cs b/MQTTnet.Core/Packets/MqttPacketExtensions.cs new file mode 100644 index 0000000..fef1054 --- /dev/null +++ b/MQTTnet.Core/Packets/MqttPacketExtensions.cs @@ -0,0 +1,27 @@ +using System; + +namespace MQTTnet.Core.Packets +{ + public static class MqttPacketExtensions + { + public static TResponsePacket CreateResponse(this MqttBasePacket packet) + { + if (packet == null) throw new ArgumentNullException(nameof(packet)); + + var responsePacket = Activator.CreateInstance(); + + if (responsePacket is IMqttPacketWithIdentifier responsePacketWithIdentifier) + { + var requestPacketWithIdentifier = packet as IMqttPacketWithIdentifier; + if (requestPacketWithIdentifier == null) + { + throw new InvalidOperationException("Response packet has PacketIdentifier but request packet does not."); + } + + responsePacketWithIdentifier.PacketIdentifier = requestPacketWithIdentifier.PacketIdentifier; + } + + return responsePacket; + } + } +} diff --git a/MQTTnet.Core/Packets/MqttSubAckPacket.cs b/MQTTnet.Core/Packets/MqttSubAckPacket.cs index 3a7265f..f63d0ca 100644 --- a/MQTTnet.Core/Packets/MqttSubAckPacket.cs +++ b/MQTTnet.Core/Packets/MqttSubAckPacket.cs @@ -4,7 +4,7 @@ using MQTTnet.Core.Protocol; namespace MQTTnet.Core.Packets { - public sealed class MqttSubAckPacket : MqttBasePacket, IPacketWithIdentifier + public sealed class MqttSubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier { public ushort PacketIdentifier { get; set; } diff --git a/MQTTnet.Core/Packets/MqttSubscribePacket.cs b/MQTTnet.Core/Packets/MqttSubscribePacket.cs index a0949e8..007bde7 100644 --- a/MQTTnet.Core/Packets/MqttSubscribePacket.cs +++ b/MQTTnet.Core/Packets/MqttSubscribePacket.cs @@ -3,7 +3,7 @@ using System.Linq; namespace MQTTnet.Core.Packets { - public sealed class MqttSubscribePacket : MqttBasePacket, IPacketWithIdentifier + public sealed class MqttSubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier { public ushort PacketIdentifier { get; set; } diff --git a/MQTTnet.Core/Packets/MqttUnsubAckPacket.cs b/MQTTnet.Core/Packets/MqttUnsubAckPacket.cs index 5f33095..57a5a7d 100644 --- a/MQTTnet.Core/Packets/MqttUnsubAckPacket.cs +++ b/MQTTnet.Core/Packets/MqttUnsubAckPacket.cs @@ -1,6 +1,6 @@ namespace MQTTnet.Core.Packets { - public sealed class MqttUnsubAckPacket : MqttBasePacket, IPacketWithIdentifier + public sealed class MqttUnsubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier { public ushort PacketIdentifier { get; set; } } diff --git a/MQTTnet.Core/Packets/MqttUnsubscribe.cs b/MQTTnet.Core/Packets/MqttUnsubscribe.cs index 3e3f36f..b6cfab6 100644 --- a/MQTTnet.Core/Packets/MqttUnsubscribe.cs +++ b/MQTTnet.Core/Packets/MqttUnsubscribe.cs @@ -2,7 +2,7 @@ namespace MQTTnet.Core.Packets { - public sealed class MqttUnsubscribePacket : MqttBasePacket, IPacketWithIdentifier + public sealed class MqttUnsubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier { public ushort PacketIdentifier { get; set; } diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs new file mode 100644 index 0000000..0b91c26 --- /dev/null +++ b/MQTTnet.Core/Server/IMqttServer.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using MQTTnet.Core.Adapter; + +namespace MQTTnet.Core.Server +{ + public interface IMqttServer + { + event EventHandler ApplicationMessageReceived; + event EventHandler ClientConnected; + + IList GetConnectedClients(); + void InjectClient(string identifier, IMqttCommunicationAdapter adapter); + void Publish(MqttApplicationMessage applicationMessage); + void Start(); + void Stop(); + } +} \ No newline at end of file diff --git a/MQTTnet.Core/Server/MqttClientMessageQueue.cs b/MQTTnet.Core/Server/MqttClientMessageQueue.cs index 926e581..07c6ea5 100644 --- a/MQTTnet.Core/Server/MqttClientMessageQueue.cs +++ b/MQTTnet.Core/Server/MqttClientMessageQueue.cs @@ -35,7 +35,7 @@ namespace MQTTnet.Core.Server _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); _cancellationTokenSource = new CancellationTokenSource(); - Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token)); + Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); } public void Stop() @@ -45,14 +45,13 @@ namespace MQTTnet.Core.Server _cancellationTokenSource = null; } - public void Enqueue(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) + public void Enqueue(MqttPublishPacket publishPacket) { - if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession)); if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); lock (_pendingPublishPackets) { - _pendingPublishPackets.Add(new MqttClientPublishPacketContext(senderClientSession, publishPacket)); + _pendingPublishPackets.Add(new MqttClientPublishPacketContext(publishPacket)); _gate.Set(); } } diff --git a/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs b/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs index 8847c70..9855390 100644 --- a/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs +++ b/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs @@ -5,14 +5,11 @@ namespace MQTTnet.Core.Server { public sealed class MqttClientPublishPacketContext { - public MqttClientPublishPacketContext(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) + public MqttClientPublishPacketContext(MqttPublishPacket publishPacket) { - SenderClientSession = senderClientSession ?? throw new ArgumentNullException(nameof(senderClientSession)); PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket)); } - public MqttClientSession SenderClientSession { get; } - public MqttPublishPacket PublishPacket { get; } public int SendTries { get; set; } diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index b89e73e..a3919f9 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -1,5 +1,5 @@ using System; -using System.Collections.Concurrent; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; @@ -13,13 +13,13 @@ namespace MQTTnet.Core.Server { public sealed class MqttClientSession : IDisposable { - private readonly ConcurrentDictionary _pendingIncomingPublications = new ConcurrentDictionary(); + private readonly HashSet _unacknowledgedPublishPackets = new HashSet(); private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager(); private readonly MqttClientMessageQueue _messageQueue; private readonly Action _publishPacketReceivedCallback; private readonly MqttServerOptions _options; - + private CancellationTokenSource _cancellationTokenSource; private IMqttCommunicationAdapter _adapter; private string _identifier; @@ -79,17 +79,16 @@ namespace MQTTnet.Core.Server } } - public void EnqueuePublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) + public void EnqueuePublishPacket(MqttPublishPacket publishPacket) { - if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession)); if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); - if (!_subscriptionsManager.IsTopicSubscribed(publishPacket)) + if (!_subscriptionsManager.IsSubscribed(publishPacket)) { return; } - _messageQueue.Enqueue(senderClientSession, publishPacket); + _messageQueue.Enqueue(publishPacket); MqttTrace.Verbose(nameof(MqttClientSession), $"Client '{_identifier}: Enqueued pending publish packet."); } @@ -101,34 +100,35 @@ namespace MQTTnet.Core.Server private Task HandleIncomingPacketAsync(MqttBasePacket packet) { - var subscribePacket = packet as MqttSubscribePacket; - if (subscribePacket != null) + if (packet is MqttSubscribePacket subscribePacket) { return _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout); } - var unsubscribePacket = packet as MqttUnsubscribePacket; - if (unsubscribePacket != null) + if (packet is MqttUnsubscribePacket unsubscribePacket) { return _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout); } - var publishPacket = packet as MqttPublishPacket; - if (publishPacket != null) + if (packet is MqttPublishPacket publishPacket) { return HandleIncomingPublishPacketAsync(publishPacket); } - var pubRelPacket = packet as MqttPubRelPacket; - if (pubRelPacket != null) + if (packet is MqttPubRelPacket pubRelPacket) { return HandleIncomingPubRelPacketAsync(pubRelPacket); } - var pubAckPacket = packet as MqttPubAckPacket; - if (pubAckPacket != null) + if (packet is MqttPubRecPacket pubRecPacket) + { + return _adapter.SendPacketAsync(pubRecPacket.CreateResponse(), _options.DefaultCommunicationTimeout); + } + + if (packet is MqttPubAckPacket || packet is MqttPubCompPacket) { - return HandleIncomingPubAckPacketAsync(pubAckPacket); + // Discard message. + return Task.FromResult((object)null); } if (packet is MqttPingReqPacket) @@ -148,12 +148,7 @@ namespace MQTTnet.Core.Server return Task.FromResult((object)null); } - private async Task HandleIncomingPubAckPacketAsync(MqttPubAckPacket pubAckPacket) - { - await Task.FromResult((object)null); - } - - private async Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket) + private Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket) { if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { @@ -161,26 +156,33 @@ namespace MQTTnet.Core.Server } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { - await _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); _publishPacketReceivedCallback(this, publishPacket); + return _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { - _pendingIncomingPublications[publishPacket.PacketIdentifier] = publishPacket; - await _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] + lock (_unacknowledgedPublishPackets) + { + _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier); + } + + _publishPacketReceivedCallback(this, publishPacket); + + return _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } + + throw new MqttCommunicationException("Received not supported QoS level."); } - private async Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket) + private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket) { - MqttPublishPacket publishPacket; - if (!_pendingIncomingPublications.TryRemove(pubRelPacket.PacketIdentifier, out publishPacket)) + lock (_unacknowledgedPublishPackets) { - return; + _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - await _adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); - _publishPacketReceivedCallback(this, publishPacket); + return _adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); } } } diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index ae4d865..f78be0a 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -22,7 +22,7 @@ namespace MQTTnet.Core.Server _options = options ?? throw new ArgumentNullException(nameof(options)); } - public event EventHandler ApplicationMessageReceived; + public event EventHandler ApplicationMessageReceived; public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs) { @@ -127,14 +127,24 @@ namespace MQTTnet.Core.Server } } - private void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) + public void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) { - var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession.ClientId, publishPacket.ToApplicationMessage()); - ApplicationMessageReceived?.Invoke(this, eventArgs); + try + { + var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession?.ClientId, publishPacket.ToApplicationMessage()); + ApplicationMessageReceived?.Invoke(this, eventArgs); + } + catch (Exception exception) + { + MqttTrace.Error(nameof(MqttClientSessionsManager), exception, "Error while processing application message"); + } - foreach (var clientSession in _clientSessions.Values.ToList()) + lock (_syncRoot) { - clientSession.EnqueuePublishPacket(senderClientSession, publishPacket); + foreach (var clientSession in _clientSessions.Values.ToList()) + { + clientSession.EnqueuePublishPacket(publishPacket); + } } } } diff --git a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs index 8cb9563..390f8f3 100644 --- a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs @@ -1,5 +1,5 @@ using System; -using System.Collections.Concurrent; +using System.Collections.Generic; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; @@ -7,17 +7,21 @@ namespace MQTTnet.Core.Server { public sealed class MqttClientSubscriptionsManager { - private readonly ConcurrentDictionary _subscribedTopics = new ConcurrentDictionary(); + private readonly Dictionary _subscribedTopics = new Dictionary(); public MqttSubAckPacket Subscribe(MqttSubscribePacket subscribePacket) { if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket)); var responsePacket = subscribePacket.CreateResponse(); - foreach (var topicFilter in subscribePacket.TopicFilters) + + lock (_subscribedTopics) { - _subscribedTopics[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; - responsePacket.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1); // TODO: Add support for QoS 2. + foreach (var topicFilter in subscribePacket.TopicFilters) + { + _subscribedTopics[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; + responsePacket.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1); // TODO: Add support for QoS 2. + } } return responsePacket; @@ -27,32 +31,37 @@ namespace MQTTnet.Core.Server { if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket)); - foreach (var topicFilter in unsubscribePacket.TopicFilters) + lock (_subscribedTopics) { - MqttQualityOfServiceLevel _; - _subscribedTopics.TryRemove(topicFilter, out _); + foreach (var topicFilter in unsubscribePacket.TopicFilters) + { + _subscribedTopics.Remove(topicFilter); + } } return unsubscribePacket.CreateResponse(); } - public bool IsTopicSubscribed(MqttPublishPacket publishPacket) + public bool IsSubscribed(MqttPublishPacket publishPacket) { if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); - foreach (var subscribedTopic in _subscribedTopics) + lock (_subscribedTopics) { - if (!MqttTopicFilterComparer.IsMatch(publishPacket.Topic, subscribedTopic.Key)) + foreach (var subscribedTopic in _subscribedTopics) { - continue; - } + if (publishPacket.QualityOfServiceLevel > subscribedTopic.Value) + { + continue; + } - if (subscribedTopic.Value < publishPacket.QualityOfServiceLevel) - { - continue; - } + if (!MqttTopicFilterComparer.IsMatch(publishPacket.Topic, subscribedTopic.Key)) + { + continue; + } - return true; + return true; + } } return false; diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index ca55d7a..98543b6 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -4,10 +4,11 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Internal; namespace MQTTnet.Core.Server { - public sealed class MqttServer + public sealed class MqttServer : IMqttServer { private readonly MqttClientSessionsManager _clientSessionsManager; private readonly ICollection _adapters; @@ -33,6 +34,13 @@ namespace MQTTnet.Core.Server public event EventHandler ApplicationMessageReceived; + public void Publish(MqttApplicationMessage applicationMessage) + { + if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); + + _clientSessionsManager.DispatchPublishPacket(null, applicationMessage.ToPublishPacket()); + } + public void InjectClient(string identifier, IMqttCommunicationAdapter adapter) { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); diff --git a/MQTTnet.Core/Server/DefaultEndpointOptions.cs b/MQTTnet.Core/Server/MqttServerDefaultEndpointOptions.cs similarity index 71% rename from MQTTnet.Core/Server/DefaultEndpointOptions.cs rename to MQTTnet.Core/Server/MqttServerDefaultEndpointOptions.cs index 203ed28..04c4d73 100644 --- a/MQTTnet.Core/Server/DefaultEndpointOptions.cs +++ b/MQTTnet.Core/Server/MqttServerDefaultEndpointOptions.cs @@ -1,6 +1,6 @@ namespace MQTTnet.Core.Server { - public sealed class DefaultEndpointOptions + public sealed class MqttServerDefaultEndpointOptions { public bool IsEnabled { get; set; } = true; diff --git a/MQTTnet.Core/Server/MqttServerOptions.cs b/MQTTnet.Core/Server/MqttServerOptions.cs index 18e284d..5edf500 100644 --- a/MQTTnet.Core/Server/MqttServerOptions.cs +++ b/MQTTnet.Core/Server/MqttServerOptions.cs @@ -6,13 +6,13 @@ namespace MQTTnet.Core.Server { public sealed class MqttServerOptions { - public DefaultEndpointOptions DefaultEndpointOptions { get; } = new DefaultEndpointOptions(); + public MqttServerDefaultEndpointOptions DefaultEndpointOptions { get; } = new MqttServerDefaultEndpointOptions(); public MqttServerTlsEndpointOptions TlsEndpointOptions { get; } = new MqttServerTlsEndpointOptions(); public int ConnectionBacklog { get; set; } = 10; - public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(15); public Func ConnectionValidator { get; set; } } diff --git a/MQTTnet.Core/Server/MqttServerTlsEndpointOptionsExtensions.cs b/MQTTnet.Core/Server/MqttServerTlsEndpointOptionsExtensions.cs deleted file mode 100644 index d790526..0000000 --- a/MQTTnet.Core/Server/MqttServerTlsEndpointOptionsExtensions.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; - -namespace MQTTnet.Core.Server -{ - public static class MqttServerTlsEndpointOptionsExtensions - { - public static int GetPort(this DefaultEndpointOptions options) - { - if (options == null) throw new ArgumentNullException(nameof(options)); - - if (!options.Port.HasValue) - { - return 1883; - } - - return options.Port.Value; - } - } -} diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 1104a33..aa40c8b 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -8,7 +8,7 @@ namespace MQTTnet.TestApp.UniversalWindows { public sealed partial class MainPage { - private MqttClient _mqttClient; + private IMqttClient _mqttClient; public MainPage() {