Browse Source

Added interceptor for application messages

release/3.x.x
Christian Kratky 7 years ago
parent
commit
4386ef1e56
16 changed files with 182 additions and 136 deletions
  1. +3
    -2
      Build/MQTTnet.nuspec
  2. +7
    -6
      MQTTnet.Core/Internal/MqttApplicationMessageExtensions.cs
  3. +9
    -4
      MQTTnet.Core/MqttApplicationMessage.cs
  4. +8
    -2
      MQTTnet.Core/MqttApplicationMessageBuilder.cs
  5. +8
    -6
      MQTTnet.Core/Serializer/MqttPacketSerializer.cs
  6. +14
    -15
      MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs
  7. +11
    -8
      MQTTnet.Core/Server/MqttClientSession.cs
  8. +3
    -3
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  9. +2
    -2
      MQTTnet.Core/Server/MqttServer.cs
  10. +2
    -0
      MQTTnet.Core/Server/MqttServerOptions.cs
  11. +1
    -0
      README.md
  12. +14
    -10
      Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
  13. +6
    -6
      Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs
  14. +7
    -67
      Tests/MQTTnet.TestApp.NetCore/Program.cs
  15. +81
    -0
      Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
  16. +6
    -5
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 3
- 2
Build/MQTTnet.nuspec View File

@@ -14,11 +14,12 @@
* [Core] Migrated the trace to a non-static approach (Breaking Change!)
* [Core] Added a builder for application messages using a fluent API
* [Client] Added a first version of a managed client which will manage the connection, subscription etc. automatically (Thanks to @JTrotta)
* [Server] Added support for WebSockets via ASP.NET Core 2.0 (Thanks to @ChristianRiedl)
* [Client] The session state response from the server is now returned in the _ConnectAsync_ method and also part of the _Connected_ event args
* [Server] Added support for WebSockets via ASP.NET Core 2.0 (Thanks to @ChristianRiedl)
* [Server] Added support for a custom application message interceptor
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2017</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M</tags>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation</tags>
<dependencies>

<group targetFramework="netstandard1.3">


+ 7
- 6
MQTTnet.Core/Internal/MqttApplicationMessageExtensions.cs View File

@@ -6,12 +6,13 @@ namespace MQTTnet.Core.Internal
{
public static MqttApplicationMessage ToApplicationMessage(this MqttPublishPacket publishPacket)
{
return new MqttApplicationMessage(
publishPacket.Topic,
publishPacket.Payload,
publishPacket.QualityOfServiceLevel,
publishPacket.Retain
);
return new MqttApplicationMessage
{
Topic = publishPacket.Topic,
Payload = publishPacket.Payload,
QualityOfServiceLevel = publishPacket.QualityOfServiceLevel,
Retain = publishPacket.Retain
};
}

public static MqttPublishPacket ToPublishPacket(this MqttApplicationMessage applicationMessage)


+ 9
- 4
MQTTnet.Core/MqttApplicationMessage.cs View File

@@ -5,6 +5,11 @@ namespace MQTTnet.Core
{
public sealed class MqttApplicationMessage
{
public MqttApplicationMessage()
{
}

[Obsolete("Use object initializer or _MqttApplicationMessageBuilder_ instead.")]
public MqttApplicationMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, bool retain)
{
Topic = topic ?? throw new ArgumentNullException(nameof(topic));
@@ -13,12 +18,12 @@ namespace MQTTnet.Core
Retain = retain;
}

public string Topic { get; }
public string Topic { get; set; }

public byte[] Payload { get; }
public byte[] Payload { get; set; }

public MqttQualityOfServiceLevel QualityOfServiceLevel { get; }
public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; }

public bool Retain { get; }
public bool Retain { get; set; }
}
}

+ 8
- 2
MQTTnet.Core/MqttApplicationMessageBuilder.cs View File

@@ -13,7 +13,7 @@ namespace MQTTnet.Core
private string _topic;
private byte[] _payload;
private bool _retain;
public MqttApplicationMessageBuilder WithTopic(string topic)
{
_topic = topic;
@@ -102,7 +102,13 @@ namespace MQTTnet.Core
throw new MqttProtocolViolationException("Topic is not set.");
}

return new MqttApplicationMessage(_topic, _payload ?? new byte[0], _qualityOfServiceLevel, _retain);
return new MqttApplicationMessage
{
Topic = _topic,
Payload = _payload ?? new byte[0],
QualityOfServiceLevel = _qualityOfServiceLevel,
Retain = _retain
};
}
}
}

+ 8
- 6
MQTTnet.Core/Serializer/MqttPacketSerializer.cs View File

@@ -65,7 +65,7 @@ namespace MQTTnet.Core.Serializer
case MqttUnsubscribePacket unsubscribePacket: return Serialize(unsubscribePacket, writer);
case MqttUnsubAckPacket unsubAckPacket: return Serialize(unsubAckPacket, writer);
default: throw new MqttProtocolViolationException("Packet type invalid.");
}
}
}

private static MqttBasePacket Deserialize(MqttPacketHeader header, MqttPacketReader reader)
@@ -233,11 +233,13 @@ namespace MQTTnet.Core.Serializer

if (willFlag)
{
packet.WillMessage = new MqttApplicationMessage(
reader.ReadStringWithLengthPrefix(),
reader.ReadWithLengthPrefix(),
(MqttQualityOfServiceLevel)willQoS,
willRetain);
packet.WillMessage = new MqttApplicationMessage
{
Topic = reader.ReadStringWithLengthPrefix(),
Payload = reader.ReadWithLengthPrefix(),
QualityOfServiceLevel = (MqttQualityOfServiceLevel)willQoS,
Retain = willRetain
};
}

if (usernameFlag)


+ 14
- 15
MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs View File

@@ -3,15 +3,14 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Server
{
public sealed class MqttClientRetainedMessagesManager
{
private readonly Dictionary<string, MqttApplicationMessage> _retainedMessages = new Dictionary<string, MqttApplicationMessage>();
private readonly MqttNetTrace _trace;
private readonly Dictionary<string, MqttPublishPacket> _retainedMessages = new Dictionary<string, MqttPublishPacket>();
private readonly MqttServerOptions _options;

public MqttClientRetainedMessagesManager(MqttServerOptions options, MqttNetTrace trace)
@@ -35,7 +34,7 @@ namespace MQTTnet.Core.Server
_retainedMessages.Clear();
foreach (var retainedMessage in retainedMessages)
{
_retainedMessages[retainedMessage.Topic] = retainedMessage.ToPublishPacket();
_retainedMessages[retainedMessage.Topic] = retainedMessage;
}
}
}
@@ -45,25 +44,25 @@ namespace MQTTnet.Core.Server
}
}

public async Task HandleMessageAsync(string clientId, MqttPublishPacket publishPacket)
public async Task HandleMessageAsync(string clientId, MqttApplicationMessage applicationMessage)
{
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

List<MqttPublishPacket> allRetainedMessages;
List<MqttApplicationMessage> allRetainedMessages;
lock (_retainedMessages)
{
if (publishPacket.Payload?.Any() == false)
if (applicationMessage.Payload?.Any() == false)
{
_retainedMessages.Remove(publishPacket.Topic);
_trace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' cleared retained message for topic '{1}'.", clientId, publishPacket.Topic);
_retainedMessages.Remove(applicationMessage.Topic);
_trace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic);
}
else
{
_retainedMessages[publishPacket.Topic] = publishPacket;
_trace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' updated retained message for topic '{1}'.", clientId, publishPacket.Topic);
_retainedMessages[applicationMessage.Topic] = applicationMessage;
_trace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' updated retained message for topic '{1}'.", clientId, applicationMessage.Topic);
}

allRetainedMessages = new List<MqttPublishPacket>(_retainedMessages.Values);
allRetainedMessages = new List<MqttApplicationMessage>(_retainedMessages.Values);
}

try
@@ -71,7 +70,7 @@ namespace MQTTnet.Core.Server
// ReSharper disable once UseNullPropagation
if (_options.Storage != null)
{
await _options.Storage.SaveRetainedMessagesAsync(allRetainedMessages.Select(p => p.ToApplicationMessage()).ToList());
await _options.Storage.SaveRetainedMessagesAsync(allRetainedMessages);
}
}
catch (Exception exception)
@@ -80,9 +79,9 @@ namespace MQTTnet.Core.Server
}
}

public List<MqttPublishPacket> GetMessages(MqttSubscribePacket subscribePacket)
public List<MqttApplicationMessage> GetMessages(MqttSubscribePacket subscribePacket)
{
var retainedMessages = new List<MqttPublishPacket>();
var retainedMessages = new List<MqttApplicationMessage>();
lock (_retainedMessages)
{
foreach (var retainedMessage in _retainedMessages.Values)


+ 11
- 8
MQTTnet.Core/Server/MqttClientSession.cs View File

@@ -72,7 +72,7 @@ namespace MQTTnet.Core.Server
{
if (_willMessage != null)
{
_mqttClientSessionsManager.DispatchPublishPacket(this, _willMessage.ToPublishPacket());
_mqttClientSessionsManager.DispatchApplicationMessage(this, _willMessage);
}

_cancellationTokenSource?.Cancel(false);
@@ -175,27 +175,30 @@ namespace MQTTnet.Core.Server
var retainedMessages = _mqttClientSessionsManager.RetainedMessagesManager.GetMessages(subscribePacket);
foreach (var publishPacket in retainedMessages)
{
EnqueuePublishPacket(publishPacket);
EnqueuePublishPacket(publishPacket.ToPublishPacket());
}
}

private async Task HandleIncomingPublishPacketAsync(IMqttCommunicationAdapter adapter, MqttPublishPacket publishPacket)
{
if (publishPacket.Retain)
var applicationMessage = publishPacket.ToApplicationMessage();
_options.ApplicationMessageInterceptor?.Invoke(applicationMessage);

if (applicationMessage.Retain)
{
await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, publishPacket);
await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, applicationMessage);
}

switch (publishPacket.QualityOfServiceLevel)
switch (applicationMessage.QualityOfServiceLevel)
{
case MqttQualityOfServiceLevel.AtMostOnce:
{
_mqttClientSessionsManager.DispatchPublishPacket(this, publishPacket);
_mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage);
return;
}
case MqttQualityOfServiceLevel.AtLeastOnce:
{
_mqttClientSessionsManager.DispatchPublishPacket(this, publishPacket);
_mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage);

await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token,
new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
@@ -210,7 +213,7 @@ namespace MQTTnet.Core.Server
_unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier);
}

_mqttClientSessionsManager.DispatchPublishPacket(this, publishPacket);
_mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage);

await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token,
new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });


+ 3
- 3
MQTTnet.Core/Server/MqttClientSessionsManager.cs View File

@@ -117,11 +117,11 @@ namespace MQTTnet.Core.Server
}
}

public void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
public void DispatchApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
{
try
{
var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession?.ClientId, publishPacket.ToApplicationMessage());
var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession?.ClientId, applicationMessage);
ApplicationMessageReceived?.Invoke(this, eventArgs);
}
catch (Exception exception)
@@ -133,7 +133,7 @@ namespace MQTTnet.Core.Server
{
foreach (var clientSession in _clientSessions.Values.ToList())
{
clientSession.EnqueuePublishPacket(publishPacket);
clientSession.EnqueuePublishPacket(applicationMessage.ToPublishPacket());
}
}
}


+ 2
- 2
MQTTnet.Core/Server/MqttServer.cs View File

@@ -4,7 +4,6 @@ using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Internal;

namespace MQTTnet.Core.Server
{
@@ -42,7 +41,8 @@ namespace MQTTnet.Core.Server
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

_clientSessionsManager.DispatchPublishPacket(null, applicationMessage.ToPublishPacket());
_options.ApplicationMessageInterceptor?.Invoke(applicationMessage);
_clientSessionsManager.DispatchApplicationMessage(null, applicationMessage);
}

public async Task StartAsync()


+ 2
- 0
MQTTnet.Core/Server/MqttServerOptions.cs View File

@@ -16,6 +16,8 @@ namespace MQTTnet.Core.Server

public Func<MqttConnectPacket, MqttConnectReturnCode> ConnectionValidator { get; set; }

public Action<MqttApplicationMessage> ApplicationMessageInterceptor { get; set; }

public IMqttServerStorage Storage { get; set; }
}
}

+ 1
- 0
README.md View File

@@ -36,6 +36,7 @@ MQTTnet is a high performance .NET library for MQTT based communication. It prov
* Extensible client credential validation
* Retained messages are supported including persisting via interface methods (own implementation required)
* WebSockets supported (via ASP.NET Core 2.0)
* A custom message interceptor can be added which allows transforming or extending every received application message

# Supported frameworks
* .NET Standard 1.3+


+ 14
- 10
Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs View File

@@ -53,11 +53,13 @@ namespace MQTTnet.Core.Tests
Username = "USER",
KeepAlivePeriod = 123,
CleanSession = true,
WillMessage = new MqttApplicationMessage(
"My/last/will",
Encoding.UTF8.GetBytes("Good byte."),
MqttQualityOfServiceLevel.AtLeastOnce,
true)
WillMessage = new MqttApplicationMessage
{
Topic = "My/last/will",
Payload = Encoding.UTF8.GetBytes("Good byte."),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
Retain = true
}
};

SerializeAndCompare(p, "EDUABE1RVFQE7gB7AANYWVoADE15L2xhc3Qvd2lsbAAKR29vZCBieXRlLgAEVVNFUgAEUEFTUw==");
@@ -88,11 +90,13 @@ namespace MQTTnet.Core.Tests
Username = "USER",
KeepAlivePeriod = 123,
CleanSession = true,
WillMessage = new MqttApplicationMessage(
"My/last/will",
Encoding.UTF8.GetBytes("Good byte."),
MqttQualityOfServiceLevel.AtLeastOnce,
true)
WillMessage = new MqttApplicationMessage
{
Topic = "My/last/will",
Payload = Encoding.UTF8.GetBytes("Good byte."),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
Retain = true
}
};

DeserializeAndCompare(p, "EDUABE1RVFQE7gB7AANYWVoADE15L2xhc3Qvd2lsbAAKR29vZCBieXRlLgAEVVNFUgAEUEFTUw==");


+ 6
- 6
Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs View File

@@ -151,12 +151,12 @@ namespace MQTTnet.TestApp.NetCore

private static MqttApplicationMessage CreateMessage()
{
return new MqttApplicationMessage(
"A/B/C",
Encoding.UTF8.GetBytes("Hello World"),
MqttQualityOfServiceLevel.AtMostOnce,
false
);
return new MqttApplicationMessage
{
Topic = "A/B/C",
Payload = Encoding.UTF8.GetBytes("Hello World"),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce
};
}

private static Task PublishSingleMessage(IMqttClient client, MqttApplicationMessage applicationMessage, ref int count)


+ 7
- 67
Tests/MQTTnet.TestApp.NetCore/Program.cs View File

@@ -32,7 +32,7 @@ namespace MQTTnet.TestApp.NetCore
}
else if (pressedKey.KeyChar == '2')
{
Task.Run(RunServerAsync);
Task.Run(ServerTest.RunAsync);
}
else if (pressedKey.KeyChar == '3')
{
@@ -119,12 +119,12 @@ namespace MQTTnet.TestApp.NetCore
{
Console.ReadLine();

var applicationMessage = new MqttApplicationMessage(
"A/B/C",
Encoding.UTF8.GetBytes("Hello World"),
MqttQualityOfServiceLevel.AtLeastOnce,
false
);
var applicationMessage = new MqttApplicationMessage
{
Topic = "A/B/C",
Payload = Encoding.UTF8.GetBytes("Hello World"),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce
};

await client.PublishAsync(applicationMessage);
}
@@ -135,66 +135,6 @@ namespace MQTTnet.TestApp.NetCore
}
}

private static Task RunServerAsync()
{
MqttNetTrace.TraceMessagePublished += (s, e) =>
{
Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}");
if (e.TraceMessage.Exception != null)
{
Console.WriteLine(e.TraceMessage.Exception);
}
};

try
{
var options = new MqttServerOptions
{
ConnectionValidator = p =>
{
if (p.ClientId == "SpecialClient")
{
if (p.Username != "USER" || p.Password != "PASS")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
}

return MqttConnectReturnCode.ConnectionAccepted;
}
};

options.Storage = new RetainedMessageHandler();


//var certificate = new X509Certificate(@"C:\certs\test\test.cer", "");
//options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert);
//options.ConnectionBacklog = 5;
//options.DefaultEndpointOptions.IsEnabled = true;
//options.TlsEndpointOptions.IsEnabled = false;

var mqttServer = new MqttServerFactory().CreateMqttServer(options);
mqttServer.ClientDisconnected += (s, e) =>
{
Console.Write("Client disconnected event fired.");
};

mqttServer.StartAsync();

Console.WriteLine("Press any key to exit.");
Console.ReadLine();

mqttServer.StopAsync();
}
catch (Exception e)
{
Console.WriteLine(e);
}

Console.ReadLine();
return Task.FromResult(0);
}

// ReSharper disable once UnusedMember.Local
private static async void WikiCode()
{


+ 81
- 0
Tests/MQTTnet.TestApp.NetCore/ServerTest.cs View File

@@ -0,0 +1,81 @@
using System;
using System.Text;
using System.Threading.Tasks;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;

namespace MQTTnet.TestApp.NetCore
{
public static class ServerTest
{
public static Task RunAsync()
{
MqttNetTrace.TraceMessagePublished += (s, e) =>
{
Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}");
if (e.TraceMessage.Exception != null)
{
Console.WriteLine(e.TraceMessage.Exception);
}
};

try
{
var options = new MqttServerOptions
{
ConnectionValidator = p =>
{
if (p.ClientId == "SpecialClient")
{
if (p.Username != "USER" || p.Password != "PASS")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
}

return MqttConnectReturnCode.ConnectionAccepted;
}
};

options.Storage = new RetainedMessageHandler();

options.ApplicationMessageInterceptor = message =>
{
if (MqttTopicFilterComparer.IsMatch(message.Topic, "/myTopic/WithTimestamp/#"))
{
// Replace the payload with the timestamp. But also extending a JSON
// based payload with the timestamp is a suitable use case.
message.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
}
};

//var certificate = new X509Certificate(@"C:\certs\test\test.cer", "");
//options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert);
//options.ConnectionBacklog = 5;
//options.DefaultEndpointOptions.IsEnabled = true;
//options.TlsEndpointOptions.IsEnabled = false;

var mqttServer = new MqttServerFactory().CreateMqttServer(options);
mqttServer.ClientDisconnected += (s, e) =>
{
Console.Write("Client disconnected event fired.");
};

mqttServer.StartAsync();

Console.WriteLine("Press any key to exit.");
Console.ReadLine();

mqttServer.StopAsync();
}
catch (Exception e)
{
Console.WriteLine(e);
}

Console.ReadLine();
return Task.FromResult(0);
}
}
}

+ 6
- 5
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs View File

@@ -119,11 +119,12 @@ namespace MQTTnet.TestApp.UniversalWindows
payload = Convert.FromBase64String(Payload.Text);
}

var message = new MqttApplicationMessage(
Topic.Text,
payload,
qos,
Retain.IsChecked == true);
var message = new MqttApplicationMessageBuilder()
.WithTopic(Topic.Text)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.WithRetainFlag(Retain.IsChecked == true)
.Build();

await _mqttClient.PublishAsync(message);
}


Loading…
Cancel
Save