@@ -216,7 +216,7 @@ namespace MQTTnet.Formatter.V3 | |||
return new MqttClientPublishResult | |||
{ | |||
PacketIdentifier = pubRecPacket.PacketIdentifier, | |||
PacketIdentifier = pubCompPacket.PacketIdentifier, | |||
ReasonCode = MqttClientPublishReasonCode.Success | |||
}; | |||
} | |||
@@ -61,8 +61,9 @@ namespace MQTTnet.Server | |||
using (await _messagesLock.WaitAsync().ConfigureAwait(false)) | |||
{ | |||
var saveIsRequired = false; | |||
var hasPayload = applicationMessage.Payload?.Any() == true; | |||
if (applicationMessage.Payload?.Any() != true) | |||
if (!hasPayload) | |||
{ | |||
saveIsRequired = _messages.Remove(applicationMessage.Topic); | |||
_logger.Verbose("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic); | |||
@@ -11,7 +11,6 @@ using MQTTnet.Client.Disconnecting; | |||
using MQTTnet.Client.Options; | |||
using MQTTnet.Client.Subscribing; | |||
using MQTTnet.Exceptions; | |||
using MQTTnet.Formatter; | |||
using MQTTnet.Protocol; | |||
using MQTTnet.Server; | |||
using MQTTnet.Tests.Mockups; | |||
@@ -21,6 +20,34 @@ namespace MQTTnet.Tests | |||
[TestClass] | |||
public class Client_Tests | |||
{ | |||
[TestMethod] | |||
public async Task PacketIdentifier_In_Publish_Result() | |||
{ | |||
using (var testEnvironment = new TestEnvironment()) | |||
{ | |||
await testEnvironment.StartServerAsync(); | |||
var client = await testEnvironment.ConnectClientAsync(); | |||
var result = await client.PublishAsync("a", "a", MqttQualityOfServiceLevel.AtMostOnce); | |||
Assert.AreEqual(null, result.PacketIdentifier); | |||
result = await client.PublishAsync("b", "b", MqttQualityOfServiceLevel.AtMostOnce); | |||
Assert.AreEqual(null, result.PacketIdentifier); | |||
result = await client.PublishAsync("a", "a", MqttQualityOfServiceLevel.AtLeastOnce); | |||
Assert.AreEqual((ushort)1, result.PacketIdentifier); | |||
result = await client.PublishAsync("b", "b", MqttQualityOfServiceLevel.AtLeastOnce); | |||
Assert.AreEqual((ushort)2, result.PacketIdentifier); | |||
result = await client.PublishAsync("a", "a", MqttQualityOfServiceLevel.ExactlyOnce); | |||
Assert.AreEqual((ushort)3, result.PacketIdentifier); | |||
result = await client.PublishAsync("b", "b", MqttQualityOfServiceLevel.ExactlyOnce); | |||
Assert.AreEqual((ushort)4, result.PacketIdentifier); | |||
} | |||
} | |||
[TestMethod] | |||
public async Task Invalid_Connect_Throws_Exception() | |||
{ | |||
@@ -15,11 +15,11 @@ namespace MQTTnet.Tests | |||
public async Task Dispose_Channel_While_Used() | |||
{ | |||
var ct = new CancellationTokenSource(); | |||
var serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); | |||
try | |||
{ | |||
var serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); | |||
serverSocket.Bind(new IPEndPoint(IPAddress.Any, 50000)); | |||
serverSocket.Bind(new IPEndPoint(IPAddress.Any, 50001)); | |||
serverSocket.Listen(0); | |||
#pragma warning disable 4014 | |||
@@ -35,7 +35,7 @@ namespace MQTTnet.Tests | |||
}, ct.Token); | |||
var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); | |||
await clientSocket.ConnectAsync(IPAddress.Loopback, 50000); | |||
await clientSocket.ConnectAsync(IPAddress.Loopback, 50001); | |||
await Task.Delay(100, ct.Token); | |||
@@ -68,6 +68,7 @@ namespace MQTTnet.Tests | |||
finally | |||
{ | |||
ct.Cancel(false); | |||
serverSocket.Dispose(); | |||
} | |||
} | |||
} | |||
@@ -540,7 +540,7 @@ namespace MQTTnet.Tests | |||
} | |||
[TestMethod] | |||
public async Task Clear_Retained_Message() | |||
public async Task Clear_Retained_Message_With_Empty_Payload() | |||
{ | |||
using (var testEnvironment = new TestEnvironment()) | |||
{ | |||
@@ -566,7 +566,35 @@ namespace MQTTnet.Tests | |||
Assert.AreEqual(0, receivedMessagesCount); | |||
} | |||
} | |||
[TestMethod] | |||
public async Task Clear_Retained_Message_With_Null_Payload() | |||
{ | |||
using (var testEnvironment = new TestEnvironment()) | |||
{ | |||
var receivedMessagesCount = 0; | |||
await testEnvironment.StartServerAsync(); | |||
var c1 = await testEnvironment.ConnectClientAsync(); | |||
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); | |||
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload((byte[])null).WithRetainFlag().Build()); | |||
await c1.DisconnectAsync(); | |||
var c2 = await testEnvironment.ConnectClientAsync(); | |||
c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount)); | |||
await Task.Delay(200); | |||
await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }); | |||
await Task.Delay(500); | |||
Assert.AreEqual(0, receivedMessagesCount); | |||
} | |||
} | |||
[TestMethod] | |||
public async Task Intercept_Application_Message() | |||
{ | |||