diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index b329846..b1aa605 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -16,7 +16,8 @@
* [AspNetCore] Adjusted some namespaces (BREAKING CHANGE!)
* [Server] Adjusted some namespaces (BREAKING CHANGE!)
* [Server] Added state checks (throw if not started etc.) for most server APIs.
-* [Server] Exposed real X509Certificate2 (instead byte array) to TLS options (thanks to @borigas).
+* [Server] Exposed real X509Certificate2 (instead byte array) to TLS options (Thanks to @borigas).
+* [Core] Added server interceptor for undelivered messages (Thanks to @cshark-inator).
Copyright Christian Kratky 2016-2020
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin Blazor
diff --git a/Source/MQTTnet/MqttApplicationMessage.cs b/Source/MQTTnet/MqttApplicationMessage.cs
index 6522d42..688bb65 100644
--- a/Source/MQTTnet/MqttApplicationMessage.cs
+++ b/Source/MQTTnet/MqttApplicationMessage.cs
@@ -29,5 +29,7 @@ namespace MQTTnet
public byte[] CorrelationData { get; set; }
public List SubscriptionIdentifiers { get; set; }
+
+ public uint DeliveryCount { get; set; }
}
}
diff --git a/Source/MQTTnet/Server/IMqttServerOptions.cs b/Source/MQTTnet/Server/IMqttServerOptions.cs
index 7df6f54..61900ed 100644
--- a/Source/MQTTnet/Server/IMqttServerOptions.cs
+++ b/Source/MQTTnet/Server/IMqttServerOptions.cs
@@ -25,5 +25,7 @@ namespace MQTTnet.Server
IMqttServerStorage Storage { get; }
IMqttRetainedMessagesManager RetainedMessagesManager { get; }
+
+ IMqttServerApplicationMessageInterceptor UndeliveredMessageInterceptor { get; set; }
}
}
\ No newline at end of file
diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs
index c7d2e50..1e8a2f6 100644
--- a/Source/MQTTnet/Server/MqttClientSession.cs
+++ b/Source/MQTTnet/Server/MqttClientSession.cs
@@ -56,6 +56,8 @@ namespace MQTTnet.Server
_logger.Verbose("Queued application message with topic '{0}' (ClientId: {1}).", applicationMessage.Topic, ClientId);
+ applicationMessage.DeliveryCount++;
+
ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage);
}
diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs
index 7850780..8be714d 100644
--- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs
+++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs
@@ -224,6 +224,8 @@ namespace MQTTnet.Server
await _retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false);
}
+ applicationMessage.DeliveryCount = 0;
+
foreach (var clientSession in _sessions.Values)
{
clientSession.EnqueueApplicationMessage(
@@ -231,6 +233,16 @@ namespace MQTTnet.Server
sender?.ClientId,
false);
}
+
+ if (applicationMessage.DeliveryCount == 0)
+ {
+ if (_options.UndeliveredMessageInterceptor == null)
+ {
+ throw new OperationCanceledException(nameof(_options.UndeliveredMessageInterceptor));
+ }
+
+ await _options.UndeliveredMessageInterceptor.InterceptApplicationMessagePublishAsync(new MqttApplicationMessageInterceptorContext(sender?.ClientId, sender?.Session?.Items, applicationMessage));
+ }
}
catch (OperationCanceledException)
{
diff --git a/Source/MQTTnet/Server/MqttServerOptions.cs b/Source/MQTTnet/Server/MqttServerOptions.cs
index 9773e72..b1c595e 100644
--- a/Source/MQTTnet/Server/MqttServerOptions.cs
+++ b/Source/MQTTnet/Server/MqttServerOptions.cs
@@ -31,5 +31,7 @@ namespace MQTTnet.Server
public IMqttServerStorage Storage { get; set; }
public IMqttRetainedMessagesManager RetainedMessagesManager { get; set; } = new MqttRetainedMessagesManager();
+
+ public IMqttServerApplicationMessageInterceptor UndeliveredMessageInterceptor { get; set; }
}
}
diff --git a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs
index b52215f..4af5672 100644
--- a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs
+++ b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs
@@ -212,5 +212,11 @@ namespace MQTTnet.Server
{
return _options;
}
+
+ public MqttServerOptionsBuilder WithUndeliveredMessageInterceptor(Action value)
+ {
+ _options.UndeliveredMessageInterceptor = new MqttServerApplicationMessageInterceptorDelegate(value);
+ return this;
+ }
}
}
diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs
index 18c106d..af1b715 100644
--- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs
+++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs
@@ -1384,5 +1384,33 @@ namespace MQTTnet.Tests
Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
}
}
+
+ [TestMethod]
+ public async Task Intercept_Undelivered()
+ {
+ using (var testEnvironment = new TestEnvironment())
+ {
+ var undeliverd = string.Empty;
+ var svr = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithUndeliveredMessageInterceptor(
+ context =>
+ {
+ undeliverd = context.ApplicationMessage.Topic;
+ }));
+
+ var topicAReceived = false;
+ var topicBReceived = false;
+
+ var client = await testEnvironment.ConnectClientAsync();
+
+ await client.SubscribeAsync("b");
+
+ await client.PublishAsync("a", null, MqttQualityOfServiceLevel.ExactlyOnce);
+
+ await Task.Delay(500);
+
+ Assert.AreEqual(undeliverd, "a");
+
+ }
+ }
}
}