From f92cfcd293acb836fc150645fd2b15ee43e961d8 Mon Sep 17 00:00:00 2001 From: Christian Date: Sun, 13 Dec 2020 11:36:31 +0100 Subject: [PATCH] Add new multi threaded application message handler for server. --- .../Server/CheckSubscriptionsResult.cs | 2 + ...qttApplicationMessageInterceptorContext.cs | 15 ++++-- .../Server/MqttClientSessionsManager.cs | 26 +++++++---- .../Server/MqttClientSubscriptionsManager.cs | 5 +- ...erApplicationMessageInterceptorDelegate.cs | 4 +- ...edApplicationMessageInterceptorDelegate.cs | 46 +++++++++++++++++++ .../Server/MqttServerOptionsBuilder.cs | 32 ++++++++++--- 7 files changed, 105 insertions(+), 25 deletions(-) create mode 100644 Source/MQTTnet/Server/MqttServerMultiThreadedApplicationMessageInterceptorDelegate.cs diff --git a/Source/MQTTnet/Server/CheckSubscriptionsResult.cs b/Source/MQTTnet/Server/CheckSubscriptionsResult.cs index 1d55380..b6e7dbd 100644 --- a/Source/MQTTnet/Server/CheckSubscriptionsResult.cs +++ b/Source/MQTTnet/Server/CheckSubscriptionsResult.cs @@ -4,6 +4,8 @@ namespace MQTTnet.Server { public struct CheckSubscriptionsResult { + public static CheckSubscriptionsResult NotSubscribed = new CheckSubscriptionsResult(); + public bool IsSubscribed { get; set; } public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } diff --git a/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs b/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs index 11efa57..cde9a85 100644 --- a/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs +++ b/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs @@ -1,16 +1,23 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; +using MQTTnet.Diagnostics; namespace MQTTnet.Server { - public class MqttApplicationMessageInterceptorContext + public sealed class MqttApplicationMessageInterceptorContext { - public MqttApplicationMessageInterceptorContext(string clientId, IDictionary sessionItems, MqttApplicationMessage applicationMessage) + public MqttApplicationMessageInterceptorContext(string clientId, IDictionary sessionItems, IMqttNetScopedLogger logger) { ClientId = clientId; - ApplicationMessage = applicationMessage; SessionItems = sessionItems; + Logger = logger ?? throw new ArgumentNullException(nameof(logger)); } + /// + /// Gets the currently used logger. + /// + public IMqttNetScopedLogger Logger { get; } + public string ClientId { get; } public MqttApplicationMessage ApplicationMessage { get; set; } diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 18b529e..dfaefed 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -322,19 +322,21 @@ namespace MQTTnet.Server } var deliveryCount = 0; - + List sessions; lock (_sessions) { - foreach (var clientSession in _sessions.Values) + sessions = _sessions.Values.ToList(); + } + + foreach (var clientSession in sessions) + { + var isSubscribed = clientSession.EnqueueApplicationMessage(applicationMessage, senderClientId, false); + if (isSubscribed) { - var isSubscribed = clientSession.EnqueueApplicationMessage(applicationMessage, senderClientId, false); - if (isSubscribed) - { - deliveryCount++; - } + deliveryCount++; } } - + if (deliveryCount == 0) { var undeliveredMessageInterceptor = _options.UndeliveredMessageInterceptor; @@ -445,7 +447,13 @@ namespace MQTTnet.Server sessionItems = clientConnection.Session.Items; } - var interceptorContext = new MqttApplicationMessageInterceptorContext(senderClientId, sessionItems, applicationMessage); + var interceptorContext = new MqttApplicationMessageInterceptorContext(senderClientId, sessionItems, _logger) + { + AcceptPublish = true, + ApplicationMessage = applicationMessage, + CloseConnection = false + }; + await interceptor.InterceptApplicationMessagePublishAsync(interceptorContext).ConfigureAwait(false); return interceptorContext; } diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index 89d21f5..15066cc 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -193,10 +193,7 @@ namespace MQTTnet.Server if (qosLevels.Count == 0) { - return new CheckSubscriptionsResult - { - IsSubscribed = false - }; + return CheckSubscriptionsResult.NotSubscribed; } return CreateSubscriptionResult(qosLevel, qosLevels); diff --git a/Source/MQTTnet/Server/MqttServerApplicationMessageInterceptorDelegate.cs b/Source/MQTTnet/Server/MqttServerApplicationMessageInterceptorDelegate.cs index 82ba0f3..160ed48 100644 --- a/Source/MQTTnet/Server/MqttServerApplicationMessageInterceptorDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerApplicationMessageInterceptorDelegate.cs @@ -3,9 +3,9 @@ using System.Threading.Tasks; namespace MQTTnet.Server { - public class MqttServerApplicationMessageInterceptorDelegate : IMqttServerApplicationMessageInterceptor + public sealed class MqttServerApplicationMessageInterceptorDelegate : IMqttServerApplicationMessageInterceptor { - private readonly Func _callback; + readonly Func _callback; public MqttServerApplicationMessageInterceptorDelegate(Action callback) { diff --git a/Source/MQTTnet/Server/MqttServerMultiThreadedApplicationMessageInterceptorDelegate.cs b/Source/MQTTnet/Server/MqttServerMultiThreadedApplicationMessageInterceptorDelegate.cs new file mode 100644 index 0000000..b99ca18 --- /dev/null +++ b/Source/MQTTnet/Server/MqttServerMultiThreadedApplicationMessageInterceptorDelegate.cs @@ -0,0 +1,46 @@ +using System; +using System.Threading.Tasks; +using MQTTnet.Diagnostics; +using MQTTnet.Implementations; +using MQTTnet.Internal; + +namespace MQTTnet.Server +{ + public sealed class MqttServerMultiThreadedApplicationMessageInterceptorDelegate : IMqttServerApplicationMessageInterceptor + { + readonly Func _callback; + + public MqttServerMultiThreadedApplicationMessageInterceptorDelegate(Action callback) + { + if (callback == null) throw new ArgumentNullException(nameof(callback)); + + _callback = context => + { + callback(context); + return Task.FromResult(0); + }; + } + + public MqttServerMultiThreadedApplicationMessageInterceptorDelegate(Func callback) + { + _callback = callback ?? throw new ArgumentNullException(nameof(callback)); + } + + public Task InterceptApplicationMessagePublishAsync(MqttApplicationMessageInterceptorContext context) + { + Task.Run(async () => + { + try + { + await _callback.Invoke(context).ConfigureAwait(false); + } + catch (Exception exception) + { + context.Logger.Error(exception, "Error while intercepting application message."); + } + }).RunInBackground(); + + return PlatformAbstractionLayer.CompletedTask; + } + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs index 66779c6..a5563b8 100644 --- a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs +++ b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs @@ -3,11 +3,13 @@ using System.Net; using System.Net.Security; using System.Security.Authentication; using MQTTnet.Certificates; +using System.Threading.Tasks; #if !WINDOWS_UWP using System.Security.Cryptography.X509Certificates; #endif +// ReSharper disable UnusedMember.Global namespace MQTTnet.Server { public class MqttServerOptionsBuilder @@ -187,6 +189,24 @@ namespace MQTTnet.Server return this; } + public MqttServerOptionsBuilder WithApplicationMessageInterceptor(Func value) + { + _options.ApplicationMessageInterceptor = new MqttServerApplicationMessageInterceptorDelegate(value); + return this; + } + + public MqttServerOptionsBuilder WithMultiThreadedApplicationMessageInterceptor(Action value) + { + _options.ApplicationMessageInterceptor = new MqttServerMultiThreadedApplicationMessageInterceptorDelegate(value); + return this; + } + + public MqttServerOptionsBuilder WithMultiThreadedApplicationMessageInterceptor(Func value) + { + _options.ApplicationMessageInterceptor = new MqttServerMultiThreadedApplicationMessageInterceptorDelegate(value); + return this; + } + public MqttServerOptionsBuilder WithClientMessageQueueInterceptor(IMqttServerClientMessageQueueInterceptor value) { _options.ClientMessageQueueInterceptor = value; @@ -217,6 +237,12 @@ namespace MQTTnet.Server return this; } + public MqttServerOptionsBuilder WithUndeliveredMessageInterceptor(Action value) + { + _options.UndeliveredMessageInterceptor = new MqttServerApplicationMessageInterceptorDelegate(value); + return this; + } + public MqttServerOptionsBuilder WithDefaultEndpointReuseAddress() { _options.DefaultEndpointOptions.ReuseAddress = true; @@ -248,11 +274,5 @@ namespace MQTTnet.Server { return _options; } - - public MqttServerOptionsBuilder WithUndeliveredMessageInterceptor(Action value) - { - _options.UndeliveredMessageInterceptor = new MqttServerApplicationMessageInterceptorDelegate(value); - return this; - } } }