Browse Source

Add more function mappings for MQTTnet Server.

release/3.x.x
Christian Kratky 5 years ago
parent
commit
8708609498
55 changed files with 662 additions and 361 deletions
  1. +3
    -3
      Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs
  2. +2
    -2
      Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs
  3. +1
    -6
      Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs
  4. +15
    -36
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  5. +6
    -6
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs
  6. +7
    -4
      Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs
  7. +32
    -0
      Source/MQTTnet.Server/Mqtt/CustomMqttFactory.cs
  8. +18
    -2
      Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs
  9. +39
    -0
      Source/MQTTnet.Server/Mqtt/MqttClientConnectedHandler.cs
  10. +40
    -0
      Source/MQTTnet.Server/Mqtt/MqttClientDisconnectedHandler.cs
  11. +41
    -0
      Source/MQTTnet.Server/Mqtt/MqttClientSubscribedTopicHandler.cs
  12. +40
    -0
      Source/MQTTnet.Server/Mqtt/MqttClientUnsubscribedTopicHandler.cs
  13. +1
    -1
      Source/MQTTnet.Server/Mqtt/MqttConnectionValidator.cs
  14. +58
    -29
      Source/MQTTnet.Server/Mqtt/MqttServerService.cs
  15. +1
    -1
      Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs
  16. +5
    -2
      Source/MQTTnet.Server/Program.cs
  17. +3
    -3
      Source/MQTTnet.Server/Scripting/PythonScriptHostService.cs
  18. +33
    -0
      Source/MQTTnet.Server/Scripts/00_sample.py
  19. +8
    -4
      Source/MQTTnet.Server/Startup.cs
  20. +1
    -1
      Source/MQTTnet/Adapter/IMqttServerAdapter.cs
  21. +2
    -6
      Source/MQTTnet/Client/IMqttClient.cs
  22. +3
    -16
      Source/MQTTnet/Client/MqttClient.cs
  23. +8
    -8
      Source/MQTTnet/Client/MqttClientExtensions.cs
  24. +1
    -1
      Source/MQTTnet/Client/Receiving/IMqttApplicationMessageHandler.cs
  25. +0
    -15
      Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerContext.cs
  26. +4
    -4
      Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerDelegate.cs
  27. +2
    -6
      Source/MQTTnet/IApplicationMessageReceiver.cs
  28. +2
    -2
      Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs
  29. +11
    -7
      Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
  30. +2
    -2
      Source/MQTTnet/Implementations/MqttTcpServerListener.cs
  31. +2
    -9
      Source/MQTTnet/Server/IMqttServer.cs
  32. +9
    -0
      Source/MQTTnet/Server/IMqttServerClientSubscribedTopicHandler.cs
  33. +9
    -0
      Source/MQTTnet/Server/IMqttServerClientUnsubscribedTopicHandler.cs
  34. +3
    -3
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  35. +9
    -6
      Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
  36. +30
    -30
      Source/MQTTnet/Server/MqttServer.cs
  37. +2
    -2
      Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs
  38. +2
    -2
      Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs
  39. +2
    -2
      Source/MQTTnet/Server/MqttServerClientSubscribedTopicEventArgs.cs
  40. +31
    -0
      Source/MQTTnet/Server/MqttServerClientSubscribedTopicHandlerDelegate.cs
  41. +2
    -2
      Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicEventArgs.cs
  42. +31
    -0
      Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicHandlerDelegate.cs
  43. +47
    -16
      Source/MQTTnet/Server/MqttServerEventDispatcher.cs
  44. +1
    -2
      Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs
  45. +3
    -2
      Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs
  46. +2
    -2
      Tests/MQTTnet.Core.Tests/Mockups/TestMqttServerAdapter.cs
  47. +0
    -38
      Tests/MQTTnet.Core.Tests/Mockups/TestServerExtensions.cs
  48. +19
    -18
      Tests/MQTTnet.Core.Tests/MqttClientTests.cs
  49. +3
    -2
      Tests/MQTTnet.Core.Tests/RoundtripTimeTests.cs
  50. +32
    -33
      Tests/MQTTnet.Core.Tests/Server_Tests.cs
  51. +9
    -6
      Tests/MQTTnet.TestApp.NetCore/ClientTest.cs
  52. +3
    -2
      Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs
  53. +2
    -1
      Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs
  54. +5
    -4
      Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
  55. +15
    -12
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 3
- 3
Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs View File

@@ -10,7 +10,7 @@ namespace MQTTnet.AspNetCore
{ {
public class MqttConnectionHandler : ConnectionHandler, IMqttServerAdapter public class MqttConnectionHandler : ConnectionHandler, IMqttServerAdapter
{ {
public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;
public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }


public override async Task OnConnectedAsync(ConnectionContext connection) public override async Task OnConnectedAsync(ConnectionContext connection)
{ {
@@ -24,9 +24,9 @@ namespace MQTTnet.AspNetCore
using (var adapter = new MqttConnectionContext(new MqttPacketFormatterAdapter(), connection)) using (var adapter = new MqttConnectionContext(new MqttPacketFormatterAdapter(), connection))
{ {
var args = new MqttServerAdapterClientAcceptedEventArgs(adapter); var args = new MqttServerAdapterClientAcceptedEventArgs(adapter);
ClientAccepted?.Invoke(this, args);
ClientAcceptedHandler?.Invoke(args);


await args.SessionTask;
await args.SessionTask.ConfigureAwait(false);
} }
} }




+ 2
- 2
Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs View File

@@ -11,7 +11,7 @@ namespace MQTTnet.AspNetCore
{ {
public class MqttWebSocketServerAdapter : IMqttServerAdapter public class MqttWebSocketServerAdapter : IMqttServerAdapter
{ {
public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;
public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }


public Task StartAsync(IMqttServerOptions options) public Task StartAsync(IMqttServerOptions options)
{ {
@@ -30,7 +30,7 @@ namespace MQTTnet.AspNetCore
var clientAdapter = new MqttChannelAdapter(new MqttWebSocketChannel(webSocket, endpoint), new MqttPacketFormatterAdapter(), new MqttNetLogger().CreateChildLogger(nameof(MqttWebSocketServerAdapter))); var clientAdapter = new MqttChannelAdapter(new MqttWebSocketChannel(webSocket, endpoint), new MqttPacketFormatterAdapter(), new MqttNetLogger().CreateChildLogger(nameof(MqttWebSocketServerAdapter)));


var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter); var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter);
ClientAccepted?.Invoke(this, eventArgs);
ClientAcceptedHandler?.Invoke(eventArgs);


if (eventArgs.SessionTask != null) if (eventArgs.SessionTask != null)
{ {


+ 1
- 6
Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs View File

@@ -14,13 +14,8 @@ namespace MQTTnet.Extensions.ManagedClient
IManagedMqttClientOptions Options { get; } IManagedMqttClientOptions Options { get; }


IMqttClientConnectedHandler ConnectedHandler { get; set; } IMqttClientConnectedHandler ConnectedHandler { get; set; }
[Obsolete("Use ConnectedHandler instead.")]
event EventHandler<MqttClientConnectedEventArgs> Connected;

IMqttClientDisconnectedHandler DisconnectedHandler { get; set; } IMqttClientDisconnectedHandler DisconnectedHandler { get; set; }
[Obsolete("Use DisconnectedHandler instead.")]
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;

event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed; event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped; event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped;




+ 15
- 36
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -13,8 +13,6 @@ using MQTTnet.Exceptions;
using MQTTnet.Internal; using MQTTnet.Internal;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using MQTTnet.Server; using MQTTnet.Server;
using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs;
using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs;


namespace MQTTnet.Extensions.ManagedClient namespace MQTTnet.Extensions.ManagedClient
{ {
@@ -36,14 +34,9 @@ namespace MQTTnet.Extensions.ManagedClient


public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
{ {
if (logger == null) throw new ArgumentNullException(nameof(logger));

_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));


_mqttClient.UseConnectedHandler(OnConnected);
_mqttClient.UseDisconnectedHandler(OnDisconnected);
_mqttClient.UseReceivedApplicationMessageHandler(OnApplicationMessageReceived);

if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(ManagedMqttClient)); _logger = logger.CreateChildLogger(nameof(ManagedMqttClient));
} }


@@ -52,20 +45,24 @@ namespace MQTTnet.Extensions.ManagedClient
public int PendingApplicationMessagesCount => _messageQueue.Count; public int PendingApplicationMessagesCount => _messageQueue.Count;
public IManagedMqttClientOptions Options { get; private set; } public IManagedMqttClientOptions Options { get; private set; }


public IMqttClientConnectedHandler ConnectedHandler { get; set; }
public event EventHandler<MqttClientConnectedEventArgs> Connected;
public IMqttClientDisconnectedHandler DisconnectedHandler { get; set; }
public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
public IMqttClientConnectedHandler ConnectedHandler
{
get => _mqttClient.ConnectedHandler;
set => _mqttClient.ConnectedHandler = value;
}


public IMqttClientDisconnectedHandler DisconnectedHandler
{
get => _mqttClient.DisconnectedHandler;
set => _mqttClient.DisconnectedHandler = value;
}


public IMqttApplicationMessageHandler ReceivedApplicationMessageHandler
public IMqttApplicationMessageHandler ApplicationMessageReceivedHandler
{ {
get => _mqttClient.ReceivedApplicationMessageHandler;
set => _mqttClient.ReceivedApplicationMessageHandler = value;
get => _mqttClient.ApplicationMessageReceivedHandler;
set => _mqttClient.ApplicationMessageReceivedHandler = value;
} }


public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed; public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
public event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped; public event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped;


@@ -420,25 +417,7 @@ namespace MQTTnet.Extensions.ManagedClient
return ReconnectionResult.NotConnected; return ReconnectionResult.NotConnected;
} }
} }

private Task OnApplicationMessageReceived(MqttApplicationMessageHandlerContext context)
{
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(context.SenderClientId, context.ApplicationMessage));
return Task.FromResult(0);
}

private Task OnDisconnected(MqttClientDisconnectedEventArgs eventArgs)
{
Disconnected?.Invoke(this, eventArgs);
return DisconnectedHandler?.HandleDisconnectedAsync(eventArgs);
}

private Task OnConnected(MqttClientConnectedEventArgs eventArgs)
{
Connected?.Invoke(this, eventArgs);
return ConnectedHandler?.HandleConnectedAsync(eventArgs);
}

private void StartPublishing() private void StartPublishing()
{ {
if (_publishingCancellationToken != null) if (_publishingCancellationToken != null)


+ 6
- 6
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs View File

@@ -60,28 +60,28 @@ namespace MQTTnet.Extensions.ManagedClient
return client; return client;
} }


public static IManagedMqttClient UseReceivedApplicationMessageHandler(this IManagedMqttClient client, Func<MqttApplicationMessageHandlerContext, Task> handler)
public static IManagedMqttClient UseApplicationMessageReceivedHandler(this IManagedMqttClient client, Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
{ {
if (handler == null) if (handler == null)
{ {
client.ReceivedApplicationMessageHandler = null;
client.ApplicationMessageReceivedHandler = null;
return client; return client;
} }


client.ReceivedApplicationMessageHandler = new MqttApplicationMessageHandlerDelegate(handler);
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(handler);


return client; return client;
} }


public static IManagedMqttClient UseReceivedApplicationMessageHandler(this IManagedMqttClient client, Action<MqttApplicationMessageHandlerContext> handler)
public static IManagedMqttClient UseApplicationMessageReceivedHandler(this IManagedMqttClient client, Action<MqttApplicationMessageReceivedEventArgs> handler)
{ {
if (handler == null) if (handler == null)
{ {
client.ReceivedApplicationMessageHandler = null;
client.ApplicationMessageReceivedHandler = null;
return client; return client;
} }


client.ReceivedApplicationMessageHandler = new MqttApplicationMessageHandlerDelegate(handler);
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(handler);


return client; return client;
} }


+ 7
- 4
Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs View File

@@ -4,6 +4,7 @@ using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Client.Receiving;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Protocol; using MQTTnet.Protocol;


@@ -18,7 +19,7 @@ namespace MQTTnet.Extensions.Rpc
{ {
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));


_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
_mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(HandleReceivedApplicationMessageAsync);
} }


public Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel) public Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
@@ -102,19 +103,21 @@ namespace MQTTnet.Extensions.Rpc
} }
} }


private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
private Task HandleReceivedApplicationMessageAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
{ {
if (!_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out var tcs)) if (!_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out var tcs))
{ {
return;
return Task.FromResult(0);
} }


if (tcs.Task.IsCompleted || tcs.Task.IsCanceled) if (tcs.Task.IsCompleted || tcs.Task.IsCanceled)
{ {
return;
return Task.FromResult(0);
} }


tcs.TrySetResult(eventArgs.ApplicationMessage.Payload); tcs.TrySetResult(eventArgs.ApplicationMessage.Payload);

return Task.FromResult(0);
} }


public void Dispose() public void Dispose()


+ 32
- 0
Source/MQTTnet.Server/Mqtt/CustomMqttFactory.cs View File

@@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Server.Logging;

namespace MQTTnet.Server.Mqtt
{
public class CustomMqttFactory
{
private readonly MqttFactory _mqttFactory;

public CustomMqttFactory(ILogger<MqttServer> logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));

Logger = new MqttNetLoggerWrapper(logger);

_mqttFactory = new MqttFactory(Logger);
}
public IMqttNetLogger Logger { get; }

public IMqttServer CreateMqttServer(List<IMqttServerAdapter> adapters)
{
if (adapters == null) throw new ArgumentNullException(nameof(adapters));

return _mqttFactory.CreateMqttServer(adapters);
}
}
}

+ 18
- 2
Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs View File

@@ -10,7 +10,7 @@ namespace MQTTnet.Server.Mqtt
public class MqttApplicationMessageInterceptor : IMqttServerApplicationMessageInterceptor public class MqttApplicationMessageInterceptor : IMqttServerApplicationMessageInterceptor
{ {
private readonly PythonScriptHostService _pythonScriptHostService; private readonly PythonScriptHostService _pythonScriptHostService;
private readonly ILogger<MqttApplicationMessageInterceptor> _logger;
private readonly ILogger _logger;


public MqttApplicationMessageInterceptor(PythonScriptHostService pythonScriptHostService, ILogger<MqttApplicationMessageInterceptor> logger) public MqttApplicationMessageInterceptor(PythonScriptHostService pythonScriptHostService, ILogger<MqttApplicationMessageInterceptor> logger)
{ {
@@ -31,7 +31,7 @@ namespace MQTTnet.Server.Mqtt
{ "qos", (int)context.ApplicationMessage.QualityOfServiceLevel }, { "qos", (int)context.ApplicationMessage.QualityOfServiceLevel },
{ "retain", context.ApplicationMessage.Retain } { "retain", context.ApplicationMessage.Retain }
}; };
_pythonScriptHostService.InvokeOptionalFunction("on_intercept_application_message", pythonContext); _pythonScriptHostService.InvokeOptionalFunction("on_intercept_application_message", pythonContext);


context.AcceptPublish = (bool)pythonContext.get("accept_publish", context.AcceptPublish); context.AcceptPublish = (bool)pythonContext.get("accept_publish", context.AcceptPublish);
@@ -46,5 +46,21 @@ namespace MQTTnet.Server.Mqtt


return Task.CompletedTask; return Task.CompletedTask;
} }

// TODO: Create dump(object) method in wrapper (creates JSON and prints it).
public class PythonMqttApplicationMessageInterceptorContext
{
public bool accept_connection;

public bool accept_publish;

public string client_id;

public string topic;

public int qos;

public bool retain;
}
} }
} }

+ 39
- 0
Source/MQTTnet.Server/Mqtt/MqttClientConnectedHandler.cs View File

@@ -0,0 +1,39 @@
using System;
using System.Threading.Tasks;
using IronPython.Runtime;
using Microsoft.Extensions.Logging;
using MQTTnet.Server.Scripting;

namespace MQTTnet.Server.Mqtt
{
public class MqttClientConnectedHandler : IMqttServerClientConnectedHandler
{
private readonly PythonScriptHostService _pythonScriptHostService;
private readonly ILogger _logger;

public MqttClientConnectedHandler(PythonScriptHostService pythonScriptHostService, ILogger<MqttClientConnectedHandler> logger)
{
_pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public Task HandleClientConnectedAsync(MqttServerClientConnectedEventArgs eventArgs)
{
try
{
var pythonEventArgs = new PythonDictionary
{
{ "client_id", eventArgs.ClientId }
};

_pythonScriptHostService.InvokeOptionalFunction("on_client_connected", pythonEventArgs);
}
catch (Exception exception)
{
_logger.LogError(exception, "Error while handling client connected event.");
}

return Task.CompletedTask;
}
}
}

+ 40
- 0
Source/MQTTnet.Server/Mqtt/MqttClientDisconnectedHandler.cs View File

@@ -0,0 +1,40 @@
using System;
using System.Threading.Tasks;
using IronPython.Runtime;
using Microsoft.Extensions.Logging;
using MQTTnet.Server.Scripting;

namespace MQTTnet.Server.Mqtt
{
public class MqttClientDisconnectedHandler : IMqttServerClientDisconnectedHandler
{
private readonly PythonScriptHostService _pythonScriptHostService;
private readonly ILogger _logger;

public MqttClientDisconnectedHandler(PythonScriptHostService pythonScriptHostService, ILogger<MqttClientDisconnectedHandler> logger)
{
_pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public Task HandleClientDisconnectedAsync(MqttServerClientDisconnectedEventArgs eventArgs)
{
try
{
var pythonEventArgs = new PythonDictionary
{
{ "client_id", eventArgs.ClientId },
{ "type", PythonConvert.Pythonfy(eventArgs.DisconnectType) }
};

_pythonScriptHostService.InvokeOptionalFunction("on_client_disconnected", pythonEventArgs);
}
catch (Exception exception)
{
_logger.LogError(exception, "Error while handling client disconnected event.");
}

return Task.CompletedTask;
}
}
}

+ 41
- 0
Source/MQTTnet.Server/Mqtt/MqttClientSubscribedTopicHandler.cs View File

@@ -0,0 +1,41 @@
using System;
using System.Threading.Tasks;
using IronPython.Runtime;
using Microsoft.Extensions.Logging;
using MQTTnet.Server.Scripting;

namespace MQTTnet.Server.Mqtt
{
public class MqttClientSubscribedTopicHandler : IMqttServerClientSubscribedTopicHandler
{
private readonly PythonScriptHostService _pythonScriptHostService;
private readonly ILogger _logger;

public MqttClientSubscribedTopicHandler(PythonScriptHostService pythonScriptHostService, ILogger<MqttClientSubscribedTopicHandler> logger)
{
_pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public Task HandleClientSubscribedTopicAsync(MqttServerClientSubscribedTopicEventArgs eventArgs)
{
try
{
var pythonEventArgs = new PythonDictionary
{
{ "client_id", eventArgs.ClientId },
{ "topic", eventArgs.TopicFilter.Topic },
{ "qos", (int)eventArgs.TopicFilter.QualityOfServiceLevel }
};

_pythonScriptHostService.InvokeOptionalFunction("on_client_subscribed_topic", pythonEventArgs);
}
catch (Exception exception)
{
_logger.LogError(exception, "Error while handling client subscribed topic event.");
}

return Task.CompletedTask;
}
}
}

+ 40
- 0
Source/MQTTnet.Server/Mqtt/MqttClientUnsubscribedTopicHandler.cs View File

@@ -0,0 +1,40 @@
using System;
using System.Threading.Tasks;
using IronPython.Runtime;
using Microsoft.Extensions.Logging;
using MQTTnet.Server.Scripting;

namespace MQTTnet.Server.Mqtt
{
public class MqttClientUnsubscribedTopicHandler : IMqttServerClientUnsubscribedTopicHandler
{
private readonly PythonScriptHostService _pythonScriptHostService;
private readonly ILogger _logger;

public MqttClientUnsubscribedTopicHandler(PythonScriptHostService pythonScriptHostService, ILogger<MqttClientUnsubscribedTopicHandler> logger)
{
_pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public Task HandleClientUnsubscribedTopicAsync(MqttServerClientUnsubscribedTopicEventArgs eventArgs)
{
try
{
var pythonEventArgs = new PythonDictionary
{
{ "client_id", eventArgs.ClientId },
{ "topic", eventArgs.TopicFilter }
};

_pythonScriptHostService.InvokeOptionalFunction("on_client_unsubscribed_topic", pythonEventArgs);
}
catch (Exception exception)
{
_logger.LogError(exception, "Error while handling client unsubscribed topic event.");
}

return Task.CompletedTask;
}
}
}

+ 1
- 1
Source/MQTTnet.Server/Mqtt/MqttConnectionValidator.cs View File

@@ -10,7 +10,7 @@ namespace MQTTnet.Server.Mqtt
public class MqttConnectionValidator : IMqttServerConnectionValidator public class MqttConnectionValidator : IMqttServerConnectionValidator
{ {
private readonly PythonScriptHostService _pythonScriptHostService; private readonly PythonScriptHostService _pythonScriptHostService;
private readonly ILogger<MqttConnectionValidator> _logger;
private readonly ILogger _logger;


public MqttConnectionValidator(PythonScriptHostService pythonScriptHostService, ILogger<MqttConnectionValidator> logger) public MqttConnectionValidator(PythonScriptHostService pythonScriptHostService, ILogger<MqttConnectionValidator> logger)
{ {


+ 58
- 29
Source/MQTTnet.Server/Mqtt/MqttServerService.cs View File

@@ -8,7 +8,6 @@ using MQTTnet.Adapter;
using MQTTnet.AspNetCore; using MQTTnet.AspNetCore;
using MQTTnet.Implementations; using MQTTnet.Implementations;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using MQTTnet.Server.Logging;
using MQTTnet.Server.Scripting; using MQTTnet.Server.Scripting;


namespace MQTTnet.Server.Mqtt namespace MQTTnet.Server.Mqtt
@@ -17,6 +16,10 @@ namespace MQTTnet.Server.Mqtt
{ {
private readonly ILogger<MqttServerService> _logger; private readonly ILogger<MqttServerService> _logger;


private readonly MqttClientConnectedHandler _mqttClientConnectedHandler;
private readonly MqttClientDisconnectedHandler _mqttClientDisconnectedHandler;
private readonly MqttClientSubscribedTopicHandler _mqttClientSubscribedTopicHandler;
private readonly MqttClientUnsubscribedTopicHandler _mqttClientUnsubscribedTopicHandler;
private readonly MqttConnectionValidator _mqttConnectionValidator; private readonly MqttConnectionValidator _mqttConnectionValidator;
private readonly MqttSubscriptionInterceptor _mqttSubscriptionInterceptor; private readonly MqttSubscriptionInterceptor _mqttSubscriptionInterceptor;
private readonly MqttApplicationMessageInterceptor _mqttApplicationMessageInterceptor; private readonly MqttApplicationMessageInterceptor _mqttApplicationMessageInterceptor;
@@ -25,15 +28,22 @@ namespace MQTTnet.Server.Mqtt
private readonly IMqttServer _mqttServer; private readonly IMqttServer _mqttServer;


public MqttServerService( public MqttServerService(
IMqttServerFactory mqttServerFactory,
CustomMqttFactory mqttFactory,
MqttWebSocketServerAdapter webSocketServerAdapter, MqttWebSocketServerAdapter webSocketServerAdapter,
MqttNetLoggerWrapper mqttNetLogger,
MqttClientConnectedHandler mqttClientConnectedHandler,
MqttClientDisconnectedHandler mqttClientDisconnectedHandler,
MqttClientSubscribedTopicHandler mqttClientSubscribedTopicHandler,
MqttClientUnsubscribedTopicHandler mqttClientUnsubscribedTopicHandler,
MqttConnectionValidator mqttConnectionValidator, MqttConnectionValidator mqttConnectionValidator,
MqttSubscriptionInterceptor mqttSubscriptionInterceptor, MqttSubscriptionInterceptor mqttSubscriptionInterceptor,
MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor, MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor,
PythonScriptHostService pythonScriptHostService, PythonScriptHostService pythonScriptHostService,
ILogger<MqttServerService> logger) ILogger<MqttServerService> logger)
{ {
_mqttClientConnectedHandler = mqttClientConnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientConnectedHandler));
_mqttClientDisconnectedHandler = mqttClientDisconnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientDisconnectedHandler));
_mqttClientSubscribedTopicHandler = mqttClientSubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientSubscribedTopicHandler));
_mqttClientUnsubscribedTopicHandler = mqttClientUnsubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientUnsubscribedTopicHandler));
_mqttConnectionValidator = mqttConnectionValidator ?? throw new ArgumentNullException(nameof(mqttConnectionValidator)); _mqttConnectionValidator = mqttConnectionValidator ?? throw new ArgumentNullException(nameof(mqttConnectionValidator));
_mqttSubscriptionInterceptor = mqttSubscriptionInterceptor ?? throw new ArgumentNullException(nameof(mqttSubscriptionInterceptor)); _mqttSubscriptionInterceptor = mqttSubscriptionInterceptor ?? throw new ArgumentNullException(nameof(mqttSubscriptionInterceptor));
_mqttApplicationMessageInterceptor = mqttApplicationMessageInterceptor ?? throw new ArgumentNullException(nameof(mqttApplicationMessageInterceptor)); _mqttApplicationMessageInterceptor = mqttApplicationMessageInterceptor ?? throw new ArgumentNullException(nameof(mqttApplicationMessageInterceptor));
@@ -42,11 +52,11 @@ namespace MQTTnet.Server.Mqtt


var adapters = new List<IMqttServerAdapter> var adapters = new List<IMqttServerAdapter>
{ {
new MqttTcpServerAdapter(new MqttNetChildLoggerWrapper(null, mqttNetLogger)),
new MqttTcpServerAdapter(mqttFactory.Logger.CreateChildLogger(nameof(MqttTcpServerAdapter))),
webSocketServerAdapter webSocketServerAdapter
}; };


_mqttServer = mqttServerFactory.CreateMqttServer(adapters);
_mqttServer = mqttFactory.CreateMqttServer(adapters);
} }


public void Configure() public void Configure()
@@ -61,6 +71,11 @@ namespace MQTTnet.Server.Mqtt
.WithSubscriptionInterceptor(_mqttSubscriptionInterceptor) .WithSubscriptionInterceptor(_mqttSubscriptionInterceptor)
.Build(); .Build();


_mqttServer.ClientConnectedHandler = _mqttClientConnectedHandler;
_mqttServer.ClientDisconnectedHandler = _mqttClientDisconnectedHandler;
_mqttServer.ClientSubscribedTopicHandler = _mqttClientSubscribedTopicHandler;
_mqttServer.ClientUnsubscribedTopicHandler = _mqttClientUnsubscribedTopicHandler;

_mqttServer.StartAsync(options).GetAwaiter().GetResult(); _mqttServer.StartAsync(options).GetAwaiter().GetResult();


_logger.LogInformation("MQTT server started."); _logger.LogInformation("MQTT server started.");
@@ -68,34 +83,48 @@ namespace MQTTnet.Server.Mqtt


private void Publish(PythonDictionary parameters) private void Publish(PythonDictionary parameters)
{ {
var applicationMessageBuilder = new MqttApplicationMessageBuilder()
.WithTopic((string)parameters.get("topic", null))
.WithRetainFlag((bool)parameters.get("retain", false))
.WithQualityOfServiceLevel((MqttQualityOfServiceLevel)(int)parameters.get("qos", 0));

var payload = parameters.get("payload", null);
var binaryPayload = new byte[0];

if (payload is string stringPayload)
{
binaryPayload = Encoding.UTF8.GetBytes(stringPayload);
}
else if (payload is ByteArray byteArray)
try
{ {
binaryPayload = byteArray.ToArray();
var applicationMessageBuilder = new MqttApplicationMessageBuilder()
.WithTopic((string)parameters.get("topic", null))
.WithRetainFlag((bool)parameters.get("retain", false))
.WithQualityOfServiceLevel((MqttQualityOfServiceLevel)(int)parameters.get("qos", 0));

var payload = parameters.get("payload", null);
byte[] binaryPayload;

if (payload == null)
{
binaryPayload = new byte[0];
}
else if (payload is string stringPayload)
{
binaryPayload = Encoding.UTF8.GetBytes(stringPayload);
}
else if (payload is ByteArray byteArray)
{
binaryPayload = byteArray.ToArray();
}
else if (payload is IEnumerable<int> intArray)
{
binaryPayload = intArray.Select(Convert.ToByte).ToArray();
}
else
{
throw new NotSupportedException("Payload type not supported.");
}

applicationMessageBuilder = applicationMessageBuilder
.WithPayload(binaryPayload);

var applicationMessage = applicationMessageBuilder.Build();

_mqttServer.PublishAsync(applicationMessage).GetAwaiter().GetResult();
} }
else if (payload is IEnumerable<int> intArray)
catch (Exception exception)
{ {
binaryPayload = intArray.Select(Convert.ToByte).ToArray();
_logger.LogError(exception, "Error while publishing application message from server.");
} }

applicationMessageBuilder = applicationMessageBuilder
.WithPayload(binaryPayload);

var applicationMessage = applicationMessageBuilder.Build();

_mqttServer.PublishAsync(applicationMessage).GetAwaiter().GetResult();
_logger.LogInformation($"Published topic '{applicationMessage.Topic}' from server.");
} }
} }
} }

+ 1
- 1
Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs View File

@@ -9,7 +9,7 @@ namespace MQTTnet.Server.Mqtt
public class MqttSubscriptionInterceptor : IMqttServerSubscriptionInterceptor public class MqttSubscriptionInterceptor : IMqttServerSubscriptionInterceptor
{ {
private readonly PythonScriptHostService _pythonScriptHostService; private readonly PythonScriptHostService _pythonScriptHostService;
private readonly ILogger<MqttSubscriptionInterceptor> _logger;
private readonly ILogger _logger;


public MqttSubscriptionInterceptor(PythonScriptHostService pythonScriptHostService, ILogger<MqttSubscriptionInterceptor> logger) public MqttSubscriptionInterceptor(PythonScriptHostService pythonScriptHostService, ILogger<MqttSubscriptionInterceptor> logger)
{ {


+ 5
- 2
Source/MQTTnet.Server/Program.cs View File

@@ -48,8 +48,11 @@ namespace MQTTnet.Server


Console.ForegroundColor = ConsoleColor.White; Console.ForegroundColor = ConsoleColor.White;
Console.WriteLine(@" Console.WriteLine(@"
Version: 1.0.0-alpha1
License: MIT (read LICENSE file)
Version: 1.0.0-alpha1
License: MIT (read LICENSE file)
Sponsoring: https://opencollective.com/mqttnet
Support: https://github.com/chkr1011/MQTTnet/issues
Docs: https://github.com/chkr1011/MQTTnet/wiki/MQTTnetServer
"); ");


Console.BackgroundColor = ConsoleColor.White; Console.BackgroundColor = ConsoleColor.White;


+ 3
- 3
Source/MQTTnet.Server/Scripting/PythonScriptHostService.cs View File

@@ -36,12 +36,12 @@ namespace MQTTnet.Server.Scripting
} }
} }


public void RegisterProxyObject(string name, object action)
public void RegisterProxyObject(string name, object @object)
{ {
if (name == null) throw new ArgumentNullException(nameof(name)); if (name == null) throw new ArgumentNullException(nameof(name));
if (action == null) throw new ArgumentNullException(nameof(action));
if (@object == null) throw new ArgumentNullException(nameof(@object));


_proxyObjects.Add(name, action);
_proxyObjects.Add(name, @object);
} }


public void InvokeOptionalFunction(string name, object parameters) public void InvokeOptionalFunction(string name, object parameters)


+ 33
- 0
Source/MQTTnet.Server/Scripts/00_sample.py View File

@@ -48,6 +48,7 @@ def on_intercept_application_message(context):
This function is invoked for every processed application message. It also allows modifying This function is invoked for every processed application message. It also allows modifying
the message or cancel processing at all. the message or cancel processing at all.
""" """

client_id = context["client_id"] client_id = context["client_id"]


if client_id != None: if client_id != None:
@@ -72,3 +73,35 @@ def on_intercept_application_message(context):
mqtt_net_server.publish(application_message) mqtt_net_server.publish(application_message)


print("Client '{client_id}' published topic '{topic}'.".format(client_id=context["client_id"], topic=context["topic"])) print("Client '{client_id}' published topic '{topic}'.".format(client_id=context["client_id"], topic=context["topic"]))


def on_client_connected(event_args):
"""
This function is called whenever a client has passed the validation is connected.
"""

print("Client '{client_id}' is now connected.".format(client_id=event_args["client_id"]))


def on_client_disconnected(event_args):
"""
This function is called whenever a client has disconnected.
"""

print("Client '{client_id}' is now disconnected (type = {type}).".format(client_id=event_args["client_id"], type=event_args["type"]))


def on_client_subscribed_topic(event_args):
"""
This function is called whenever a client has subscribed to a topic (when allowed).
"""

print("Client '{client_id}' has subscribed to '{topic}'.".format(client_id=event_args["client_id"], topic=event_args["topic"]))


def on_client_unsubscribed_topic(event_args):
"""
This function is called whenever a client has unsubscribed from a topic.
"""

print("Client '{client_id}' has unsubscribed from '{topic}'.".format(client_id=event_args["client_id"], topic=event_args["topic"]))

+ 8
- 4
Source/MQTTnet.Server/Startup.cs View File

@@ -50,20 +50,24 @@ namespace MQTTnet.Server


public void ConfigureServices(IServiceCollection services) public void ConfigureServices(IServiceCollection services)
{ {
services.AddSingleton<MqttNetLoggerWrapper>();
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);


services.AddSingleton<PythonIOStream>(); services.AddSingleton<PythonIOStream>();
services.AddSingleton<PythonScriptHostService>(); services.AddSingleton<PythonScriptHostService>();
services.AddSingleton<DataSharingService>(); services.AddSingleton<DataSharingService>();


services.AddSingleton<MqttNetLoggerWrapper>();
services.AddSingleton<MqttWebSocketServerAdapter>(); services.AddSingleton<MqttWebSocketServerAdapter>();
services.AddSingleton<IMqttServerFactory, MqttFactory>();
services.AddSingleton<CustomMqttFactory>();
services.AddSingleton<MqttServerService>(); services.AddSingleton<MqttServerService>();

services.AddSingleton<MqttClientConnectedHandler>();
services.AddSingleton<MqttClientDisconnectedHandler>();
services.AddSingleton<MqttClientSubscribedTopicHandler>();
services.AddSingleton<MqttClientUnsubscribedTopicHandler>();
services.AddSingleton<MqttConnectionValidator>(); services.AddSingleton<MqttConnectionValidator>();
services.AddSingleton<MqttSubscriptionInterceptor>(); services.AddSingleton<MqttSubscriptionInterceptor>();
services.AddSingleton<MqttApplicationMessageInterceptor>(); services.AddSingleton<MqttApplicationMessageInterceptor>();

services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
} }
} }
} }

+ 1
- 1
Source/MQTTnet/Adapter/IMqttServerAdapter.cs View File

@@ -6,7 +6,7 @@ namespace MQTTnet.Adapter
{ {
public interface IMqttServerAdapter : IDisposable public interface IMqttServerAdapter : IDisposable
{ {
event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;
Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }


Task StartAsync(IMqttServerOptions options); Task StartAsync(IMqttServerOptions options);
Task StopAsync(); Task StopAsync();


+ 2
- 6
Source/MQTTnet/Client/IMqttClient.cs View File

@@ -15,13 +15,9 @@ namespace MQTTnet.Client
IMqttClientOptions Options { get; } IMqttClientOptions Options { get; }


IMqttClientConnectedHandler ConnectedHandler { get; set; } IMqttClientConnectedHandler ConnectedHandler { get; set; }
[Obsolete("Use ConnectedHandler instead.")]
event EventHandler<MqttClientConnectedEventArgs> Connected;

IMqttClientDisconnectedHandler DisconnectedHandler { get; set; } IMqttClientDisconnectedHandler DisconnectedHandler { get; set; }
[Obsolete("Use DisconnectedHandler instead.")]
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;

Task<MqttClientAuthenticateResult> ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken); Task<MqttClientAuthenticateResult> ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken);
Task DisconnectAsync(MqttClientDisconnectOptions options, CancellationToken cancellationToken); Task DisconnectAsync(MqttClientDisconnectOptions options, CancellationToken cancellationToken);




+ 3
- 16
Source/MQTTnet/Client/MqttClient.cs View File

@@ -46,15 +46,9 @@ namespace MQTTnet.Client


public IMqttClientConnectedHandler ConnectedHandler { get; set; } public IMqttClientConnectedHandler ConnectedHandler { get; set; }


public event EventHandler<MqttClientConnectedEventArgs> Connected;

public IMqttClientDisconnectedHandler DisconnectedHandler { get; set; } public IMqttClientDisconnectedHandler DisconnectedHandler { get; set; }


public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;

public IMqttApplicationMessageHandler ReceivedApplicationMessageHandler { get; set; }

public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
public IMqttApplicationMessageHandler ApplicationMessageReceivedHandler { get; set; }


public bool IsConnected { get; private set; } public bool IsConnected { get; private set; }


@@ -106,9 +100,6 @@ namespace MQTTnet.Client
await connectedHandler.HandleConnectedAsync(new MqttClientConnectedEventArgs(authenticateResult)).ConfigureAwait(false); await connectedHandler.HandleConnectedAsync(new MqttClientConnectedEventArgs(authenticateResult)).ConfigureAwait(false);
} }


// TODO: Remove!
Connected?.Invoke(this, new MqttClientConnectedEventArgs(authenticateResult));

return authenticateResult; return authenticateResult;
} }
catch (Exception exception) catch (Exception exception)
@@ -276,8 +267,6 @@ namespace MQTTnet.Client
{ {
await disconnectedHandler.HandleDisconnectedAsync(new MqttClientDisconnectedEventArgs(clientWasConnected, exception)).ConfigureAwait(false); await disconnectedHandler.HandleDisconnectedAsync(new MqttClientDisconnectedEventArgs(clientWasConnected, exception)).ConfigureAwait(false);
} }

Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
} }
} }


@@ -587,13 +576,11 @@ namespace MQTTnet.Client
{ {
var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket); var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket);


ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage));

var handler = ReceivedApplicationMessageHandler;
var handler = ApplicationMessageReceivedHandler;
if (handler != null) if (handler != null)
{ {
return handler.HandleApplicationMessageAsync( return handler.HandleApplicationMessageAsync(
new MqttApplicationMessageHandlerContext(Options.ClientId, applicationMessage));
new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage));
} }


return Task.FromResult(0); return Task.FromResult(0);


+ 8
- 8
Source/MQTTnet/Client/MqttClientExtensions.cs View File

@@ -63,35 +63,35 @@ namespace MQTTnet.Client
return client; return client;
} }


public static IMqttClient UseReceivedApplicationMessageHandler(this IMqttClient client, Func<MqttApplicationMessageHandlerContext, Task> handler)
public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
{ {
if (handler == null) if (handler == null)
{ {
client.ReceivedApplicationMessageHandler = null;
client.ApplicationMessageReceivedHandler = null;
return client; return client;
} }


client.ReceivedApplicationMessageHandler = new MqttApplicationMessageHandlerDelegate(handler);
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(handler);


return client; return client;
} }


public static IMqttClient UseReceivedApplicationMessageHandler(this IMqttClient client, Action<MqttApplicationMessageHandlerContext> handler)
public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, Action<MqttApplicationMessageReceivedEventArgs> handler)
{ {
if (handler == null) if (handler == null)
{ {
client.ReceivedApplicationMessageHandler = null;
client.ApplicationMessageReceivedHandler = null;
return client; return client;
} }


client.ReceivedApplicationMessageHandler = new MqttApplicationMessageHandlerDelegate(handler);
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(handler);


return client; return client;
} }


public static IMqttClient UseReceivedApplicationMessageHandler(this IMqttClient client, IMqttApplicationMessageHandler handler)
public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, IMqttApplicationMessageHandler handler)
{ {
client.ReceivedApplicationMessageHandler = handler;
client.ApplicationMessageReceivedHandler = handler;


return client; return client;
} }


+ 1
- 1
Source/MQTTnet/Client/Receiving/IMqttApplicationMessageHandler.cs View File

@@ -4,6 +4,6 @@ namespace MQTTnet.Client.Receiving
{ {
public interface IMqttApplicationMessageHandler public interface IMqttApplicationMessageHandler
{ {
Task HandleApplicationMessageAsync(MqttApplicationMessageHandlerContext context);
Task HandleApplicationMessageAsync(MqttApplicationMessageReceivedEventArgs eventArgs);
} }
} }

+ 0
- 15
Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerContext.cs View File

@@ -1,15 +0,0 @@
namespace MQTTnet.Client.Receiving
{
public class MqttApplicationMessageHandlerContext
{
public MqttApplicationMessageHandlerContext(string senderClientId, MqttApplicationMessage applicationMessage)
{
SenderClientId = senderClientId;
ApplicationMessage = applicationMessage;
}

public string SenderClientId { get; }

public MqttApplicationMessage ApplicationMessage { get; }
}
}

+ 4
- 4
Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerDelegate.cs View File

@@ -5,9 +5,9 @@ namespace MQTTnet.Client.Receiving
{ {
public class MqttApplicationMessageHandlerDelegate : IMqttApplicationMessageHandler public class MqttApplicationMessageHandlerDelegate : IMqttApplicationMessageHandler
{ {
private readonly Func<MqttApplicationMessageHandlerContext, Task> _handler;
private readonly Func<MqttApplicationMessageReceivedEventArgs, Task> _handler;


public MqttApplicationMessageHandlerDelegate(Action<MqttApplicationMessageHandlerContext> handler)
public MqttApplicationMessageHandlerDelegate(Action<MqttApplicationMessageReceivedEventArgs> handler)
{ {
if (handler == null) throw new ArgumentNullException(nameof(handler)); if (handler == null) throw new ArgumentNullException(nameof(handler));


@@ -18,12 +18,12 @@ namespace MQTTnet.Client.Receiving
}; };
} }


public MqttApplicationMessageHandlerDelegate(Func<MqttApplicationMessageHandlerContext, Task> handler)
public MqttApplicationMessageHandlerDelegate(Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
{ {
_handler = handler ?? throw new ArgumentNullException(nameof(handler)); _handler = handler ?? throw new ArgumentNullException(nameof(handler));
} }


public Task HandleApplicationMessageAsync(MqttApplicationMessageHandlerContext context)
public Task HandleApplicationMessageAsync(MqttApplicationMessageReceivedEventArgs context)
{ {
return _handler(context); return _handler(context);
} }


+ 2
- 6
Source/MQTTnet/IApplicationMessageReceiver.cs View File

@@ -1,13 +1,9 @@
using System;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Receiving;


namespace MQTTnet namespace MQTTnet
{ {
public interface IApplicationMessageReceiver public interface IApplicationMessageReceiver
{ {
IMqttApplicationMessageHandler ReceivedApplicationMessageHandler { get; set; }

[Obsolete("Use _ReceivedApplicationMessageHandler_ instead.")]
event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
IMqttApplicationMessageHandler ApplicationMessageReceivedHandler { get; set; }
} }
} }

+ 2
- 2
Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs View File

@@ -23,7 +23,7 @@ namespace MQTTnet.Implementations
_logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter)); _logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter));
} }


public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;
public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }


public async Task StartAsync(IMqttServerOptions options) public async Task StartAsync(IMqttServerOptions options)
{ {
@@ -73,7 +73,7 @@ namespace MQTTnet.Implementations
try try
{ {
var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket, _options), new MqttPacketFormatterAdapter(), _logger); var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket, _options), new MqttPacketFormatterAdapter(), _logger);
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
} }
catch (Exception exception) catch (Exception exception)
{ {


+ 11
- 7
Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs View File

@@ -26,7 +26,7 @@ namespace MQTTnet.Implementations
_logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter)); _logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter));
} }


public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;
public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }


public Task StartAsync(IMqttServerOptions options) public Task StartAsync(IMqttServerOptions options)
{ {
@@ -87,9 +87,11 @@ namespace MQTTnet.Implementations
options, options,
tlsCertificate, tlsCertificate,
_cancellationTokenSource.Token, _cancellationTokenSource.Token,
_logger);
_logger)
{
ClientAcceptedHandler = OnClientAccepted
};


listenerV4.ClientAccepted += OnClientAccepted;
listenerV4.Start(); listenerV4.Start();
_listeners.Add(listenerV4); _listeners.Add(listenerV4);
} }
@@ -101,17 +103,19 @@ namespace MQTTnet.Implementations
options, options,
tlsCertificate, tlsCertificate,
_cancellationTokenSource.Token, _cancellationTokenSource.Token,
_logger);
_logger)
{
ClientAcceptedHandler = OnClientAccepted
};


listenerV6.ClientAccepted += OnClientAccepted;
listenerV6.Start(); listenerV6.Start();
_listeners.Add(listenerV6); _listeners.Add(listenerV6);
} }
} }


private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs e)
private void OnClientAccepted(MqttServerAdapterClientAcceptedEventArgs eventArgs)
{ {
ClientAccepted?.Invoke(this, e);
ClientAcceptedHandler?.Invoke(eventArgs);
} }
} }
} }

+ 2
- 2
Source/MQTTnet/Implementations/MqttTcpServerListener.cs View File

@@ -42,7 +42,7 @@ namespace MQTTnet.Implementations
} }
} }


public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;
public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler;


public void Start() public void Start()
{ {
@@ -88,7 +88,7 @@ namespace MQTTnet.Implementations
_addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6"); _addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6");


var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketFormatterAdapter(), _logger); var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketFormatterAdapter(), _logger);
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
} }
catch (ObjectDisposedException) catch (ObjectDisposedException)
{ {


+ 2
- 9
Source/MQTTnet/Server/IMqttServer.cs View File

@@ -11,16 +11,9 @@ namespace MQTTnet.Server
event EventHandler Stopped; event EventHandler Stopped;


IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; } IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; }
[Obsolete("Use ClientConnectedHandler instead.")]
event EventHandler<MqttServerClientConnectedEventArgs> ClientConnected;

IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; } IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; }
[Obsolete("Use ClientDisconnectedHandler instead.")]
event EventHandler<MqttServerClientDisconnectedEventArgs> ClientDisconnected;

event EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic;

event EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic;
IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler { get; set; }
IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler { get; set; }
IMqttServerOptions Options { get; } IMqttServerOptions Options { get; }




+ 9
- 0
Source/MQTTnet/Server/IMqttServerClientSubscribedTopicHandler.cs View File

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public interface IMqttServerClientSubscribedTopicHandler
{
Task HandleClientSubscribedTopicAsync(MqttServerClientSubscribedTopicEventArgs eventArgs);
}
}

+ 9
- 0
Source/MQTTnet/Server/IMqttServerClientUnsubscribedTopicHandler.cs View File

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public interface IMqttServerClientUnsubscribedTopicHandler
{
Task HandleClientUnsubscribedTopicAsync(MqttServerClientUnsubscribedTopicEventArgs eventArgs);
}
}

+ 3
- 3
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -196,7 +196,7 @@ namespace MQTTnet.Server
applicationMessage = interceptorContext.ApplicationMessage; applicationMessage = interceptorContext.ApplicationMessage;
} }


_eventDispatcher.OnApplicationMessageReceived(sender?.ClientId, applicationMessage);
await _eventDispatcher.HandleApplicationMessageReceivedAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false);


if (applicationMessage.Retain) if (applicationMessage.Retain)
{ {
@@ -258,7 +258,7 @@ namespace MQTTnet.Server


var connection = await CreateConnectionAsync(channelAdapter, connectPacket).ConfigureAwait(false); var connection = await CreateConnectionAsync(channelAdapter, connectPacket).ConfigureAwait(false);


_eventDispatcher.OnClientConnected(clientId);
await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false);
disconnectType = await connection.RunAsync().ConfigureAwait(false); disconnectType = await connection.RunAsync().ConfigureAwait(false);
} }
@@ -288,7 +288,7 @@ namespace MQTTnet.Server


await TryCleanupChannelAsync(channelAdapter).ConfigureAwait(false); await TryCleanupChannelAsync(channelAdapter).ConfigureAwait(false);


_eventDispatcher.OnClientDisconnected(clientId, disconnectType);
await _eventDispatcher.HandleClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false);
} }
} }




+ 9
- 6
Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs View File

@@ -61,7 +61,7 @@ namespace MQTTnet.Server
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
} }


_eventDispatcher.OnClientSubscribedTopic(_clientId, topicFilter);
await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientId, topicFilter).ConfigureAwait(false);
} }
} }


@@ -85,12 +85,12 @@ namespace MQTTnet.Server
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
} }


_eventDispatcher.OnClientSubscribedTopic(_clientId, topicFilter);
await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientId, topicFilter).ConfigureAwait(false);
} }
} }
} }


public Task<MqttUnsubAckPacket> UnsubscribeAsync(MqttUnsubscribePacket unsubscribePacket)
public async Task<MqttUnsubAckPacket> UnsubscribeAsync(MqttUnsubscribePacket unsubscribePacket)
{ {
if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket)); if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket));


@@ -111,12 +111,15 @@ namespace MQTTnet.Server
{ {
unsubAckPacket.ReasonCodes.Add(MqttUnsubscribeReasonCode.NoSubscriptionExisted); unsubAckPacket.ReasonCodes.Add(MqttUnsubscribeReasonCode.NoSubscriptionExisted);
} }

_eventDispatcher.OnClientUnsubscribedTopic(_clientId, topicFilter);
} }
} }


return Task.FromResult(unsubAckPacket);
foreach (var topicFilter in unsubscribePacket.TopicFilters)
{
await _eventDispatcher.HandleClientUnsubscribedTopicAsync(_clientId, topicFilter).ConfigureAwait(false);
}
return unsubAckPacket;
} }


public Task UnsubscribeAsync(IEnumerable<string> topicFilters) public Task UnsubscribeAsync(IEnumerable<string> topicFilters)


+ 30
- 30
Source/MQTTnet/Server/MqttServer.cs View File

@@ -28,40 +28,40 @@ namespace MQTTnet.Server


if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttServer)); _logger = logger.CreateChildLogger(nameof(MqttServer));

_eventDispatcher.ClientConnected += (s, e) => ClientConnected?.Invoke(s, e);
_eventDispatcher.ClientDisconnected += (s, e) => ClientDisconnected?.Invoke(s, e);
_eventDispatcher.ClientSubscribedTopic += (s, e) => ClientSubscribedTopic?.Invoke(s, e);
_eventDispatcher.ClientUnsubscribedTopic += (s, e) => ClientUnsubscribedTopic?.Invoke(s, e);
_eventDispatcher.ApplicationMessageReceived += async (s, e) =>
{
// TODO: Migrate EventDispatcher to proper handlers and no events anymore.
ApplicationMessageReceived?.Invoke(s, e);

var handler = ReceivedApplicationMessageHandler;
if (handler != null)
{
await handler.HandleApplicationMessageAsync(
new MqttApplicationMessageHandlerContext(e.ClientId, e.ApplicationMessage)).ConfigureAwait(false);
}
};
} }


public event EventHandler Started; public event EventHandler Started;
public event EventHandler Stopped; public event EventHandler Stopped;


public IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; }
public event EventHandler<MqttServerClientConnectedEventArgs> ClientConnected;

public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; }
public event EventHandler<MqttServerClientDisconnectedEventArgs> ClientDisconnected;

public event EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic;
public IMqttServerClientConnectedHandler ClientConnectedHandler
{
get => _eventDispatcher.ClientConnectedHandler;
set => _eventDispatcher.ClientConnectedHandler = value;
}


public event EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic;
public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler
{
get => _eventDispatcher.ClientDisconnectedHandler;
set => _eventDispatcher.ClientDisconnectedHandler = value;
}
public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler
{
get => _eventDispatcher.ClientSubscribedTopicHandler;
set => _eventDispatcher.ClientSubscribedTopicHandler = value;
}


public IMqttApplicationMessageHandler ReceivedApplicationMessageHandler { get; set; }
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler
{
get => _eventDispatcher.ClientUnsubscribedTopicHandler;
set => _eventDispatcher.ClientUnsubscribedTopicHandler = value;
}
public IMqttApplicationMessageHandler ApplicationMessageReceivedHandler
{
get => _eventDispatcher.ApplicationMessageReceivedHandler;
set => _eventDispatcher.ApplicationMessageReceivedHandler = value;
}


public IMqttServerOptions Options { get; private set; } public IMqttServerOptions Options { get; private set; }


@@ -123,7 +123,7 @@ namespace MQTTnet.Server


foreach (var adapter in _adapters) foreach (var adapter in _adapters)
{ {
adapter.ClientAccepted += OnClientAccepted;
adapter.ClientAcceptedHandler = OnClientAccepted;
await adapter.StartAsync(Options).ConfigureAwait(false); await adapter.StartAsync(Options).ConfigureAwait(false);
} }


@@ -146,7 +146,7 @@ namespace MQTTnet.Server


foreach (var adapter in _adapters) foreach (var adapter in _adapters)
{ {
adapter.ClientAccepted -= OnClientAccepted;
adapter.ClientAcceptedHandler = null;
await adapter.StopAsync().ConfigureAwait(false); await adapter.StopAsync().ConfigureAwait(false);
} }


@@ -170,7 +170,7 @@ namespace MQTTnet.Server
return _retainedMessagesManager?.ClearMessagesAsync(); return _retainedMessagesManager?.ClearMessagesAsync();
} }


private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs)
private void OnClientAccepted(MqttServerAdapterClientAcceptedEventArgs eventArgs)
{ {
eventArgs.SessionTask = _clientSessionsManager.HandleConnectionAsync(eventArgs.Client); eventArgs.SessionTask = _clientSessionsManager.HandleConnectionAsync(eventArgs.Client);
} }


+ 2
- 2
Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs View File

@@ -11,9 +11,9 @@ namespace MQTTnet.Server
{ {
if (handler == null) throw new ArgumentNullException(nameof(handler)); if (handler == null) throw new ArgumentNullException(nameof(handler));


_handler = context =>
_handler = eventArgs =>
{ {
handler(context);
handler(eventArgs);
return Task.FromResult(0); return Task.FromResult(0);
}; };
} }


+ 2
- 2
Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs View File

@@ -11,9 +11,9 @@ namespace MQTTnet.Server
{ {
if (handler == null) throw new ArgumentNullException(nameof(handler)); if (handler == null) throw new ArgumentNullException(nameof(handler));


_handler = context =>
_handler = eventArgs =>
{ {
handler(context);
handler(eventArgs);
return Task.FromResult(0); return Task.FromResult(0);
}; };
} }


Source/MQTTnet/Server/MqttClientSubscribedTopicEventArgs.cs → Source/MQTTnet/Server/MqttServerClientSubscribedTopicEventArgs.cs View File

@@ -2,9 +2,9 @@


namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public class MqttClientSubscribedTopicEventArgs : EventArgs
public class MqttServerClientSubscribedTopicEventArgs : EventArgs
{ {
public MqttClientSubscribedTopicEventArgs(string clientId, TopicFilter topicFilter)
public MqttServerClientSubscribedTopicEventArgs(string clientId, TopicFilter topicFilter)
{ {
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter)); TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));

+ 31
- 0
Source/MQTTnet/Server/MqttServerClientSubscribedTopicHandlerDelegate.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttServerClientSubscribedHandlerDelegate : IMqttServerClientSubscribedTopicHandler
{
private readonly Func<MqttServerClientSubscribedTopicEventArgs, Task> _handler;

public MqttServerClientSubscribedHandlerDelegate(Action<MqttServerClientSubscribedTopicEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = eventArgs =>
{
handler(eventArgs);
return Task.FromResult(0);
};
}

public MqttServerClientSubscribedHandlerDelegate(Func<MqttServerClientSubscribedTopicEventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleClientSubscribedTopicAsync(MqttServerClientSubscribedTopicEventArgs eventArgs)
{
return _handler(eventArgs);
}
}
}

Source/MQTTnet/Server/MqttClientUnSubscribedTopicEventArgs.cs → Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicEventArgs.cs View File

@@ -2,9 +2,9 @@


namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public class MqttClientUnsubscribedTopicEventArgs : EventArgs
public class MqttServerClientUnsubscribedTopicEventArgs : EventArgs
{ {
public MqttClientUnsubscribedTopicEventArgs(string clientId, string topicFilter)
public MqttServerClientUnsubscribedTopicEventArgs(string clientId, string topicFilter)
{ {
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter)); TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));

+ 31
- 0
Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicHandlerDelegate.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttServerClientUnsubscribedTopicHandlerDelegate : IMqttServerClientUnsubscribedTopicHandler
{
private readonly Func<MqttServerClientUnsubscribedTopicEventArgs, Task> _handler;

public MqttServerClientUnsubscribedTopicHandlerDelegate(Action<MqttServerClientUnsubscribedTopicEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = eventArgs =>
{
handler(eventArgs);
return Task.FromResult(0);
};
}

public MqttServerClientUnsubscribedTopicHandlerDelegate(Func<MqttServerClientUnsubscribedTopicEventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleClientUnsubscribedTopicAsync(MqttServerClientUnsubscribedTopicEventArgs eventArgs)
{
return _handler(eventArgs);
}
}
}

+ 47
- 16
Source/MQTTnet/Server/MqttServerEventDispatcher.cs View File

@@ -1,42 +1,73 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Client.Receiving;


namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public class MqttServerEventDispatcher public class MqttServerEventDispatcher
{ {
public event EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic;
public IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; }


public event EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic;
public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; }


public event EventHandler<MqttServerClientConnectedEventArgs> ClientConnected;
public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler { get; set; }


public event EventHandler<MqttServerClientDisconnectedEventArgs> ClientDisconnected;
public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler { get; set; }


public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
public IMqttApplicationMessageHandler ApplicationMessageReceivedHandler { get; set; }


public void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter)
public Task HandleClientConnectedAsync(string clientId)
{ {
ClientSubscribedTopic?.Invoke(this, new MqttClientSubscribedTopicEventArgs(clientId, topicFilter));
var handler = ClientConnectedHandler;
if (handler == null)
{
return Task.FromResult(0);
}

return handler.HandleClientConnectedAsync(new MqttServerClientConnectedEventArgs(clientId));
} }


public void OnClientUnsubscribedTopic(string clientId, string topicFilter)
public Task HandleClientDisconnectedAsync(string clientId, MqttClientDisconnectType disconnectType)
{ {
ClientUnsubscribedTopic?.Invoke(this, new MqttClientUnsubscribedTopicEventArgs(clientId, topicFilter));
var handler = ClientDisconnectedHandler;
if (handler == null)
{
return Task.FromResult(0);
}

return handler.HandleClientDisconnectedAsync(new MqttServerClientDisconnectedEventArgs(clientId, disconnectType));
} }


public void OnClientDisconnected(string clientId, MqttClientDisconnectType disconnectType)
public Task HandleClientSubscribedTopicAsync(string clientId, TopicFilter topicFilter)
{ {
ClientDisconnected?.Invoke(this, new MqttServerClientDisconnectedEventArgs(clientId, disconnectType));
var handler = ClientSubscribedTopicHandler;
if (handler == null)
{
return Task.FromResult(0);
}

return handler.HandleClientSubscribedTopicAsync(new MqttServerClientSubscribedTopicEventArgs(clientId, topicFilter));
} }


public void OnApplicationMessageReceived(string senderClientId, MqttApplicationMessage applicationMessage)
public Task HandleClientUnsubscribedTopicAsync(string clientId, string topicFilter)
{ {
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage));
var handler = ClientUnsubscribedTopicHandler;
if (handler == null)
{
return Task.FromResult(0);
}

return handler.HandleClientUnsubscribedTopicAsync(new MqttServerClientUnsubscribedTopicEventArgs(clientId, topicFilter));
} }


public void OnClientConnected(string clientId)
public Task HandleApplicationMessageReceivedAsync(string senderClientId, MqttApplicationMessage applicationMessage)
{ {
ClientConnected?.Invoke(this, new MqttServerClientConnectedEventArgs(clientId));
var handler = ApplicationMessageReceivedHandler;
if (handler == null)
{
return Task.FromResult(0);
}

return handler.HandleApplicationMessageAsync(new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage));
} }
} }
} }

+ 1
- 2
Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs View File

@@ -1,6 +1,5 @@
using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Attributes;
using MQTTnet.Channel; using MQTTnet.Channel;
using MQTTnet.Client;
using MQTTnet.Diagnostics; using MQTTnet.Diagnostics;
using MQTTnet.Implementations; using MQTTnet.Implementations;
using MQTTnet.Server; using MQTTnet.Server;
@@ -22,7 +21,7 @@ namespace MQTTnet.Benchmarks
{ {
var factory = new MqttFactory(); var factory = new MqttFactory();
var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
tcpServer.ClientAccepted += (sender, args) => _serverChannel = (IMqttChannel)args.Client.GetType().GetField("_channel", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance).GetValue(args.Client);
tcpServer.ClientAcceptedHandler += args => _serverChannel = (IMqttChannel)args.Client.GetType().GetField("_channel", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance).GetValue(args.Client);


_mqttServer = factory.CreateMqttServer(new[] { tcpServer }, new MqttNetLogger()); _mqttServer = factory.CreateMqttServer(new[] { tcpServer }, new MqttNetLogger());




+ 3
- 2
Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs View File

@@ -4,6 +4,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Client.Publishing; using MQTTnet.Client.Publishing;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Subscribing; using MQTTnet.Client.Subscribing;
using MQTTnet.Client.Unsubscribing; using MQTTnet.Client.Unsubscribing;
using MQTTnet.Formatter; using MQTTnet.Formatter;
@@ -213,13 +214,13 @@ namespace MQTTnet.Tests.MQTTv5
var receivedMessages = new List<MqttApplicationMessageReceivedEventArgs>(); var receivedMessages = new List<MqttApplicationMessageReceivedEventArgs>();


await client1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").WithClientId("client1").WithProtocolVersion(MqttProtocolVersion.V500).Build()); await client1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").WithClientId("client1").WithProtocolVersion(MqttProtocolVersion.V500).Build());
client1.ApplicationMessageReceived += (s, e) =>
client1.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e =>
{ {
lock (receivedMessages) lock (receivedMessages)
{ {
receivedMessages.Add(e); receivedMessages.Add(e);
} }
};
});


await client1.SubscribeAsync("a"); await client1.SubscribeAsync("a");




+ 2
- 2
Tests/MQTTnet.Core.Tests/Mockups/TestMqttServerAdapter.cs View File

@@ -11,7 +11,7 @@ namespace MQTTnet.Tests.Mockups
{ {
public class TestMqttServerAdapter : IMqttServerAdapter public class TestMqttServerAdapter : IMqttServerAdapter
{ {
public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;
public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }
public async Task<IMqttClient> ConnectTestClient(string clientId, MqttApplicationMessage willMessage = null) public async Task<IMqttClient> ConnectTestClient(string clientId, MqttApplicationMessage willMessage = null)
{ {
@@ -41,7 +41,7 @@ namespace MQTTnet.Tests.Mockups
private void FireClientAcceptedEvent(IMqttChannelAdapter adapter) private void FireClientAcceptedEvent(IMqttChannelAdapter adapter)
{ {
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(adapter));
ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(adapter));
} }


public Task StartAsync(IMqttServerOptions options) public Task StartAsync(IMqttServerOptions options)


+ 0
- 38
Tests/MQTTnet.Core.Tests/Mockups/TestServerExtensions.cs View File

@@ -1,38 +0,0 @@
using System.Threading.Tasks;
using MQTTnet.Client;
using MQTTnet.Server;

namespace MQTTnet.Tests.Mockups
{
public static class TestServerExtensions
{
/// <summary>
/// publishes a message with a client and waits in the server until a message with the same topic is received
/// </summary>
/// <returns></returns>
public static async Task PublishAndWaitForAsync(this IMqttClient client, IMqttServer server, MqttApplicationMessage message)
{
var tcs = new TaskCompletionSource<object>();

void Handler(object sender, MqttApplicationMessageReceivedEventArgs args)
{
if (args.ApplicationMessage.Topic == message.Topic)
{
tcs.SetResult(true);
}
}

server.ApplicationMessageReceived += Handler;

try
{
await client.PublishAsync(message).ConfigureAwait(false);
await tcs.Task.ConfigureAwait(false);
}
finally
{
server.ApplicationMessageReceived -= Handler;
}
}
}
}

+ 19
- 18
Tests/MQTTnet.Core.Tests/MqttClientTests.cs View File

@@ -5,8 +5,9 @@ using System.Net.Sockets;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting; using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using MQTTnet.Server; using MQTTnet.Server;
@@ -45,10 +46,10 @@ namespace MQTTnet.Tests
using (var client = factory.CreateMqttClient()) using (var client = factory.CreateMqttClient())
{ {
Exception ex = null; Exception ex = null;
client.Disconnected += (s, e) =>
client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e =>
{ {
ex = e.Exception; ex = e.Exception;
};
});


try try
{ {
@@ -81,9 +82,9 @@ namespace MQTTnet.Tests


var receivedValues = new List<int>(); var receivedValues = new List<int>();


async Task Handler1(MqttApplicationMessageHandlerContext context)
async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
{ {
var value = int.Parse(context.ApplicationMessage.ConvertPayloadToString());
var value = int.Parse(eventArgs.ApplicationMessage.ConvertPayloadToString());
await Task.Delay(value); await Task.Delay(value);


lock (receivedValues) lock (receivedValues)
@@ -92,7 +93,7 @@ namespace MQTTnet.Tests
} }
} }


client1.UseReceivedApplicationMessageHandler(Handler1);
client1.UseApplicationMessageReceivedHandler(Handler1);


var client2 = await testEnvironment.ConnectClientAsync(); var client2 = await testEnvironment.ConnectClientAsync();
for (var i = MessagesCount; i > 0; i--) for (var i = MessagesCount; i > 0; i--)
@@ -119,27 +120,27 @@ namespace MQTTnet.Tests
var client1 = await testEnvironment.ConnectClientAsync(); var client1 = await testEnvironment.ConnectClientAsync();
await client1.SubscribeAsync("request/+"); await client1.SubscribeAsync("request/+");


async Task Handler1(MqttApplicationMessageHandlerContext context)
async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
{ {
await client1.PublishAsync($"reply/{context.ApplicationMessage.Topic}");
await client1.PublishAsync($"reply/{eventArgs.ApplicationMessage.Topic}");
} }


client1.UseReceivedApplicationMessageHandler(Handler1);
client1.UseApplicationMessageReceivedHandler(Handler1);


var client2 = await testEnvironment.ConnectClientAsync(); var client2 = await testEnvironment.ConnectClientAsync();
await client2.SubscribeAsync("reply/#"); await client2.SubscribeAsync("reply/#");


var replies = new List<string>(); var replies = new List<string>();


void Handler2(MqttApplicationMessageHandlerContext context)
void Handler2(MqttApplicationMessageReceivedEventArgs eventArgs)
{ {
lock (replies) lock (replies)
{ {
replies.Add(context.ApplicationMessage.Topic);
replies.Add(eventArgs.ApplicationMessage.Topic);
} }
} }


client2.UseReceivedApplicationMessageHandler((Action<MqttApplicationMessageHandlerContext>)Handler2);
client2.UseApplicationMessageReceivedHandler((Action<MqttApplicationMessageReceivedEventArgs>)Handler2);


await Task.Delay(500); await Task.Delay(500);


@@ -163,7 +164,7 @@ namespace MQTTnet.Tests
var receivedMessages = new List<MqttApplicationMessage>(); var receivedMessages = new List<MqttApplicationMessage>();


var client1 = await testEnvironment.ConnectClientAsync(); var client1 = await testEnvironment.ConnectClientAsync();
client1.UseReceivedApplicationMessageHandler(c =>
client1.UseApplicationMessageReceivedHandler(c =>
{ {
lock (receivedMessages) lock (receivedMessages)
{ {
@@ -195,7 +196,7 @@ namespace MQTTnet.Tests


var client = testEnvironment.CreateClient(); var client = testEnvironment.CreateClient();


client.Connected += async (s, e) =>
client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
{ {
await client.SubscribeAsync("RCU/P1/H0001/R0003"); await client.SubscribeAsync("RCU/P1/H0001/R0003");


@@ -204,9 +205,9 @@ namespace MQTTnet.Tests
.WithTopic("RCU/P1/H0001/R0003"); .WithTopic("RCU/P1/H0001/R0003");


await client.PublishAsync(msg.Build()); await client.PublishAsync(msg.Build());
};
});


client.UseReceivedApplicationMessageHandler(c =>
client.UseApplicationMessageReceivedHandler(c =>
{ {
lock (receivedMessages) lock (receivedMessages)
{ {
@@ -241,7 +242,7 @@ namespace MQTTnet.Tests


var retries = 0; var retries = 0;


async Task Handler1(MqttApplicationMessageHandlerContext context)
async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
{ {
retries++; retries++;


@@ -249,7 +250,7 @@ namespace MQTTnet.Tests
throw new Exception("Broken!"); throw new Exception("Broken!");
} }


client1.UseReceivedApplicationMessageHandler(Handler1);
client1.UseApplicationMessageReceivedHandler(Handler1);


var client2 = await testEnvironment.ConnectClientAsync(); var client2 = await testEnvironment.ConnectClientAsync();
await client2.PublishAsync("x"); await client2.PublishAsync("x");


+ 3
- 2
Tests/MQTTnet.Core.Tests/RoundtripTimeTests.cs View File

@@ -5,6 +5,7 @@ using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting; using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Server; using MQTTnet.Server;


namespace MQTTnet.Tests namespace MQTTnet.Tests
@@ -29,10 +30,10 @@ namespace MQTTnet.Tests


TaskCompletionSource<string> response = null; TaskCompletionSource<string> response = null;


receiverClient.ApplicationMessageReceived += (sender, args) =>
receiverClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(args =>
{ {
response?.SetResult(args.ApplicationMessage.ConvertPayloadToString()); response?.SetResult(args.ApplicationMessage.ConvertPayloadToString());
};
});


var times = new List<TimeSpan>(); var times = new List<TimeSpan>();
var stopwatch = Stopwatch.StartNew(); var stopwatch = Stopwatch.StartNew();


+ 32
- 33
Tests/MQTTnet.Core.Tests/Server_Tests.cs View File

@@ -9,7 +9,9 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Adapter; using MQTTnet.Adapter;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Client.Connecting; using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using MQTTnet.Server; using MQTTnet.Server;
using MQTTnet.Tests.Mockups; using MQTTnet.Tests.Mockups;
@@ -66,7 +68,7 @@ namespace MQTTnet.Tests
var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage); var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);


var c1 = await testEnvironment.ConnectClientAsync(); var c1 = await testEnvironment.ConnectClientAsync();
c1.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
c1.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(c => Interlocked.Increment(ref receivedMessagesCount));
await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build()); await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());


var c2 = await testEnvironment.ConnectClientAsync(clientOptions); var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
@@ -92,7 +94,7 @@ namespace MQTTnet.Tests
var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage); var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);


var c1 = await testEnvironment.ConnectClientAsync(); var c1 = await testEnvironment.ConnectClientAsync();
c1.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build()); await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());


var c2 = await testEnvironment.ConnectClientAsync(clientOptions); var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
@@ -114,7 +116,7 @@ namespace MQTTnet.Tests
var server = await testEnvironment.StartServerAsync(); var server = await testEnvironment.StartServerAsync();


var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c1")); var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c1"));
c1.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));


var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c2")); var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c2"));


@@ -125,10 +127,10 @@ namespace MQTTnet.Tests
Assert.AreEqual(0, receivedMessagesCount); Assert.AreEqual(0, receivedMessagesCount);


var subscribeEventCalled = false; var subscribeEventCalled = false;
server.ClientSubscribedTopic += (_, e) =>
server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(e =>
{ {
subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1"; subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1";
};
});


await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }); await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
await Task.Delay(250); await Task.Delay(250);
@@ -139,10 +141,10 @@ namespace MQTTnet.Tests
Assert.AreEqual(1, receivedMessagesCount); Assert.AreEqual(1, receivedMessagesCount);


var unsubscribeEventCalled = false; var unsubscribeEventCalled = false;
server.ClientUnsubscribedTopic += (_, e) =>
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e =>
{ {
unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1"; unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
};
});


await c1.UnsubscribeAsync("a"); await c1.UnsubscribeAsync("a");
await Task.Delay(250); await Task.Delay(250);
@@ -168,7 +170,7 @@ namespace MQTTnet.Tests
var receivedMessagesCount = 0; var receivedMessagesCount = 0;


var client = await testEnvironment.ConnectClientAsync(); var client = await testEnvironment.ConnectClientAsync();
client.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
client.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));


var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
await client.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }); await client.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
@@ -194,7 +196,7 @@ namespace MQTTnet.Tests
var c1 = await testEnvironment.ConnectClientAsync(); var c1 = await testEnvironment.ConnectClientAsync();
var c2 = await testEnvironment.ConnectClientAsync(); var c2 = await testEnvironment.ConnectClientAsync();


c1.UseReceivedApplicationMessageHandler(c =>
c1.UseApplicationMessageReceivedHandler(c =>
{ {
lock (locked) lock (locked)
{ {
@@ -202,7 +204,7 @@ namespace MQTTnet.Tests
} }
}); });


c2.UseReceivedApplicationMessageHandler(c =>
c2.UseApplicationMessageReceivedHandler(c =>
{ {
lock (locked) lock (locked)
{ {
@@ -258,12 +260,12 @@ namespace MQTTnet.Tests
var client = await testEnvironment.ConnectClientAsync(); var client = await testEnvironment.ConnectClientAsync();
var receivedMessages = new List<MqttApplicationMessage>(); var receivedMessages = new List<MqttApplicationMessage>();


client.Connected += async (s, e) =>
client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
{ {
await client.PublishAsync("Connected"); await client.PublishAsync("Connected");
};
});


client.UseReceivedApplicationMessageHandler(c =>
client.UseApplicationMessageReceivedHandler(c =>
{ {
lock (receivedMessages) lock (receivedMessages)
{ {
@@ -287,15 +289,12 @@ namespace MQTTnet.Tests
using (var testEnvironment = new TestEnvironment()) using (var testEnvironment = new TestEnvironment())
{ {
var server = await testEnvironment.StartServerAsync(); var server = await testEnvironment.StartServerAsync();
server.ClientConnected += async (s, e) =>
{
await server.SubscribeAsync(e.ClientId, "topic1");
};
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1"));


var client = await testEnvironment.ConnectClientAsync(); var client = await testEnvironment.ConnectClientAsync();
var receivedMessages = new List<MqttApplicationMessage>(); var receivedMessages = new List<MqttApplicationMessage>();


client.UseReceivedApplicationMessageHandler(c =>
client.UseApplicationMessageReceivedHandler(c =>
{ {
lock (receivedMessages) lock (receivedMessages)
{ {
@@ -325,7 +324,7 @@ namespace MQTTnet.Tests
var disconnectCalled = 0; var disconnectCalled = 0;


var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder()); var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
c1.Disconnected += (sender, args) => disconnectCalled++;
c1.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e => disconnectCalled++);


await Task.Delay(100); await Task.Delay(100);


@@ -347,8 +346,8 @@ namespace MQTTnet.Tests
var clientConnectedCalled = 0; var clientConnectedCalled = 0;
var clientDisconnectedCalled = 0; var clientDisconnectedCalled = 0;


server.ClientConnected += (_, __) => Interlocked.Increment(ref clientConnectedCalled);
server.ClientDisconnected += (_, __) => Interlocked.Increment(ref clientDisconnectedCalled);
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => Interlocked.Increment(ref clientConnectedCalled));
server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ => Interlocked.Increment(ref clientDisconnectedCalled));


var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder()); var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());


@@ -462,7 +461,7 @@ namespace MQTTnet.Tests
var receivedMessages = 0; var receivedMessages = 0;


var c2 = await testEnvironment.ConnectClientAsync(); var c2 = await testEnvironment.ConnectClientAsync();
c2.UseReceivedApplicationMessageHandler(c =>
c2.UseApplicationMessageReceivedHandler(c =>
{ {
Interlocked.Increment(ref receivedMessages); Interlocked.Increment(ref receivedMessages);
}); });
@@ -500,7 +499,7 @@ namespace MQTTnet.Tests
var receivedMessagesCount = 0; var receivedMessagesCount = 0;


var c2 = await testEnvironment.ConnectClientAsync(); var c2 = await testEnvironment.ConnectClientAsync();
c2.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained_other").Build()); await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained_other").Build());


await Task.Delay(500); await Task.Delay(500);
@@ -523,7 +522,7 @@ namespace MQTTnet.Tests
var receivedMessages = new List<MqttApplicationMessage>(); var receivedMessages = new List<MqttApplicationMessage>();


var c2 = await testEnvironment.ConnectClientAsync(); var c2 = await testEnvironment.ConnectClientAsync();
c2.UseReceivedApplicationMessageHandler(c =>
c2.UseApplicationMessageReceivedHandler(c =>
{ {
lock (receivedMessages) lock (receivedMessages)
{ {
@@ -558,7 +557,7 @@ namespace MQTTnet.Tests


var c2 = await testEnvironment.ConnectClientAsync(); var c2 = await testEnvironment.ConnectClientAsync();


c2.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));


await Task.Delay(200); await Task.Delay(200);
await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }); await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
@@ -604,7 +603,7 @@ namespace MQTTnet.Tests
await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build()); await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build());


var isIntercepted = false; var isIntercepted = false;
c2.UseReceivedApplicationMessageHandler(c =>
c2.UseApplicationMessageReceivedHandler(c =>
{ {
isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(c.ApplicationMessage.Payload), StringComparison.Ordinal) == 0; isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(c.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
}); });
@@ -645,7 +644,7 @@ namespace MQTTnet.Tests


await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder()); await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
var client1 = await testEnvironment.ConnectClientAsync(); var client1 = await testEnvironment.ConnectClientAsync();
client1.UseReceivedApplicationMessageHandler(c =>
client1.UseApplicationMessageReceivedHandler(c =>
{ {
receivedBody = c.ApplicationMessage.Payload; receivedBody = c.ApplicationMessage.Payload;
}); });
@@ -703,21 +702,21 @@ namespace MQTTnet.Tests


var events = new List<string>(); var events = new List<string>();


server.ClientConnected += (_, __) =>
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
{ {
lock (events) lock (events)
{ {
events.Add("c"); events.Add("c");
} }
};
});


server.ClientDisconnected += (_, __) =>
server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ =>
{ {
lock (events) lock (events)
{ {
events.Add("d"); events.Add("d");
} }
};
});


var clientOptions = new MqttClientOptionsBuilder() var clientOptions = new MqttClientOptionsBuilder()
.WithClientId("same_id"); .WithClientId("same_id");
@@ -888,7 +887,7 @@ namespace MQTTnet.Tests


var buffer = new StringBuilder(); var buffer = new StringBuilder();
client2.UseReceivedApplicationMessageHandler(c =>
client2.UseApplicationMessageReceivedHandler(c =>
{ {
lock (buffer) lock (buffer)
{ {
@@ -952,7 +951,7 @@ namespace MQTTnet.Tests
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder()); await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());


var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("receiver")); var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("receiver"));
c1.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());


var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("sender")); var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("sender"));


+ 9
- 6
Tests/MQTTnet.TestApp.NetCore/ClientTest.cs View File

@@ -2,7 +2,10 @@
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol; using MQTTnet.Protocol;


namespace MQTTnet.TestApp.NetCore namespace MQTTnet.TestApp.NetCore
@@ -25,7 +28,7 @@ namespace MQTTnet.TestApp.NetCore
} }
}; };


client.ApplicationMessageReceived += (s, e) =>
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e =>
{ {
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
@@ -33,18 +36,18 @@ namespace MQTTnet.TestApp.NetCore
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
Console.WriteLine(); Console.WriteLine();
};
});


client.Connected += async (s, e) =>
client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
{ {
Console.WriteLine("### CONNECTED WITH SERVER ###"); Console.WriteLine("### CONNECTED WITH SERVER ###");


await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build()); await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());


Console.WriteLine("### SUBSCRIBED ###"); Console.WriteLine("### SUBSCRIBED ###");
};
});


client.Disconnected += async (s, e) =>
client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(async e =>
{ {
Console.WriteLine("### DISCONNECTED FROM SERVER ###"); Console.WriteLine("### DISCONNECTED FROM SERVER ###");
await Task.Delay(TimeSpan.FromSeconds(5)); await Task.Delay(TimeSpan.FromSeconds(5));
@@ -57,7 +60,7 @@ namespace MQTTnet.TestApp.NetCore
{ {
Console.WriteLine("### RECONNECTING FAILED ###"); Console.WriteLine("### RECONNECTING FAILED ###");
} }
};
});


try try
{ {


+ 3
- 2
Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs View File

@@ -4,6 +4,7 @@ using System.IO;
using Newtonsoft.Json; using Newtonsoft.Json;
using System.Collections.Generic; using System.Collections.Generic;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Extensions.ManagedClient; using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Protocol; using MQTTnet.Protocol;


@@ -34,10 +35,10 @@ namespace MQTTnet.TestApp.NetCore
try try
{ {
var managedClient = new MqttFactory().CreateManagedMqttClient(); var managedClient = new MqttFactory().CreateManagedMqttClient();
managedClient.ApplicationMessageReceived += (s, e) =>
managedClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e =>
{ {
Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic); Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic);
};
});


await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("1")); await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("1"));
await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS()); await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS());


+ 2
- 1
Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs View File

@@ -4,6 +4,7 @@ using System.IO;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Formatter; using MQTTnet.Formatter;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using Newtonsoft.Json; using Newtonsoft.Json;
@@ -80,7 +81,7 @@ namespace MQTTnet.TestApp.NetCore
var topic = Guid.NewGuid().ToString(); var topic = Guid.NewGuid().ToString();


MqttApplicationMessage receivedMessage = null; MqttApplicationMessage receivedMessage = null;
client.ApplicationMessageReceived += (s, e) => receivedMessage = e.ApplicationMessage;
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e => receivedMessage = e.ApplicationMessage);


await client.ConnectAsync(options); await client.ConnectAsync(options);
await client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce); await client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce);


+ 5
- 4
Tests/MQTTnet.TestApp.NetCore/ServerTest.cs View File

@@ -1,6 +1,7 @@
using System; using System;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using MQTTnet.Server; using MQTTnet.Server;


@@ -78,12 +79,12 @@ namespace MQTTnet.TestApp.NetCore


var mqttServer = new MqttFactory().CreateMqttServer(); var mqttServer = new MqttFactory().CreateMqttServer();


mqttServer.ApplicationMessageReceived += (s, e) =>
mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e =>
{ {
MqttNetConsoleLogger.PrintToConsole( MqttNetConsoleLogger.PrintToConsole(
$"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0])}'", $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0])}'",
ConsoleColor.Magenta); ConsoleColor.Magenta);
};
});


//options.ApplicationMessageInterceptor = c => //options.ApplicationMessageInterceptor = c =>
//{ //{
@@ -107,10 +108,10 @@ namespace MQTTnet.TestApp.NetCore
// } // }
//}; //};


mqttServer.ClientDisconnected += (s, e) =>
mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e =>
{ {
Console.Write("Client disconnected event fired."); Console.Write("Client disconnected event fired.");
};
});


await mqttServer.StartAsync(options); await mqttServer.StartAsync(options);




+ 15
- 12
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs View File

@@ -7,7 +7,10 @@ using Windows.Security.Cryptography.Certificates;
using Windows.UI.Core; using Windows.UI.Core;
using Windows.UI.Xaml; using Windows.UI.Xaml;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Diagnostics; using MQTTnet.Diagnostics;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Extensions.ManagedClient; using MQTTnet.Extensions.ManagedClient;
@@ -144,9 +147,9 @@ namespace MQTTnet.TestApp.UniversalWindows
if (_mqttClient != null) if (_mqttClient != null)
{ {
await _mqttClient.DisconnectAsync(); await _mqttClient.DisconnectAsync();
_mqttClient.ApplicationMessageReceived -= OnApplicationMessageReceived;
_mqttClient.Connected -= OnConnected;
_mqttClient.Disconnected -= OnDisconnected;
_mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
} }


var factory = new MqttFactory(); var factory = new MqttFactory();
@@ -154,9 +157,9 @@ namespace MQTTnet.TestApp.UniversalWindows
if (UseManagedClient.IsChecked == true) if (UseManagedClient.IsChecked == true)
{ {
_managedMqttClient = factory.CreateManagedMqttClient(); _managedMqttClient = factory.CreateManagedMqttClient();
_managedMqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
_managedMqttClient.Connected += OnConnected;
_managedMqttClient.Disconnected += OnDisconnected;
_managedMqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
_managedMqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
_managedMqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));


await _managedMqttClient.StartAsync(new ManagedMqttClientOptions await _managedMqttClient.StartAsync(new ManagedMqttClientOptions
{ {
@@ -166,9 +169,9 @@ namespace MQTTnet.TestApp.UniversalWindows
else else
{ {
_mqttClient = factory.CreateMqttClient(); _mqttClient = factory.CreateMqttClient();
_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
_mqttClient.Connected += OnConnected;
_mqttClient.Disconnected += OnDisconnected;
_mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));


await _mqttClient.ConnectAsync(options); await _mqttClient.ConnectAsync(options);
} }
@@ -179,7 +182,7 @@ namespace MQTTnet.TestApp.UniversalWindows
} }
} }


private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
private void OnDisconnected(MqttClientDisconnectedEventArgs e)
{ {
_traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1, _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
"", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null)); "", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null));
@@ -187,7 +190,7 @@ namespace MQTTnet.TestApp.UniversalWindows
Task.Run(UpdateLogAsync); Task.Run(UpdateLogAsync);
} }


private void OnConnected(object sender, MqttClientConnectedEventArgs e)
private void OnConnected(MqttClientConnectedEventArgs e)
{ {
_traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1, _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
"", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null)); "", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null));
@@ -195,7 +198,7 @@ namespace MQTTnet.TestApp.UniversalWindows
Task.Run(UpdateLogAsync); Task.Run(UpdateLogAsync);
} }


private async void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
private async Task HandleReceivedApplicationMessage(MqttApplicationMessageReceivedEventArgs eventArgs)
{ {
var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}"; var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";




Loading…
Cancel
Save