From aaed2ed790561a2d079590643ad26b69d809a4a3 Mon Sep 17 00:00:00 2001 From: Christian Date: Thu, 3 May 2018 20:30:40 +0200 Subject: [PATCH] Process client messages in worker thread (extend options) --- .../ApplicationBuilderExtensions.cs | 10 +- .../Client/IMqttClientOptions.cs | 11 +- .../MQTTnet.NetStandard/Client/MqttClient.cs | 282 ++++++++++-------- .../Client/MqttClientOptions.cs | 15 +- .../Client/MqttClientOptionsBuilder.cs | 7 + ...eceivedApplicationMessageProcessingMode.cs | 8 + 6 files changed, 183 insertions(+), 150 deletions(-) create mode 100644 Frameworks/MQTTnet.NetStandard/Client/MqttReceivedApplicationMessageProcessingMode.cs diff --git a/Frameworks/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs b/Frameworks/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs index 5a25712..87b2038 100644 --- a/Frameworks/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs +++ b/Frameworks/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs @@ -17,12 +17,12 @@ namespace MQTTnet.AspNetCore { string subprotocol = null; - if (context.Request.Headers.TryGetValue("Sec-WebSocket-Protocol", out var requestedSubProtocolValues) - && requestedSubProtocolValues.Count > 0 - && requestedSubProtocolValues.Any(v => v.ToLower() == "mqtt") - ) + if (context.Request.Headers.TryGetValue("Sec-WebSocket-Protocol", out var requestedSubProtocolValues)) { - subprotocol = "mqtt"; + // Order the protocols to also match "mqtt", "mqttv-3.1", "mqttv-3.11" etc. + subprotocol = requestedSubProtocolValues + .OrderByDescending(p => p.Length) + .FirstOrDefault(p => p.ToLower().StartsWith("mqtt")); } var adapter = app.ApplicationServices.GetRequiredService(); diff --git a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs index 647f688..aa873af 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs @@ -6,17 +6,16 @@ namespace MQTTnet.Client public interface IMqttClientOptions { string ClientId { get; } - - IMqttClientCredentials Credentials { get; } bool CleanSession { get; } - MqttApplicationMessage WillMessage { get; } + IMqttClientCredentials Credentials { get; } + MqttProtocolVersion ProtocolVersion { get; } + IMqttClientChannelOptions ChannelOptions { get; } TimeSpan CommunicationTimeout { get; } TimeSpan KeepAlivePeriod { get; } TimeSpan? KeepAliveSendInterval { get; } + MqttReceivedApplicationMessageProcessingMode ReceivedApplicationMessageProcessingMode { get; } - MqttProtocolVersion ProtocolVersion { get; } - - IMqttClientChannelOptions ChannelOptions { get; } + MqttApplicationMessage WillMessage { get; } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs index 90790d5..ec86356 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs @@ -23,7 +23,6 @@ namespace MQTTnet.Client private readonly IMqttNetLogger _logger; private IMqttClientOptions _options; - private bool _isReceivingPackets; private CancellationTokenSource _cancellationTokenSource; private Task _packetReceiverTask; private Task _keepAliveMessageSenderTask; @@ -63,16 +62,16 @@ namespace MQTTnet.Client await _adapter.ConnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); _logger.Verbose("Connection with server established."); - await StartReceivingPacketsAsync().ConfigureAwait(false); + StartReceivingPackets(_cancellationTokenSource.Token); - var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false); + var connectResponse = await AuthenticateAsync(options.WillMessage, _cancellationTokenSource.Token).ConfigureAwait(false); _logger.Verbose("MQTT connection with server established."); _sendTracker.Restart(); if (_options.KeepAlivePeriod != TimeSpan.Zero) { - StartSendingKeepAliveMessages(); + StartSendingKeepAliveMessages(_cancellationTokenSource.Token); } IsConnected = true; @@ -101,7 +100,7 @@ namespace MQTTnet.Client { if (!_cancellationTokenSource.IsCancellationRequested) { - await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false); + await SendAsync(new MqttDisconnectPacket(), _cancellationTokenSource.Token).ConfigureAwait(false); } } finally @@ -122,7 +121,7 @@ namespace MQTTnet.Client TopicFilters = topicFilters.ToList() }; - var response = await SendAndReceiveAsync(subscribePacket).ConfigureAwait(false); + var response = await SendAndReceiveAsync(subscribePacket, _cancellationTokenSource.Token).ConfigureAwait(false); if (response.SubscribeReturnCodes.Count != subscribePacket.TopicFilters.Count) { @@ -144,7 +143,7 @@ namespace MQTTnet.Client TopicFilters = topicFilters.ToList() }; - await SendAndReceiveAsync(unsubscribePacket).ConfigureAwait(false); + await SendAndReceiveAsync(unsubscribePacket, _cancellationTokenSource.Token).ConfigureAwait(false); } public async Task PublishAsync(IEnumerable applicationMessages) @@ -161,7 +160,7 @@ namespace MQTTnet.Client case MqttQualityOfServiceLevel.AtMostOnce: { // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - await SendAsync(qosGroup.Cast().ToArray()).ConfigureAwait(false); + await SendAsync(qosGroup, _cancellationTokenSource.Token).ConfigureAwait(false); break; } case MqttQualityOfServiceLevel.AtLeastOnce: @@ -169,7 +168,7 @@ namespace MQTTnet.Client foreach (var publishPacket in qosGroup) { publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(); - await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); + await SendAndReceiveAsync(publishPacket, _cancellationTokenSource.Token).ConfigureAwait(false); } break; @@ -180,13 +179,13 @@ namespace MQTTnet.Client { publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(); - var pubRecPacket = await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); + var pubRecPacket = await SendAndReceiveAsync(publishPacket, _cancellationTokenSource.Token).ConfigureAwait(false); var pubRelPacket = new MqttPubRelPacket { PacketIdentifier = pubRecPacket.PacketIdentifier }; - await SendAndReceiveAsync(pubRelPacket).ConfigureAwait(false); + await SendAndReceiveAsync(pubRelPacket, _cancellationTokenSource.Token).ConfigureAwait(false); } break; @@ -207,7 +206,7 @@ namespace MQTTnet.Client _adapter?.Dispose(); } - private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage) + private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage, CancellationToken cancellationToken) { var connectPacket = new MqttConnectPacket { @@ -219,7 +218,7 @@ namespace MQTTnet.Client WillMessage = willApplicationMessage }; - var response = await SendAndReceiveAsync(connectPacket).ConfigureAwait(false); + var response = await SendAndReceiveAsync(connectPacket, _cancellationTokenSource.Token).ConfigureAwait(false); if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { throw new MqttConnectingFailedException(response.ConnectReturnCode); @@ -264,21 +263,19 @@ namespace MQTTnet.Client try { - if (_packetReceiverTask != null && _packetReceiverTask != sender) - { - _packetReceiverTask.Wait(); - } + await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false); + await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false); if (_keepAliveMessageSenderTask != null && _keepAliveMessageSenderTask != sender) { - _keepAliveMessageSenderTask.Wait(); + await _keepAliveMessageSenderTask.ConfigureAwait(false); } if (_adapter != null) { await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); } - + _logger.Verbose("Disconnected from adapter."); } catch (Exception adapterException) @@ -297,101 +294,19 @@ namespace MQTTnet.Client } } - private async Task ProcessReceivedPacketAsync(MqttBasePacket packet) - { - try - { - if (packet is MqttPublishPacket publishPacket) - { - await ProcessReceivedPublishPacketAsync(publishPacket).ConfigureAwait(false); - return; - } - - if (packet is MqttPingReqPacket) - { - await SendAsync(new MqttPingRespPacket()).ConfigureAwait(false); - return; - } - - if (packet is MqttDisconnectPacket) - { - await DisconnectAsync().ConfigureAwait(false); - return; - } - - if (packet is MqttPubRelPacket pubRelPacket) - { - await ProcessReceivedPubRelPacket(pubRelPacket).ConfigureAwait(false); - return; - } - - _packetDispatcher.Dispatch(packet); - } - catch (Exception exception) - { - _logger.Error(exception, "Unhandled exception while processing received packet."); - } - } - - private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) + private Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken) { - try - { - var applicationMessage = publishPacket.ToApplicationMessage(); - ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(_options.ClientId, applicationMessage)); - } - catch (Exception exception) - { - _logger.Error(exception, "Unhandled exception while handling application message."); - } - } - - private Task ProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket) - { - if (_cancellationTokenSource.IsCancellationRequested) - { - return Task.FromResult(0); - } - - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) - { - FireApplicationMessageReceivedEvent(publishPacket); - return Task.FromResult(0); - } - - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) - { - FireApplicationMessageReceivedEvent(publishPacket); - return SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); - } - - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) - { - // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] - FireApplicationMessageReceivedEvent(publishPacket); - return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); - } - - throw new MqttCommunicationException("Received a not supported QoS level."); - } - - private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) - { - var response = new MqttPubCompPacket - { - PacketIdentifier = pubRelPacket.PacketIdentifier - }; - - return SendAsync(response); + _sendTracker.Restart(); + return _adapter.SendPacketsAsync(_options.CommunicationTimeout, cancellationToken, new[] { packet }); } - private Task SendAsync(params MqttBasePacket[] packets) + private Task SendAsync(IEnumerable packets, CancellationToken cancellationToken) { _sendTracker.Restart(); - return _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, packets); + return _adapter.SendPacketsAsync(_options.CommunicationTimeout, cancellationToken, packets); } - private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket + private async Task SendAndReceiveAsync(MqttBasePacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttBasePacket { ushort? identifier = null; if (requestPacket is IMqttPacketWithIdentifier requestPacketWithIdentifier) @@ -400,18 +315,18 @@ namespace MQTTnet.Client } var packetAwaiter = _packetDispatcher.WaitForPacketAsync(typeof(TResponsePacket), identifier, _options.CommunicationTimeout); - await SendAsync(requestPacket).ConfigureAwait(false); + await SendAsync(requestPacket, cancellationToken).ConfigureAwait(false); return (TResponsePacket)await packetAwaiter.ConfigureAwait(false); } - private async Task SendKeepAliveMessagesAsync() + private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) { _logger.Verbose("Start sending keep alive packets."); try { - while (!_cancellationTokenSource.Token.IsCancellationRequested) + while (!cancellationToken.IsCancellationRequested) { var keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75); if (_options.KeepAliveSendInterval.HasValue) @@ -421,10 +336,10 @@ namespace MQTTnet.Client if (_sendTracker.Elapsed > keepAliveSendInterval) { - await SendAndReceiveAsync(new MqttPingReqPacket()).ConfigureAwait(false); + await SendAndReceiveAsync(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false); } - await Task.Delay(keepAliveSendInterval, _cancellationTokenSource.Token).ConfigureAwait(false); + await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false); } } catch (Exception exception) @@ -440,7 +355,7 @@ namespace MQTTnet.Client { _logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets."); } - + await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false); } finally @@ -449,24 +364,34 @@ namespace MQTTnet.Client } } - private async Task ReceivePacketsAsync() + private async Task ReceivePacketsAsync(CancellationToken cancellationToken) { _logger.Verbose("Start receiving packets."); try { - while (!_cancellationTokenSource.Token.IsCancellationRequested) + while (!cancellationToken.IsCancellationRequested) { - _isReceivingPackets = true; + var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false); - var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, _cancellationTokenSource.Token).ConfigureAwait(false); - - if (_cancellationTokenSource.Token.IsCancellationRequested) + if (cancellationToken.IsCancellationRequested) { return; } - StartProcessReceivedPacket(packet); + if (packet == null) + { + continue; + } + + if (_options.ReceivedApplicationMessageProcessingMode == MqttReceivedApplicationMessageProcessingMode.SingleThread) + { + await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false); + } + else if (_options.ReceivedApplicationMessageProcessingMode == MqttReceivedApplicationMessageProcessingMode.DedicatedThread) + { + StartProcessReceivedPacketAsync(packet, cancellationToken); + } } } catch (Exception exception) @@ -491,26 +416,125 @@ namespace MQTTnet.Client } } - private void StartProcessReceivedPacket(MqttBasePacket packet) + private async Task ProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) { - Task.Run(() => ProcessReceivedPacketAsync(packet), _cancellationTokenSource.Token); + try + { + if (packet is MqttPublishPacket publishPacket) + { + await ProcessReceivedPublishPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false); + return; + } + + if (packet is MqttPingReqPacket) + { + await SendAsync(new MqttPingRespPacket(), cancellationToken).ConfigureAwait(false); + return; + } + + if (packet is MqttDisconnectPacket) + { + await DisconnectAsync().ConfigureAwait(false); + return; + } + + if (packet is MqttPubRelPacket pubRelPacket) + { + await ProcessReceivedPubRelPacket(pubRelPacket, cancellationToken).ConfigureAwait(false); + return; + } + + _packetDispatcher.Dispatch(packet); + } + catch (Exception exception) + { + _logger.Error(exception, "Unhandled exception while processing received packet."); + } } - private async Task StartReceivingPacketsAsync() + private Task ProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken) { - _isReceivingPackets = false; + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) + { + FireApplicationMessageReceivedEvent(publishPacket); + return Task.FromResult(0); + } - _packetReceiverTask = Task.Run(ReceivePacketsAsync, _cancellationTokenSource.Token); + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + { + FireApplicationMessageReceivedEvent(publishPacket); + return SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, cancellationToken); + } - while (!_isReceivingPackets && !_cancellationTokenSource.Token.IsCancellationRequested) + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { - await Task.Delay(TimeSpan.FromMilliseconds(100), _cancellationTokenSource.Token).ConfigureAwait(false); + // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] + FireApplicationMessageReceivedEvent(publishPacket); + return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, cancellationToken); } + + throw new MqttCommunicationException("Received a not supported QoS level."); } - private void StartSendingKeepAliveMessages() + private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken) { - _keepAliveMessageSenderTask = Task.Run(SendKeepAliveMessagesAsync, _cancellationTokenSource.Token); + var response = new MqttPubCompPacket + { + PacketIdentifier = pubRelPacket.PacketIdentifier + }; + + return SendAsync(response, cancellationToken); + } + + private void StartReceivingPackets(CancellationToken cancellationToken) + { + _packetReceiverTask = Task.Run(() => ReceivePacketsAsync(cancellationToken), cancellationToken); + } + + private void StartSendingKeepAliveMessages(CancellationToken cancellationToken) + { + _keepAliveMessageSenderTask = Task.Run(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken); + } + + private void StartProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(() => ProcessReceivedPacketAsync(packet, cancellationToken), cancellationToken); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + + private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) + { + try + { + var applicationMessage = publishPacket.ToApplicationMessage(); + ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(_options.ClientId, applicationMessage)); + } + catch (Exception exception) + { + _logger.Error(exception, "Unhandled exception while handling application message."); + } + } + + private static async Task WaitForTaskAsync(Task task, Task sender) + { + if (task == sender) + { + return; + } + + if (task.IsCanceled || task.IsCompleted || task.IsFaulted) + { + return; + } + + try + { + await task.ConfigureAwait(false); + } + catch (TaskCanceledException) + { + } } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptions.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptions.cs index 9640a47..ae0517a 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptions.cs @@ -5,22 +5,17 @@ namespace MQTTnet.Client { public class MqttClientOptions : IMqttClientOptions { - public MqttApplicationMessage WillMessage { get; set; } - public string ClientId { get; set; } = Guid.NewGuid().ToString("N"); - public bool CleanSession { get; set; } = true; - public IMqttClientCredentials Credentials { get; set; } = new MqttClientCredentials(); + public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + public IMqttClientChannelOptions ChannelOptions { get; set; } + public TimeSpan CommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(15); - public TimeSpan? KeepAliveSendInterval { get; set; } + public MqttReceivedApplicationMessageProcessingMode ReceivedApplicationMessageProcessingMode { get; set; } = MqttReceivedApplicationMessageProcessingMode.SingleThread; - public TimeSpan CommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); - - public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; - - public IMqttClientChannelOptions ChannelOptions { get; set; } + public MqttApplicationMessage WillMessage { get; set; } } } diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptionsBuilder.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptionsBuilder.cs index ad766c3..bc6aa9f 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptionsBuilder.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptionsBuilder.cs @@ -108,6 +108,13 @@ namespace MQTTnet.Client return this; } + public MqttClientOptionsBuilder WithReceivedApplicationMessageProcessingMode( + MqttReceivedApplicationMessageProcessingMode mode) + { + _options.ReceivedApplicationMessageProcessingMode = mode; + return this; + } + public IMqttClientOptions Build() { if (_tlsOptions != null) diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttReceivedApplicationMessageProcessingMode.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttReceivedApplicationMessageProcessingMode.cs new file mode 100644 index 0000000..651abe5 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttReceivedApplicationMessageProcessingMode.cs @@ -0,0 +1,8 @@ +namespace MQTTnet.Client +{ + public enum MqttReceivedApplicationMessageProcessingMode + { + SingleThread, + DedicatedThread + } +}