diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs index 373cdac..fc38fdf 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs @@ -30,6 +30,7 @@ namespace MQTTnet.Implementations if (options.DefaultEndpointOptions.IsEnabled) { _defaultEndpointSocket = new StreamSocketListener(); + _defaultEndpointSocket.Control.NoDelay = true; await _defaultEndpointSocket.BindServiceNameAsync(options.GetDefaultEndpointPort().ToString(), SocketProtectionLevel.PlainSocket); _defaultEndpointSocket.ConnectionReceived += AcceptDefaultEndpointConnectionsAsync; } @@ -57,6 +58,8 @@ namespace MQTTnet.Implementations { try { + args.Socket.Control.NoDelay = true; + var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerMqttCommunicationAdapter(new MqttTcpChannel(args.Socket)); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index e1f8457..903b330 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -142,51 +142,65 @@ namespace MQTTnet.Core.Server } } - private async Task ProcessReceivedPacketAsync(IMqttCommunicationAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken) + private Task ProcessReceivedPacketAsync(IMqttCommunicationAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken) { - if (packet is MqttSubscribePacket subscribePacket) + if (packet is MqttPingReqPacket) { - var subscribeResult = _subscriptionsManager.Subscribe(subscribePacket); - await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket); - EnqueueRetainedMessages(subscribePacket); - - if (subscribeResult.CloseConnection) - { - await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttDisconnectPacket()); - Stop(); - } + return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttPingRespPacket()); } - else if (packet is MqttUnsubscribePacket unsubscribePacket) + + if (packet is MqttPublishPacket publishPacket) { - await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, _subscriptionsManager.Unsubscribe(unsubscribePacket)); + return HandleIncomingPublishPacketAsync(adapter, publishPacket, cancellationToken); } - else if (packet is MqttPublishPacket publishPacket) + + if (packet is MqttPubRelPacket pubRelPacket) { - await HandleIncomingPublishPacketAsync(adapter, publishPacket, cancellationToken); + return HandleIncomingPubRelPacketAsync(adapter, pubRelPacket, cancellationToken); } - else if (packet is MqttPubRelPacket pubRelPacket) + + if (packet is MqttPubRecPacket pubRecPacket) { - await HandleIncomingPubRelPacketAsync(adapter, pubRelPacket, cancellationToken); + return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, pubRecPacket.CreateResponse()); } - else if (packet is MqttPubRecPacket pubRecPacket) + + if (packet is MqttPubAckPacket || packet is MqttPubCompPacket) { - await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, pubRecPacket.CreateResponse()); + // Discard message. + return Task.FromResult(0); } - else if (packet is MqttPubAckPacket || packet is MqttPubCompPacket) + + if (packet is MqttSubscribePacket subscribePacket) { - // Discard message. + return HandleIncomingSubscribePacketAsync(adapter, subscribePacket, cancellationToken); } - else if (packet is MqttPingReqPacket) + + if (packet is MqttUnsubscribePacket unsubscribePacket) { - await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttPingRespPacket()); + return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, _subscriptionsManager.Unsubscribe(unsubscribePacket)); } - else if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) + + if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) { Stop(); + return Task.FromResult(0); } - else + + _logger.LogWarning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); + Stop(); + + return Task.FromResult(0); + } + + private async Task HandleIncomingSubscribePacketAsync(IMqttCommunicationAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) + { + var subscribeResult = _subscriptionsManager.Subscribe(subscribePacket, ClientId); + await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket); + EnqueueRetainedMessages(subscribePacket); + + if (subscribeResult.CloseConnection) { - _logger.LogWarning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); + await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttDisconnectPacket()); Stop(); } } diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index 0e92867..62cc1d8 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -39,12 +39,12 @@ namespace MQTTnet.Core.Server public MqttClientRetainedMessagesManager RetainedMessagesManager { get; } - public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter) + public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter, CancellationToken cancellationToken) { var clientId = string.Empty; try { - if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket)) + if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false) is MqttConnectPacket connectPacket)) { throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); } @@ -57,7 +57,7 @@ namespace MQTTnet.Core.Server var connectReturnCode = ValidateConnection(connectPacket); if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { - await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket + await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode }).ConfigureAwait(false); @@ -67,7 +67,7 @@ namespace MQTTnet.Core.Server var clientSession = GetOrCreateClientSession(connectPacket); - await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket + await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode, IsSessionPresent = clientSession.IsExistingSession diff --git a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs index 7808276..1b0d6c4 100644 --- a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs @@ -16,7 +16,7 @@ namespace MQTTnet.Core.Server _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); } - public MqttClientSubscribeResult Subscribe(MqttSubscribePacket subscribePacket) + public MqttClientSubscribeResult Subscribe(MqttSubscribePacket subscribePacket, string clientId) { if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket)); @@ -27,7 +27,7 @@ namespace MQTTnet.Core.Server { foreach (var topicFilter in subscribePacket.TopicFilters) { - var interceptorContext = new MqttSubscriptionInterceptorContext("", topicFilter); + var interceptorContext = new MqttSubscriptionInterceptorContext(clientId, topicFilter); _options.SubscriptionsInterceptor?.Invoke(interceptorContext); responsePacket.SubscribeReturnCodes.Add(interceptorContext.AcceptSubscription ? MqttSubscribeReturnCode.SuccessMaximumQoS1 : MqttSubscribeReturnCode.Failure); diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index 8f3800c..f1ed791 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -109,7 +109,7 @@ namespace MQTTnet.Core.Server private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) { - eventArgs.SessionTask = Task.Run(async () => await _clientSessionsManager.RunClientSessionAsync(eventArgs.Client), _cancellationTokenSource.Token); + eventArgs.SessionTask = Task.Run(async () => await _clientSessionsManager.RunClientSessionAsync(eventArgs.Client, _cancellationTokenSource.Token), _cancellationTokenSource.Token); } private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs)