Browse Source

Add support for topic alias.

release/3.x.x
Christian Kratky 4 years ago
parent
commit
e1bf9de347
9 changed files with 142 additions and 33 deletions
  1. +3
    -3
      MQTTnet.sln
  2. +1
    -1
      Source/MQTTnet/Client/MqttClient.cs
  3. +2
    -0
      Source/MQTTnet/Formatter/IMqttDataConverter.cs
  4. +9
    -0
      Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs
  5. +9
    -0
      Source/MQTTnet/Formatter/V5/MqttV500DataConverter.cs
  6. +14
    -1
      Source/MQTTnet/Protocol/MqttTopicValidator.cs
  7. +33
    -28
      Source/MQTTnet/Server/MqttClientConnection.cs
  8. +14
    -0
      Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs
  9. +57
    -0
      Tests/MQTTnet.Core.Tests/Server_TopicAlias_Tests.cs

+ 3
- 3
MQTTnet.sln View File

@@ -57,11 +57,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Apps", "Apps", "{A56E3128-1
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Blazor", "Blazor", "{61B165A0-5AA8-4E04-A53D-A22A84AA6EB7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Test.BlazorApp.Server", "Tests\MQTTnet.Test.BlazorApp\Server\MQTTnet.Test.BlazorApp.Server.csproj", "{A9662AF3-3520-4BF8-9DFF-C55C0C33D08F}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Test.BlazorApp.Server", "Tests\MQTTnet.Test.BlazorApp\Server\MQTTnet.Test.BlazorApp.Server.csproj", "{A9662AF3-3520-4BF8-9DFF-C55C0C33D08F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Test.BlazorApp.Client", "Tests\MQTTnet.Test.BlazorApp\Client\MQTTnet.Test.BlazorApp.Client.csproj", "{D260D63D-7902-4C55-9665-84C5CACBBB24}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Test.BlazorApp.Client", "Tests\MQTTnet.Test.BlazorApp\Client\MQTTnet.Test.BlazorApp.Client.csproj", "{D260D63D-7902-4C55-9665-84C5CACBBB24}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Test.BlazorApp.Shared", "Tests\MQTTnet.Test.BlazorApp\Shared\MQTTnet.Test.BlazorApp.Shared.csproj", "{DDB069BA-6E1A-48C7-B374-5D903B553CAD}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Test.BlazorApp.Shared", "Tests\MQTTnet.Test.BlazorApp\Shared\MQTTnet.Test.BlazorApp.Shared.csproj", "{DDB069BA-6E1A-48C7-B374-5D903B553CAD}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution


+ 1
- 1
Source/MQTTnet/Client/MqttClient.cs View File

@@ -219,7 +219,7 @@ namespace MQTTnet.Client
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

MqttTopicValidator.ThrowIfInvalid(applicationMessage.Topic);
MqttTopicValidator.ThrowIfInvalid(applicationMessage);

ThrowIfDisposed();
ThrowIfNotConnected();


+ 2
- 0
Source/MQTTnet/Formatter/IMqttDataConverter.cs View File

@@ -18,6 +18,8 @@ namespace MQTTnet.Formatter

MqttPubAckPacket CreatePubAckPacket(MqttPublishPacket publishPacket);

MqttBasePacket CreatePubRecPacket(MqttPublishPacket publishPacket);

MqttApplicationMessage CreateApplicationMessage(MqttPublishPacket publishPacket);

MqttClientAuthenticateResult CreateClientConnectResult(MqttConnAckPacket connAckPacket);


+ 9
- 0
Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs View File

@@ -40,6 +40,15 @@ namespace MQTTnet.Formatter.V3
};
}

public MqttBasePacket CreatePubRecPacket(MqttPublishPacket publishPacket)
{
return new MqttPubRecPacket
{
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubRecReasonCode.Success
};
}

public MqttApplicationMessage CreateApplicationMessage(MqttPublishPacket publishPacket)
{
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));


+ 9
- 0
Source/MQTTnet/Formatter/V5/MqttV500DataConverter.cs View File

@@ -58,6 +58,15 @@ namespace MQTTnet.Formatter.V5
};
}

public MqttBasePacket CreatePubRecPacket(MqttPublishPacket publishPacket)
{
return new MqttPubRecPacket
{
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubRecReasonCode.Success
};
}

public MqttApplicationMessage CreateApplicationMessage(MqttPublishPacket publishPacket)
{
return new MqttApplicationMessage


+ 14
- 1
Source/MQTTnet/Protocol/MqttTopicValidator.cs View File

@@ -1,9 +1,22 @@
using MQTTnet.Exceptions;
using System;
using MQTTnet.Exceptions;

namespace MQTTnet.Protocol
{
public static class MqttTopicValidator
{
public static void ThrowIfInvalid(MqttApplicationMessage applicationMessage)
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

if (applicationMessage.TopicAlias > 0)
{
return;
}

ThrowIfInvalid(applicationMessage.Topic);
}

public static void ThrowIfInvalid(string topic)
{
if (string.IsNullOrEmpty(topic))


+ 33
- 28
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -18,6 +18,7 @@ namespace MQTTnet.Server
{
public sealed class MqttClientConnection : IDisposable
{
readonly Dictionary<ushort, string> _topicAlias = new Dictionary<ushort, string>();
readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();
@@ -316,19 +317,26 @@ namespace MQTTnet.Server
{
Interlocked.Increment(ref _sentApplicationMessagesCount);

HandleTopicAlias(publishPacket);

var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket);
_sessionsManager.DispatchApplicationMessage(applicationMessage, this);

switch (publishPacket.QualityOfServiceLevel)
{
case MqttQualityOfServiceLevel.AtMostOnce:
{
return HandleIncomingPublishPacketWithQoS0Async(publishPacket);
return PlatformAbstractionLayer.CompletedTask;
}
case MqttQualityOfServiceLevel.AtLeastOnce:
{
return HandleIncomingPublishPacketWithQoS1Async(publishPacket, cancellationToken);
var pubAckPacket = _dataConverter.CreatePubAckPacket(publishPacket);
return SendAsync(pubAckPacket, cancellationToken);
}
case MqttQualityOfServiceLevel.ExactlyOnce:
{
return HandleIncomingPublishPacketWithQoS2Async(publishPacket, cancellationToken);
var pubRecPacket = _dataConverter.CreatePubRecPacket(publishPacket);
return SendAsync(pubRecPacket, cancellationToken);
}
default:
{
@@ -337,36 +345,33 @@ namespace MQTTnet.Server
}
}

Task HandleIncomingPublishPacketWithQoS0Async(MqttPublishPacket publishPacket)
{
var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket);

_sessionsManager.DispatchApplicationMessage(applicationMessage, this);

return PlatformAbstractionLayer.CompletedTask;
}

Task HandleIncomingPublishPacketWithQoS1Async(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
void HandleTopicAlias(MqttPublishPacket publishPacket)
{
var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket);
_sessionsManager.DispatchApplicationMessage(applicationMessage, this);

var pubAckPacket = _dataConverter.CreatePubAckPacket(publishPacket);
return SendAsync(pubAckPacket, cancellationToken);
}
if (publishPacket.Properties?.TopicAlias == null)
{
return;
}

Task HandleIncomingPublishPacketWithQoS2Async(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
{
var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket);
_sessionsManager.DispatchApplicationMessage(applicationMessage, this);
var topicAlias = publishPacket.Properties.TopicAlias.Value;

var pubRecPacket = new MqttPubRecPacket
lock (_topicAlias)
{
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubRecReasonCode.Success
};
if (!string.IsNullOrEmpty(publishPacket.Topic))
{
_topicAlias[topicAlias] = publishPacket.Topic;
}
else
{
if (_topicAlias.TryGetValue(topicAlias, out var topic))
{
publishPacket.Topic = topic;
}
else
{

return SendAsync(pubRecPacket, cancellationToken);
}
}
}
}

async Task SendPendingPacketsAsync(CancellationToken cancellationToken)


+ 14
- 0
Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs View File

@@ -124,6 +124,20 @@ namespace MQTTnet.Tests.Mockups
return client;
}

public async Task<IMqttClient> ConnectClientAsync(Action<MqttClientOptionsBuilder> optionsBuilder)
{
if (optionsBuilder == null) throw new ArgumentNullException(nameof(optionsBuilder));

var options = new MqttClientOptionsBuilder();
options = options.WithTcpServer("localhost", ServerPort);
optionsBuilder.Invoke(options);

var client = CreateClient();
await client.ConnectAsync(options.Build()).ConfigureAwait(false);

return client;
}

public async Task<IMqttClient> ConnectClientAsync(MqttClientOptionsBuilder options)
{
if (options == null) throw new ArgumentNullException(nameof(options));


+ 57
- 0
Tests/MQTTnet.Core.Tests/Server_TopicAlias_Tests.cs View File

@@ -0,0 +1,57 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Formatter;
using MQTTnet.Tests.Mockups;

namespace MQTTnet.Tests
{
[TestClass]
public class Server_TopicAlias_Tests
{
[TestMethod]
public async Task Publish_After_Client_Connects()
{
using (var testEnvironment = new TestEnvironment())
{
await testEnvironment.StartServerAsync();

var receivedTopics = new List<string>();

var c1 = await testEnvironment.ConnectClientAsync(options => options.WithProtocolVersion(MqttProtocolVersion.V500));
c1.UseApplicationMessageReceivedHandler(e =>
{
lock (receivedTopics)
{
receivedTopics.Add(e.ApplicationMessage.Topic);
}
});

await c1.SubscribeAsync("#");

var c2 = await testEnvironment.ConnectClientAsync(options => options.WithProtocolVersion(MqttProtocolVersion.V500));

var message = new MqttApplicationMessage
{
Topic = "this_is_the_topic",
TopicAlias = 22
};

await c2.PublishAsync(message);

message.Topic = null;

await c2.PublishAsync(message);
await c2.PublishAsync(message);

await Task.Delay(500);

Assert.AreEqual(3, receivedTopics.Count);
CollectionAssert.AllItemsAreNotNull(receivedTopics);
Assert.IsTrue(receivedTopics.All(t => t.Equals("this_is_the_topic")));
}
}
}
}

Loading…
Cancel
Save