From e14ea9a240b695452512e1cf2ab5791a7640365c Mon Sep 17 00:00:00 2001 From: Christian Date: Fri, 11 May 2018 20:49:31 +0200 Subject: [PATCH] Add overflow strategy for pending messages queue. --- Build/MQTTnet.nuspec | 4 +- .../MQTTnet.NetStandard/Client/MqttClient.cs | 51 +++++++++---------- .../Diagnostics/IMqttNetLogger.cs | 8 ++- .../Diagnostics/MqttNetLogger.cs | 36 +++++++++---- .../Server/IMqttServerOptions.cs | 1 + .../Server/MqttClientKeepAliveMonitor.cs | 6 +-- .../Server/MqttClientPendingMessagesQueue.cs | 49 ++++++++++-------- .../Server/MqttClientSession.cs | 7 +-- .../MqttPendingMessagesOverflowStrategy.cs | 8 +++ .../Server/MqttServerOptions.cs | 4 +- 10 files changed, 103 insertions(+), 71 deletions(-) create mode 100644 Frameworks/MQTTnet.NetStandard/Server/MqttPendingMessagesOverflowStrategy.cs diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 2be9d25..5f965a3 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -10,9 +10,11 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * [Client] Received messages are now processed in the worker thread by default. Added a new setting for switching back to dedicated threads. + * [Core] Performance optimizations. +* [Client] Received messages are now processed in the worker thread by default. Added a new setting for switching back to dedicated threads. * [Server] Added support for other WebSocket sub protocol formats like mqttv-3.1.1 (thanks to @israellot). * [Server] The takeover of an existing client sessions is now treated as a _clean_ disconnect of the previous client. +* [Server] The pending messages queue per client is now limited to 250 messages. Overflow strategy and count can be changed via options (thanks to @VladimirAkopyan) Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs index edf9b44..4f28a90 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs @@ -57,14 +57,14 @@ namespace MQTTnet.Client _adapter = _adapterFactory.CreateClientAdapter(options, _logger); - _logger.Verbose("Trying to connect with server."); + _logger.Verbose(this, "Trying to connect with server."); await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false); - _logger.Verbose("Connection with server established."); + _logger.Verbose(this, "Connection with server established."); StartReceivingPackets(_cancellationTokenSource.Token); var connectResponse = await AuthenticateAsync(options.WillMessage, _cancellationTokenSource.Token).ConfigureAwait(false); - _logger.Verbose("MQTT connection with server established."); + _logger.Verbose(this, "MQTT connection with server established."); _sendTracker.Restart(); @@ -76,12 +76,12 @@ namespace MQTTnet.Client IsConnected = true; Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent)); - _logger.Info("Connected."); + _logger.Info(this, "Connected."); return new MqttClientConnectResult(connectResponse.IsSessionPresent); } catch (Exception exception) { - _logger.Error(exception, "Error while connecting with server."); + _logger.Error(this, exception, "Error while connecting with server."); await DisconnectInternalAsync(null, exception).ConfigureAwait(false); throw; @@ -233,7 +233,7 @@ namespace MQTTnet.Client private async Task DisconnectInternalAsync(Task sender, Exception exception) { - await _disconnectLock.WaitAsync(); + await _disconnectLock.WaitAsync().ConfigureAwait(false); try { if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) @@ -245,7 +245,7 @@ namespace MQTTnet.Client } catch (Exception adapterException) { - _logger.Warning(adapterException, "Error while disconnecting from adapter."); + _logger.Warning(this, adapterException, "Error while disconnecting from adapter."); } finally { @@ -260,21 +260,16 @@ namespace MQTTnet.Client await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false); await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false); - if (_keepAliveMessageSenderTask != null && _keepAliveMessageSenderTask != sender) - { - await _keepAliveMessageSenderTask.ConfigureAwait(false); - } - if (_adapter != null) { await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false); } - _logger.Verbose("Disconnected from adapter."); + _logger.Verbose(this, "Disconnected from adapter."); } catch (Exception adapterException) { - _logger.Warning(adapterException, "Error while disconnecting from adapter."); + _logger.Warning(this, adapterException, "Error while disconnecting from adapter."); } finally { @@ -283,7 +278,7 @@ namespace MQTTnet.Client _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; - _logger.Info("Disconnected."); + _logger.Info(this, "Disconnected."); Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception)); } } @@ -329,7 +324,7 @@ namespace MQTTnet.Client } catch (MqttCommunicationTimedOutException) { - _logger.Warning($"Timeout while waiting for packet of type '{typeof(TResponsePacket).Namespace}'."); + _logger.Warning(this, null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Namespace); throw; } finally @@ -340,7 +335,7 @@ namespace MQTTnet.Client private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) { - _logger.Verbose("Start sending keep alive packets."); + _logger.Verbose(this, "Start sending keep alive packets."); try { @@ -367,24 +362,24 @@ namespace MQTTnet.Client } else if (exception is MqttCommunicationException) { - _logger.Warning(exception, "MQTT communication exception while sending/receiving keep alive packets."); + _logger.Warning(this, exception, "MQTT communication exception while sending/receiving keep alive packets."); } else { - _logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets."); + _logger.Error(this, exception, "Unhandled exception while sending/receiving keep alive packets."); } await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false); } finally { - _logger.Verbose("Stopped sending keep alive packets."); + _logger.Verbose(this, "Stopped sending keep alive packets."); } } private async Task ReceivePacketsAsync(CancellationToken cancellationToken) { - _logger.Verbose("Start receiving packets."); + _logger.Verbose(this, "Start receiving packets."); try { @@ -419,11 +414,11 @@ namespace MQTTnet.Client } else if (exception is MqttCommunicationException) { - _logger.Warning(exception, "MQTT communication exception while receiving packets."); + _logger.Warning(this, exception, "MQTT communication exception while receiving packets."); } else { - _logger.Error(exception, "Unhandled exception while receiving packets."); + _logger.Error(this, exception, "Unhandled exception while receiving packets."); } await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); @@ -431,7 +426,7 @@ namespace MQTTnet.Client } finally { - _logger.Verbose("Stopped receiving packets."); + _logger.Verbose(this, "Stopped receiving packets."); } } @@ -467,7 +462,7 @@ namespace MQTTnet.Client } catch (Exception exception) { - _logger.Error(exception, "Unhandled exception while processing received packet."); + _logger.Error(this, exception, "Unhandled exception while processing received packet."); } } @@ -508,7 +503,7 @@ namespace MQTTnet.Client private void StartReceivingPackets(CancellationToken cancellationToken) { _packetReceiverTask = Task.Factory.StartNew( - () => ReceivePacketsAsync(cancellationToken), + () => ReceivePacketsAsync(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Current); @@ -518,7 +513,7 @@ namespace MQTTnet.Client { _keepAliveMessageSenderTask = Task.Factory.StartNew( () => SendKeepAliveMessagesAsync(cancellationToken), - cancellationToken, + cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Current); } @@ -539,7 +534,7 @@ namespace MQTTnet.Client } catch (Exception exception) { - _logger.Error(exception, "Unhandled exception while handling application message."); + _logger.Error(this, exception, "Unhandled exception while handling application message."); } } diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs index 6ed16de..b7d85fe 100644 --- a/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs +++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs @@ -8,14 +8,18 @@ namespace MQTTnet.Diagnostics void Verbose(string message, params object[] parameters); + void Verbose(object source, string message, params object[] parameters); + void Info(string message, params object[] parameters); + void Info(object source, string message, params object[] parameters); + void Warning(Exception exception, string message, params object[] parameters); - void Warning(string message, params object[] parameters); + void Warning(object source, Exception exception, string message, params object[] parameters); void Error(Exception exception, string message, params object[] parameters); - void Error(string message, params object[] parameters); + void Error(object source, Exception exception, string message, params object[] parameters); } } diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs index 53598b2..9421921 100644 --- a/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs +++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs @@ -15,35 +15,45 @@ namespace MQTTnet.Diagnostics public void Verbose(string message, params object[] parameters) { - Publish(MqttNetLogLevel.Verbose, null, message, parameters); + Publish(MqttNetLogLevel.Verbose, typeof(TSource), message, parameters, null); + } + + public void Verbose(object source, string message, params object[] parameters) + { + Publish(MqttNetLogLevel.Verbose, source, message, parameters, null); } public void Info(string message, params object[] parameters) { - Publish(MqttNetLogLevel.Info, null, message, parameters); + Publish(MqttNetLogLevel.Info, typeof(TSource), message, parameters, null); + } + + public void Info(object source, string message, params object[] parameters) + { + Publish(MqttNetLogLevel.Info, source, message, parameters, null); } public void Warning(Exception exception, string message, params object[] parameters) { - Publish(MqttNetLogLevel.Warning, exception, message, parameters); + Publish(MqttNetLogLevel.Warning, typeof(TSource), message, parameters, null); } - public void Warning(string message, params object[] parameters) + public void Warning(object source, Exception exception, string message, params object[] parameters) { - Warning(null, message, parameters); + Publish(MqttNetLogLevel.Warning, source, message, parameters, null); } public void Error(Exception exception, string message, params object[] parameters) { - Publish(MqttNetLogLevel.Error, exception, message, parameters); + Publish(MqttNetLogLevel.Error, typeof(TSource), message, parameters, null); } - public void Error(string message, params object[] parameters) + public void Error(object source, Exception exception, string message, params object[] parameters) { - Warning(null, message, parameters); + Publish(MqttNetLogLevel.Error, source, message, parameters, null); } - private void Publish(MqttNetLogLevel logLevel, Exception exception, string message, object[] parameters) + private void Publish(MqttNetLogLevel logLevel, object source, string message, object[] parameters, Exception exception) { var hasLocalListeners = LogMessagePublished != null; var hasGlobalListeners = MqttNetGlobalLogger.HasListeners; @@ -58,7 +68,13 @@ namespace MQTTnet.Diagnostics message = string.Format(message, parameters); } - var traceMessage = new MqttNetLogMessage(_logId, DateTime.Now, Environment.CurrentManagedThreadId, typeof(TSource).Name, logLevel, message, exception); + string sourceName = null; + if (source != null) + { + sourceName = source.GetType().Name; + } + + var traceMessage = new MqttNetLogMessage(_logId, DateTime.Now, Environment.CurrentManagedThreadId, sourceName, logLevel, message, exception); if (hasGlobalListeners) { diff --git a/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs b/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs index 9ae7715..3637319 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs @@ -6,6 +6,7 @@ namespace MQTTnet.Server { int ConnectionBacklog { get; } int MaxPendingMessagesPerClient { get; } + MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; } TimeSpan DefaultCommunicationTimeout { get; } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs index 8dc96e4..ac95807 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs @@ -59,7 +59,7 @@ namespace MQTTnet.Server // Values described here: [MQTT-3.1.2-24]. if (_lastPacketReceivedTracker.Elapsed.TotalSeconds > keepAlivePeriod * 1.5D) { - _logger.Warning("Client '{0}': Did not receive any packet or keep alive signal.", _clientId); + _logger.Warning(this, null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientId); _timeoutCallback?.Invoke(); @@ -74,11 +74,11 @@ namespace MQTTnet.Server } catch (Exception exception) { - _logger.Error(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId); + _logger.Error(this, exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId); } finally { - _logger.Verbose("Client {0}: Stopped checking keep alive timeout.", _clientId); + _logger.Verbose(this, "Client {0}: Stopped checking keep alive timeout.", _clientId); } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs index fd89691..a6e2204 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs @@ -50,25 +50,27 @@ namespace MQTTnet.Server } } - public async Task DropPacket() - { - MqttBasePacket packet = null; - await _queueWaitSemaphore.WaitAsync().ConfigureAwait(false); - if (!_queue.TryDequeue(out packet)) - { - throw new InvalidOperationException(); // should not happen - } - _queueWaitSemaphore.Release(); - } - public void Enqueue(MqttBasePacket packet) { if (packet == null) throw new ArgumentNullException(nameof(packet)); + if (_queue.Count >= _options.MaxPendingMessagesPerClient) + { + if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage) + { + return; + } + + if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage) + { + _queue.TryDequeue(out _); + } + } + _queue.Enqueue(packet); _queueAutoResetEvent.Set(); - _logger.Verbose("Enqueued packet (ClientId: {0}).", _clientSession.ClientId); + _logger.Verbose(this, "Enqueued packet (ClientId: {0}).", _clientSession.ClientId); } private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) @@ -77,7 +79,7 @@ namespace MQTTnet.Server { while (!cancellationToken.IsCancellationRequested) { - await SendNextQueuedPacketAsync(adapter, cancellationToken).ConfigureAwait(false); + await TrySendNextQueuedPacketAsync(adapter, cancellationToken).ConfigureAwait(false); } } catch (OperationCanceledException) @@ -85,19 +87,23 @@ namespace MQTTnet.Server } catch (Exception exception) { - _logger.Error(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId); + _logger.Error(this, exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId); } } - private async Task SendNextQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) + private async Task TrySendNextQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) { MqttBasePacket packet = null; try { - await _queueAutoResetEvent.WaitOneAsync(cancellationToken).ConfigureAwait(false); + if (_queue.IsEmpty) + { + await _queueAutoResetEvent.WaitOneAsync(cancellationToken).ConfigureAwait(false); + } + if (!_queue.TryDequeue(out packet)) { - throw new InvalidOperationException(); // should not happen + return; } if (cancellationToken.IsCancellationRequested) @@ -107,24 +113,25 @@ namespace MQTTnet.Server await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { packet }, cancellationToken).ConfigureAwait(false); - _logger.Verbose("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId); + _logger.Verbose("Enqueued packet sent (ClientId: {0}).", + _clientSession.ClientId); } catch (Exception exception) { if (exception is MqttCommunicationTimedOutException) { - _logger.Warning(exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _clientSession.ClientId); + _logger.Warning(this, exception, "Sending publish packet failed: Timeout (ClientId: {0}).", _clientSession.ClientId); } else if (exception is MqttCommunicationException) { - _logger.Warning(exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _clientSession.ClientId); + _logger.Warning(this, exception, "Sending publish packet failed: Communication exception (ClientId: {0}).", _clientSession.ClientId); } else if (exception is OperationCanceledException) { } else { - _logger.Error(exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId); + _logger.Error(this, exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId); } if (packet is MqttPublishPacket publishPacket) diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs index 7d50bad..f4f8c99 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs @@ -147,10 +147,7 @@ namespace MQTTnet.Server { publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(); } - if (_options.MaxPendingMessagesPerClient <= PendingMessagesQueue.Count) - { - await PendingMessagesQueue.DropPacket(); - } + PendingMessagesQueue.Enqueue(publishPacket); } @@ -276,7 +273,7 @@ namespace MQTTnet.Server return Task.FromResult(0); } - _logger.Warning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); + _logger.Warning(this, null, "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); Stop(MqttClientDisconnectType.NotClean); return Task.FromResult(0); diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttPendingMessagesOverflowStrategy.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttPendingMessagesOverflowStrategy.cs new file mode 100644 index 0000000..1601487 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttPendingMessagesOverflowStrategy.cs @@ -0,0 +1,8 @@ +namespace MQTTnet.Server +{ + public enum MqttPendingMessagesOverflowStrategy + { + DropOldestQueuedMessage, + DropNewMessage + } +} diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs index 4315fcf..e6117fd 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs @@ -11,7 +11,9 @@ namespace MQTTnet.Server public int ConnectionBacklog { get; set; } = 10; public int MaxPendingMessagesPerClient { get; set; } = 250; - + + public MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; set; } = MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage; + public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(15); public Action ConnectionValidator { get; set; }