From 75dfe68a948eda55a89065efa77aadc6b263b88a Mon Sep 17 00:00:00 2001 From: SeppPenner Date: Tue, 9 Jun 2020 20:54:42 +0200 Subject: [PATCH] Added interceptor for undelivered messages. --- Build/MQTTnet.nuspec | 3 +- Source/MQTTnet/MqttApplicationMessage.cs | 2 ++ Source/MQTTnet/Server/IMqttServerOptions.cs | 2 ++ Source/MQTTnet/Server/MqttClientSession.cs | 2 ++ .../Server/MqttClientSessionsManager.cs | 12 ++++++++ Source/MQTTnet/Server/MqttServerOptions.cs | 2 ++ .../Server/MqttServerOptionsBuilder.cs | 6 ++++ Tests/MQTTnet.Core.Tests/Server_Tests.cs | 28 +++++++++++++++++++ 8 files changed, 56 insertions(+), 1 deletion(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index b329846..b1aa605 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -16,7 +16,8 @@ * [AspNetCore] Adjusted some namespaces (BREAKING CHANGE!) * [Server] Adjusted some namespaces (BREAKING CHANGE!) * [Server] Added state checks (throw if not started etc.) for most server APIs. -* [Server] Exposed real X509Certificate2 (instead byte array) to TLS options (thanks to @borigas). +* [Server] Exposed real X509Certificate2 (instead byte array) to TLS options (Thanks to @borigas). +* [Core] Added server interceptor for undelivered messages (Thanks to @cshark-inator). Copyright Christian Kratky 2016-2020 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin Blazor diff --git a/Source/MQTTnet/MqttApplicationMessage.cs b/Source/MQTTnet/MqttApplicationMessage.cs index 6522d42..688bb65 100644 --- a/Source/MQTTnet/MqttApplicationMessage.cs +++ b/Source/MQTTnet/MqttApplicationMessage.cs @@ -29,5 +29,7 @@ namespace MQTTnet public byte[] CorrelationData { get; set; } public List SubscriptionIdentifiers { get; set; } + + public uint DeliveryCount { get; set; } } } diff --git a/Source/MQTTnet/Server/IMqttServerOptions.cs b/Source/MQTTnet/Server/IMqttServerOptions.cs index 7df6f54..61900ed 100644 --- a/Source/MQTTnet/Server/IMqttServerOptions.cs +++ b/Source/MQTTnet/Server/IMqttServerOptions.cs @@ -25,5 +25,7 @@ namespace MQTTnet.Server IMqttServerStorage Storage { get; } IMqttRetainedMessagesManager RetainedMessagesManager { get; } + + IMqttServerApplicationMessageInterceptor UndeliveredMessageInterceptor { get; set; } } } \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index c7d2e50..1e8a2f6 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -56,6 +56,8 @@ namespace MQTTnet.Server _logger.Verbose("Queued application message with topic '{0}' (ClientId: {1}).", applicationMessage.Topic, ClientId); + applicationMessage.DeliveryCount++; + ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage); } diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 7850780..8be714d 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -224,6 +224,8 @@ namespace MQTTnet.Server await _retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false); } + applicationMessage.DeliveryCount = 0; + foreach (var clientSession in _sessions.Values) { clientSession.EnqueueApplicationMessage( @@ -231,6 +233,16 @@ namespace MQTTnet.Server sender?.ClientId, false); } + + if (applicationMessage.DeliveryCount == 0) + { + if (_options.UndeliveredMessageInterceptor == null) + { + throw new OperationCanceledException(nameof(_options.UndeliveredMessageInterceptor)); + } + + await _options.UndeliveredMessageInterceptor.InterceptApplicationMessagePublishAsync(new MqttApplicationMessageInterceptorContext(sender?.ClientId, sender?.Session?.Items, applicationMessage)); + } } catch (OperationCanceledException) { diff --git a/Source/MQTTnet/Server/MqttServerOptions.cs b/Source/MQTTnet/Server/MqttServerOptions.cs index 9773e72..b1c595e 100644 --- a/Source/MQTTnet/Server/MqttServerOptions.cs +++ b/Source/MQTTnet/Server/MqttServerOptions.cs @@ -31,5 +31,7 @@ namespace MQTTnet.Server public IMqttServerStorage Storage { get; set; } public IMqttRetainedMessagesManager RetainedMessagesManager { get; set; } = new MqttRetainedMessagesManager(); + + public IMqttServerApplicationMessageInterceptor UndeliveredMessageInterceptor { get; set; } } } diff --git a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs index b52215f..4af5672 100644 --- a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs +++ b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs @@ -212,5 +212,11 @@ namespace MQTTnet.Server { return _options; } + + public MqttServerOptionsBuilder WithUndeliveredMessageInterceptor(Action value) + { + _options.UndeliveredMessageInterceptor = new MqttServerApplicationMessageInterceptorDelegate(value); + return this; + } } } diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index 18c106d..af1b715 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -1384,5 +1384,33 @@ namespace MQTTnet.Tests Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount); } } + + [TestMethod] + public async Task Intercept_Undelivered() + { + using (var testEnvironment = new TestEnvironment()) + { + var undeliverd = string.Empty; + var svr = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithUndeliveredMessageInterceptor( + context => + { + undeliverd = context.ApplicationMessage.Topic; + })); + + var topicAReceived = false; + var topicBReceived = false; + + var client = await testEnvironment.ConnectClientAsync(); + + await client.SubscribeAsync("b"); + + await client.PublishAsync("a", null, MqttQualityOfServiceLevel.ExactlyOnce); + + await Task.Delay(500); + + Assert.AreEqual(undeliverd, "a"); + + } + } } }