diff --git a/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs b/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs index cc58eeb..9ae7715 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs @@ -5,6 +5,8 @@ namespace MQTTnet.Server public interface IMqttServerOptions { int ConnectionBacklog { get; } + int MaxPendingMessagesPerClient { get; } + TimeSpan DefaultCommunicationTimeout { get; } Action ConnectionValidator { get; } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs index 1d00528..fd89691 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs @@ -50,6 +50,17 @@ namespace MQTTnet.Server } } + public async Task DropPacket() + { + MqttBasePacket packet = null; + await _queueWaitSemaphore.WaitAsync().ConfigureAwait(false); + if (!_queue.TryDequeue(out packet)) + { + throw new InvalidOperationException(); // should not happen + } + _queueWaitSemaphore.Release(); + } + public void Enqueue(MqttBasePacket packet) { if (packet == null) throw new ArgumentNullException(nameof(packet)); diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs index 183ab8e..7d50bad 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs @@ -147,7 +147,10 @@ namespace MQTTnet.Server { publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(); } - + if (_options.MaxPendingMessagesPerClient <= PendingMessagesQueue.Count) + { + await PendingMessagesQueue.DropPacket(); + } PendingMessagesQueue.Enqueue(publishPacket); } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs index 2b0a9a0..4315fcf 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs @@ -9,6 +9,8 @@ namespace MQTTnet.Server public MqttServerTlsEndpointOptions TlsEndpointOptions { get; } = new MqttServerTlsEndpointOptions(); public int ConnectionBacklog { get; set; } = 10; + + public int MaxPendingMessagesPerClient { get; set; } = 250; public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(15); diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptionsBuilder.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptionsBuilder.cs index 18b2de2..2c86512 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptionsBuilder.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptionsBuilder.cs @@ -13,6 +13,12 @@ namespace MQTTnet.Server return this; } + public MqttServerOptionsBuilder WithMaxPendingMessagesPerClient(int value) + { + _options.MaxPendingMessagesPerClient = value; + return this; + } + public MqttServerOptionsBuilder WithDefaultCommunicationTimeout(TimeSpan value) { _options.DefaultCommunicationTimeout = value;