From 87086094981bdaaea389792a062360b943e69287 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 6 Mar 2019 22:42:00 +0100 Subject: [PATCH] Add more function mappings for MQTTnet Server. --- .../MqttConnectionHandler.cs | 6 +- .../MqttWebSocketServerAdapter.cs | 4 +- .../IManagedMqttClient.cs | 7 +- .../ManagedMqttClient.cs | 51 ++++------- .../ManagedMqttClientExtensions.cs | 12 +-- .../MQTTnet.Extensions.Rpc/MqttRpcClient.cs | 11 ++- .../MQTTnet.Server/Mqtt/CustomMqttFactory.cs | 32 +++++++ .../Mqtt/MqttApplicationMessageInterceptor.cs | 20 ++++- .../Mqtt/MqttClientConnectedHandler.cs | 39 +++++++++ .../Mqtt/MqttClientDisconnectedHandler.cs | 40 +++++++++ .../Mqtt/MqttClientSubscribedTopicHandler.cs | 41 +++++++++ .../MqttClientUnsubscribedTopicHandler.cs | 40 +++++++++ .../Mqtt/MqttConnectionValidator.cs | 2 +- .../MQTTnet.Server/Mqtt/MqttServerService.cs | 87 ++++++++++++------- .../Mqtt/MqttSubscriptionInterceptor.cs | 2 +- Source/MQTTnet.Server/Program.cs | 7 +- .../Scripting/PythonScriptHostService.cs | 6 +- Source/MQTTnet.Server/Scripts/00_sample.py | 33 +++++++ Source/MQTTnet.Server/Startup.cs | 12 ++- Source/MQTTnet/Adapter/IMqttServerAdapter.cs | 2 +- Source/MQTTnet/Client/IMqttClient.cs | 8 +- Source/MQTTnet/Client/MqttClient.cs | 19 +--- Source/MQTTnet/Client/MqttClientExtensions.cs | 16 ++-- .../IMqttApplicationMessageHandler.cs | 2 +- .../MqttApplicationMessageHandlerContext.cs | 15 ---- .../MqttApplicationMessageHandlerDelegate.cs | 8 +- Source/MQTTnet/IApplicationMessageReceiver.cs | 8 +- .../MqttTcpServerAdapter.Uwp.cs | 4 +- .../Implementations/MqttTcpServerAdapter.cs | 18 ++-- .../Implementations/MqttTcpServerListener.cs | 4 +- Source/MQTTnet/Server/IMqttServer.cs | 11 +-- ...IMqttServerClientSubscribedTopicHandler.cs | 9 ++ ...qttServerClientUnsubscribedTopicHandler.cs | 9 ++ .../Server/MqttClientSessionsManager.cs | 6 +- .../Server/MqttClientSubscriptionsManager.cs | 15 ++-- Source/MQTTnet/Server/MqttServer.cs | 60 ++++++------- ...qttServerClientConnectedHandlerDelegate.cs | 4 +- ...ServerClientDisconnectedHandlerDelegate.cs | 4 +- ...ttServerClientSubscribedTopicEventArgs.cs} | 4 +- ...verClientSubscribedTopicHandlerDelegate.cs | 31 +++++++ ...ServerClientUnsubscribedTopicEventArgs.cs} | 4 +- ...rClientUnsubscribedTopicHandlerDelegate.cs | 31 +++++++ .../Server/MqttServerEventDispatcher.cs | 63 ++++++++++---- .../MqttTcpChannelBenchmark.cs | 3 +- .../MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs | 5 +- .../Mockups/TestMqttServerAdapter.cs | 4 +- .../Mockups/TestServerExtensions.cs | 38 -------- Tests/MQTTnet.Core.Tests/MqttClientTests.cs | 37 ++++---- .../MQTTnet.Core.Tests/RoundtripTimeTests.cs | 5 +- Tests/MQTTnet.Core.Tests/Server_Tests.cs | 65 +++++++------- Tests/MQTTnet.TestApp.NetCore/ClientTest.cs | 15 ++-- .../ManagedClientTest.cs | 5 +- .../PublicBrokerTest.cs | 3 +- Tests/MQTTnet.TestApp.NetCore/ServerTest.cs | 9 +- .../MainPage.xaml.cs | 27 +++--- 55 files changed, 662 insertions(+), 361 deletions(-) create mode 100644 Source/MQTTnet.Server/Mqtt/CustomMqttFactory.cs create mode 100644 Source/MQTTnet.Server/Mqtt/MqttClientConnectedHandler.cs create mode 100644 Source/MQTTnet.Server/Mqtt/MqttClientDisconnectedHandler.cs create mode 100644 Source/MQTTnet.Server/Mqtt/MqttClientSubscribedTopicHandler.cs create mode 100644 Source/MQTTnet.Server/Mqtt/MqttClientUnsubscribedTopicHandler.cs delete mode 100644 Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerContext.cs create mode 100644 Source/MQTTnet/Server/IMqttServerClientSubscribedTopicHandler.cs create mode 100644 Source/MQTTnet/Server/IMqttServerClientUnsubscribedTopicHandler.cs rename Source/MQTTnet/Server/{MqttClientSubscribedTopicEventArgs.cs => MqttServerClientSubscribedTopicEventArgs.cs} (67%) create mode 100644 Source/MQTTnet/Server/MqttServerClientSubscribedTopicHandlerDelegate.cs rename Source/MQTTnet/Server/{MqttClientUnSubscribedTopicEventArgs.cs => MqttServerClientUnsubscribedTopicEventArgs.cs} (67%) create mode 100644 Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicHandlerDelegate.cs delete mode 100644 Tests/MQTTnet.Core.Tests/Mockups/TestServerExtensions.cs diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs index 0c09a7a..ec2528f 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs @@ -10,7 +10,7 @@ namespace MQTTnet.AspNetCore { public class MqttConnectionHandler : ConnectionHandler, IMqttServerAdapter { - public event EventHandler ClientAccepted; + public Action ClientAcceptedHandler { get; set; } public override async Task OnConnectedAsync(ConnectionContext connection) { @@ -24,9 +24,9 @@ namespace MQTTnet.AspNetCore using (var adapter = new MqttConnectionContext(new MqttPacketFormatterAdapter(), connection)) { var args = new MqttServerAdapterClientAcceptedEventArgs(adapter); - ClientAccepted?.Invoke(this, args); + ClientAcceptedHandler?.Invoke(args); - await args.SessionTask; + await args.SessionTask.ConfigureAwait(false); } } diff --git a/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs b/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index 41039b5..ac77f16 100644 --- a/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs +++ b/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -11,7 +11,7 @@ namespace MQTTnet.AspNetCore { public class MqttWebSocketServerAdapter : IMqttServerAdapter { - public event EventHandler ClientAccepted; + public Action ClientAcceptedHandler { get; set; } public Task StartAsync(IMqttServerOptions options) { @@ -30,7 +30,7 @@ namespace MQTTnet.AspNetCore var clientAdapter = new MqttChannelAdapter(new MqttWebSocketChannel(webSocket, endpoint), new MqttPacketFormatterAdapter(), new MqttNetLogger().CreateChildLogger(nameof(MqttWebSocketServerAdapter))); var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter); - ClientAccepted?.Invoke(this, eventArgs); + ClientAcceptedHandler?.Invoke(eventArgs); if (eventArgs.SessionTask != null) { diff --git a/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs index 644104f..6de908f 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs @@ -14,13 +14,8 @@ namespace MQTTnet.Extensions.ManagedClient IManagedMqttClientOptions Options { get; } IMqttClientConnectedHandler ConnectedHandler { get; set; } - [Obsolete("Use ConnectedHandler instead.")] - event EventHandler Connected; - IMqttClientDisconnectedHandler DisconnectedHandler { get; set; } - [Obsolete("Use DisconnectedHandler instead.")] - event EventHandler Disconnected; - + event EventHandler ApplicationMessageProcessed; event EventHandler ApplicationMessageSkipped; diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 54750a1..51e8acc 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -13,8 +13,6 @@ using MQTTnet.Exceptions; using MQTTnet.Internal; using MQTTnet.Protocol; using MQTTnet.Server; -using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs; -using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs; namespace MQTTnet.Extensions.ManagedClient { @@ -36,14 +34,9 @@ namespace MQTTnet.Extensions.ManagedClient public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) { - if (logger == null) throw new ArgumentNullException(nameof(logger)); - _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); - _mqttClient.UseConnectedHandler(OnConnected); - _mqttClient.UseDisconnectedHandler(OnDisconnected); - _mqttClient.UseReceivedApplicationMessageHandler(OnApplicationMessageReceived); - + if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(ManagedMqttClient)); } @@ -52,20 +45,24 @@ namespace MQTTnet.Extensions.ManagedClient public int PendingApplicationMessagesCount => _messageQueue.Count; public IManagedMqttClientOptions Options { get; private set; } - public IMqttClientConnectedHandler ConnectedHandler { get; set; } - public event EventHandler Connected; - - public IMqttClientDisconnectedHandler DisconnectedHandler { get; set; } - public event EventHandler Disconnected; + public IMqttClientConnectedHandler ConnectedHandler + { + get => _mqttClient.ConnectedHandler; + set => _mqttClient.ConnectedHandler = value; + } + public IMqttClientDisconnectedHandler DisconnectedHandler + { + get => _mqttClient.DisconnectedHandler; + set => _mqttClient.DisconnectedHandler = value; + } - public IMqttApplicationMessageHandler ReceivedApplicationMessageHandler + public IMqttApplicationMessageHandler ApplicationMessageReceivedHandler { - get => _mqttClient.ReceivedApplicationMessageHandler; - set => _mqttClient.ReceivedApplicationMessageHandler = value; + get => _mqttClient.ApplicationMessageReceivedHandler; + set => _mqttClient.ApplicationMessageReceivedHandler = value; } - public event EventHandler ApplicationMessageReceived; public event EventHandler ApplicationMessageProcessed; public event EventHandler ApplicationMessageSkipped; @@ -420,25 +417,7 @@ namespace MQTTnet.Extensions.ManagedClient return ReconnectionResult.NotConnected; } } - - private Task OnApplicationMessageReceived(MqttApplicationMessageHandlerContext context) - { - ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(context.SenderClientId, context.ApplicationMessage)); - return Task.FromResult(0); - } - - private Task OnDisconnected(MqttClientDisconnectedEventArgs eventArgs) - { - Disconnected?.Invoke(this, eventArgs); - return DisconnectedHandler?.HandleDisconnectedAsync(eventArgs); - } - - private Task OnConnected(MqttClientConnectedEventArgs eventArgs) - { - Connected?.Invoke(this, eventArgs); - return ConnectedHandler?.HandleConnectedAsync(eventArgs); - } - + private void StartPublishing() { if (_publishingCancellationToken != null) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs index 811368f..518042c 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs @@ -60,28 +60,28 @@ namespace MQTTnet.Extensions.ManagedClient return client; } - public static IManagedMqttClient UseReceivedApplicationMessageHandler(this IManagedMqttClient client, Func handler) + public static IManagedMqttClient UseApplicationMessageReceivedHandler(this IManagedMqttClient client, Func handler) { if (handler == null) { - client.ReceivedApplicationMessageHandler = null; + client.ApplicationMessageReceivedHandler = null; return client; } - client.ReceivedApplicationMessageHandler = new MqttApplicationMessageHandlerDelegate(handler); + client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(handler); return client; } - public static IManagedMqttClient UseReceivedApplicationMessageHandler(this IManagedMqttClient client, Action handler) + public static IManagedMqttClient UseApplicationMessageReceivedHandler(this IManagedMqttClient client, Action handler) { if (handler == null) { - client.ReceivedApplicationMessageHandler = null; + client.ApplicationMessageReceivedHandler = null; return client; } - client.ReceivedApplicationMessageHandler = new MqttApplicationMessageHandlerDelegate(handler); + client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(handler); return client; } diff --git a/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs b/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs index ebf66ec..69d3ac8 100644 --- a/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs +++ b/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs @@ -4,6 +4,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; using MQTTnet.Client; +using MQTTnet.Client.Receiving; using MQTTnet.Exceptions; using MQTTnet.Protocol; @@ -18,7 +19,7 @@ namespace MQTTnet.Extensions.Rpc { _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); - _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; + _mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(HandleReceivedApplicationMessageAsync); } public Task ExecuteAsync(TimeSpan timeout, string methodName, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel) @@ -102,19 +103,21 @@ namespace MQTTnet.Extensions.Rpc } } - private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs) + private Task HandleReceivedApplicationMessageAsync(MqttApplicationMessageReceivedEventArgs eventArgs) { if (!_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out var tcs)) { - return; + return Task.FromResult(0); } if (tcs.Task.IsCompleted || tcs.Task.IsCanceled) { - return; + return Task.FromResult(0); } tcs.TrySetResult(eventArgs.ApplicationMessage.Payload); + + return Task.FromResult(0); } public void Dispose() diff --git a/Source/MQTTnet.Server/Mqtt/CustomMqttFactory.cs b/Source/MQTTnet.Server/Mqtt/CustomMqttFactory.cs new file mode 100644 index 0000000..156871c --- /dev/null +++ b/Source/MQTTnet.Server/Mqtt/CustomMqttFactory.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using Microsoft.Extensions.Logging; +using MQTTnet.Adapter; +using MQTTnet.Diagnostics; +using MQTTnet.Server.Logging; + +namespace MQTTnet.Server.Mqtt +{ + public class CustomMqttFactory + { + private readonly MqttFactory _mqttFactory; + + public CustomMqttFactory(ILogger logger) + { + if (logger == null) throw new ArgumentNullException(nameof(logger)); + + Logger = new MqttNetLoggerWrapper(logger); + + _mqttFactory = new MqttFactory(Logger); + } + + public IMqttNetLogger Logger { get; } + + public IMqttServer CreateMqttServer(List adapters) + { + if (adapters == null) throw new ArgumentNullException(nameof(adapters)); + + return _mqttFactory.CreateMqttServer(adapters); + } + } +} diff --git a/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs b/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs index 4af2a55..ab3d589 100644 --- a/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs +++ b/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs @@ -10,7 +10,7 @@ namespace MQTTnet.Server.Mqtt public class MqttApplicationMessageInterceptor : IMqttServerApplicationMessageInterceptor { private readonly PythonScriptHostService _pythonScriptHostService; - private readonly ILogger _logger; + private readonly ILogger _logger; public MqttApplicationMessageInterceptor(PythonScriptHostService pythonScriptHostService, ILogger logger) { @@ -31,7 +31,7 @@ namespace MQTTnet.Server.Mqtt { "qos", (int)context.ApplicationMessage.QualityOfServiceLevel }, { "retain", context.ApplicationMessage.Retain } }; - + _pythonScriptHostService.InvokeOptionalFunction("on_intercept_application_message", pythonContext); context.AcceptPublish = (bool)pythonContext.get("accept_publish", context.AcceptPublish); @@ -46,5 +46,21 @@ namespace MQTTnet.Server.Mqtt return Task.CompletedTask; } + + // TODO: Create dump(object) method in wrapper (creates JSON and prints it). + public class PythonMqttApplicationMessageInterceptorContext + { + public bool accept_connection; + + public bool accept_publish; + + public string client_id; + + public string topic; + + public int qos; + + public bool retain; + } } } diff --git a/Source/MQTTnet.Server/Mqtt/MqttClientConnectedHandler.cs b/Source/MQTTnet.Server/Mqtt/MqttClientConnectedHandler.cs new file mode 100644 index 0000000..8d3879e --- /dev/null +++ b/Source/MQTTnet.Server/Mqtt/MqttClientConnectedHandler.cs @@ -0,0 +1,39 @@ +using System; +using System.Threading.Tasks; +using IronPython.Runtime; +using Microsoft.Extensions.Logging; +using MQTTnet.Server.Scripting; + +namespace MQTTnet.Server.Mqtt +{ + public class MqttClientConnectedHandler : IMqttServerClientConnectedHandler + { + private readonly PythonScriptHostService _pythonScriptHostService; + private readonly ILogger _logger; + + public MqttClientConnectedHandler(PythonScriptHostService pythonScriptHostService, ILogger logger) + { + _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public Task HandleClientConnectedAsync(MqttServerClientConnectedEventArgs eventArgs) + { + try + { + var pythonEventArgs = new PythonDictionary + { + { "client_id", eventArgs.ClientId } + }; + + _pythonScriptHostService.InvokeOptionalFunction("on_client_connected", pythonEventArgs); + } + catch (Exception exception) + { + _logger.LogError(exception, "Error while handling client connected event."); + } + + return Task.CompletedTask; + } + } +} diff --git a/Source/MQTTnet.Server/Mqtt/MqttClientDisconnectedHandler.cs b/Source/MQTTnet.Server/Mqtt/MqttClientDisconnectedHandler.cs new file mode 100644 index 0000000..f7b364e --- /dev/null +++ b/Source/MQTTnet.Server/Mqtt/MqttClientDisconnectedHandler.cs @@ -0,0 +1,40 @@ +using System; +using System.Threading.Tasks; +using IronPython.Runtime; +using Microsoft.Extensions.Logging; +using MQTTnet.Server.Scripting; + +namespace MQTTnet.Server.Mqtt +{ + public class MqttClientDisconnectedHandler : IMqttServerClientDisconnectedHandler + { + private readonly PythonScriptHostService _pythonScriptHostService; + private readonly ILogger _logger; + + public MqttClientDisconnectedHandler(PythonScriptHostService pythonScriptHostService, ILogger logger) + { + _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public Task HandleClientDisconnectedAsync(MqttServerClientDisconnectedEventArgs eventArgs) + { + try + { + var pythonEventArgs = new PythonDictionary + { + { "client_id", eventArgs.ClientId }, + { "type", PythonConvert.Pythonfy(eventArgs.DisconnectType) } + }; + + _pythonScriptHostService.InvokeOptionalFunction("on_client_disconnected", pythonEventArgs); + } + catch (Exception exception) + { + _logger.LogError(exception, "Error while handling client disconnected event."); + } + + return Task.CompletedTask; + } + } +} diff --git a/Source/MQTTnet.Server/Mqtt/MqttClientSubscribedTopicHandler.cs b/Source/MQTTnet.Server/Mqtt/MqttClientSubscribedTopicHandler.cs new file mode 100644 index 0000000..6d90b44 --- /dev/null +++ b/Source/MQTTnet.Server/Mqtt/MqttClientSubscribedTopicHandler.cs @@ -0,0 +1,41 @@ +using System; +using System.Threading.Tasks; +using IronPython.Runtime; +using Microsoft.Extensions.Logging; +using MQTTnet.Server.Scripting; + +namespace MQTTnet.Server.Mqtt +{ + public class MqttClientSubscribedTopicHandler : IMqttServerClientSubscribedTopicHandler + { + private readonly PythonScriptHostService _pythonScriptHostService; + private readonly ILogger _logger; + + public MqttClientSubscribedTopicHandler(PythonScriptHostService pythonScriptHostService, ILogger logger) + { + _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public Task HandleClientSubscribedTopicAsync(MqttServerClientSubscribedTopicEventArgs eventArgs) + { + try + { + var pythonEventArgs = new PythonDictionary + { + { "client_id", eventArgs.ClientId }, + { "topic", eventArgs.TopicFilter.Topic }, + { "qos", (int)eventArgs.TopicFilter.QualityOfServiceLevel } + }; + + _pythonScriptHostService.InvokeOptionalFunction("on_client_subscribed_topic", pythonEventArgs); + } + catch (Exception exception) + { + _logger.LogError(exception, "Error while handling client subscribed topic event."); + } + + return Task.CompletedTask; + } + } +} diff --git a/Source/MQTTnet.Server/Mqtt/MqttClientUnsubscribedTopicHandler.cs b/Source/MQTTnet.Server/Mqtt/MqttClientUnsubscribedTopicHandler.cs new file mode 100644 index 0000000..ac42f9e --- /dev/null +++ b/Source/MQTTnet.Server/Mqtt/MqttClientUnsubscribedTopicHandler.cs @@ -0,0 +1,40 @@ +using System; +using System.Threading.Tasks; +using IronPython.Runtime; +using Microsoft.Extensions.Logging; +using MQTTnet.Server.Scripting; + +namespace MQTTnet.Server.Mqtt +{ + public class MqttClientUnsubscribedTopicHandler : IMqttServerClientUnsubscribedTopicHandler + { + private readonly PythonScriptHostService _pythonScriptHostService; + private readonly ILogger _logger; + + public MqttClientUnsubscribedTopicHandler(PythonScriptHostService pythonScriptHostService, ILogger logger) + { + _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public Task HandleClientUnsubscribedTopicAsync(MqttServerClientUnsubscribedTopicEventArgs eventArgs) + { + try + { + var pythonEventArgs = new PythonDictionary + { + { "client_id", eventArgs.ClientId }, + { "topic", eventArgs.TopicFilter } + }; + + _pythonScriptHostService.InvokeOptionalFunction("on_client_unsubscribed_topic", pythonEventArgs); + } + catch (Exception exception) + { + _logger.LogError(exception, "Error while handling client unsubscribed topic event."); + } + + return Task.CompletedTask; + } + } +} diff --git a/Source/MQTTnet.Server/Mqtt/MqttConnectionValidator.cs b/Source/MQTTnet.Server/Mqtt/MqttConnectionValidator.cs index 0471b47..d085230 100644 --- a/Source/MQTTnet.Server/Mqtt/MqttConnectionValidator.cs +++ b/Source/MQTTnet.Server/Mqtt/MqttConnectionValidator.cs @@ -10,7 +10,7 @@ namespace MQTTnet.Server.Mqtt public class MqttConnectionValidator : IMqttServerConnectionValidator { private readonly PythonScriptHostService _pythonScriptHostService; - private readonly ILogger _logger; + private readonly ILogger _logger; public MqttConnectionValidator(PythonScriptHostService pythonScriptHostService, ILogger logger) { diff --git a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs index 6588ab5..39651ed 100644 --- a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs +++ b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs @@ -8,7 +8,6 @@ using MQTTnet.Adapter; using MQTTnet.AspNetCore; using MQTTnet.Implementations; using MQTTnet.Protocol; -using MQTTnet.Server.Logging; using MQTTnet.Server.Scripting; namespace MQTTnet.Server.Mqtt @@ -17,6 +16,10 @@ namespace MQTTnet.Server.Mqtt { private readonly ILogger _logger; + private readonly MqttClientConnectedHandler _mqttClientConnectedHandler; + private readonly MqttClientDisconnectedHandler _mqttClientDisconnectedHandler; + private readonly MqttClientSubscribedTopicHandler _mqttClientSubscribedTopicHandler; + private readonly MqttClientUnsubscribedTopicHandler _mqttClientUnsubscribedTopicHandler; private readonly MqttConnectionValidator _mqttConnectionValidator; private readonly MqttSubscriptionInterceptor _mqttSubscriptionInterceptor; private readonly MqttApplicationMessageInterceptor _mqttApplicationMessageInterceptor; @@ -25,15 +28,22 @@ namespace MQTTnet.Server.Mqtt private readonly IMqttServer _mqttServer; public MqttServerService( - IMqttServerFactory mqttServerFactory, + CustomMqttFactory mqttFactory, MqttWebSocketServerAdapter webSocketServerAdapter, - MqttNetLoggerWrapper mqttNetLogger, + MqttClientConnectedHandler mqttClientConnectedHandler, + MqttClientDisconnectedHandler mqttClientDisconnectedHandler, + MqttClientSubscribedTopicHandler mqttClientSubscribedTopicHandler, + MqttClientUnsubscribedTopicHandler mqttClientUnsubscribedTopicHandler, MqttConnectionValidator mqttConnectionValidator, MqttSubscriptionInterceptor mqttSubscriptionInterceptor, MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor, PythonScriptHostService pythonScriptHostService, ILogger logger) { + _mqttClientConnectedHandler = mqttClientConnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientConnectedHandler)); + _mqttClientDisconnectedHandler = mqttClientDisconnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientDisconnectedHandler)); + _mqttClientSubscribedTopicHandler = mqttClientSubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientSubscribedTopicHandler)); + _mqttClientUnsubscribedTopicHandler = mqttClientUnsubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientUnsubscribedTopicHandler)); _mqttConnectionValidator = mqttConnectionValidator ?? throw new ArgumentNullException(nameof(mqttConnectionValidator)); _mqttSubscriptionInterceptor = mqttSubscriptionInterceptor ?? throw new ArgumentNullException(nameof(mqttSubscriptionInterceptor)); _mqttApplicationMessageInterceptor = mqttApplicationMessageInterceptor ?? throw new ArgumentNullException(nameof(mqttApplicationMessageInterceptor)); @@ -42,11 +52,11 @@ namespace MQTTnet.Server.Mqtt var adapters = new List { - new MqttTcpServerAdapter(new MqttNetChildLoggerWrapper(null, mqttNetLogger)), + new MqttTcpServerAdapter(mqttFactory.Logger.CreateChildLogger(nameof(MqttTcpServerAdapter))), webSocketServerAdapter }; - _mqttServer = mqttServerFactory.CreateMqttServer(adapters); + _mqttServer = mqttFactory.CreateMqttServer(adapters); } public void Configure() @@ -61,6 +71,11 @@ namespace MQTTnet.Server.Mqtt .WithSubscriptionInterceptor(_mqttSubscriptionInterceptor) .Build(); + _mqttServer.ClientConnectedHandler = _mqttClientConnectedHandler; + _mqttServer.ClientDisconnectedHandler = _mqttClientDisconnectedHandler; + _mqttServer.ClientSubscribedTopicHandler = _mqttClientSubscribedTopicHandler; + _mqttServer.ClientUnsubscribedTopicHandler = _mqttClientUnsubscribedTopicHandler; + _mqttServer.StartAsync(options).GetAwaiter().GetResult(); _logger.LogInformation("MQTT server started."); @@ -68,34 +83,48 @@ namespace MQTTnet.Server.Mqtt private void Publish(PythonDictionary parameters) { - var applicationMessageBuilder = new MqttApplicationMessageBuilder() - .WithTopic((string)parameters.get("topic", null)) - .WithRetainFlag((bool)parameters.get("retain", false)) - .WithQualityOfServiceLevel((MqttQualityOfServiceLevel)(int)parameters.get("qos", 0)); - - var payload = parameters.get("payload", null); - var binaryPayload = new byte[0]; - - if (payload is string stringPayload) - { - binaryPayload = Encoding.UTF8.GetBytes(stringPayload); - } - else if (payload is ByteArray byteArray) + try { - binaryPayload = byteArray.ToArray(); + var applicationMessageBuilder = new MqttApplicationMessageBuilder() + .WithTopic((string)parameters.get("topic", null)) + .WithRetainFlag((bool)parameters.get("retain", false)) + .WithQualityOfServiceLevel((MqttQualityOfServiceLevel)(int)parameters.get("qos", 0)); + + var payload = parameters.get("payload", null); + byte[] binaryPayload; + + if (payload == null) + { + binaryPayload = new byte[0]; + } + else if (payload is string stringPayload) + { + binaryPayload = Encoding.UTF8.GetBytes(stringPayload); + } + else if (payload is ByteArray byteArray) + { + binaryPayload = byteArray.ToArray(); + } + else if (payload is IEnumerable intArray) + { + binaryPayload = intArray.Select(Convert.ToByte).ToArray(); + } + else + { + throw new NotSupportedException("Payload type not supported."); + } + + applicationMessageBuilder = applicationMessageBuilder + .WithPayload(binaryPayload); + + var applicationMessage = applicationMessageBuilder.Build(); + + _mqttServer.PublishAsync(applicationMessage).GetAwaiter().GetResult(); } - else if (payload is IEnumerable intArray) + catch (Exception exception) { - binaryPayload = intArray.Select(Convert.ToByte).ToArray(); + _logger.LogError(exception, "Error while publishing application message from server."); } - - applicationMessageBuilder = applicationMessageBuilder - .WithPayload(binaryPayload); - - var applicationMessage = applicationMessageBuilder.Build(); - - _mqttServer.PublishAsync(applicationMessage).GetAwaiter().GetResult(); - _logger.LogInformation($"Published topic '{applicationMessage.Topic}' from server."); } } } diff --git a/Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs b/Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs index 115e0ab..2d37f74 100644 --- a/Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs +++ b/Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs @@ -9,7 +9,7 @@ namespace MQTTnet.Server.Mqtt public class MqttSubscriptionInterceptor : IMqttServerSubscriptionInterceptor { private readonly PythonScriptHostService _pythonScriptHostService; - private readonly ILogger _logger; + private readonly ILogger _logger; public MqttSubscriptionInterceptor(PythonScriptHostService pythonScriptHostService, ILogger logger) { diff --git a/Source/MQTTnet.Server/Program.cs b/Source/MQTTnet.Server/Program.cs index a9b3be6..5d50030 100644 --- a/Source/MQTTnet.Server/Program.cs +++ b/Source/MQTTnet.Server/Program.cs @@ -48,8 +48,11 @@ namespace MQTTnet.Server Console.ForegroundColor = ConsoleColor.White; Console.WriteLine(@" - Version: 1.0.0-alpha1 - License: MIT (read LICENSE file) + Version: 1.0.0-alpha1 + License: MIT (read LICENSE file) + Sponsoring: https://opencollective.com/mqttnet + Support: https://github.com/chkr1011/MQTTnet/issues + Docs: https://github.com/chkr1011/MQTTnet/wiki/MQTTnetServer "); Console.BackgroundColor = ConsoleColor.White; diff --git a/Source/MQTTnet.Server/Scripting/PythonScriptHostService.cs b/Source/MQTTnet.Server/Scripting/PythonScriptHostService.cs index 4e8aff9..1acc58c 100644 --- a/Source/MQTTnet.Server/Scripting/PythonScriptHostService.cs +++ b/Source/MQTTnet.Server/Scripting/PythonScriptHostService.cs @@ -36,12 +36,12 @@ namespace MQTTnet.Server.Scripting } } - public void RegisterProxyObject(string name, object action) + public void RegisterProxyObject(string name, object @object) { if (name == null) throw new ArgumentNullException(nameof(name)); - if (action == null) throw new ArgumentNullException(nameof(action)); + if (@object == null) throw new ArgumentNullException(nameof(@object)); - _proxyObjects.Add(name, action); + _proxyObjects.Add(name, @object); } public void InvokeOptionalFunction(string name, object parameters) diff --git a/Source/MQTTnet.Server/Scripts/00_sample.py b/Source/MQTTnet.Server/Scripts/00_sample.py index 51763d8..845a216 100644 --- a/Source/MQTTnet.Server/Scripts/00_sample.py +++ b/Source/MQTTnet.Server/Scripts/00_sample.py @@ -48,6 +48,7 @@ def on_intercept_application_message(context): This function is invoked for every processed application message. It also allows modifying the message or cancel processing at all. """ + client_id = context["client_id"] if client_id != None: @@ -72,3 +73,35 @@ def on_intercept_application_message(context): mqtt_net_server.publish(application_message) print("Client '{client_id}' published topic '{topic}'.".format(client_id=context["client_id"], topic=context["topic"])) + + +def on_client_connected(event_args): + """ + This function is called whenever a client has passed the validation is connected. + """ + + print("Client '{client_id}' is now connected.".format(client_id=event_args["client_id"])) + + +def on_client_disconnected(event_args): + """ + This function is called whenever a client has disconnected. + """ + + print("Client '{client_id}' is now disconnected (type = {type}).".format(client_id=event_args["client_id"], type=event_args["type"])) + + +def on_client_subscribed_topic(event_args): + """ + This function is called whenever a client has subscribed to a topic (when allowed). + """ + + print("Client '{client_id}' has subscribed to '{topic}'.".format(client_id=event_args["client_id"], topic=event_args["topic"])) + + +def on_client_unsubscribed_topic(event_args): + """ + This function is called whenever a client has unsubscribed from a topic. + """ + + print("Client '{client_id}' has unsubscribed from '{topic}'.".format(client_id=event_args["client_id"], topic=event_args["topic"])) \ No newline at end of file diff --git a/Source/MQTTnet.Server/Startup.cs b/Source/MQTTnet.Server/Startup.cs index ad7f40e..14763d2 100644 --- a/Source/MQTTnet.Server/Startup.cs +++ b/Source/MQTTnet.Server/Startup.cs @@ -50,20 +50,24 @@ namespace MQTTnet.Server public void ConfigureServices(IServiceCollection services) { - services.AddSingleton(); + services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); + + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); - - services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2); } } } \ No newline at end of file diff --git a/Source/MQTTnet/Adapter/IMqttServerAdapter.cs b/Source/MQTTnet/Adapter/IMqttServerAdapter.cs index eff9eab..2e0bfd1 100644 --- a/Source/MQTTnet/Adapter/IMqttServerAdapter.cs +++ b/Source/MQTTnet/Adapter/IMqttServerAdapter.cs @@ -6,7 +6,7 @@ namespace MQTTnet.Adapter { public interface IMqttServerAdapter : IDisposable { - event EventHandler ClientAccepted; + Action ClientAcceptedHandler { get; set; } Task StartAsync(IMqttServerOptions options); Task StopAsync(); diff --git a/Source/MQTTnet/Client/IMqttClient.cs b/Source/MQTTnet/Client/IMqttClient.cs index 4e25e9c..70259cf 100644 --- a/Source/MQTTnet/Client/IMqttClient.cs +++ b/Source/MQTTnet/Client/IMqttClient.cs @@ -15,13 +15,9 @@ namespace MQTTnet.Client IMqttClientOptions Options { get; } IMqttClientConnectedHandler ConnectedHandler { get; set; } - [Obsolete("Use ConnectedHandler instead.")] - event EventHandler Connected; - + IMqttClientDisconnectedHandler DisconnectedHandler { get; set; } - [Obsolete("Use DisconnectedHandler instead.")] - event EventHandler Disconnected; - + Task ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken); Task DisconnectAsync(MqttClientDisconnectOptions options, CancellationToken cancellationToken); diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index d431ba5..75d6836 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -46,15 +46,9 @@ namespace MQTTnet.Client public IMqttClientConnectedHandler ConnectedHandler { get; set; } - public event EventHandler Connected; - public IMqttClientDisconnectedHandler DisconnectedHandler { get; set; } - public event EventHandler Disconnected; - - public IMqttApplicationMessageHandler ReceivedApplicationMessageHandler { get; set; } - - public event EventHandler ApplicationMessageReceived; + public IMqttApplicationMessageHandler ApplicationMessageReceivedHandler { get; set; } public bool IsConnected { get; private set; } @@ -106,9 +100,6 @@ namespace MQTTnet.Client await connectedHandler.HandleConnectedAsync(new MqttClientConnectedEventArgs(authenticateResult)).ConfigureAwait(false); } - // TODO: Remove! - Connected?.Invoke(this, new MqttClientConnectedEventArgs(authenticateResult)); - return authenticateResult; } catch (Exception exception) @@ -276,8 +267,6 @@ namespace MQTTnet.Client { await disconnectedHandler.HandleDisconnectedAsync(new MqttClientDisconnectedEventArgs(clientWasConnected, exception)).ConfigureAwait(false); } - - Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception)); } } @@ -587,13 +576,11 @@ namespace MQTTnet.Client { var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket); - ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage)); - - var handler = ReceivedApplicationMessageHandler; + var handler = ApplicationMessageReceivedHandler; if (handler != null) { return handler.HandleApplicationMessageAsync( - new MqttApplicationMessageHandlerContext(Options.ClientId, applicationMessage)); + new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage)); } return Task.FromResult(0); diff --git a/Source/MQTTnet/Client/MqttClientExtensions.cs b/Source/MQTTnet/Client/MqttClientExtensions.cs index eacdbf4..9814f1c 100644 --- a/Source/MQTTnet/Client/MqttClientExtensions.cs +++ b/Source/MQTTnet/Client/MqttClientExtensions.cs @@ -63,35 +63,35 @@ namespace MQTTnet.Client return client; } - public static IMqttClient UseReceivedApplicationMessageHandler(this IMqttClient client, Func handler) + public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, Func handler) { if (handler == null) { - client.ReceivedApplicationMessageHandler = null; + client.ApplicationMessageReceivedHandler = null; return client; } - client.ReceivedApplicationMessageHandler = new MqttApplicationMessageHandlerDelegate(handler); + client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(handler); return client; } - public static IMqttClient UseReceivedApplicationMessageHandler(this IMqttClient client, Action handler) + public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, Action handler) { if (handler == null) { - client.ReceivedApplicationMessageHandler = null; + client.ApplicationMessageReceivedHandler = null; return client; } - client.ReceivedApplicationMessageHandler = new MqttApplicationMessageHandlerDelegate(handler); + client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(handler); return client; } - public static IMqttClient UseReceivedApplicationMessageHandler(this IMqttClient client, IMqttApplicationMessageHandler handler) + public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, IMqttApplicationMessageHandler handler) { - client.ReceivedApplicationMessageHandler = handler; + client.ApplicationMessageReceivedHandler = handler; return client; } diff --git a/Source/MQTTnet/Client/Receiving/IMqttApplicationMessageHandler.cs b/Source/MQTTnet/Client/Receiving/IMqttApplicationMessageHandler.cs index fdc5bf6..5490bb0 100644 --- a/Source/MQTTnet/Client/Receiving/IMqttApplicationMessageHandler.cs +++ b/Source/MQTTnet/Client/Receiving/IMqttApplicationMessageHandler.cs @@ -4,6 +4,6 @@ namespace MQTTnet.Client.Receiving { public interface IMqttApplicationMessageHandler { - Task HandleApplicationMessageAsync(MqttApplicationMessageHandlerContext context); + Task HandleApplicationMessageAsync(MqttApplicationMessageReceivedEventArgs eventArgs); } } diff --git a/Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerContext.cs b/Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerContext.cs deleted file mode 100644 index 87fb959..0000000 --- a/Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerContext.cs +++ /dev/null @@ -1,15 +0,0 @@ -namespace MQTTnet.Client.Receiving -{ - public class MqttApplicationMessageHandlerContext - { - public MqttApplicationMessageHandlerContext(string senderClientId, MqttApplicationMessage applicationMessage) - { - SenderClientId = senderClientId; - ApplicationMessage = applicationMessage; - } - - public string SenderClientId { get; } - - public MqttApplicationMessage ApplicationMessage { get; } - } -} diff --git a/Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerDelegate.cs b/Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerDelegate.cs index cde8944..399640d 100644 --- a/Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerDelegate.cs +++ b/Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerDelegate.cs @@ -5,9 +5,9 @@ namespace MQTTnet.Client.Receiving { public class MqttApplicationMessageHandlerDelegate : IMqttApplicationMessageHandler { - private readonly Func _handler; + private readonly Func _handler; - public MqttApplicationMessageHandlerDelegate(Action handler) + public MqttApplicationMessageHandlerDelegate(Action handler) { if (handler == null) throw new ArgumentNullException(nameof(handler)); @@ -18,12 +18,12 @@ namespace MQTTnet.Client.Receiving }; } - public MqttApplicationMessageHandlerDelegate(Func handler) + public MqttApplicationMessageHandlerDelegate(Func handler) { _handler = handler ?? throw new ArgumentNullException(nameof(handler)); } - public Task HandleApplicationMessageAsync(MqttApplicationMessageHandlerContext context) + public Task HandleApplicationMessageAsync(MqttApplicationMessageReceivedEventArgs context) { return _handler(context); } diff --git a/Source/MQTTnet/IApplicationMessageReceiver.cs b/Source/MQTTnet/IApplicationMessageReceiver.cs index 806bb75..f34222e 100644 --- a/Source/MQTTnet/IApplicationMessageReceiver.cs +++ b/Source/MQTTnet/IApplicationMessageReceiver.cs @@ -1,13 +1,9 @@ -using System; -using MQTTnet.Client.Receiving; +using MQTTnet.Client.Receiving; namespace MQTTnet { public interface IApplicationMessageReceiver { - IMqttApplicationMessageHandler ReceivedApplicationMessageHandler { get; set; } - - [Obsolete("Use _ReceivedApplicationMessageHandler_ instead.")] - event EventHandler ApplicationMessageReceived; + IMqttApplicationMessageHandler ApplicationMessageReceivedHandler { get; set; } } } diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs index dd93b14..9fbf27b 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs @@ -23,7 +23,7 @@ namespace MQTTnet.Implementations _logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter)); } - public event EventHandler ClientAccepted; + public Action ClientAcceptedHandler { get; set; } public async Task StartAsync(IMqttServerOptions options) { @@ -73,7 +73,7 @@ namespace MQTTnet.Implementations try { var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket, _options), new MqttPacketFormatterAdapter(), _logger); - ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); + ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (Exception exception) { diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs index cfbf725..92ccb81 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs @@ -26,7 +26,7 @@ namespace MQTTnet.Implementations _logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter)); } - public event EventHandler ClientAccepted; + public Action ClientAcceptedHandler { get; set; } public Task StartAsync(IMqttServerOptions options) { @@ -87,9 +87,11 @@ namespace MQTTnet.Implementations options, tlsCertificate, _cancellationTokenSource.Token, - _logger); + _logger) + { + ClientAcceptedHandler = OnClientAccepted + }; - listenerV4.ClientAccepted += OnClientAccepted; listenerV4.Start(); _listeners.Add(listenerV4); } @@ -101,17 +103,19 @@ namespace MQTTnet.Implementations options, tlsCertificate, _cancellationTokenSource.Token, - _logger); + _logger) + { + ClientAcceptedHandler = OnClientAccepted + }; - listenerV6.ClientAccepted += OnClientAccepted; listenerV6.Start(); _listeners.Add(listenerV6); } } - private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs e) + private void OnClientAccepted(MqttServerAdapterClientAcceptedEventArgs eventArgs) { - ClientAccepted?.Invoke(this, e); + ClientAcceptedHandler?.Invoke(eventArgs); } } } diff --git a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs index c550ebb..f1207b2 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs @@ -42,7 +42,7 @@ namespace MQTTnet.Implementations } } - public event EventHandler ClientAccepted; + public Action ClientAcceptedHandler; public void Start() { @@ -88,7 +88,7 @@ namespace MQTTnet.Implementations _addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6"); var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketFormatterAdapter(), _logger); - ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); + ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (ObjectDisposedException) { diff --git a/Source/MQTTnet/Server/IMqttServer.cs b/Source/MQTTnet/Server/IMqttServer.cs index 0286720..1186368 100644 --- a/Source/MQTTnet/Server/IMqttServer.cs +++ b/Source/MQTTnet/Server/IMqttServer.cs @@ -11,16 +11,9 @@ namespace MQTTnet.Server event EventHandler Stopped; IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; } - [Obsolete("Use ClientConnectedHandler instead.")] - event EventHandler ClientConnected; - IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; } - [Obsolete("Use ClientDisconnectedHandler instead.")] - event EventHandler ClientDisconnected; - - event EventHandler ClientSubscribedTopic; - - event EventHandler ClientUnsubscribedTopic; + IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler { get; set; } + IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler { get; set; } IMqttServerOptions Options { get; } diff --git a/Source/MQTTnet/Server/IMqttServerClientSubscribedTopicHandler.cs b/Source/MQTTnet/Server/IMqttServerClientSubscribedTopicHandler.cs new file mode 100644 index 0000000..6f82bb3 --- /dev/null +++ b/Source/MQTTnet/Server/IMqttServerClientSubscribedTopicHandler.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace MQTTnet.Server +{ + public interface IMqttServerClientSubscribedTopicHandler + { + Task HandleClientSubscribedTopicAsync(MqttServerClientSubscribedTopicEventArgs eventArgs); + } +} diff --git a/Source/MQTTnet/Server/IMqttServerClientUnsubscribedTopicHandler.cs b/Source/MQTTnet/Server/IMqttServerClientUnsubscribedTopicHandler.cs new file mode 100644 index 0000000..077a80f --- /dev/null +++ b/Source/MQTTnet/Server/IMqttServerClientUnsubscribedTopicHandler.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace MQTTnet.Server +{ + public interface IMqttServerClientUnsubscribedTopicHandler + { + Task HandleClientUnsubscribedTopicAsync(MqttServerClientUnsubscribedTopicEventArgs eventArgs); + } +} diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index e0ce7e4..423638a 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -196,7 +196,7 @@ namespace MQTTnet.Server applicationMessage = interceptorContext.ApplicationMessage; } - _eventDispatcher.OnApplicationMessageReceived(sender?.ClientId, applicationMessage); + await _eventDispatcher.HandleApplicationMessageReceivedAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false); if (applicationMessage.Retain) { @@ -258,7 +258,7 @@ namespace MQTTnet.Server var connection = await CreateConnectionAsync(channelAdapter, connectPacket).ConfigureAwait(false); - _eventDispatcher.OnClientConnected(clientId); + await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false); disconnectType = await connection.RunAsync().ConfigureAwait(false); } @@ -288,7 +288,7 @@ namespace MQTTnet.Server await TryCleanupChannelAsync(channelAdapter).ConfigureAwait(false); - _eventDispatcher.OnClientDisconnected(clientId, disconnectType); + await _eventDispatcher.HandleClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false); } } diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index d4c2eea..904474f 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -61,7 +61,7 @@ namespace MQTTnet.Server _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; } - _eventDispatcher.OnClientSubscribedTopic(_clientId, topicFilter); + await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientId, topicFilter).ConfigureAwait(false); } } @@ -85,12 +85,12 @@ namespace MQTTnet.Server _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; } - _eventDispatcher.OnClientSubscribedTopic(_clientId, topicFilter); + await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientId, topicFilter).ConfigureAwait(false); } } } - public Task UnsubscribeAsync(MqttUnsubscribePacket unsubscribePacket) + public async Task UnsubscribeAsync(MqttUnsubscribePacket unsubscribePacket) { if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket)); @@ -111,12 +111,15 @@ namespace MQTTnet.Server { unsubAckPacket.ReasonCodes.Add(MqttUnsubscribeReasonCode.NoSubscriptionExisted); } - - _eventDispatcher.OnClientUnsubscribedTopic(_clientId, topicFilter); } } - return Task.FromResult(unsubAckPacket); + foreach (var topicFilter in unsubscribePacket.TopicFilters) + { + await _eventDispatcher.HandleClientUnsubscribedTopicAsync(_clientId, topicFilter).ConfigureAwait(false); + } + + return unsubAckPacket; } public Task UnsubscribeAsync(IEnumerable topicFilters) diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index a0a9d44..0707dec 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -28,40 +28,40 @@ namespace MQTTnet.Server if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttServer)); - - _eventDispatcher.ClientConnected += (s, e) => ClientConnected?.Invoke(s, e); - _eventDispatcher.ClientDisconnected += (s, e) => ClientDisconnected?.Invoke(s, e); - _eventDispatcher.ClientSubscribedTopic += (s, e) => ClientSubscribedTopic?.Invoke(s, e); - _eventDispatcher.ClientUnsubscribedTopic += (s, e) => ClientUnsubscribedTopic?.Invoke(s, e); - _eventDispatcher.ApplicationMessageReceived += async (s, e) => - { - // TODO: Migrate EventDispatcher to proper handlers and no events anymore. - ApplicationMessageReceived?.Invoke(s, e); - - var handler = ReceivedApplicationMessageHandler; - if (handler != null) - { - await handler.HandleApplicationMessageAsync( - new MqttApplicationMessageHandlerContext(e.ClientId, e.ApplicationMessage)).ConfigureAwait(false); - } - }; } public event EventHandler Started; public event EventHandler Stopped; - public IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; } - public event EventHandler ClientConnected; - - public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; } - public event EventHandler ClientDisconnected; - - public event EventHandler ClientSubscribedTopic; + public IMqttServerClientConnectedHandler ClientConnectedHandler + { + get => _eventDispatcher.ClientConnectedHandler; + set => _eventDispatcher.ClientConnectedHandler = value; + } - public event EventHandler ClientUnsubscribedTopic; + public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler + { + get => _eventDispatcher.ClientDisconnectedHandler; + set => _eventDispatcher.ClientDisconnectedHandler = value; + } + + public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler + { + get => _eventDispatcher.ClientSubscribedTopicHandler; + set => _eventDispatcher.ClientSubscribedTopicHandler = value; + } - public IMqttApplicationMessageHandler ReceivedApplicationMessageHandler { get; set; } - public event EventHandler ApplicationMessageReceived; + public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler + { + get => _eventDispatcher.ClientUnsubscribedTopicHandler; + set => _eventDispatcher.ClientUnsubscribedTopicHandler = value; + } + + public IMqttApplicationMessageHandler ApplicationMessageReceivedHandler + { + get => _eventDispatcher.ApplicationMessageReceivedHandler; + set => _eventDispatcher.ApplicationMessageReceivedHandler = value; + } public IMqttServerOptions Options { get; private set; } @@ -123,7 +123,7 @@ namespace MQTTnet.Server foreach (var adapter in _adapters) { - adapter.ClientAccepted += OnClientAccepted; + adapter.ClientAcceptedHandler = OnClientAccepted; await adapter.StartAsync(Options).ConfigureAwait(false); } @@ -146,7 +146,7 @@ namespace MQTTnet.Server foreach (var adapter in _adapters) { - adapter.ClientAccepted -= OnClientAccepted; + adapter.ClientAcceptedHandler = null; await adapter.StopAsync().ConfigureAwait(false); } @@ -170,7 +170,7 @@ namespace MQTTnet.Server return _retainedMessagesManager?.ClearMessagesAsync(); } - private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) + private void OnClientAccepted(MqttServerAdapterClientAcceptedEventArgs eventArgs) { eventArgs.SessionTask = _clientSessionsManager.HandleConnectionAsync(eventArgs.Client); } diff --git a/Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs b/Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs index 5847744..2883d66 100644 --- a/Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs @@ -11,9 +11,9 @@ namespace MQTTnet.Server { if (handler == null) throw new ArgumentNullException(nameof(handler)); - _handler = context => + _handler = eventArgs => { - handler(context); + handler(eventArgs); return Task.FromResult(0); }; } diff --git a/Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs b/Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs index 45c46d9..b1d495a 100644 --- a/Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs @@ -11,9 +11,9 @@ namespace MQTTnet.Server { if (handler == null) throw new ArgumentNullException(nameof(handler)); - _handler = context => + _handler = eventArgs => { - handler(context); + handler(eventArgs); return Task.FromResult(0); }; } diff --git a/Source/MQTTnet/Server/MqttClientSubscribedTopicEventArgs.cs b/Source/MQTTnet/Server/MqttServerClientSubscribedTopicEventArgs.cs similarity index 67% rename from Source/MQTTnet/Server/MqttClientSubscribedTopicEventArgs.cs rename to Source/MQTTnet/Server/MqttServerClientSubscribedTopicEventArgs.cs index 11f5c06..1454ac9 100644 --- a/Source/MQTTnet/Server/MqttClientSubscribedTopicEventArgs.cs +++ b/Source/MQTTnet/Server/MqttServerClientSubscribedTopicEventArgs.cs @@ -2,9 +2,9 @@ namespace MQTTnet.Server { - public class MqttClientSubscribedTopicEventArgs : EventArgs + public class MqttServerClientSubscribedTopicEventArgs : EventArgs { - public MqttClientSubscribedTopicEventArgs(string clientId, TopicFilter topicFilter) + public MqttServerClientSubscribedTopicEventArgs(string clientId, TopicFilter topicFilter) { ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter)); diff --git a/Source/MQTTnet/Server/MqttServerClientSubscribedTopicHandlerDelegate.cs b/Source/MQTTnet/Server/MqttServerClientSubscribedTopicHandlerDelegate.cs new file mode 100644 index 0000000..048b0a8 --- /dev/null +++ b/Source/MQTTnet/Server/MqttServerClientSubscribedTopicHandlerDelegate.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; + +namespace MQTTnet.Server +{ + public class MqttServerClientSubscribedHandlerDelegate : IMqttServerClientSubscribedTopicHandler + { + private readonly Func _handler; + + public MqttServerClientSubscribedHandlerDelegate(Action handler) + { + if (handler == null) throw new ArgumentNullException(nameof(handler)); + + _handler = eventArgs => + { + handler(eventArgs); + return Task.FromResult(0); + }; + } + + public MqttServerClientSubscribedHandlerDelegate(Func handler) + { + _handler = handler ?? throw new ArgumentNullException(nameof(handler)); + } + + public Task HandleClientSubscribedTopicAsync(MqttServerClientSubscribedTopicEventArgs eventArgs) + { + return _handler(eventArgs); + } + } +} diff --git a/Source/MQTTnet/Server/MqttClientUnSubscribedTopicEventArgs.cs b/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicEventArgs.cs similarity index 67% rename from Source/MQTTnet/Server/MqttClientUnSubscribedTopicEventArgs.cs rename to Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicEventArgs.cs index d76347e..bb78bb2 100644 --- a/Source/MQTTnet/Server/MqttClientUnSubscribedTopicEventArgs.cs +++ b/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicEventArgs.cs @@ -2,9 +2,9 @@ namespace MQTTnet.Server { - public class MqttClientUnsubscribedTopicEventArgs : EventArgs + public class MqttServerClientUnsubscribedTopicEventArgs : EventArgs { - public MqttClientUnsubscribedTopicEventArgs(string clientId, string topicFilter) + public MqttServerClientUnsubscribedTopicEventArgs(string clientId, string topicFilter) { ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter)); diff --git a/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicHandlerDelegate.cs b/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicHandlerDelegate.cs new file mode 100644 index 0000000..4416287 --- /dev/null +++ b/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicHandlerDelegate.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; + +namespace MQTTnet.Server +{ + public class MqttServerClientUnsubscribedTopicHandlerDelegate : IMqttServerClientUnsubscribedTopicHandler + { + private readonly Func _handler; + + public MqttServerClientUnsubscribedTopicHandlerDelegate(Action handler) + { + if (handler == null) throw new ArgumentNullException(nameof(handler)); + + _handler = eventArgs => + { + handler(eventArgs); + return Task.FromResult(0); + }; + } + + public MqttServerClientUnsubscribedTopicHandlerDelegate(Func handler) + { + _handler = handler ?? throw new ArgumentNullException(nameof(handler)); + } + + public Task HandleClientUnsubscribedTopicAsync(MqttServerClientUnsubscribedTopicEventArgs eventArgs) + { + return _handler(eventArgs); + } + } +} diff --git a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs index 4daa994..cea18e1 100644 --- a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs +++ b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs @@ -1,42 +1,73 @@ -using System; +using System.Threading.Tasks; +using MQTTnet.Client.Receiving; namespace MQTTnet.Server { public class MqttServerEventDispatcher { - public event EventHandler ClientSubscribedTopic; + public IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; } - public event EventHandler ClientUnsubscribedTopic; + public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; } - public event EventHandler ClientConnected; + public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler { get; set; } - public event EventHandler ClientDisconnected; + public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler { get; set; } - public event EventHandler ApplicationMessageReceived; + public IMqttApplicationMessageHandler ApplicationMessageReceivedHandler { get; set; } - public void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter) + public Task HandleClientConnectedAsync(string clientId) { - ClientSubscribedTopic?.Invoke(this, new MqttClientSubscribedTopicEventArgs(clientId, topicFilter)); + var handler = ClientConnectedHandler; + if (handler == null) + { + return Task.FromResult(0); + } + + return handler.HandleClientConnectedAsync(new MqttServerClientConnectedEventArgs(clientId)); } - public void OnClientUnsubscribedTopic(string clientId, string topicFilter) + public Task HandleClientDisconnectedAsync(string clientId, MqttClientDisconnectType disconnectType) { - ClientUnsubscribedTopic?.Invoke(this, new MqttClientUnsubscribedTopicEventArgs(clientId, topicFilter)); + var handler = ClientDisconnectedHandler; + if (handler == null) + { + return Task.FromResult(0); + } + + return handler.HandleClientDisconnectedAsync(new MqttServerClientDisconnectedEventArgs(clientId, disconnectType)); } - public void OnClientDisconnected(string clientId, MqttClientDisconnectType disconnectType) + public Task HandleClientSubscribedTopicAsync(string clientId, TopicFilter topicFilter) { - ClientDisconnected?.Invoke(this, new MqttServerClientDisconnectedEventArgs(clientId, disconnectType)); + var handler = ClientSubscribedTopicHandler; + if (handler == null) + { + return Task.FromResult(0); + } + + return handler.HandleClientSubscribedTopicAsync(new MqttServerClientSubscribedTopicEventArgs(clientId, topicFilter)); } - public void OnApplicationMessageReceived(string senderClientId, MqttApplicationMessage applicationMessage) + public Task HandleClientUnsubscribedTopicAsync(string clientId, string topicFilter) { - ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage)); + var handler = ClientUnsubscribedTopicHandler; + if (handler == null) + { + return Task.FromResult(0); + } + + return handler.HandleClientUnsubscribedTopicAsync(new MqttServerClientUnsubscribedTopicEventArgs(clientId, topicFilter)); } - public void OnClientConnected(string clientId) + public Task HandleApplicationMessageReceivedAsync(string senderClientId, MqttApplicationMessage applicationMessage) { - ClientConnected?.Invoke(this, new MqttServerClientConnectedEventArgs(clientId)); + var handler = ApplicationMessageReceivedHandler; + if (handler == null) + { + return Task.FromResult(0); + } + + return handler.HandleApplicationMessageAsync(new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage)); } } } diff --git a/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs b/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs index 1878c34..c18a731 100644 --- a/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs @@ -1,6 +1,5 @@ using BenchmarkDotNet.Attributes; using MQTTnet.Channel; -using MQTTnet.Client; using MQTTnet.Diagnostics; using MQTTnet.Implementations; using MQTTnet.Server; @@ -22,7 +21,7 @@ namespace MQTTnet.Benchmarks { var factory = new MqttFactory(); var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); - tcpServer.ClientAccepted += (sender, args) => _serverChannel = (IMqttChannel)args.Client.GetType().GetField("_channel", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance).GetValue(args.Client); + tcpServer.ClientAcceptedHandler += args => _serverChannel = (IMqttChannel)args.Client.GetType().GetField("_channel", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance).GetValue(args.Client); _mqttServer = factory.CreateMqttServer(new[] { tcpServer }, new MqttNetLogger()); diff --git a/Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs b/Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs index 6c13dbb..08f0fc9 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs @@ -4,6 +4,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; using MQTTnet.Client.Options; using MQTTnet.Client.Publishing; +using MQTTnet.Client.Receiving; using MQTTnet.Client.Subscribing; using MQTTnet.Client.Unsubscribing; using MQTTnet.Formatter; @@ -213,13 +214,13 @@ namespace MQTTnet.Tests.MQTTv5 var receivedMessages = new List(); await client1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").WithClientId("client1").WithProtocolVersion(MqttProtocolVersion.V500).Build()); - client1.ApplicationMessageReceived += (s, e) => + client1.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e => { lock (receivedMessages) { receivedMessages.Add(e); } - }; + }); await client1.SubscribeAsync("a"); diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestMqttServerAdapter.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestMqttServerAdapter.cs index 014a906..b2970fb 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestMqttServerAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestMqttServerAdapter.cs @@ -11,7 +11,7 @@ namespace MQTTnet.Tests.Mockups { public class TestMqttServerAdapter : IMqttServerAdapter { - public event EventHandler ClientAccepted; + public Action ClientAcceptedHandler { get; set; } public async Task ConnectTestClient(string clientId, MqttApplicationMessage willMessage = null) { @@ -41,7 +41,7 @@ namespace MQTTnet.Tests.Mockups private void FireClientAcceptedEvent(IMqttChannelAdapter adapter) { - ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(adapter)); + ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(adapter)); } public Task StartAsync(IMqttServerOptions options) diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestServerExtensions.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestServerExtensions.cs deleted file mode 100644 index 1cbd1b7..0000000 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestServerExtensions.cs +++ /dev/null @@ -1,38 +0,0 @@ -using System.Threading.Tasks; -using MQTTnet.Client; -using MQTTnet.Server; - -namespace MQTTnet.Tests.Mockups -{ - public static class TestServerExtensions - { - /// - /// publishes a message with a client and waits in the server until a message with the same topic is received - /// - /// - public static async Task PublishAndWaitForAsync(this IMqttClient client, IMqttServer server, MqttApplicationMessage message) - { - var tcs = new TaskCompletionSource(); - - void Handler(object sender, MqttApplicationMessageReceivedEventArgs args) - { - if (args.ApplicationMessage.Topic == message.Topic) - { - tcs.SetResult(true); - } - } - - server.ApplicationMessageReceived += Handler; - - try - { - await client.PublishAsync(message).ConfigureAwait(false); - await tcs.Task.ConfigureAwait(false); - } - finally - { - server.ApplicationMessageReceived -= Handler; - } - } - } -} diff --git a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs index d7af329..6098adb 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs @@ -5,8 +5,9 @@ using System.Net.Sockets; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; +using MQTTnet.Client.Connecting; +using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; -using MQTTnet.Client.Receiving; using MQTTnet.Exceptions; using MQTTnet.Protocol; using MQTTnet.Server; @@ -45,10 +46,10 @@ namespace MQTTnet.Tests using (var client = factory.CreateMqttClient()) { Exception ex = null; - client.Disconnected += (s, e) => + client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e => { ex = e.Exception; - }; + }); try { @@ -81,9 +82,9 @@ namespace MQTTnet.Tests var receivedValues = new List(); - async Task Handler1(MqttApplicationMessageHandlerContext context) + async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs) { - var value = int.Parse(context.ApplicationMessage.ConvertPayloadToString()); + var value = int.Parse(eventArgs.ApplicationMessage.ConvertPayloadToString()); await Task.Delay(value); lock (receivedValues) @@ -92,7 +93,7 @@ namespace MQTTnet.Tests } } - client1.UseReceivedApplicationMessageHandler(Handler1); + client1.UseApplicationMessageReceivedHandler(Handler1); var client2 = await testEnvironment.ConnectClientAsync(); for (var i = MessagesCount; i > 0; i--) @@ -119,27 +120,27 @@ namespace MQTTnet.Tests var client1 = await testEnvironment.ConnectClientAsync(); await client1.SubscribeAsync("request/+"); - async Task Handler1(MqttApplicationMessageHandlerContext context) + async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs) { - await client1.PublishAsync($"reply/{context.ApplicationMessage.Topic}"); + await client1.PublishAsync($"reply/{eventArgs.ApplicationMessage.Topic}"); } - client1.UseReceivedApplicationMessageHandler(Handler1); + client1.UseApplicationMessageReceivedHandler(Handler1); var client2 = await testEnvironment.ConnectClientAsync(); await client2.SubscribeAsync("reply/#"); var replies = new List(); - void Handler2(MqttApplicationMessageHandlerContext context) + void Handler2(MqttApplicationMessageReceivedEventArgs eventArgs) { lock (replies) { - replies.Add(context.ApplicationMessage.Topic); + replies.Add(eventArgs.ApplicationMessage.Topic); } } - client2.UseReceivedApplicationMessageHandler((Action)Handler2); + client2.UseApplicationMessageReceivedHandler((Action)Handler2); await Task.Delay(500); @@ -163,7 +164,7 @@ namespace MQTTnet.Tests var receivedMessages = new List(); var client1 = await testEnvironment.ConnectClientAsync(); - client1.UseReceivedApplicationMessageHandler(c => + client1.UseApplicationMessageReceivedHandler(c => { lock (receivedMessages) { @@ -195,7 +196,7 @@ namespace MQTTnet.Tests var client = testEnvironment.CreateClient(); - client.Connected += async (s, e) => + client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e => { await client.SubscribeAsync("RCU/P1/H0001/R0003"); @@ -204,9 +205,9 @@ namespace MQTTnet.Tests .WithTopic("RCU/P1/H0001/R0003"); await client.PublishAsync(msg.Build()); - }; + }); - client.UseReceivedApplicationMessageHandler(c => + client.UseApplicationMessageReceivedHandler(c => { lock (receivedMessages) { @@ -241,7 +242,7 @@ namespace MQTTnet.Tests var retries = 0; - async Task Handler1(MqttApplicationMessageHandlerContext context) + async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs) { retries++; @@ -249,7 +250,7 @@ namespace MQTTnet.Tests throw new Exception("Broken!"); } - client1.UseReceivedApplicationMessageHandler(Handler1); + client1.UseApplicationMessageReceivedHandler(Handler1); var client2 = await testEnvironment.ConnectClientAsync(); await client2.PublishAsync("x"); diff --git a/Tests/MQTTnet.Core.Tests/RoundtripTimeTests.cs b/Tests/MQTTnet.Core.Tests/RoundtripTimeTests.cs index c8de9e4..3d88a44 100644 --- a/Tests/MQTTnet.Core.Tests/RoundtripTimeTests.cs +++ b/Tests/MQTTnet.Core.Tests/RoundtripTimeTests.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; using MQTTnet.Client.Options; +using MQTTnet.Client.Receiving; using MQTTnet.Server; namespace MQTTnet.Tests @@ -29,10 +30,10 @@ namespace MQTTnet.Tests TaskCompletionSource response = null; - receiverClient.ApplicationMessageReceived += (sender, args) => + receiverClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(args => { response?.SetResult(args.ApplicationMessage.ConvertPayloadToString()); - }; + }); var times = new List(); var stopwatch = Stopwatch.StartNew(); diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index b9efc7a..93ad66b 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -9,7 +9,9 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Adapter; using MQTTnet.Client; using MQTTnet.Client.Connecting; +using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; +using MQTTnet.Client.Receiving; using MQTTnet.Protocol; using MQTTnet.Server; using MQTTnet.Tests.Mockups; @@ -66,7 +68,7 @@ namespace MQTTnet.Tests var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage); var c1 = await testEnvironment.ConnectClientAsync(); - c1.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount)); + c1.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(c => Interlocked.Increment(ref receivedMessagesCount)); await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build()); var c2 = await testEnvironment.ConnectClientAsync(clientOptions); @@ -92,7 +94,7 @@ namespace MQTTnet.Tests var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage); var c1 = await testEnvironment.ConnectClientAsync(); - c1.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount)); + c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount)); await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build()); var c2 = await testEnvironment.ConnectClientAsync(clientOptions); @@ -114,7 +116,7 @@ namespace MQTTnet.Tests var server = await testEnvironment.StartServerAsync(); var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c1")); - c1.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount)); + c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount)); var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c2")); @@ -125,10 +127,10 @@ namespace MQTTnet.Tests Assert.AreEqual(0, receivedMessagesCount); var subscribeEventCalled = false; - server.ClientSubscribedTopic += (_, e) => + server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(e => { subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1"; - }; + }); await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }); await Task.Delay(250); @@ -139,10 +141,10 @@ namespace MQTTnet.Tests Assert.AreEqual(1, receivedMessagesCount); var unsubscribeEventCalled = false; - server.ClientUnsubscribedTopic += (_, e) => + server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e => { unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1"; - }; + }); await c1.UnsubscribeAsync("a"); await Task.Delay(250); @@ -168,7 +170,7 @@ namespace MQTTnet.Tests var receivedMessagesCount = 0; var client = await testEnvironment.ConnectClientAsync(); - client.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount)); + client.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount)); var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); await client.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }); @@ -194,7 +196,7 @@ namespace MQTTnet.Tests var c1 = await testEnvironment.ConnectClientAsync(); var c2 = await testEnvironment.ConnectClientAsync(); - c1.UseReceivedApplicationMessageHandler(c => + c1.UseApplicationMessageReceivedHandler(c => { lock (locked) { @@ -202,7 +204,7 @@ namespace MQTTnet.Tests } }); - c2.UseReceivedApplicationMessageHandler(c => + c2.UseApplicationMessageReceivedHandler(c => { lock (locked) { @@ -258,12 +260,12 @@ namespace MQTTnet.Tests var client = await testEnvironment.ConnectClientAsync(); var receivedMessages = new List(); - client.Connected += async (s, e) => + client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e => { await client.PublishAsync("Connected"); - }; + }); - client.UseReceivedApplicationMessageHandler(c => + client.UseApplicationMessageReceivedHandler(c => { lock (receivedMessages) { @@ -287,15 +289,12 @@ namespace MQTTnet.Tests using (var testEnvironment = new TestEnvironment()) { var server = await testEnvironment.StartServerAsync(); - server.ClientConnected += async (s, e) => - { - await server.SubscribeAsync(e.ClientId, "topic1"); - }; + server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1")); var client = await testEnvironment.ConnectClientAsync(); var receivedMessages = new List(); - client.UseReceivedApplicationMessageHandler(c => + client.UseApplicationMessageReceivedHandler(c => { lock (receivedMessages) { @@ -325,7 +324,7 @@ namespace MQTTnet.Tests var disconnectCalled = 0; var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder()); - c1.Disconnected += (sender, args) => disconnectCalled++; + c1.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e => disconnectCalled++); await Task.Delay(100); @@ -347,8 +346,8 @@ namespace MQTTnet.Tests var clientConnectedCalled = 0; var clientDisconnectedCalled = 0; - server.ClientConnected += (_, __) => Interlocked.Increment(ref clientConnectedCalled); - server.ClientDisconnected += (_, __) => Interlocked.Increment(ref clientDisconnectedCalled); + server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => Interlocked.Increment(ref clientConnectedCalled)); + server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ => Interlocked.Increment(ref clientDisconnectedCalled)); var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder()); @@ -462,7 +461,7 @@ namespace MQTTnet.Tests var receivedMessages = 0; var c2 = await testEnvironment.ConnectClientAsync(); - c2.UseReceivedApplicationMessageHandler(c => + c2.UseApplicationMessageReceivedHandler(c => { Interlocked.Increment(ref receivedMessages); }); @@ -500,7 +499,7 @@ namespace MQTTnet.Tests var receivedMessagesCount = 0; var c2 = await testEnvironment.ConnectClientAsync(); - c2.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount)); + c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount)); await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained_other").Build()); await Task.Delay(500); @@ -523,7 +522,7 @@ namespace MQTTnet.Tests var receivedMessages = new List(); var c2 = await testEnvironment.ConnectClientAsync(); - c2.UseReceivedApplicationMessageHandler(c => + c2.UseApplicationMessageReceivedHandler(c => { lock (receivedMessages) { @@ -558,7 +557,7 @@ namespace MQTTnet.Tests var c2 = await testEnvironment.ConnectClientAsync(); - c2.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount)); + c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount)); await Task.Delay(200); await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }); @@ -604,7 +603,7 @@ namespace MQTTnet.Tests await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build()); var isIntercepted = false; - c2.UseReceivedApplicationMessageHandler(c => + c2.UseApplicationMessageReceivedHandler(c => { isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(c.ApplicationMessage.Payload), StringComparison.Ordinal) == 0; }); @@ -645,7 +644,7 @@ namespace MQTTnet.Tests await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder()); var client1 = await testEnvironment.ConnectClientAsync(); - client1.UseReceivedApplicationMessageHandler(c => + client1.UseApplicationMessageReceivedHandler(c => { receivedBody = c.ApplicationMessage.Payload; }); @@ -703,21 +702,21 @@ namespace MQTTnet.Tests var events = new List(); - server.ClientConnected += (_, __) => + server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => { lock (events) { events.Add("c"); } - }; + }); - server.ClientDisconnected += (_, __) => + server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ => { lock (events) { events.Add("d"); } - }; + }); var clientOptions = new MqttClientOptionsBuilder() .WithClientId("same_id"); @@ -888,7 +887,7 @@ namespace MQTTnet.Tests var buffer = new StringBuilder(); - client2.UseReceivedApplicationMessageHandler(c => + client2.UseApplicationMessageReceivedHandler(c => { lock (buffer) { @@ -952,7 +951,7 @@ namespace MQTTnet.Tests await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder()); var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("receiver")); - c1.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount)); + c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount)); await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("sender")); diff --git a/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs index 8343d9e..f85a42c 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs @@ -2,7 +2,10 @@ using System.Text; using System.Threading.Tasks; using MQTTnet.Client; +using MQTTnet.Client.Connecting; +using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; +using MQTTnet.Client.Receiving; using MQTTnet.Protocol; namespace MQTTnet.TestApp.NetCore @@ -25,7 +28,7 @@ namespace MQTTnet.TestApp.NetCore } }; - client.ApplicationMessageReceived += (s, e) => + client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e => { Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); @@ -33,18 +36,18 @@ namespace MQTTnet.TestApp.NetCore Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); Console.WriteLine(); - }; + }); - client.Connected += async (s, e) => + client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e => { Console.WriteLine("### CONNECTED WITH SERVER ###"); await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build()); Console.WriteLine("### SUBSCRIBED ###"); - }; + }); - client.Disconnected += async (s, e) => + client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(async e => { Console.WriteLine("### DISCONNECTED FROM SERVER ###"); await Task.Delay(TimeSpan.FromSeconds(5)); @@ -57,7 +60,7 @@ namespace MQTTnet.TestApp.NetCore { Console.WriteLine("### RECONNECTING FAILED ###"); } - }; + }); try { diff --git a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs index d7cea49..7498e9e 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs @@ -4,6 +4,7 @@ using System.IO; using Newtonsoft.Json; using System.Collections.Generic; using MQTTnet.Client.Options; +using MQTTnet.Client.Receiving; using MQTTnet.Extensions.ManagedClient; using MQTTnet.Protocol; @@ -34,10 +35,10 @@ namespace MQTTnet.TestApp.NetCore try { var managedClient = new MqttFactory().CreateManagedMqttClient(); - managedClient.ApplicationMessageReceived += (s, e) => + managedClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e => { Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic); - }; + }); await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("1")); await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS()); diff --git a/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs b/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs index ac6d16c..00f80c6 100644 --- a/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs @@ -4,6 +4,7 @@ using System.IO; using System.Threading; using System.Threading.Tasks; using MQTTnet.Client.Options; +using MQTTnet.Client.Receiving; using MQTTnet.Formatter; using MQTTnet.Protocol; using Newtonsoft.Json; @@ -80,7 +81,7 @@ namespace MQTTnet.TestApp.NetCore var topic = Guid.NewGuid().ToString(); MqttApplicationMessage receivedMessage = null; - client.ApplicationMessageReceived += (s, e) => receivedMessage = e.ApplicationMessage; + client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e => receivedMessage = e.ApplicationMessage); await client.ConnectAsync(options); await client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce); diff --git a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs index b00e3ac..73aea2d 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs @@ -1,6 +1,7 @@ using System; using System.Text; using System.Threading.Tasks; +using MQTTnet.Client.Receiving; using MQTTnet.Protocol; using MQTTnet.Server; @@ -78,12 +79,12 @@ namespace MQTTnet.TestApp.NetCore var mqttServer = new MqttFactory().CreateMqttServer(); - mqttServer.ApplicationMessageReceived += (s, e) => + mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e => { MqttNetConsoleLogger.PrintToConsole( $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0])}'", ConsoleColor.Magenta); - }; + }); //options.ApplicationMessageInterceptor = c => //{ @@ -107,10 +108,10 @@ namespace MQTTnet.TestApp.NetCore // } //}; - mqttServer.ClientDisconnected += (s, e) => + mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => { Console.Write("Client disconnected event fired."); - }; + }); await mqttServer.StartAsync(options); diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index eaf2087..d79fe06 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -7,7 +7,10 @@ using Windows.Security.Cryptography.Certificates; using Windows.UI.Core; using Windows.UI.Xaml; using MQTTnet.Client; +using MQTTnet.Client.Connecting; +using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; +using MQTTnet.Client.Receiving; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; using MQTTnet.Extensions.ManagedClient; @@ -144,9 +147,9 @@ namespace MQTTnet.TestApp.UniversalWindows if (_mqttClient != null) { await _mqttClient.DisconnectAsync(); - _mqttClient.ApplicationMessageReceived -= OnApplicationMessageReceived; - _mqttClient.Connected -= OnConnected; - _mqttClient.Disconnected -= OnDisconnected; + _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage); + _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x)); + _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x)); } var factory = new MqttFactory(); @@ -154,9 +157,9 @@ namespace MQTTnet.TestApp.UniversalWindows if (UseManagedClient.IsChecked == true) { _managedMqttClient = factory.CreateManagedMqttClient(); - _managedMqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; - _managedMqttClient.Connected += OnConnected; - _managedMqttClient.Disconnected += OnDisconnected; + _managedMqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage); + _managedMqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x)); + _managedMqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x)); await _managedMqttClient.StartAsync(new ManagedMqttClientOptions { @@ -166,9 +169,9 @@ namespace MQTTnet.TestApp.UniversalWindows else { _mqttClient = factory.CreateMqttClient(); - _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; - _mqttClient.Connected += OnConnected; - _mqttClient.Disconnected += OnDisconnected; + _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage); + _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x)); + _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x)); await _mqttClient.ConnectAsync(options); } @@ -179,7 +182,7 @@ namespace MQTTnet.TestApp.UniversalWindows } } - private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs e) + private void OnDisconnected(MqttClientDisconnectedEventArgs e) { _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1, "", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null)); @@ -187,7 +190,7 @@ namespace MQTTnet.TestApp.UniversalWindows Task.Run(UpdateLogAsync); } - private void OnConnected(object sender, MqttClientConnectedEventArgs e) + private void OnConnected(MqttClientConnectedEventArgs e) { _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1, "", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null)); @@ -195,7 +198,7 @@ namespace MQTTnet.TestApp.UniversalWindows Task.Run(UpdateLogAsync); } - private async void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs) + private async Task HandleReceivedApplicationMessage(MqttApplicationMessageReceivedEventArgs eventArgs) { var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";