From 019f428f0a41c83648e5e1e13d9eb52426206d2d Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 4 Jul 2019 20:27:48 +0200 Subject: [PATCH 01/30] MQTTnet.Server: Add HTTP API for publishing of messages. --- .../Controllers/ClientsController.cs | 2 +- .../Controllers/MessagesController.cs | 40 +++++++++++++++++++ .../RetainedApplicationMessagesController.cs | 2 +- .../Controllers/SessionsController.cs | 2 +- 4 files changed, 43 insertions(+), 3 deletions(-) create mode 100644 Source/MQTTnet.Server/Controllers/MessagesController.cs diff --git a/Source/MQTTnet.Server/Controllers/ClientsController.cs b/Source/MQTTnet.Server/Controllers/ClientsController.cs index 6898375..bd9795a 100644 --- a/Source/MQTTnet.Server/Controllers/ClientsController.cs +++ b/Source/MQTTnet.Server/Controllers/ClientsController.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Server.Controllers { [Authorize] [ApiController] - public class ClientsController : ControllerBase + public class ClientsController : Controller { private readonly MqttServerService _mqttServerService; diff --git a/Source/MQTTnet.Server/Controllers/MessagesController.cs b/Source/MQTTnet.Server/Controllers/MessagesController.cs new file mode 100644 index 0000000..89eb385 --- /dev/null +++ b/Source/MQTTnet.Server/Controllers/MessagesController.cs @@ -0,0 +1,40 @@ +using System; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using MQTTnet.Server.Mqtt; + +namespace MQTTnet.Server.Controllers +{ + [Authorize] + [ApiController] + public class MessagesController : Controller + { + private readonly MqttServerService _mqttServerService; + + public MessagesController(MqttServerService mqttServerService) + { + _mqttServerService = mqttServerService ?? throw new ArgumentNullException(nameof(mqttServerService)); + } + + [Route("api/v1/messages")] + [HttpPost] + public async Task PostMessage(MqttApplicationMessage message) + { + await _mqttServerService.PublishAsync(message); + return Ok(); + } + + [Route("api/v1/messages/{*topic}")] + [HttpPost] + public Task PostMessage(string topic, string payload) + { + var message = new MqttApplicationMessageBuilder() + .WithTopic(topic) + .WithPayload(payload) + .Build(); + + return PostMessage(message); + } + } +} diff --git a/Source/MQTTnet.Server/Controllers/RetainedApplicationMessagesController.cs b/Source/MQTTnet.Server/Controllers/RetainedApplicationMessagesController.cs index 9c9f273..030d141 100644 --- a/Source/MQTTnet.Server/Controllers/RetainedApplicationMessagesController.cs +++ b/Source/MQTTnet.Server/Controllers/RetainedApplicationMessagesController.cs @@ -12,7 +12,7 @@ namespace MQTTnet.Server.Controllers { [Authorize] [ApiController] - public class RetainedApplicationMessagesController : ControllerBase + public class RetainedApplicationMessagesController : Controller { private readonly MqttServerService _mqttServerService; diff --git a/Source/MQTTnet.Server/Controllers/SessionsController.cs b/Source/MQTTnet.Server/Controllers/SessionsController.cs index 463c004..5fd0638 100644 --- a/Source/MQTTnet.Server/Controllers/SessionsController.cs +++ b/Source/MQTTnet.Server/Controllers/SessionsController.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Server.Controllers { [Authorize] [ApiController] - public class SessionsController : ControllerBase + public class SessionsController : Controller { private readonly MqttServerService _mqttServerService; From 8563230f9ce556a4ac4370fdc3bc53866c0eb6f2 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 4 Jul 2019 20:38:01 +0200 Subject: [PATCH 02/30] Update docs. --- Build/MQTTnet.nuspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index c9d4016..b10019f 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -11,7 +11,7 @@ false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol. -* [Server] Moved new socket options to TCP options to avoid incompatibility with Linux hosts. +* [MQTTnet.Server] Added REST API for publishing basic messages. Copyright Christian Kratky 2016-2019 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin From 3eb5e82d10afb6966fa6ae74630434de6c4637a2 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 4 Jul 2019 23:06:37 +0200 Subject: [PATCH 03/30] Expose connect packet in application message interceptor and subscription interceptor. --- Build/MQTTnet.nuspec | 2 + .../Mqtt/MqttApplicationMessageInterceptor.cs | 12 +++-- ...qttApplicationMessageInterceptorContext.cs | 9 ++-- .../Server/MqttBaseInterceptorContext.cs | 54 +++++++++++++++++++ Source/MQTTnet/Server/MqttClientConnection.cs | 17 +++--- Source/MQTTnet/Server/MqttClientSession.cs | 12 +++-- .../Server/MqttClientSessionsManager.cs | 38 ++++++++----- .../Server/MqttClientSubscriptionsManager.cs | 37 +++++++------ .../Server/MqttConnectionValidatorContext.cs | 42 ++------------- .../MqttSubscriptionInterceptorContext.cs | 8 +-- .../MqttSubscriptionsManager_Tests.cs | 51 ++++++++++++------ 11 files changed, 179 insertions(+), 103 deletions(-) create mode 100644 Source/MQTTnet/Server/MqttBaseInterceptorContext.cs diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index b10019f..a360e21 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -11,6 +11,8 @@ false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol. +* [Server] Added items dictionary to client session in order to share data across interceptors as along as the session exists. +* [Server] Exposed CONNECT packet properties in Application Message and Subscription interceptor. * [MQTTnet.Server] Added REST API for publishing basic messages. Copyright Christian Kratky 2016-2019 diff --git a/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs b/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs index c5f3afd..8d378af 100644 --- a/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs +++ b/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs @@ -24,12 +24,18 @@ namespace MQTTnet.Server.Mqtt { var pythonContext = new PythonDictionary { + { "client_id", context.ClientId }, + { "retain", context.ApplicationMessage.Retain }, + { "username", context.Username }, + { "password", context.Password }, + { "raw_password", new Bytes(context.RawPassword ?? new byte[0]) }, + { "clean_session", context.CleanSession}, + { "authentication_method", context.AuthenticationMethod}, + { "authentication_data", new Bytes(context.AuthenticationData ?? new byte[0]) }, { "accept_publish", context.AcceptPublish }, { "close_connection", context.CloseConnection }, - { "client_id", context.ClientId }, { "topic", context.ApplicationMessage.Topic }, - { "qos", (int)context.ApplicationMessage.QualityOfServiceLevel }, - { "retain", context.ApplicationMessage.Retain } + { "qos", (int)context.ApplicationMessage.QualityOfServiceLevel } }; _pythonScriptHostService.InvokeOptionalFunction("on_intercept_application_message", pythonContext); diff --git a/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs b/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs index 5612601..c33580e 100644 --- a/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs +++ b/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs @@ -1,8 +1,11 @@ -namespace MQTTnet.Server +using System.Collections.Generic; +using MQTTnet.Packets; + +namespace MQTTnet.Server { - public class MqttApplicationMessageInterceptorContext + public class MqttApplicationMessageInterceptorContext : MqttBaseInterceptorContext { - public MqttApplicationMessageInterceptorContext(string clientId, MqttApplicationMessage applicationMessage) + public MqttApplicationMessageInterceptorContext(string clientId, IDictionary sessionItems, MqttConnectPacket connectPacket, MqttApplicationMessage applicationMessage) : base(connectPacket, sessionItems) { ClientId = clientId; ApplicationMessage = applicationMessage; diff --git a/Source/MQTTnet/Server/MqttBaseInterceptorContext.cs b/Source/MQTTnet/Server/MqttBaseInterceptorContext.cs new file mode 100644 index 0000000..6909d5e --- /dev/null +++ b/Source/MQTTnet/Server/MqttBaseInterceptorContext.cs @@ -0,0 +1,54 @@ +using System.Collections.Generic; +using System.Text; +using MQTTnet.Packets; + +namespace MQTTnet.Server +{ + public class MqttBaseInterceptorContext + { + private readonly MqttConnectPacket _connectPacket; + + protected MqttBaseInterceptorContext(MqttConnectPacket connectPacket, IDictionary sessionItems) + { + _connectPacket = connectPacket; + SessionItems = sessionItems; + } + + public string Username => _connectPacket?.Username; + + public byte[] RawPassword => _connectPacket?.Password; + + public string Password => Encoding.UTF8.GetString(RawPassword ?? new byte[0]); + + public MqttApplicationMessage WillMessage => _connectPacket?.WillMessage; + + public bool? CleanSession => _connectPacket?.CleanSession; + + public ushort? KeepAlivePeriod => _connectPacket?.KeepAlivePeriod; + + public List UserProperties => _connectPacket?.Properties?.UserProperties; + + public byte[] AuthenticationData => _connectPacket?.Properties?.AuthenticationData; + + public string AuthenticationMethod => _connectPacket?.Properties?.AuthenticationMethod; + + public uint? MaximumPacketSize => _connectPacket?.Properties?.MaximumPacketSize; + + public ushort? ReceiveMaximum => _connectPacket?.Properties?.ReceiveMaximum; + + public ushort? TopicAliasMaximum => _connectPacket?.Properties?.TopicAliasMaximum; + + public bool? RequestProblemInformation => _connectPacket?.Properties?.RequestProblemInformation; + + public bool? RequestResponseInformation => _connectPacket?.Properties?.RequestResponseInformation; + + public uint? SessionExpiryInterval => _connectPacket?.Properties?.SessionExpiryInterval; + + public uint? WillDelayInterval => _connectPacket?.Properties?.WillDelayInterval; + + /// + /// Gets or sets a key/value collection that can be used to share data within the scope of this session. + /// + public IDictionary SessionItems { get; } + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index ed378bb..a34f28b 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -31,7 +31,6 @@ namespace MQTTnet.Server private readonly IMqttChannelAdapter _channelAdapter; private readonly IMqttDataConverter _dataConverter; private readonly string _endpoint; - private readonly MqttConnectPacket _connectPacket; private readonly DateTime _connectedTimestamp; private Task _packageReceiverTask; @@ -60,22 +59,24 @@ namespace MQTTnet.Server _channelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter)); _dataConverter = _channelAdapter.PacketFormatterAdapter.DataConverter; _endpoint = _channelAdapter.Endpoint; - _connectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket)); + ConnectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket)); if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttClientConnection)); - _keepAliveMonitor = new MqttClientKeepAliveMonitor(_connectPacket.ClientId, StopAsync, _logger); + _keepAliveMonitor = new MqttClientKeepAliveMonitor(ConnectPacket.ClientId, StopAsync, _logger); _connectedTimestamp = DateTime.UtcNow; _lastPacketReceivedTimestamp = _connectedTimestamp; _lastNonKeepAlivePacketReceivedTimestamp = _lastPacketReceivedTimestamp; } - public string ClientId => _connectPacket.ClientId; + public MqttConnectPacket ConnectPacket { get; } - public MqttClientSession Session { get; } + public string ClientId => ConnectPacket.ClientId; + public MqttClientSession Session { get; } + public async Task StopAsync() { StopInternal(); @@ -133,12 +134,12 @@ namespace MQTTnet.Server _channelAdapter.ReadingPacketStartedCallback = OnAdapterReadingPacketStarted; _channelAdapter.ReadingPacketCompletedCallback = OnAdapterReadingPacketCompleted; - Session.WillMessage = _connectPacket.WillMessage; + Session.WillMessage = ConnectPacket.WillMessage; Task.Run(() => SendPendingPacketsAsync(_cancellationToken.Token), _cancellationToken.Token).Forget(_logger); // TODO: Change to single thread in SessionManager. Or use SessionManager and stats from KeepAliveMonitor. - _keepAliveMonitor.Start(_connectPacket.KeepAlivePeriod, _cancellationToken.Token); + _keepAliveMonitor.Start(ConnectPacket.KeepAlivePeriod, _cancellationToken.Token); await SendAsync( new MqttConnAckPacket @@ -271,7 +272,7 @@ namespace MQTTnet.Server private async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket) { // TODO: Let the channel adapter create the packet. - var subscribeResult = await Session.SubscriptionsManager.SubscribeAsync(subscribePacket).ConfigureAwait(false); + var subscribeResult = await Session.SubscriptionsManager.SubscribeAsync(subscribePacket, ConnectPacket).ConfigureAwait(false); await SendAsync(subscribeResult.ResponsePacket).ConfigureAwait(false); diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 804a223..73263cb 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -12,11 +12,12 @@ namespace MQTTnet.Server private readonly DateTime _createdTimestamp = DateTime.UtcNow; - public MqttClientSession(string clientId, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, IMqttNetChildLogger logger) + public MqttClientSession(string clientId, IDictionary items, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, IMqttNetChildLogger logger) { ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); + Items = items ?? throw new ArgumentNullException(nameof(items)); - SubscriptionsManager = new MqttClientSubscriptionsManager(clientId, eventDispatcher, serverOptions); + SubscriptionsManager = new MqttClientSubscriptionsManager(this, eventDispatcher, serverOptions); ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions); if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -33,6 +34,11 @@ namespace MQTTnet.Server public MqttClientSessionApplicationMessagesQueue ApplicationMessagesQueue { get; } + /// + /// Gets or sets a key/value collection that can be used to share data within the scope of this session. + /// + public IDictionary Items { get; } + public void EnqueueApplicationMessage(MqttApplicationMessage applicationMessage, string senderClientId, bool isRetainedApplicationMessage) { var checkSubscriptionsResult = SubscriptionsManager.CheckSubscriptions(applicationMessage.Topic, applicationMessage.QualityOfServiceLevel); @@ -48,7 +54,7 @@ namespace MQTTnet.Server public async Task SubscribeAsync(ICollection topicFilters, MqttRetainedMessagesManager retainedMessagesManager) { - await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false); + await SubscriptionsManager.SubscribeAsync(topicFilters, null).ConfigureAwait(false); var matchingRetainedMessages = await retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false); foreach (var matchingRetainedMessage in matchingRetainedMessages) diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index c2d5637..1463ed4 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -20,7 +20,8 @@ namespace MQTTnet.Server private readonly SemaphoreSlim _createConnectionGate = new SemaphoreSlim(1, 1); private readonly ConcurrentDictionary _connections = new ConcurrentDictionary(); private readonly ConcurrentDictionary _sessions = new ConcurrentDictionary(); - + private readonly IDictionary _serverSessionItems = new ConcurrentDictionary(); + private readonly CancellationToken _cancellationToken; private readonly MqttServerEventDispatcher _eventDispatcher; @@ -241,19 +242,19 @@ namespace MQTTnet.Server clientId = connectPacket.ClientId; - var validatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false); + var connectionValidatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false); - if (validatorContext.ReasonCode != MqttConnectReasonCode.Success) + if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success) { // Send failure response here without preparing a session. The result for a successful connect // will be sent from the session itself. - var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(validatorContext); + var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext); await channelAdapter.SendPacketAsync(connAckPacket, _options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false); return; } - var connection = await CreateConnectionAsync(channelAdapter, connectPacket).ConfigureAwait(false); + var connection = await CreateConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false); await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false); @@ -302,8 +303,7 @@ namespace MQTTnet.Server await connectionValidator.ValidateConnectionAsync(context).ConfigureAwait(false); // Check the client ID and set a random one if supported. - if (string.IsNullOrEmpty(connectPacket.ClientId) && - channelAdapter.PacketFormatterAdapter.ProtocolVersion == MqttProtocolVersion.V500) + if (string.IsNullOrEmpty(connectPacket.ClientId) && channelAdapter.PacketFormatterAdapter.ProtocolVersion == MqttProtocolVersion.V500) { connectPacket.ClientId = context.AssignedClientIdentifier; } @@ -316,7 +316,7 @@ namespace MQTTnet.Server return context; } - private async Task CreateConnectionAsync(IMqttChannelAdapter channelAdapter, MqttConnectPacket connectPacket) + private async Task CreateConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter) { await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false); try @@ -345,7 +345,7 @@ namespace MQTTnet.Server if (session == null) { - session = new MqttClientSession(connectPacket.ClientId, _eventDispatcher, _options, _logger); + session = new MqttClientSession(connectPacket.ClientId, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _logger); _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); } @@ -362,7 +362,7 @@ namespace MQTTnet.Server } } - private async Task InterceptApplicationMessageAsync(MqttClientConnection sender, MqttApplicationMessage applicationMessage) + private async Task InterceptApplicationMessageAsync(MqttClientConnection senderConnection, MqttApplicationMessage applicationMessage) { var interceptor = _options.ApplicationMessageInterceptor; if (interceptor == null) @@ -370,13 +370,25 @@ namespace MQTTnet.Server return null; } - var senderClientId = sender?.ClientId; - if (sender == null) + string senderClientId; + IDictionary sessionItems; + MqttConnectPacket connectPacket; + + var messageIsFromServer = senderConnection == null; + if (messageIsFromServer) { senderClientId = _options.ClientId; + sessionItems = _serverSessionItems; + connectPacket = null; + } + else + { + senderClientId = senderConnection.ClientId; + sessionItems = senderConnection.Session.Items; + connectPacket = senderConnection.ConnectPacket; } - var interceptorContext = new MqttApplicationMessageInterceptorContext(senderClientId, applicationMessage); + var interceptorContext = new MqttApplicationMessageInterceptorContext(senderClientId, sessionItems, connectPacket, applicationMessage); await interceptor.InterceptApplicationMessagePublishAsync(interceptorContext).ConfigureAwait(false); return interceptorContext; } diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index e2024a6..04d7495 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -10,20 +10,23 @@ namespace MQTTnet.Server public class MqttClientSubscriptionsManager { private readonly Dictionary _subscriptions = new Dictionary(); - private readonly IMqttServerOptions _options; + private readonly MqttClientSession _clientSession; + private readonly IMqttServerOptions _serverOptions; private readonly MqttServerEventDispatcher _eventDispatcher; - private readonly string _clientId; - public MqttClientSubscriptionsManager(string clientId, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions options) + public MqttClientSubscriptionsManager(MqttClientSession clientSession, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions) { - _clientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); - _options = options ?? throw new ArgumentNullException(nameof(options)); + _clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession)); + + // TODO: Consider removing the server options here and build a new class "ISubscriptionInterceptor" and just pass it. The instance is generated in the root server class upon start. + _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions)); _eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher)); } - public async Task SubscribeAsync(MqttSubscribePacket subscribePacket) + public async Task SubscribeAsync(MqttSubscribePacket subscribePacket, MqttConnectPacket connectPacket) { if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket)); + if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); var result = new MqttClientSubscribeResult { @@ -37,7 +40,7 @@ namespace MQTTnet.Server foreach (var originalTopicFilter in subscribePacket.TopicFilters) { - var interceptorContext = await InterceptSubscribeAsync(originalTopicFilter).ConfigureAwait(false); + var interceptorContext = await InterceptSubscribeAsync(originalTopicFilter, connectPacket).ConfigureAwait(false); var finalTopicFilter = interceptorContext.TopicFilter; @@ -64,18 +67,20 @@ namespace MQTTnet.Server _subscriptions[finalTopicFilter.Topic] = finalTopicFilter.QualityOfServiceLevel; } - await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientId, finalTopicFilter).ConfigureAwait(false); + await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientSession.ClientId, finalTopicFilter).ConfigureAwait(false); } } return result; } - public async Task SubscribeAsync(IEnumerable topicFilters) + public async Task SubscribeAsync(IEnumerable topicFilters, MqttConnectPacket connectPacket) { + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + foreach (var topicFilter in topicFilters) { - var interceptorContext = await InterceptSubscribeAsync(topicFilter).ConfigureAwait(false); + var interceptorContext = await InterceptSubscribeAsync(topicFilter, connectPacket).ConfigureAwait(false); if (!interceptorContext.AcceptSubscription) { continue; @@ -88,7 +93,7 @@ namespace MQTTnet.Server _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; } - await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientId, topicFilter).ConfigureAwait(false); + await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false); } } } @@ -119,7 +124,7 @@ namespace MQTTnet.Server foreach (var topicFilter in unsubscribePacket.TopicFilters) { - await _eventDispatcher.HandleClientUnsubscribedTopicAsync(_clientId, topicFilter).ConfigureAwait(false); + await _eventDispatcher.HandleClientUnsubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false); } return unsubAckPacket; @@ -190,12 +195,12 @@ namespace MQTTnet.Server } } - private async Task InterceptSubscribeAsync(TopicFilter topicFilter) + private async Task InterceptSubscribeAsync(TopicFilter topicFilter, MqttConnectPacket connectPacket) { - var context = new MqttSubscriptionInterceptorContext(_clientId, topicFilter); - if (_options.SubscriptionInterceptor != null) + var context = new MqttSubscriptionInterceptorContext(_clientSession.ClientId, topicFilter, connectPacket, _clientSession.Items); + if (_serverOptions.SubscriptionInterceptor != null) { - await _options.SubscriptionInterceptor.InterceptSubscriptionAsync(context).ConfigureAwait(false); + await _serverOptions.SubscriptionInterceptor.InterceptSubscriptionAsync(context).ConfigureAwait(false); } return context; diff --git a/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs b/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs index 45dba13..2ab3383 100644 --- a/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs +++ b/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs @@ -1,7 +1,7 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Security.Cryptography.X509Certificates; -using System.Text; using MQTTnet.Adapter; using MQTTnet.Formatter; using MQTTnet.Packets; @@ -9,51 +9,19 @@ using MQTTnet.Protocol; namespace MQTTnet.Server { - public class MqttConnectionValidatorContext + public class MqttConnectionValidatorContext : MqttBaseInterceptorContext { private readonly MqttConnectPacket _connectPacket; private readonly IMqttChannelAdapter _clientAdapter; - public MqttConnectionValidatorContext(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter) + public MqttConnectionValidatorContext(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter) : base(connectPacket, new ConcurrentDictionary()) { - _connectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket)); + _connectPacket = connectPacket; _clientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter)); } public string ClientId => _connectPacket.ClientId; - public string Username => _connectPacket.Username; - - public byte[] RawPassword => _connectPacket.Password; - - public string Password => Encoding.UTF8.GetString(RawPassword ?? new byte[0]); - - public MqttApplicationMessage WillMessage => _connectPacket.WillMessage; - - public bool CleanSession => _connectPacket.CleanSession; - - public ushort KeepAlivePeriod => _connectPacket.KeepAlivePeriod; - - public List UserProperties => _connectPacket.Properties?.UserProperties; - - public byte[] AuthenticationData => _connectPacket.Properties?.AuthenticationData; - - public string AuthenticationMethod => _connectPacket.Properties?.AuthenticationMethod; - - public uint? MaximumPacketSize => _connectPacket.Properties?.MaximumPacketSize; - - public ushort? ReceiveMaximum => _connectPacket.Properties?.ReceiveMaximum; - - public ushort? TopicAliasMaximum => _connectPacket.Properties?.TopicAliasMaximum; - - public bool? RequestProblemInformation => _connectPacket.Properties?.RequestProblemInformation; - - public bool? RequestResponseInformation => _connectPacket.Properties?.RequestResponseInformation; - - public uint? SessionExpiryInterval => _connectPacket.Properties?.SessionExpiryInterval; - - public uint? WillDelayInterval => _connectPacket.Properties?.WillDelayInterval; - public string Endpoint => _clientAdapter.Endpoint; public bool IsSecureConnection => _clientAdapter.IsSecureConnection; @@ -61,7 +29,7 @@ namespace MQTTnet.Server public X509Certificate2 ClientCertificate => _clientAdapter.ClientCertificate; public MqttProtocolVersion ProtocolVersion => _clientAdapter.PacketFormatterAdapter.ProtocolVersion; - + /// /// This is used for MQTTv3 only. /// diff --git a/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs b/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs index ca98c95..74799b2 100644 --- a/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs +++ b/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs @@ -1,10 +1,12 @@ using System; +using System.Collections.Generic; +using MQTTnet.Packets; namespace MQTTnet.Server { - public class MqttSubscriptionInterceptorContext + public class MqttSubscriptionInterceptorContext : MqttBaseInterceptorContext { - public MqttSubscriptionInterceptorContext(string clientId, TopicFilter topicFilter) + public MqttSubscriptionInterceptorContext(string clientId, TopicFilter topicFilter, MqttConnectPacket connectPacket, IDictionary sessionItems) : base(connectPacket, sessionItems) { ClientId = clientId; TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter)); @@ -13,7 +15,7 @@ namespace MQTTnet.Server public string ClientId { get; } public TopicFilter TopicFilter { get; set; } - + public bool AcceptSubscription { get; set; } = true; public bool CloseConnection { get; set; } diff --git a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManager_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManager_Tests.cs index 1c4ec84..6f0d542 100644 --- a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManager_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManager_Tests.cs @@ -1,4 +1,6 @@ -using Microsoft.VisualStudio.TestTools.UnitTesting; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server; @@ -10,14 +12,17 @@ namespace MQTTnet.Tests public class MqttSubscriptionsManager_Tests { [TestMethod] - public void MqttSubscriptionsManager_SubscribeSingleSuccess() + public async Task MqttSubscriptionsManager_SubscribeSingleSuccess() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions()); + var s = new MqttClientSession("", new ConcurrentDictionary(), + new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions(), new TestLogger()); + + var sm = new MqttClientSubscriptionsManager(s, new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build()); - sm.SubscribeAsync(sp).GetAwaiter().GetResult(); + await sm.SubscribeAsync(sp, new MqttConnectPacket()); var result = sm.CheckSubscriptions("A/B/C", MqttQualityOfServiceLevel.AtMostOnce); Assert.IsTrue(result.IsSubscribed); @@ -25,14 +30,17 @@ namespace MQTTnet.Tests } [TestMethod] - public void MqttSubscriptionsManager_SubscribeDifferentQoSSuccess() + public async Task MqttSubscriptionsManager_SubscribeDifferentQoSSuccess() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions()); + var s = new MqttClientSession("", new ConcurrentDictionary(), + new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions(), new TestLogger()); + + var sm = new MqttClientSubscriptionsManager(s, new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilter { Topic = "A/B/C", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }); - sm.SubscribeAsync(sp).GetAwaiter().GetResult(); + await sm.SubscribeAsync(sp, new MqttConnectPacket()); var result = sm.CheckSubscriptions("A/B/C", MqttQualityOfServiceLevel.ExactlyOnce); Assert.IsTrue(result.IsSubscribed); @@ -40,15 +48,18 @@ namespace MQTTnet.Tests } [TestMethod] - public void MqttSubscriptionsManager_SubscribeTwoTimesSuccess() + public async Task MqttSubscriptionsManager_SubscribeTwoTimesSuccess() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions()); + var s = new MqttClientSession("", new ConcurrentDictionary(), + new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions(), new TestLogger()); + + var sm = new MqttClientSubscriptionsManager(s, new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilter { Topic = "#", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }); sp.TopicFilters.Add(new TopicFilter { Topic = "A/B/C", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }); - sm.SubscribeAsync(sp).GetAwaiter().GetResult(); + await sm.SubscribeAsync(sp, new MqttConnectPacket()); var result = sm.CheckSubscriptions("A/B/C", MqttQualityOfServiceLevel.ExactlyOnce); Assert.IsTrue(result.IsSubscribed); @@ -56,33 +67,39 @@ namespace MQTTnet.Tests } [TestMethod] - public void MqttSubscriptionsManager_SubscribeSingleNoSuccess() + public async Task MqttSubscriptionsManager_SubscribeSingleNoSuccess() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions()); + var s = new MqttClientSession("", new ConcurrentDictionary(), + new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions(), new TestLogger()); + + var sm = new MqttClientSubscriptionsManager(s, new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build()); - sm.SubscribeAsync(sp).GetAwaiter().GetResult(); + await sm.SubscribeAsync(sp, new MqttConnectPacket()); Assert.IsFalse(sm.CheckSubscriptions("A/B/X", MqttQualityOfServiceLevel.AtMostOnce).IsSubscribed); } [TestMethod] - public void MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle() + public async Task MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions()); + var s = new MqttClientSession("", new ConcurrentDictionary(), + new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions(), new TestLogger()); + + var sm = new MqttClientSubscriptionsManager(s, new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build()); - sm.SubscribeAsync(sp).GetAwaiter().GetResult(); + await sm.SubscribeAsync(sp, new MqttConnectPacket()); Assert.IsTrue(sm.CheckSubscriptions("A/B/C", MqttQualityOfServiceLevel.AtMostOnce).IsSubscribed); var up = new MqttUnsubscribePacket(); up.TopicFilters.Add("A/B/C"); - sm.UnsubscribeAsync(up); + await sm.UnsubscribeAsync(up); Assert.IsFalse(sm.CheckSubscriptions("A/B/C", MqttQualityOfServiceLevel.AtMostOnce).IsSubscribed); } From 6c8db47e25de4f5728534669679017bcb019d069 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 5 Jul 2019 18:28:18 +0200 Subject: [PATCH 04/30] Remove dedicated values from contexts and only provide session items. --- .../Mqtt/MqttApplicationMessageInterceptor.cs | 9 +-- .../Mqtt/MqttServerConnectionValidator.cs | 7 +++ .../Mqtt/MqttSubscriptionInterceptor.cs | 10 +-- ...qttApplicationMessageInterceptorContext.cs | 11 +++- .../Server/MqttBaseInterceptorContext.cs | 54 ---------------- Source/MQTTnet/Server/MqttClientSession.cs | 2 +- .../Server/MqttClientSessionsManager.cs | 7 +-- .../Server/MqttClientSubscriptionsManager.cs | 10 +-- .../Server/MqttConnectionValidatorContext.cs | 46 ++++++++++++-- .../MqttSubscriptionInterceptorContext.cs | 16 +++-- Tests/MQTTnet.Core.Tests/Session_Tests.cs | 61 +++++++++++++++++++ 11 files changed, 145 insertions(+), 88 deletions(-) delete mode 100644 Source/MQTTnet/Server/MqttBaseInterceptorContext.cs create mode 100644 Tests/MQTTnet.Core.Tests/Session_Tests.cs diff --git a/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs b/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs index 8d378af..00eb0e7 100644 --- a/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs +++ b/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs @@ -22,16 +22,13 @@ namespace MQTTnet.Server.Mqtt { try { + var sessionItems = (PythonDictionary)context.SessionItems[MqttServerConnectionValidator.WrappedSessionItemsKey]; + var pythonContext = new PythonDictionary { { "client_id", context.ClientId }, + { "session_items", sessionItems }, { "retain", context.ApplicationMessage.Retain }, - { "username", context.Username }, - { "password", context.Password }, - { "raw_password", new Bytes(context.RawPassword ?? new byte[0]) }, - { "clean_session", context.CleanSession}, - { "authentication_method", context.AuthenticationMethod}, - { "authentication_data", new Bytes(context.AuthenticationData ?? new byte[0]) }, { "accept_publish", context.AcceptPublish }, { "close_connection", context.CloseConnection }, { "topic", context.ApplicationMessage.Topic }, diff --git a/Source/MQTTnet.Server/Mqtt/MqttServerConnectionValidator.cs b/Source/MQTTnet.Server/Mqtt/MqttServerConnectionValidator.cs index 3b1a2fc..d002842 100644 --- a/Source/MQTTnet.Server/Mqtt/MqttServerConnectionValidator.cs +++ b/Source/MQTTnet.Server/Mqtt/MqttServerConnectionValidator.cs @@ -9,6 +9,8 @@ namespace MQTTnet.Server.Mqtt { public class MqttServerConnectionValidator : IMqttServerConnectionValidator { + public const string WrappedSessionItemsKey = "WRAPPED_ITEMS"; + private readonly PythonScriptHostService _pythonScriptHostService; private readonly ILogger _logger; @@ -22,6 +24,8 @@ namespace MQTTnet.Server.Mqtt { try { + var sessionItems = new PythonDictionary(); + var pythonContext = new PythonDictionary { { "endpoint", context.Endpoint }, @@ -33,6 +37,7 @@ namespace MQTTnet.Server.Mqtt { "clean_session", context.CleanSession}, { "authentication_method", context.AuthenticationMethod}, { "authentication_data", new Bytes(context.AuthenticationData ?? new byte[0]) }, + { "session_items", sessionItems }, { "result", PythonConvert.Pythonfy(context.ReasonCode) } }; @@ -40,6 +45,8 @@ namespace MQTTnet.Server.Mqtt _pythonScriptHostService.InvokeOptionalFunction("on_validate_client_connection", pythonContext); context.ReasonCode = PythonConvert.ParseEnum((string)pythonContext["result"]); + + context.SessionItems[WrappedSessionItemsKey] = sessionItems; } catch (Exception exception) { diff --git a/Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs b/Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs index 2d37f74..ba99e9f 100644 --- a/Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs +++ b/Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs @@ -21,14 +21,16 @@ namespace MQTTnet.Server.Mqtt { try { + var sessionItems = (PythonDictionary)context.SessionItems[MqttServerConnectionValidator.WrappedSessionItemsKey]; + var pythonContext = new PythonDictionary { - { "accept_subscription", context.AcceptSubscription }, - { "close_connection", context.CloseConnection }, - { "client_id", context.ClientId }, + { "session_items", sessionItems }, { "topic", context.TopicFilter.Topic }, - { "qos", (int)context.TopicFilter.QualityOfServiceLevel } + { "qos", (int)context.TopicFilter.QualityOfServiceLevel }, + { "accept_subscription", context.AcceptSubscription }, + { "close_connection", context.CloseConnection } }; _pythonScriptHostService.InvokeOptionalFunction("on_intercept_subscription", pythonContext); diff --git a/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs b/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs index c33580e..11efa57 100644 --- a/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs +++ b/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs @@ -1,20 +1,25 @@ using System.Collections.Generic; -using MQTTnet.Packets; namespace MQTTnet.Server { - public class MqttApplicationMessageInterceptorContext : MqttBaseInterceptorContext + public class MqttApplicationMessageInterceptorContext { - public MqttApplicationMessageInterceptorContext(string clientId, IDictionary sessionItems, MqttConnectPacket connectPacket, MqttApplicationMessage applicationMessage) : base(connectPacket, sessionItems) + public MqttApplicationMessageInterceptorContext(string clientId, IDictionary sessionItems, MqttApplicationMessage applicationMessage) { ClientId = clientId; ApplicationMessage = applicationMessage; + SessionItems = sessionItems; } public string ClientId { get; } public MqttApplicationMessage ApplicationMessage { get; set; } + /// + /// Gets or sets a key/value collection that can be used to share data within the scope of this session. + /// + public IDictionary SessionItems { get; } + public bool AcceptPublish { get; set; } = true; public bool CloseConnection { get; set; } diff --git a/Source/MQTTnet/Server/MqttBaseInterceptorContext.cs b/Source/MQTTnet/Server/MqttBaseInterceptorContext.cs deleted file mode 100644 index 6909d5e..0000000 --- a/Source/MQTTnet/Server/MqttBaseInterceptorContext.cs +++ /dev/null @@ -1,54 +0,0 @@ -using System.Collections.Generic; -using System.Text; -using MQTTnet.Packets; - -namespace MQTTnet.Server -{ - public class MqttBaseInterceptorContext - { - private readonly MqttConnectPacket _connectPacket; - - protected MqttBaseInterceptorContext(MqttConnectPacket connectPacket, IDictionary sessionItems) - { - _connectPacket = connectPacket; - SessionItems = sessionItems; - } - - public string Username => _connectPacket?.Username; - - public byte[] RawPassword => _connectPacket?.Password; - - public string Password => Encoding.UTF8.GetString(RawPassword ?? new byte[0]); - - public MqttApplicationMessage WillMessage => _connectPacket?.WillMessage; - - public bool? CleanSession => _connectPacket?.CleanSession; - - public ushort? KeepAlivePeriod => _connectPacket?.KeepAlivePeriod; - - public List UserProperties => _connectPacket?.Properties?.UserProperties; - - public byte[] AuthenticationData => _connectPacket?.Properties?.AuthenticationData; - - public string AuthenticationMethod => _connectPacket?.Properties?.AuthenticationMethod; - - public uint? MaximumPacketSize => _connectPacket?.Properties?.MaximumPacketSize; - - public ushort? ReceiveMaximum => _connectPacket?.Properties?.ReceiveMaximum; - - public ushort? TopicAliasMaximum => _connectPacket?.Properties?.TopicAliasMaximum; - - public bool? RequestProblemInformation => _connectPacket?.Properties?.RequestProblemInformation; - - public bool? RequestResponseInformation => _connectPacket?.Properties?.RequestResponseInformation; - - public uint? SessionExpiryInterval => _connectPacket?.Properties?.SessionExpiryInterval; - - public uint? WillDelayInterval => _connectPacket?.Properties?.WillDelayInterval; - - /// - /// Gets or sets a key/value collection that can be used to share data within the scope of this session. - /// - public IDictionary SessionItems { get; } - } -} \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 73263cb..d165001 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -54,7 +54,7 @@ namespace MQTTnet.Server public async Task SubscribeAsync(ICollection topicFilters, MqttRetainedMessagesManager retainedMessagesManager) { - await SubscriptionsManager.SubscribeAsync(topicFilters, null).ConfigureAwait(false); + await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false); var matchingRetainedMessages = await retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false); foreach (var matchingRetainedMessage in matchingRetainedMessages) diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 1463ed4..db70e95 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -290,7 +290,7 @@ namespace MQTTnet.Server private async Task ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter) { - var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter); + var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter, new ConcurrentDictionary()); var connectionValidator = _options.ConnectionValidator; @@ -372,23 +372,20 @@ namespace MQTTnet.Server string senderClientId; IDictionary sessionItems; - MqttConnectPacket connectPacket; var messageIsFromServer = senderConnection == null; if (messageIsFromServer) { senderClientId = _options.ClientId; sessionItems = _serverSessionItems; - connectPacket = null; } else { senderClientId = senderConnection.ClientId; sessionItems = senderConnection.Session.Items; - connectPacket = senderConnection.ConnectPacket; } - var interceptorContext = new MqttApplicationMessageInterceptorContext(senderClientId, sessionItems, connectPacket, applicationMessage); + var interceptorContext = new MqttApplicationMessageInterceptorContext(senderClientId, sessionItems, applicationMessage); await interceptor.InterceptApplicationMessagePublishAsync(interceptorContext).ConfigureAwait(false); return interceptorContext; } diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index 04d7495..59eafe5 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -40,7 +40,7 @@ namespace MQTTnet.Server foreach (var originalTopicFilter in subscribePacket.TopicFilters) { - var interceptorContext = await InterceptSubscribeAsync(originalTopicFilter, connectPacket).ConfigureAwait(false); + var interceptorContext = await InterceptSubscribeAsync(originalTopicFilter).ConfigureAwait(false); var finalTopicFilter = interceptorContext.TopicFilter; @@ -74,13 +74,13 @@ namespace MQTTnet.Server return result; } - public async Task SubscribeAsync(IEnumerable topicFilters, MqttConnectPacket connectPacket) + public async Task SubscribeAsync(IEnumerable topicFilters) { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); foreach (var topicFilter in topicFilters) { - var interceptorContext = await InterceptSubscribeAsync(topicFilter, connectPacket).ConfigureAwait(false); + var interceptorContext = await InterceptSubscribeAsync(topicFilter).ConfigureAwait(false); if (!interceptorContext.AcceptSubscription) { continue; @@ -195,9 +195,9 @@ namespace MQTTnet.Server } } - private async Task InterceptSubscribeAsync(TopicFilter topicFilter, MqttConnectPacket connectPacket) + private async Task InterceptSubscribeAsync(TopicFilter topicFilter) { - var context = new MqttSubscriptionInterceptorContext(_clientSession.ClientId, topicFilter, connectPacket, _clientSession.Items); + var context = new MqttSubscriptionInterceptorContext(_clientSession.ClientId, topicFilter, _clientSession.Items); if (_serverOptions.SubscriptionInterceptor != null) { await _serverOptions.SubscriptionInterceptor.InterceptSubscriptionAsync(context).ConfigureAwait(false); diff --git a/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs b/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs index 2ab3383..9a5b8b9 100644 --- a/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs +++ b/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs @@ -1,7 +1,7 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Security.Cryptography.X509Certificates; +using System.Text; using MQTTnet.Adapter; using MQTTnet.Formatter; using MQTTnet.Packets; @@ -9,15 +9,16 @@ using MQTTnet.Protocol; namespace MQTTnet.Server { - public class MqttConnectionValidatorContext : MqttBaseInterceptorContext + public class MqttConnectionValidatorContext { private readonly MqttConnectPacket _connectPacket; private readonly IMqttChannelAdapter _clientAdapter; - public MqttConnectionValidatorContext(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter) : base(connectPacket, new ConcurrentDictionary()) + public MqttConnectionValidatorContext(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter, IDictionary sessionItems) { _connectPacket = connectPacket; _clientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter)); + SessionItems = sessionItems; } public string ClientId => _connectPacket.ClientId; @@ -29,7 +30,44 @@ namespace MQTTnet.Server public X509Certificate2 ClientCertificate => _clientAdapter.ClientCertificate; public MqttProtocolVersion ProtocolVersion => _clientAdapter.PacketFormatterAdapter.ProtocolVersion; - + + public string Username => _connectPacket?.Username; + + public byte[] RawPassword => _connectPacket?.Password; + + public string Password => Encoding.UTF8.GetString(RawPassword ?? new byte[0]); + + public MqttApplicationMessage WillMessage => _connectPacket?.WillMessage; + + public bool? CleanSession => _connectPacket?.CleanSession; + + public ushort? KeepAlivePeriod => _connectPacket?.KeepAlivePeriod; + + public List UserProperties => _connectPacket?.Properties?.UserProperties; + + public byte[] AuthenticationData => _connectPacket?.Properties?.AuthenticationData; + + public string AuthenticationMethod => _connectPacket?.Properties?.AuthenticationMethod; + + public uint? MaximumPacketSize => _connectPacket?.Properties?.MaximumPacketSize; + + public ushort? ReceiveMaximum => _connectPacket?.Properties?.ReceiveMaximum; + + public ushort? TopicAliasMaximum => _connectPacket?.Properties?.TopicAliasMaximum; + + public bool? RequestProblemInformation => _connectPacket?.Properties?.RequestProblemInformation; + + public bool? RequestResponseInformation => _connectPacket?.Properties?.RequestResponseInformation; + + public uint? SessionExpiryInterval => _connectPacket?.Properties?.SessionExpiryInterval; + + public uint? WillDelayInterval => _connectPacket?.Properties?.WillDelayInterval; + + /// + /// Gets or sets a key/value collection that can be used to share data within the scope of this session. + /// + public IDictionary SessionItems { get; } + /// /// This is used for MQTTv3 only. /// diff --git a/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs b/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs index 74799b2..7e3963b 100644 --- a/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs +++ b/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs @@ -1,21 +1,25 @@ -using System; -using System.Collections.Generic; -using MQTTnet.Packets; +using System.Collections.Generic; namespace MQTTnet.Server { - public class MqttSubscriptionInterceptorContext : MqttBaseInterceptorContext + public class MqttSubscriptionInterceptorContext { - public MqttSubscriptionInterceptorContext(string clientId, TopicFilter topicFilter, MqttConnectPacket connectPacket, IDictionary sessionItems) : base(connectPacket, sessionItems) + public MqttSubscriptionInterceptorContext(string clientId, TopicFilter topicFilter, IDictionary sessionItems) { ClientId = clientId; - TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter)); + TopicFilter = topicFilter; + SessionItems = sessionItems; } public string ClientId { get; } public TopicFilter TopicFilter { get; set; } + /// + /// Gets or sets a key/value collection that can be used to share data within the scope of this session. + /// + public IDictionary SessionItems { get; } + public bool AcceptSubscription { get; set; } = true; public bool CloseConnection { get; set; } diff --git a/Tests/MQTTnet.Core.Tests/Session_Tests.cs b/Tests/MQTTnet.Core.Tests/Session_Tests.cs new file mode 100644 index 0000000..d06bd4e --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/Session_Tests.cs @@ -0,0 +1,61 @@ +using System.Text; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Client; +using MQTTnet.Client.Subscribing; +using MQTTnet.Server; +using MQTTnet.Tests.Mockups; + +namespace MQTTnet.Tests +{ + [TestClass] + public class Session_Tests + { + [TestMethod] + public async Task Set_Session_Item() + { + using (var testEnvironment = new TestEnvironment()) + { + var serverOptions = new MqttServerOptionsBuilder() + .WithConnectionValidator(delegate (MqttConnectionValidatorContext context) + { + // Don't validate anything. Just set some session items. + context.SessionItems["can_subscribe_x"] = true; + context.SessionItems["default_payload"] = "Hello World"; + }) + .WithSubscriptionInterceptor(delegate (MqttSubscriptionInterceptorContext context) + { + if (context.TopicFilter.Topic == "x") + { + context.AcceptSubscription = context.SessionItems["can_subscribe_x"] as bool? == true; + } + }) + .WithApplicationMessageInterceptor(delegate (MqttApplicationMessageInterceptorContext context) + { + context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(context.SessionItems["default_payload"] as string); + }); + + await testEnvironment.StartServerAsync(serverOptions); + + string receivedPayload = null; + + var client = await testEnvironment.ConnectClientAsync(); + client.UseApplicationMessageReceivedHandler(delegate(MqttApplicationMessageReceivedEventArgs args) + { + receivedPayload = args.ApplicationMessage.ConvertPayloadToString(); + }); + + var subscribeResult = await client.SubscribeAsync("x"); + + Assert.AreEqual(MqttClientSubscribeResultCode.GrantedQoS0, subscribeResult.Items[0].ResultCode); + + var client2 = await testEnvironment.ConnectClientAsync(); + await client2.PublishAsync("x"); + + await Task.Delay(1000); + + Assert.AreEqual("Hello World", receivedPayload); + } + } + } +} From 6608172719fc8189686e39675d22511d8d473989 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 7 Jul 2019 21:26:07 +0200 Subject: [PATCH 05/30] MQTTnet.Server: Extend messages API. --- .../Controllers/MessagesController.cs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet.Server/Controllers/MessagesController.cs b/Source/MQTTnet.Server/Controllers/MessagesController.cs index 89eb385..6bd00e7 100644 --- a/Source/MQTTnet.Server/Controllers/MessagesController.cs +++ b/Source/MQTTnet.Server/Controllers/MessagesController.cs @@ -1,7 +1,9 @@ using System; +using System.IO; using System.Threading.Tasks; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; +using MQTTnet.Protocol; using MQTTnet.Server.Mqtt; namespace MQTTnet.Server.Controllers @@ -27,14 +29,23 @@ namespace MQTTnet.Server.Controllers [Route("api/v1/messages/{*topic}")] [HttpPost] - public Task PostMessage(string topic, string payload) + public async Task PostMessage(string topic, int qosLevel = 0) { + byte[] payload; + + using (var memoryStream = new MemoryStream()) + { + await HttpContext.Request.Body.CopyToAsync(memoryStream); + payload = memoryStream.ToArray(); + } + var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) + .WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qosLevel) .Build(); - return PostMessage(message); + return await PostMessage(message); } } } From 4f66614cdd0a8dc5fd6428482c289e21d154354f Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Tue, 9 Jul 2019 14:13:20 +0200 Subject: [PATCH 06/30] fixed #691 --- .../MqttConnectionContext.cs | 9 ++--- .../Mockups/DuplexPipeMockup.cs | 12 +++++-- .../Mockups/LimitedMemoryPool.cs | 18 ++++++++++ .../Mockups/MemoryOwner.cs | 33 +++++++++++++++++++ .../MqttConnectionContextTest.cs | 16 +++++++++ 5 files changed, 82 insertions(+), 6 deletions(-) create mode 100644 Tests/MQTTnet.AspNetCore.Tests/Mockups/LimitedMemoryPool.cs create mode 100644 Tests/MQTTnet.AspNetCore.Tests/Mockups/MemoryOwner.cs diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs index f50d85d..2aa9842 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -162,11 +162,12 @@ namespace MQTTnet.AspNetCore var buffer = formatter.Encode(packet); var msg = buffer.AsMemory(); var output = _output; - msg.CopyTo(output.GetMemory(msg.Length)); - BytesSent += msg.Length; + var result = await output.WriteAsync(msg, cancellationToken).ConfigureAwait(false); + if (result.IsCompleted) + { + BytesSent += msg.Length; + } PacketFormatterAdapter.FreeBuffer(); - output.Advance(msg.Length); - await output.FlushAsync().ConfigureAwait(false); } finally { diff --git a/Tests/MQTTnet.AspNetCore.Tests/Mockups/DuplexPipeMockup.cs b/Tests/MQTTnet.AspNetCore.Tests/Mockups/DuplexPipeMockup.cs index 1774f18..306749b 100644 --- a/Tests/MQTTnet.AspNetCore.Tests/Mockups/DuplexPipeMockup.cs +++ b/Tests/MQTTnet.AspNetCore.Tests/Mockups/DuplexPipeMockup.cs @@ -4,13 +4,21 @@ namespace MQTTnet.AspNetCore.Tests.Mockups { public class DuplexPipeMockup : IDuplexPipe { + public DuplexPipeMockup() + { + var pool = new LimitedMemoryPool(); + var pipeOptions = new PipeOptions(pool); + Receive = new Pipe(pipeOptions); + Send = new Pipe(pipeOptions); + } + PipeReader IDuplexPipe.Input => Receive.Reader; PipeWriter IDuplexPipe.Output => Send.Writer; - public Pipe Receive { get; set; } = new Pipe(); + public Pipe Receive { get; set; } - public Pipe Send { get; set; } = new Pipe(); + public Pipe Send { get; set; } } } diff --git a/Tests/MQTTnet.AspNetCore.Tests/Mockups/LimitedMemoryPool.cs b/Tests/MQTTnet.AspNetCore.Tests/Mockups/LimitedMemoryPool.cs new file mode 100644 index 0000000..ac5c23c --- /dev/null +++ b/Tests/MQTTnet.AspNetCore.Tests/Mockups/LimitedMemoryPool.cs @@ -0,0 +1,18 @@ +using System.Buffers; + +namespace MQTTnet.AspNetCore.Tests.Mockups +{ + public class LimitedMemoryPool : MemoryPool + { + protected override void Dispose(bool disposing) + { + } + + public override IMemoryOwner Rent(int minBufferSize = -1) + { + return new MemoryOwner(minBufferSize); + } + + public override int MaxBufferSize { get; } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.AspNetCore.Tests/Mockups/MemoryOwner.cs b/Tests/MQTTnet.AspNetCore.Tests/Mockups/MemoryOwner.cs new file mode 100644 index 0000000..1b7b02f --- /dev/null +++ b/Tests/MQTTnet.AspNetCore.Tests/Mockups/MemoryOwner.cs @@ -0,0 +1,33 @@ +using System; +using System.Buffers; + +namespace MQTTnet.AspNetCore.Tests.Mockups +{ + public class MemoryOwner : IMemoryOwner + { + private readonly byte[] _raw; + + public MemoryOwner(int size) + { + if (size <= 0) + { + size = 1024; + } + + if (size > 4096) + { + size = 4096; + } + + _raw = ArrayPool.Shared.Rent(size); + Memory = _raw; + } + + public void Dispose() + { + ArrayPool.Shared.Return(_raw); + } + + public Memory Memory { get; } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs b/Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs index 8fb74ae..f916779 100644 --- a/Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs +++ b/Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs @@ -47,5 +47,21 @@ namespace MQTTnet.AspNetCore.Tests await Task.WhenAll(tasks).ConfigureAwait(false); } + + + [TestMethod] + public async Task TestLargePacket() + { + var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311); + var pipe = new DuplexPipeMockup(); + var connection = new DefaultConnectionContext(); + connection.Transport = pipe; + var ctx = new MqttConnectionContext(serializer, connection); + + await ctx.SendPacketAsync(new MqttPublishPacket() { Payload = new byte[20_000] }, TimeSpan.Zero, CancellationToken.None).ConfigureAwait(false); + + var readResult = await pipe.Send.Reader.ReadAsync(); + Assert.IsTrue(readResult.Buffer.Length > 20000); + } } } From f223a26748c25538674419c06f46ab0ac0d8ef2a Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 9 Jul 2019 22:42:44 +0200 Subject: [PATCH 07/30] Update docs. --- Build/MQTTnet.nuspec | 1 + 1 file changed, 1 insertion(+) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index a360e21..c93416e 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -13,6 +13,7 @@ * [Server] Added items dictionary to client session in order to share data across interceptors as along as the session exists. * [Server] Exposed CONNECT packet properties in Application Message and Subscription interceptor. +* [Server] Fixed: Sending Large packets with AspnetCore based connection throws System.ArgumentException. * [MQTTnet.Server] Added REST API for publishing basic messages. Copyright Christian Kratky 2016-2019 From 4eb2b77138cf9308c3b1da98a741f663de71da03 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 11 Jul 2019 21:11:11 +0200 Subject: [PATCH 08/30] Fix wrong NoDelay usage in server implementation. --- Build/MQTTnet.nuspec | 1 + Source/MQTTnet/Implementations/MqttTcpServerListener.cs | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index c93416e..2ffae0a 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -14,6 +14,7 @@ * [Server] Added items dictionary to client session in order to share data across interceptors as along as the session exists. * [Server] Exposed CONNECT packet properties in Application Message and Subscription interceptor. * [Server] Fixed: Sending Large packets with AspnetCore based connection throws System.ArgumentException. +* [Server] Fixed wrong usage of socket option _NoDelay_. * [MQTTnet.Server] Added REST API for publishing basic messages. Copyright Christian Kratky 2016-2019 diff --git a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs index 62eea00..84f6482 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs @@ -61,6 +61,8 @@ namespace MQTTnet.Implementations _socket = new Socket(_addressFamily, SocketType.Stream, ProtocolType.Tcp); + // Usage of socket options is described here: https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.setsocketoption?view=netcore-2.2 + if (_options.ReuseAddress) { _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); @@ -68,7 +70,7 @@ namespace MQTTnet.Implementations if (_options.NoDelay) { - _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true); + _socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true); } _socket.Bind(_localEndPoint); From 87b4ce5185334cad6aefd2aaf0a1a75955c898bb Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 11 Jul 2019 21:28:07 +0200 Subject: [PATCH 09/30] Extend build and upload script to publish symbols as well. --- .gitignore | 2 ++ Build/MQTTnet.nuspec | 1 + Build/build.ps1 | 12 ++++++------ Build/upload.ps1 | 2 +- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 93a9b60..bc1ad4a 100644 --- a/.gitignore +++ b/.gitignore @@ -292,3 +292,5 @@ __pycache__/ *.map /Tests/MQTTnet.TestApp.NetCore/RetainedMessages.json + +Build/NuGet/ diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 2ffae0a..f47029a 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -11,6 +11,7 @@ false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol. +* [Core] Nuget packages with symbols are now also published to improve debugging. * [Server] Added items dictionary to client session in order to share data across interceptors as along as the session exists. * [Server] Exposed CONNECT packet properties in Application Message and Subscription interceptor. * [Server] Fixed: Sending Large packets with AspnetCore based connection throws System.ArgumentException. diff --git a/Build/build.ps1 b/Build/build.ps1 index f5f9975..33f2767 100644 --- a/Build/build.ps1 +++ b/Build/build.ps1 @@ -59,12 +59,12 @@ Copy-Item MQTTnet.Extensions.WebSocket4Net.nuspec -Destination MQTTnet.Extension (Get-Content MQTTnet.Extensions.WebSocket4Net.nuspec) -replace '\$nugetVersion', $nugetVersion | Set-Content MQTTnet.Extensions.WebSocket4Net.nuspec New-Item -ItemType Directory -Force -Path .\NuGet -.\nuget.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion -.\nuget.exe pack MQTTnet.NETStandard.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion -.\nuget.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion -.\nuget.exe pack MQTTnet.Extensions.Rpc.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion -.\nuget.exe pack MQTTnet.Extensions.ManagedClient.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion -.\nuget.exe pack MQTTnet.Extensions.WebSocket4Net.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion +.\nuget.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -SymbolPackageFormat snupkg -OutputDir "NuGet" -Version $nugetVersion +.\nuget.exe pack MQTTnet.NETStandard.nuspec -Verbosity detailed -Symbols -SymbolPackageFormat snupkg -OutputDir "NuGet" -Version $nugetVersion +.\nuget.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -SymbolPackageFormat snupkg -OutputDir "NuGet" -Version $nugetVersion +.\nuget.exe pack MQTTnet.Extensions.Rpc.nuspec -Verbosity detailed -Symbols -SymbolPackageFormat snupkg -OutputDir "NuGet" -Version $nugetVersion +.\nuget.exe pack MQTTnet.Extensions.ManagedClient.nuspec -Verbosity detailed -Symbols -SymbolPackageFormat snupkg -OutputDir "NuGet" -Version $nugetVersion +.\nuget.exe pack MQTTnet.Extensions.WebSocket4Net.nuspec -Verbosity detailed -Symbols -SymbolPackageFormat snupkg -OutputDir "NuGet" -Version $nugetVersion Move-Item MQTTnet.AspNetCore.nuspec.old -Destination MQTTnet.AspNetCore.nuspec -Force Move-Item MQTTnet.Extensions.Rpc.nuspec.old -Destination MQTTnet.Extensions.Rpc.nuspec -Force diff --git a/Build/upload.ps1 b/Build/upload.ps1 index adb7c6d..794fe87 100644 --- a/Build/upload.ps1 +++ b/Build/upload.ps1 @@ -7,7 +7,7 @@ foreach ($file in $files) { Write-Host "Uploading: " $file - .\nuget.exe push $file.Fullname $apiKey -NoSymbols -Source https://api.nuget.org/v3/index.json + .\nuget.exe push $file.Fullname $apiKey -Source https://api.nuget.org/v3/index.json } Remove-Item "nuget.exe" -Force -Recurse -ErrorAction SilentlyContinue \ No newline at end of file From ba8ce15ab40565d7ac187ce8b6385fb6b3ba0be1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rud=C3=A1=20Cunha?= Date: Thu, 11 Jul 2019 16:28:09 -0300 Subject: [PATCH 10/30] Add remote certificate validation callback on server --- Source/MQTTnet/Implementations/MqttTcpServerListener.cs | 2 +- Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs index 62eea00..fe0e792 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs @@ -160,7 +160,7 @@ namespace MQTTnet.Implementations if (_tlsCertificate != null) { - var sslStream = new SslStream(stream, false); + var sslStream = new SslStream(stream, false, _tlsOptions.RemoteCertificateValidationCallback); await sslStream.AuthenticateAsServerAsync( _tlsCertificate, diff --git a/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs b/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs index 212b052..3e787ed 100644 --- a/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs +++ b/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs @@ -1,4 +1,5 @@ -using System.Security.Authentication; +using System.Net.Security; +using System.Security.Authentication; namespace MQTTnet.Server { @@ -14,7 +15,9 @@ namespace MQTTnet.Server public bool ClientCertificateRequired { get; set; } public bool CheckCertificateRevocation { get; set; } - + + public RemoteCertificateValidationCallback RemoteCertificateValidationCallback { get; set; } + public SslProtocols SslProtocol { get; set; } = SslProtocols.Tls12; } } From e7ba3f76dcdd4f3d59bc2dd3c507ffa434a36903 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 11 Jul 2019 23:03:56 +0200 Subject: [PATCH 11/30] Add options builder methods for remote certificate validation callback. --- Build/MQTTnet.nuspec | 1 + Source/MQTTnet/Server/MqttServerOptionsBuilder.cs | 11 ++++++++++- .../MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs | 3 ++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index f47029a..1c496d0 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -16,6 +16,7 @@ * [Server] Exposed CONNECT packet properties in Application Message and Subscription interceptor. * [Server] Fixed: Sending Large packets with AspnetCore based connection throws System.ArgumentException. * [Server] Fixed wrong usage of socket option _NoDelay_. +* [Server] Added remote certificate validation callback (thanks to @rudacs). * [MQTTnet.Server] Added REST API for publishing basic messages. Copyright Christian Kratky 2016-2019 diff --git a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs index 357cb82..1fcd981 100644 --- a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs +++ b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs @@ -1,5 +1,6 @@ using System; using System.Net; +using System.Net.Security; using System.Security.Authentication; namespace MQTTnet.Server @@ -98,7 +99,15 @@ namespace MQTTnet.Server _options.TlsEndpointOptions.IsEnabled = false; return this; } - + +#if !WINDOWS_UWP + public MqttServerOptionsBuilder WithRemoteCertificateValidationCallback(RemoteCertificateValidationCallback value) + { + _options.TlsEndpointOptions.RemoteCertificateValidationCallback = value; + return this; + } +#endif + public MqttServerOptionsBuilder WithStorage(IMqttServerStorage value) { _options.Storage = value; diff --git a/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs b/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs index 3e787ed..282bef9 100644 --- a/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs +++ b/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs @@ -16,8 +16,9 @@ namespace MQTTnet.Server public bool CheckCertificateRevocation { get; set; } +#if !WINDOWS_UWP public RemoteCertificateValidationCallback RemoteCertificateValidationCallback { get; set; } - +#endif public SslProtocols SslProtocol { get; set; } = SslProtocols.Tls12; } } From ff22c34bb2342de52c25dc794bdcb6dada987c64 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 12 Jul 2019 20:36:46 +0200 Subject: [PATCH 12/30] Add first classes for persisted session support. --- Source/MQTTnet/Server/IMqttServerOptions.cs | 4 +++- .../Server/IMqttServerPersistedSession.cs | 22 +++++++++++++++++++ .../IMqttServerPersistedSessionsStorage.cs | 11 ++++++++++ Source/MQTTnet/Server/MqttClientConnection.cs | 2 +- .../Server/MqttClientSubscriptionsManager.cs | 8 +++---- 5 files changed, 41 insertions(+), 6 deletions(-) create mode 100644 Source/MQTTnet/Server/IMqttServerPersistedSession.cs create mode 100644 Source/MQTTnet/Server/IMqttServerPersistedSessionsStorage.cs diff --git a/Source/MQTTnet/Server/IMqttServerOptions.cs b/Source/MQTTnet/Server/IMqttServerOptions.cs index 3a24289..7c5fde4 100644 --- a/Source/MQTTnet/Server/IMqttServerOptions.cs +++ b/Source/MQTTnet/Server/IMqttServerOptions.cs @@ -21,6 +21,8 @@ namespace MQTTnet.Server MqttServerTcpEndpointOptions DefaultEndpointOptions { get; } MqttServerTlsTcpEndpointOptions TlsEndpointOptions { get; } - IMqttServerStorage Storage { get; } + IMqttServerStorage Storage { get; } + + } } \ No newline at end of file diff --git a/Source/MQTTnet/Server/IMqttServerPersistedSession.cs b/Source/MQTTnet/Server/IMqttServerPersistedSession.cs new file mode 100644 index 0000000..18f7165 --- /dev/null +++ b/Source/MQTTnet/Server/IMqttServerPersistedSession.cs @@ -0,0 +1,22 @@ +using System; +using System.Collections.Generic; + +namespace MQTTnet.Server +{ + public interface IMqttServerPersistedSession + { + string ClientId { get; } + + IDictionary Items { get; } + + IList Subscriptions { get; } + + MqttApplicationMessage WillMessage { get; } + + uint? WillDelayInterval { get; } + + DateTime? SessionExpiryTimestamp { get; } + + IList PendingApplicationMessages { get; } + } +} diff --git a/Source/MQTTnet/Server/IMqttServerPersistedSessionsStorage.cs b/Source/MQTTnet/Server/IMqttServerPersistedSessionsStorage.cs new file mode 100644 index 0000000..fa2a4cb --- /dev/null +++ b/Source/MQTTnet/Server/IMqttServerPersistedSessionsStorage.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace MQTTnet.Server +{ + public interface IMqttServerPersistedSessionsStorage + { + Task> LoadPersistedSessionsAsync(); + } +} diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index a34f28b..e71d1a8 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -229,7 +229,7 @@ namespace MQTTnet.Server } else { - _logger.Error(exception, "Client '{0}': Unhandled exception while receiving client packets.", ClientId); + _logger.Error(exception, "Client '{0}': Error while receiving client packets.", ClientId); } StopInternal(); diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index 59eafe5..c84a018 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -9,7 +9,7 @@ namespace MQTTnet.Server { public class MqttClientSubscriptionsManager { - private readonly Dictionary _subscriptions = new Dictionary(); + private readonly Dictionary _subscriptions = new Dictionary(); private readonly MqttClientSession _clientSession; private readonly IMqttServerOptions _serverOptions; private readonly MqttServerEventDispatcher _eventDispatcher; @@ -64,7 +64,7 @@ namespace MQTTnet.Server { lock (_subscriptions) { - _subscriptions[finalTopicFilter.Topic] = finalTopicFilter.QualityOfServiceLevel; + _subscriptions[finalTopicFilter.Topic] = finalTopicFilter; } await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientSession.ClientId, finalTopicFilter).ConfigureAwait(false); @@ -90,7 +90,7 @@ namespace MQTTnet.Server { lock (_subscriptions) { - _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; + _subscriptions[topicFilter.Topic] = topicFilter; } await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false); @@ -158,7 +158,7 @@ namespace MQTTnet.Server continue; } - qosLevels.Add(subscription.Value); + qosLevels.Add(subscription.Value.QualityOfServiceLevel); } } From 128484c55a14111ead2f3fe216d62cabc6a0cb6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rud=C3=A1=20Cunha?= Date: Mon, 15 Jul 2019 10:46:45 -0300 Subject: [PATCH 13/30] X509Certificate convert to X509Certificate2 in ClientCertificate --- Source/MQTTnet/Implementations/MqttTcpServerListener.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs index 62eea00..7ce1abe 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs @@ -171,6 +171,11 @@ namespace MQTTnet.Implementations stream = sslStream; clientCertificate = sslStream.RemoteCertificate as X509Certificate2; + + if (clientCertificate == null && sslStream.RemoteCertificate != null) + { + clientCertificate = new X509Certificate2(sslStream.RemoteCertificate.Export(X509ContentType.Cert)); + } } var clientHandler = ClientHandler; From dbb227406bbaec7c363c234ce57c76561e132af5 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 17 Jul 2019 21:58:37 +0200 Subject: [PATCH 14/30] Added UnitTests. --- Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs index 155b55e..c1245ce 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs @@ -158,6 +158,25 @@ namespace MQTTnet.Tests } } + [TestMethod] + public async Task Fire_Disconnected_Event_On_Server_Shutdown() + { + using (var testEnvironment = new TestEnvironment()) + { + var server = await testEnvironment.StartServerAsync(); + var client = await testEnvironment.ConnectClientAsync(); + + var handlerFired = false; + client.UseDisconnectedHandler(e => handlerFired = true); + + await server.StopAsync(); + + await Task.Delay(4000); + + Assert.IsTrue(handlerFired); + } + } + [TestMethod] public async Task Disconnect_Event_Contains_Exception() { From e1590fc0c8314e4117f001769b14dfc9a990c7eb Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Wed, 17 Jul 2019 16:55:56 -0400 Subject: [PATCH 15/30] Fixed storage queue race condition Sometimes, TryPublishQueuedMessageAsync would try to remove a message from the storage queue before PublishAsync added it to the storage queue, resulting in a message being stuck in the storage queue forever. Switched the message queue lock to an async lock and synchronized the storage queue updates with the message queue updates. --- .../ManagedMqttClient.cs | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 74d148d..a6ee48b 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -24,6 +24,8 @@ namespace MQTTnet.Extensions.ManagedClient private readonly IMqttClient _mqttClient; private readonly IMqttNetChildLogger _logger; + + private readonly AsyncLock _messageQueueLock = new AsyncLock(); private CancellationTokenSource _connectionCancellationToken; private CancellationTokenSource _publishingCancellationToken; @@ -147,7 +149,7 @@ namespace MQTTnet.Extensions.ManagedClient try { - lock (_messageQueue) + using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) { if (_messageQueue.Count >= Options.MaxPendingMessages) { @@ -167,6 +169,16 @@ namespace MQTTnet.Extensions.ManagedClient } _messageQueue.Enqueue(applicationMessage); + + if (_storageManager != null) + { + if (removedMessage != null) + { + await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false); + } + + await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); + } } } finally @@ -181,16 +193,6 @@ namespace MQTTnet.Extensions.ManagedClient } } - - if (_storageManager != null) - { - if (removedMessage != null) - { - await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false); - } - - await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); - } } public Task SubscribeAsync(IEnumerable topicFilters) @@ -377,7 +379,7 @@ namespace MQTTnet.Extensions.ManagedClient { await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false); - lock (_messageQueue) //lock to avoid conflict with this.PublishAsync + using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync { // While publishing this message, this.PublishAsync could have booted this // message off the queue to make room for another (when using a cap @@ -386,11 +388,11 @@ namespace MQTTnet.Extensions.ManagedClient // it from the queue. If not, that means this.PublishAsync has already // removed it, in which case we don't want to do anything. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); - } - - if (_storageManager != null) - { - await _storageManager.RemoveAsync(message).ConfigureAwait(false); + + if (_storageManager != null) + { + await _storageManager.RemoveAsync(message).ConfigureAwait(false); + } } } catch (MqttCommunicationException exception) @@ -408,14 +410,14 @@ namespace MQTTnet.Extensions.ManagedClient //contradict the expected behavior of QoS 1 and 2, that's also true //for the usage of a message queue cap, so it's still consistent //with prior behavior in that way. - lock (_messageQueue) //lock to avoid conflict with this.PublishAsync + using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync { _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); - } - - if (_storageManager != null) - { - await _storageManager.RemoveAsync(message).ConfigureAwait(false); + + if (_storageManager != null) + { + await _storageManager.RemoveAsync(message).ConfigureAwait(false); + } } } } @@ -533,4 +535,4 @@ namespace MQTTnet.Extensions.ManagedClient _connectionCancellationToken = null; } } -} \ No newline at end of file +} From ed06ea6bc95843a9924dc30db1e038e4e36f5035 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 19 Jul 2019 19:32:33 +0200 Subject: [PATCH 16/30] Add version information to MQTTnet.Server. --- .../Controllers/ServerController.cs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 Source/MQTTnet.Server/Controllers/ServerController.cs diff --git a/Source/MQTTnet.Server/Controllers/ServerController.cs b/Source/MQTTnet.Server/Controllers/ServerController.cs new file mode 100644 index 0000000..cf53bba --- /dev/null +++ b/Source/MQTTnet.Server/Controllers/ServerController.cs @@ -0,0 +1,18 @@ +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using System.Reflection; + +namespace MQTTnet.Server.Controllers +{ + [Authorize] + [ApiController] + public class ServerController : Controller + { + [Route("api/v1/server/version")] + [HttpGet] + public ActionResult GetVersion() + { + return Assembly.GetExecutingAssembly().GetCustomAttribute().InformationalVersion; + } + } +} From 8eb23f0f1d28ff0e3e223bfa29058222925b5024 Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Tue, 23 Jul 2019 15:45:50 -0400 Subject: [PATCH 17/30] Storage queue drain test It's a unit test to ensure the storage queue is drained. --- .../ManagedMqttClient_Tests.cs | 89 ++++++++++++++++++- 1 file changed, 88 insertions(+), 1 deletion(-) diff --git a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs index ca665a1..f0812f3 100644 --- a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs @@ -108,5 +108,92 @@ namespace MQTTnet.Tests Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count); } } + + [TestMethod] + public async Task Storage_Queue_Drains() + { + using (var testEnvironment = new TestEnvironment()) + { + testEnvironment.IgnoreClientLogErrors = true; + testEnvironment.IgnoreServerLogErrors = true; + + var factory = new MqttFactory(); + + var server = await testEnvironment.StartServerAsync(); + + var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger()); + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost", testEnvironment.ServerPort); + var storage = new ManagedMqttClientTestStorage(); + + TaskCompletionSource connected = new TaskCompletionSource(); + managedClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e => + { + managedClient.ConnectedHandler = null; + connected.SetResult(true); + }); + + await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder() + .WithClientOptions(clientOptions) + .WithStorage(storage) + .WithAutoReconnectDelay(System.TimeSpan.FromSeconds(5)) + .Build()); + + await connected.Task; + + await testEnvironment.Server.StopAsync(); + + await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "1" }); + + //Message should have been added to the storage queue in PublishAsync, + //and we are awaiting PublishAsync, so the message should already be + //in storage at this point (i.e. no waiting). + Assert.AreEqual(1, storage.GetMessageCount()); + + connected = new TaskCompletionSource(); + managedClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e => + { + managedClient.ConnectedHandler = null; + connected.SetResult(true); + }); + + await testEnvironment.Server.StartAsync(new MqttServerOptionsBuilder() + .WithDefaultEndpointPort(testEnvironment.ServerPort).Build()); + + await connected.Task; + + //Wait 500ms here so the client has time to publish the queued message + await Task.Delay(500); + + Assert.AreEqual(0, storage.GetMessageCount()); + + await managedClient.StopAsync(); + } + } + } + + public class ManagedMqttClientTestStorage : IManagedMqttClientStorage + { + private IList _messages = null; + + public Task> LoadQueuedMessagesAsync() + { + if (_messages == null) + { + _messages = new List(); + } + return Task.FromResult(_messages); + } + + public Task SaveQueuedMessagesAsync(IList messages) + { + _messages = messages; + return Task.FromResult(0); + } + + public int GetMessageCount() + { + return _messages.Count; + } } -} \ No newline at end of file +} From 1b5e0197e42ba4b8f8299bd12eb4ef40545753e6 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 23 Jul 2019 21:56:14 +0200 Subject: [PATCH 18/30] Update docs. --- Build/MQTTnet.nuspec | 1 + 1 file changed, 1 insertion(+) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 1c496d0..c0c9adc 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -12,6 +12,7 @@ MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol. * [Core] Nuget packages with symbols are now also published to improve debugging. +* [ManagedClient] Fix a race condition in the message storage (thanks to @PaulFake). * [Server] Added items dictionary to client session in order to share data across interceptors as along as the session exists. * [Server] Exposed CONNECT packet properties in Application Message and Subscription interceptor. * [Server] Fixed: Sending Large packets with AspnetCore based connection throws System.ArgumentException. From 3d404f9755176dc6c9d2f4536c44c4fea494fac9 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 23 Jul 2019 22:08:51 +0200 Subject: [PATCH 19/30] Fixed build. --- Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs index f0812f3..0aeea6d 100644 --- a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; From 5c40577bec8e42832ef09cae0741ef458f4c6043 Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Tue, 23 Jul 2019 16:20:36 -0400 Subject: [PATCH 20/30] Fixed AsyncLock::WaitAsync cancellation bug There isn't any code in MQTTnet that actually uses a cancellation token in WaitAsync, so this is more of a preventative thing than a bug fix. The original code just checks if the task was completed, not whether it was cancelled. If the process is cancelled immediately before the call to WaitAsync, it'll return as normal (https://referencesource.microsoft.com/#mscorlib/system/threading/SemaphoreSlim.cs,612) rather than throw a cancellation exception. This change will ensure we only return the releaser if the wait actually ran to completion rather than exited early due to cancellation. I've tested this, and it properly throws a cancellation exception later on. --- Source/MQTTnet/Internal/AsyncLock.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/MQTTnet/Internal/AsyncLock.cs b/Source/MQTTnet/Internal/AsyncLock.cs index 9b7eefd..17d7404 100644 --- a/Source/MQTTnet/Internal/AsyncLock.cs +++ b/Source/MQTTnet/Internal/AsyncLock.cs @@ -23,7 +23,7 @@ namespace MQTTnet.Internal public Task WaitAsync(CancellationToken cancellationToken) { var task = _semaphore.WaitAsync(cancellationToken); - if (task.IsCompleted) + if (task.Status == TaskStatus.RanToCompletion) { return _releaser; } From 9cc96f1888dd776b265e6aefc90699e7e27e1657 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 30 Jul 2019 20:01:45 +0200 Subject: [PATCH 21/30] Refactor log messages. --- Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index a6ee48b..f53a8dd 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -364,7 +364,7 @@ namespace MQTTnet.Extensions.ManagedClient } catch (Exception exception) { - _logger.Error(exception, "Unhandled exception while publishing queued application messages."); + _logger.Error(exception, "Error while publishing queued application messages."); } finally { @@ -424,7 +424,7 @@ namespace MQTTnet.Extensions.ManagedClient catch (Exception exception) { transmitException = exception; - _logger.Error(exception, $"Unhandled exception while publishing application message ({message.Id})."); + _logger.Error(exception, $"Error while publishing application message ({message.Id})."); } finally { From 60761512224ef5f50eeb5c8c2a9da220639b2531 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 30 Jul 2019 20:01:57 +0200 Subject: [PATCH 22/30] Update docs. --- Build/MQTTnet.nuspec | 1 + 1 file changed, 1 insertion(+) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index c0c9adc..8fc2673 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -12,6 +12,7 @@ MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol. * [Core] Nuget packages with symbols are now also published to improve debugging. +* [Core] Improve task handling (thanks to @mwinterb) * [ManagedClient] Fix a race condition in the message storage (thanks to @PaulFake). * [Server] Added items dictionary to client session in order to share data across interceptors as along as the session exists. * [Server] Exposed CONNECT packet properties in Application Message and Subscription interceptor. From 975eb60e90eb346a38993c01f139e098a3c7a655 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 30 Jul 2019 20:02:46 +0200 Subject: [PATCH 23/30] Fix memory leak when SSL is not working properly. --- Source/MQTTnet/Implementations/MqttTcpChannel.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs index 63adf55..12fd2bb 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs @@ -84,9 +84,9 @@ namespace MQTTnet.Implementations if (_options.TlsOptions.UseTls) { var sslStream = new SslStream(networkStream, false, InternalUserCertificateValidationCallback); - await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false); - _stream = sslStream; + + await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false); } else { From ab8c7df0b95ae6ab6045bc90934dbfa931487e8c Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 30 Jul 2019 20:35:22 +0200 Subject: [PATCH 24/30] Add unit Tests. --- Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs | 77 ++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs index c1245ce..51d1753 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs @@ -109,6 +109,83 @@ namespace MQTTnet.Tests } } + [TestMethod] + public async Task Reconnect_While_Server_Offline() + { + using (var testEnvironment = new TestEnvironment()) + { + testEnvironment.IgnoreClientLogErrors = true; + + var server = await testEnvironment.StartServerAsync(); + var client = await testEnvironment.ConnectClientAsync(); + + await Task.Delay(500); + Assert.IsTrue(client.IsConnected); + + await server.StopAsync(); + await Task.Delay(500); + Assert.IsFalse(client.IsConnected); + + for (var i = 0; i < 5; i++) + { + try + { + await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build()); + Assert.Fail("Must fail!"); + } + catch + { + } + } + + await server.StartAsync(new MqttServerOptionsBuilder().WithDefaultEndpointPort(testEnvironment.ServerPort).Build()); + await Task.Delay(500); + + await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build()); + Assert.IsTrue(client.IsConnected); + } + } + + [TestMethod] + public async Task Reconnect_From_Disconnected_Event() + { + using (var testEnvironment = new TestEnvironment()) + { + testEnvironment.IgnoreClientLogErrors = true; + + var client = testEnvironment.CreateClient(); + + var tries = 0; + var maxTries = 3; + + client.UseDisconnectedHandler(async e => + { + if (tries >= maxTries) + { + return; + } + + Interlocked.Increment(ref tries); + + await Task.Delay(100); + await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build()); + }); + + try + { + await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build()); + Assert.Fail("Must fail!"); + } + catch + { + } + + SpinWait.SpinUntil(() => tries >= maxTries, 10000); + + Assert.AreEqual(maxTries, tries); + } + } + [TestMethod] public async Task PacketIdentifier_In_Publish_Result() { From 8bfbbd2b297788d8e3351854ed9289d34f01e063 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 30 Jul 2019 20:35:33 +0200 Subject: [PATCH 25/30] Refactoring. --- Source/MQTTnet/Client/MqttClient.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index e29e9d0..27b56ff 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -257,12 +257,12 @@ namespace MQTTnet.Client { var clientWasConnected = IsConnected; - InitiateDisconnect(); - - IsConnected = false; + TryInitiateDisconnect(); try { + IsConnected = false; + if (_adapter != null) { _logger.Verbose("Disconnecting [Timeout={0}]", Options.CommunicationTimeout); @@ -295,7 +295,7 @@ namespace MQTTnet.Client } } - private void InitiateDisconnect() + private void TryInitiateDisconnect() { lock (_disconnectLock) { From a03bdc7616d61eb9217ff391c60b6ecd741b3a7f Mon Sep 17 00:00:00 2001 From: Craig Lutgen Date: Wed, 31 Jul 2019 13:04:03 -0500 Subject: [PATCH 26/30] Expose server certificate password and client certificate options to MqttServerOptionsBuilder --- .../Client/Options/MqttClientOptionsBuilder.cs | 7 +++++++ .../MQTTnet/Implementations/MqttTcpServerAdapter.cs | 2 +- Source/MQTTnet/Server/IMqttServerCredentials.cs | 6 ++++++ Source/MQTTnet/Server/MqttServerOptionsBuilder.cs | 13 +++++++++++-- .../Server/MqttServerTlsTcpEndpointOptions.cs | 2 ++ 5 files changed, 27 insertions(+), 3 deletions(-) create mode 100644 Source/MQTTnet/Server/IMqttServerCredentials.cs diff --git a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs index a7aefd1..65a1ec9 100644 --- a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs +++ b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs @@ -139,6 +139,13 @@ namespace MQTTnet.Client.Options return this; } + public MqttClientOptionsBuilder WithCredentials(IMqttClientCredentials credentials) + { + _options.Credentials = credentials; + + return this; + } + public MqttClientOptionsBuilder WithExtendedAuthenticationExchangeHandler(IMqttExtendedAuthenticationExchangeHandler handler) { _options.ExtendedAuthenticationExchangeHandler = handler; diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs index 0e28ad0..8ef8c51 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs @@ -48,7 +48,7 @@ namespace MQTTnet.Implementations throw new ArgumentException("TLS certificate is not set."); } - var tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate); + var tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate, options.TlsEndpointOptions.Password.Password); if (!tlsCertificate.HasPrivateKey) { throw new InvalidOperationException("The certificate for TLS encryption must contain the private key."); diff --git a/Source/MQTTnet/Server/IMqttServerCredentials.cs b/Source/MQTTnet/Server/IMqttServerCredentials.cs new file mode 100644 index 0000000..5e75be9 --- /dev/null +++ b/Source/MQTTnet/Server/IMqttServerCredentials.cs @@ -0,0 +1,6 @@ +using System; + +public interface IMqttServerCredentials +{ + String Password { get; } +} diff --git a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs index 1fcd981..de6b1a0 100644 --- a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs +++ b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs @@ -82,9 +82,10 @@ namespace MQTTnet.Server return this; } - public MqttServerOptionsBuilder WithEncryptionCertificate(byte[] value) + public MqttServerOptionsBuilder WithEncryptionCertificate(byte[] value, IMqttServerCredentials password = null) { _options.TlsEndpointOptions.Certificate = value; + _options.TlsEndpointOptions.Password = password; return this; } @@ -94,6 +95,14 @@ namespace MQTTnet.Server return this; } + public MqttServerOptionsBuilder WithClientCertificate(RemoteCertificateValidationCallback validationCallback = null, bool checkCertificateRevocation = false) + { + _options.TlsEndpointOptions.ClientCertificateRequired = true; + _options.TlsEndpointOptions.CheckCertificateRevocation = checkCertificateRevocation; + _options.TlsEndpointOptions.CertificateValidationCallback = validationCallback; + return this; + } + public MqttServerOptionsBuilder WithoutEncryptedEndpoint() { _options.TlsEndpointOptions.IsEnabled = false; @@ -107,7 +116,7 @@ namespace MQTTnet.Server return this; } #endif - + public MqttServerOptionsBuilder WithStorage(IMqttServerStorage value) { _options.Storage = value; diff --git a/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs b/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs index 282bef9..8d65230 100644 --- a/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs +++ b/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs @@ -12,6 +12,8 @@ namespace MQTTnet.Server public byte[] Certificate { get; set; } + public IMqttServerCredentials Password { get; set; } + public bool ClientCertificateRequired { get; set; } public bool CheckCertificateRevocation { get; set; } From 9d63500f95a2ee835a9095bafd02fe91f8e48920 Mon Sep 17 00:00:00 2001 From: Craig Lutgen Date: Wed, 31 Jul 2019 13:27:33 -0500 Subject: [PATCH 27/30] Fix build issue with UWP --- Source/MQTTnet/Server/MqttServerOptionsBuilder.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs index de6b1a0..8404271 100644 --- a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs +++ b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs @@ -95,13 +95,15 @@ namespace MQTTnet.Server return this; } +#if !WINDOWS_UWP public MqttServerOptionsBuilder WithClientCertificate(RemoteCertificateValidationCallback validationCallback = null, bool checkCertificateRevocation = false) { _options.TlsEndpointOptions.ClientCertificateRequired = true; _options.TlsEndpointOptions.CheckCertificateRevocation = checkCertificateRevocation; - _options.TlsEndpointOptions.CertificateValidationCallback = validationCallback; + _options.TlsEndpointOptions.RemoteCertificateValidationCallback = validationCallback; return this; } +#endif public MqttServerOptionsBuilder WithoutEncryptedEndpoint() { From 59d2a8e551f09e9eb57597798a40e024f3a7147c Mon Sep 17 00:00:00 2001 From: Craig Lutgen Date: Thu, 8 Aug 2019 13:35:14 -0500 Subject: [PATCH 28/30] Changed server crentials property name from password to certificateCredentials --- Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs | 2 +- Source/MQTTnet/Server/MqttServerOptionsBuilder.cs | 4 ++-- Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs index 8ef8c51..e3dcab8 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs @@ -48,7 +48,7 @@ namespace MQTTnet.Implementations throw new ArgumentException("TLS certificate is not set."); } - var tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate, options.TlsEndpointOptions.Password.Password); + var tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate, options.TlsEndpointOptions.CertificateCredentials.Password); if (!tlsCertificate.HasPrivateKey) { throw new InvalidOperationException("The certificate for TLS encryption must contain the private key."); diff --git a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs index 8404271..5991e7d 100644 --- a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs +++ b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs @@ -82,10 +82,10 @@ namespace MQTTnet.Server return this; } - public MqttServerOptionsBuilder WithEncryptionCertificate(byte[] value, IMqttServerCredentials password = null) + public MqttServerOptionsBuilder WithEncryptionCertificate(byte[] value, IMqttServerCredentials credentials = null) { _options.TlsEndpointOptions.Certificate = value; - _options.TlsEndpointOptions.Password = password; + _options.TlsEndpointOptions.CertificateCredentials = credentials; return this; } diff --git a/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs b/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs index 8d65230..e92d987 100644 --- a/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs +++ b/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs @@ -12,7 +12,7 @@ namespace MQTTnet.Server public byte[] Certificate { get; set; } - public IMqttServerCredentials Password { get; set; } + public IMqttServerCredentials CertificateCredentials { get; set; } public bool ClientCertificateRequired { get; set; } From 524f69582954fb4efbd2005119acd4d5705982ab Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 9 Aug 2019 22:21:42 +0200 Subject: [PATCH 29/30] Update docs. --- Build/MQTTnet.nuspec | 1 + Source/MQTTnet/MQTTnet.csproj | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 8fc2673..6aafdc6 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -19,6 +19,7 @@ * [Server] Fixed: Sending Large packets with AspnetCore based connection throws System.ArgumentException. * [Server] Fixed wrong usage of socket option _NoDelay_. * [Server] Added remote certificate validation callback (thanks to @rudacs). +* [Server] Add support for certificate passwords (thanks to @cslutgen). * [MQTTnet.Server] Added REST API for publishing basic messages. Copyright Christian Kratky 2016-2019 diff --git a/Source/MQTTnet/MQTTnet.csproj b/Source/MQTTnet/MQTTnet.csproj index 5e0251d..61ca517 100644 --- a/Source/MQTTnet/MQTTnet.csproj +++ b/Source/MQTTnet/MQTTnet.csproj @@ -41,13 +41,13 @@ RELEASE;NETSTANDARD1_3 - + - + From 2300e68fff459314dcf4db3ef80a1b66d5c0086f Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 11 Aug 2019 10:41:36 +0200 Subject: [PATCH 30/30] Add UnitTests for RPC. --- Tests/MQTTnet.Core.Tests/RPC_Tests.cs | 67 +++++++++++++++++++++------ 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/Tests/MQTTnet.Core.Tests/RPC_Tests.cs b/Tests/MQTTnet.Core.Tests/RPC_Tests.cs index b0babfa..9f03172 100644 --- a/Tests/MQTTnet.Core.Tests/RPC_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/RPC_Tests.cs @@ -8,6 +8,8 @@ using MQTTnet.Client.Receiving; using MQTTnet.Exceptions; using MQTTnet.Extensions.Rpc; using MQTTnet.Protocol; +using MQTTnet.Client.Options; +using MQTTnet.Formatter; namespace MQTTnet.Tests { @@ -15,26 +17,39 @@ namespace MQTTnet.Tests public class RPC_Tests { [TestMethod] - public async Task Execute_Success() + public Task Execute_Success_With_QoS_0() { - using (var testEnvironment = new TestEnvironment()) - { - await testEnvironment.StartServerAsync(); - var responseSender = await testEnvironment.ConnectClientAsync(); - await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping"); + return Execute_Success(MqttQualityOfServiceLevel.AtMostOnce, MqttProtocolVersion.V311); + } - responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e => - { - await responseSender.PublishAsync(e.ApplicationMessage.Topic + "/response", "pong"); - }); + [TestMethod] + public Task Execute_Success_With_QoS_1() + { + return Execute_Success(MqttQualityOfServiceLevel.AtLeastOnce, MqttProtocolVersion.V311); + } - var requestSender = await testEnvironment.ConnectClientAsync(); + [TestMethod] + public Task Execute_Success_With_QoS_2() + { + return Execute_Success(MqttQualityOfServiceLevel.ExactlyOnce, MqttProtocolVersion.V311); + } - var rpcClient = new MqttRpcClient(requestSender); - var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); + [TestMethod] + public Task Execute_Success_With_QoS_0_MQTT_V5() + { + return Execute_Success(MqttQualityOfServiceLevel.AtMostOnce, MqttProtocolVersion.V500); + } - Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); - } + [TestMethod] + public Task Execute_Success_With_QoS_1_MQTT_V5() + { + return Execute_Success(MqttQualityOfServiceLevel.AtLeastOnce, MqttProtocolVersion.V500); + } + + [TestMethod] + public Task Execute_Success_With_QoS_2_MQTT_V5() + { + return Execute_Success(MqttQualityOfServiceLevel.ExactlyOnce, MqttProtocolVersion.V500); } [TestMethod] @@ -51,5 +66,27 @@ namespace MQTTnet.Tests await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); } } + + private async Task Execute_Success(MqttQualityOfServiceLevel qosLevel, MqttProtocolVersion protocolVersion) + { + using (var testEnvironment = new TestEnvironment()) + { + await testEnvironment.StartServerAsync(); + var responseSender = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithProtocolVersion(protocolVersion)); + await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping"); + + responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e => + { + await responseSender.PublishAsync(e.ApplicationMessage.Topic + "/response", "pong"); + }); + + var requestSender = await testEnvironment.ConnectClientAsync(); + + var rpcClient = new MqttRpcClient(requestSender); + var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", qosLevel); + + Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); + } + } } }