diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index 6c95f25..bf51269 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -397,17 +397,13 @@ namespace MQTTnet.Server // Set the retain flag to true according to [MQTT-3.3.1-8] and [MQTT-3.3.1-9]. publishPacket.Retain = queuedApplicationMessage.IsRetainedMessage; - if (publishPacket.QualityOfServiceLevel > 0) - { - publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier(); - } - if (_serverOptions.ClientMessageQueueInterceptor != null) { var context = new MqttClientMessageQueueInterceptorContext( queuedApplicationMessage.SenderClientId, ClientId, - queuedApplicationMessage.ApplicationMessage); + queuedApplicationMessage.ApplicationMessage, + queuedApplicationMessage.SubscriptionQualityOfServiceLevel); if (_serverOptions.ClientMessageQueueInterceptor != null) { @@ -421,7 +417,12 @@ namespace MQTTnet.Server publishPacket.Topic = context.ApplicationMessage.Topic; publishPacket.Payload = context.ApplicationMessage.Payload; - publishPacket.QualityOfServiceLevel = context.ApplicationMessage.QualityOfServiceLevel; + publishPacket.QualityOfServiceLevel = context.SubscriptionQualityOfServiceLevel; + } + + if (publishPacket.QualityOfServiceLevel > 0) + { + publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier(); } if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index 9ea8627..16db377 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -1452,5 +1452,31 @@ namespace MQTTnet.Tests } } + [TestMethod] + public async Task Intercept_ClientMessageQueue_Different_QoS_Of_Subscription_And_Message() + { + const string topic = "a"; + + using (var testEnvironment = new TestEnvironment(TestContext)) + { + await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder() + .WithClientMessageQueueInterceptor(c => { })); // Interceptor does nothing but has to be present. + + bool receivedMessage = false; + var client = await testEnvironment.ConnectClientAsync(); + client.UseApplicationMessageReceivedHandler(c => + { + receivedMessage = true; + }); + + await client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce); + + await client.PublishAsync(new MqttApplicationMessage{ Topic = topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }); + + await Task.Delay(500); + + Assert.IsTrue(receivedMessage); + } + } } }