From 98d7bc980c54c03d2cc6f39805e63e9508c22892 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 13 Jul 2017 20:28:49 +0200 Subject: [PATCH] Add an event to receive all application messages at server level. --- ...MqttApplicationMessageReceivedEventArgs.cs | 2 +- MQTTnet.Core/Client/MqttClient.cs | 8 +-- .../MqttApplicationMessageExtensions.cs | 10 +++ ...MqttApplicationMessageReceivedEventArgs.cs | 17 +++++ MQTTnet.Core/Server/MqttClientSession.cs | 7 ++- .../Server/MqttClientSessionsManager.cs | 8 ++- MQTTnet.Core/Server/MqttServer.cs | 3 + MQTTnet.nuspec | 63 +++++++++++++++++++ MQTTnet.sln | 4 +- Build/build.ps1 => build.ps1 | 2 +- 10 files changed, 110 insertions(+), 14 deletions(-) rename MQTTnet.Core/{ => Client}/MqttApplicationMessageReceivedEventArgs.cs (93%) create mode 100644 MQTTnet.Core/Server/MqttApplicationMessageReceivedEventArgs.cs create mode 100644 MQTTnet.nuspec rename Build/build.ps1 => build.ps1 (86%) diff --git a/MQTTnet.Core/MqttApplicationMessageReceivedEventArgs.cs b/MQTTnet.Core/Client/MqttApplicationMessageReceivedEventArgs.cs similarity index 93% rename from MQTTnet.Core/MqttApplicationMessageReceivedEventArgs.cs rename to MQTTnet.Core/Client/MqttApplicationMessageReceivedEventArgs.cs index c624bea..9e12028 100644 --- a/MQTTnet.Core/MqttApplicationMessageReceivedEventArgs.cs +++ b/MQTTnet.Core/Client/MqttApplicationMessageReceivedEventArgs.cs @@ -1,6 +1,6 @@ using System; -namespace MQTTnet.Core +namespace MQTTnet.Core.Client { public sealed class MqttApplicationMessageReceivedEventArgs : EventArgs { diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index cc3ebee..ebf2cfd 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -234,13 +234,7 @@ namespace MQTTnet.Core.Client _processedPublishPackets.Add(publishPacket.PacketIdentifier); } - var applicationMessage = new MqttApplicationMessage( - publishPacket.Topic, - publishPacket.Payload, - publishPacket.QualityOfServiceLevel, - publishPacket.Retain - ); - + var applicationMessage = publishPacket.ToApplicationMessage(); ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage)); } diff --git a/MQTTnet.Core/Internal/MqttApplicationMessageExtensions.cs b/MQTTnet.Core/Internal/MqttApplicationMessageExtensions.cs index 71cdb7f..7d3c665 100644 --- a/MQTTnet.Core/Internal/MqttApplicationMessageExtensions.cs +++ b/MQTTnet.Core/Internal/MqttApplicationMessageExtensions.cs @@ -4,6 +4,16 @@ namespace MQTTnet.Core.Internal { internal static class MqttApplicationMessageExtensions { + public static MqttApplicationMessage ToApplicationMessage(this MqttPublishPacket publishPacket) + { + return new MqttApplicationMessage( + publishPacket.Topic, + publishPacket.Payload, + publishPacket.QualityOfServiceLevel, + publishPacket.Retain + ); + } + public static MqttPublishPacket ToPublishPacket(this MqttApplicationMessage applicationMessage) { if (applicationMessage == null) diff --git a/MQTTnet.Core/Server/MqttApplicationMessageReceivedEventArgs.cs b/MQTTnet.Core/Server/MqttApplicationMessageReceivedEventArgs.cs new file mode 100644 index 0000000..4f2cc80 --- /dev/null +++ b/MQTTnet.Core/Server/MqttApplicationMessageReceivedEventArgs.cs @@ -0,0 +1,17 @@ +using System; + +namespace MQTTnet.Core.Server +{ + public sealed class MqttApplicationMessageReceivedEventArgs : EventArgs + { + public MqttApplicationMessageReceivedEventArgs(string clientId, MqttApplicationMessage applicationMessage) + { + ClientId = clientId; + ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); + } + + public string ClientId { get; } + + public MqttApplicationMessage ApplicationMessage { get; } + } +} diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 19b4cfd..b89e73e 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -19,20 +19,23 @@ namespace MQTTnet.Core.Server private readonly MqttClientMessageQueue _messageQueue; private readonly Action _publishPacketReceivedCallback; private readonly MqttServerOptions _options; - + private CancellationTokenSource _cancellationTokenSource; private IMqttCommunicationAdapter _adapter; private string _identifier; private MqttApplicationMessage _willApplicationMessage; - public MqttClientSession(MqttServerOptions options, Action publishPacketReceivedCallback) + public MqttClientSession(string clientId, MqttServerOptions options, Action publishPacketReceivedCallback) { + ClientId = clientId; _options = options ?? throw new ArgumentNullException(nameof(options)); _publishPacketReceivedCallback = publishPacketReceivedCallback ?? throw new ArgumentNullException(nameof(publishPacketReceivedCallback)); _messageQueue = new MqttClientMessageQueue(options); } + public string ClientId { get; } + public bool IsConnected => _adapter != null; public async Task RunAsync(string identifier, MqttApplicationMessage willApplicationMessage, IMqttCommunicationAdapter adapter) diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index 2a9a0b2..ae4d865 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; @@ -21,6 +22,8 @@ namespace MQTTnet.Core.Server _options = options ?? throw new ArgumentNullException(nameof(options)); } + public event EventHandler ApplicationMessageReceived; + public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs) { try @@ -114,7 +117,7 @@ namespace MQTTnet.Core.Server { isExistingSession = false; - clientSession = new MqttClientSession(_options, DispatchPublishPacket); + clientSession = new MqttClientSession(connectPacket.ClientId, _options, DispatchPublishPacket); _clientSessions[connectPacket.ClientId] = clientSession; MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Created a new session for client '{connectPacket.ClientId}'."); @@ -126,6 +129,9 @@ namespace MQTTnet.Core.Server private void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) { + var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession.ClientId, publishPacket.ToApplicationMessage()); + ApplicationMessageReceived?.Invoke(this, eventArgs); + foreach (var clientSession in _clientSessions.Values.ToList()) { clientSession.EnqueuePublishPacket(senderClientSession, publishPacket); diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index e1cda96..f7f7932 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -21,6 +21,7 @@ namespace MQTTnet.Core.Server _adapters = adapters ?? throw new ArgumentNullException(nameof(adapters)); _clientSessionsManager = new MqttClientSessionsManager(options); + _clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); } public IList GetConnectedClients() @@ -30,6 +31,8 @@ namespace MQTTnet.Core.Server public event EventHandler ClientConnected; + public event EventHandler ApplicationMessageReceived; + public void InjectClient(string identifier, IMqttCommunicationAdapter adapter) { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); diff --git a/MQTTnet.nuspec b/MQTTnet.nuspec new file mode 100644 index 0000000..28f8ee3 --- /dev/null +++ b/MQTTnet.nuspec @@ -0,0 +1,63 @@ + + + + MQTTnet + 2.1.4.0 + Christian Kratky + Christian Kratky + https://github.com/chkr1011/MQTTnet/blob/master/LICENSE + https://github.com/chkr1011/MQTTnet + https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png + false + MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). + * [.NET Standard] Added support for .NET Standard 1.3 (Thanks to 1iveowl) +* [Server] Added an event to receive every application message + Copyright Christian Kratky 2016-2017 + MQTT MQTTClient MQTTServer MQTTBroker Broker + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/MQTTnet.sln b/MQTTnet.sln index f6a4ad0..17e73d0 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -17,8 +17,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core", "MQTTnet.Cor EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{002203AF-2565-4C0D-95ED-027FDEFE0C35}" ProjectSection(SolutionItems) = preProject - Build\build.ps1 = Build\build.ps1 - Build\MQTTnet.nuspec = Build\MQTTnet.nuspec + build.ps1 = build.ps1 + MQTTnet.nuspec = MQTTnet.nuspec EndProjectSection EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp.NetFramework", "Tests\MQTTnet.TestApp.NetFramework\MQTTnet.TestApp.NetFramework.csproj", "{D9D74F33-6943-49B2-B765-7BD589082098}" diff --git a/Build/build.ps1 b/build.ps1 similarity index 86% rename from Build/build.ps1 rename to build.ps1 index 322a2c8..e30c896 100644 --- a/Build/build.ps1 +++ b/build.ps1 @@ -11,4 +11,4 @@ $msbuild = "C:\Program Files (x86)\Microsoft Visual Studio\2017\Enterprise\MSBui Remove-Item .\NuGet -Force -Recurse New-Item -ItemType Directory -Force -Path .\NuGet -NuGet.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $version \ No newline at end of file +.\NuGet.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $version \ No newline at end of file