diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index fc9e1a6..e55f08f 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -24,6 +24,7 @@ * [Server] Keep alive checking is now suspended while large packages are being received (and thus the client is connected). Keep alive checking continues after a large packet is received completely. * [Server] Rewritten the _ConnectedClients_ API and added new features for disconnecting and Endpoint information (IP etc.). * [Server] Added settings for disabling persistent sessions and defining a max pending messages queue size per session. +* [Server] Added a new interceptor which is invoked before a new message is added to the client queue. Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs b/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs index 54e037d..b753587 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs @@ -16,6 +16,7 @@ namespace MQTTnet.Server Action ConnectionValidator { get; } Action SubscriptionInterceptor { get; } Action ApplicationMessageInterceptor { get; } + Action ClientMessageQueueInterceptor { get; set; } MqttServerDefaultEndpointOptions DefaultEndpointOptions { get; } MqttServerTlsEndpointOptions TlsEndpointOptions { get; } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientMessageQueueInterceptorContext.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientMessageQueueInterceptorContext.cs new file mode 100644 index 0000000..ca0e2a0 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientMessageQueueInterceptorContext.cs @@ -0,0 +1,20 @@ +namespace MQTTnet.Server +{ + public class MqttClientMessageQueueInterceptorContext + { + public MqttClientMessageQueueInterceptorContext(string senderClientId, string receiverClientId, MqttApplicationMessage applicationMessage) + { + SenderClientId = senderClientId; + ReceiverClientId = receiverClientId; + ApplicationMessage = applicationMessage; + } + + public string SenderClientId { get; } + + public string ReceiverClientId { get; } + + public MqttApplicationMessage ApplicationMessage { get; set; } + + public bool AcceptEnqueue { get; set; } = true; + } +} diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs index 97f616c..15d4af9 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs @@ -19,7 +19,6 @@ namespace MQTTnet.Server private readonly IMqttNetChildLogger _logger; private ConcurrentQueue _queue = new ConcurrentQueue(); - private Task _workerTask; public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession clientSession, IMqttNetChildLogger logger) { @@ -41,7 +40,7 @@ namespace MQTTnet.Server return; } - _workerTask = Task.Run(() => SendQueuedPacketsAsync(adapter, cancellationToken), cancellationToken); + Task.Run(() => SendQueuedPacketsAsync(adapter, cancellationToken), cancellationToken); } public void Enqueue(MqttBasePacket packet) diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs index 0f11553..4fd7266 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs @@ -153,24 +153,39 @@ namespace MQTTnet.Server } } - public void EnqueueApplicationMessage(MqttApplicationMessage applicationMessage) + public void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage) { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); - var result = _subscriptionsManager.CheckSubscriptions(applicationMessage); - if (!result.IsSubscribed) + var checkSubscriptionsResult = _subscriptionsManager.CheckSubscriptions(applicationMessage); + if (!checkSubscriptionsResult.IsSubscribed) { return; } var publishPacket = applicationMessage.ToPublishPacket(); - publishPacket.QualityOfServiceLevel = result.QualityOfServiceLevel; + publishPacket.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel; if (publishPacket.QualityOfServiceLevel > 0) { publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(); } + if (_options.ClientMessageQueueInterceptor != null) + { + var context = new MqttClientMessageQueueInterceptorContext( + senderClientSession?.ClientId, + ClientId, + publishPacket.ToApplicationMessage()); + + _options.ClientMessageQueueInterceptor?.Invoke(context); + + if (!context.AcceptEnqueue || context.ApplicationMessage == null) + { + return; + } + } + _pendingMessagesQueue.Enqueue(publishPacket); } @@ -276,7 +291,7 @@ namespace MQTTnet.Server var retainedMessages = _retainedMessagesManager.GetSubscribedMessages(topicFilters); foreach (var applicationMessage in retainedMessages) { - EnqueueApplicationMessage(applicationMessage); + EnqueueApplicationMessage(null, applicationMessage); } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs index 7918aa8..692e9c6 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs @@ -257,7 +257,7 @@ namespace MQTTnet.Server foreach (var clientSession in _sessions.Values) { - clientSession.EnqueueApplicationMessage(applicationMessage); + clientSession.EnqueueApplicationMessage(senderClientSession, applicationMessage); } } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs index e848555..4d06b94 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs @@ -21,6 +21,8 @@ namespace MQTTnet.Server public Action ApplicationMessageInterceptor { get; set; } + public Action ClientMessageQueueInterceptor { get; set; } + public Action SubscriptionInterceptor { get; set; } public IMqttServerStorage Storage { get; set; }