Browse Source

ref #230 Limiting the growth of PendingMessagesQueue to a configurable value. it will only store X lated messages, 250 by default.

https://github.com/chkr1011/MQTTnet/issues/230
release/3.x.x
VladimirAkopyan 6 years ago
parent
commit
7b987635f7
6 changed files with 43 additions and 1 deletions
  1. +2
    -0
      Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs
  2. +11
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
  3. +4
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
  4. +2
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs
  5. +6
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttServerOptionsBuilder.cs
  6. +18
    -0
      MQTTnet.sln

+ 2
- 0
Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs View File

@@ -5,6 +5,8 @@ namespace MQTTnet.Server
public interface IMqttServerOptions
{
int ConnectionBacklog { get; }
int MaxPendingMessagesPerClient { get; }

TimeSpan DefaultCommunicationTimeout { get; }

Action<MqttConnectionValidatorContext> ConnectionValidator { get; }


+ 11
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs View File

@@ -50,6 +50,17 @@ namespace MQTTnet.Server
}
}

public async Task DropPacket()
{
MqttBasePacket packet = null;
await _queueWaitSemaphore.WaitAsync().ConfigureAwait(false);
if (!_queue.TryDequeue(out packet))
{
throw new InvalidOperationException(); // should not happen
}
_queueWaitSemaphore.Release();
}

public void Enqueue(MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));


+ 4
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs View File

@@ -142,7 +142,10 @@ namespace MQTTnet.Server
{
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
}

if (_options.MaxPendingMessagesPerClient <= PendingMessagesQueue.Count)
{
await PendingMessagesQueue.DropPacket();
}
PendingMessagesQueue.Enqueue(publishPacket);
}



+ 2
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs View File

@@ -9,6 +9,8 @@ namespace MQTTnet.Server
public MqttServerTlsEndpointOptions TlsEndpointOptions { get; } = new MqttServerTlsEndpointOptions();

public int ConnectionBacklog { get; set; } = 10;

public int MaxPendingMessagesPerClient { get; set; } = 250;
public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(15);



+ 6
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttServerOptionsBuilder.cs View File

@@ -13,6 +13,12 @@ namespace MQTTnet.Server
return this;
}

public MqttServerOptionsBuilder WithMaxPendingMessagesPerClient(int value)
{
_options.MaxPendingMessagesPerClient = value;
return this;
}

public MqttServerOptionsBuilder WithDefaultCommunicationTimeout(TimeSpan value)
{
_options.DefaultCommunicationTimeout = value;


+ 18
- 0
MQTTnet.sln View File

@@ -38,6 +38,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Extensions", "Extensions",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.Rpc", "Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj", "{C444E9C8-95FA-430E-9126-274129DE16CD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTserver", "MQTTserver\MQTTserver.csproj", "{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -168,6 +170,22 @@ Global
{C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x64.Build.0 = Release|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x86.ActiveCfg = Release|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x86.Build.0 = Release|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Debug|ARM.ActiveCfg = Debug|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Debug|ARM.Build.0 = Debug|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Debug|x64.ActiveCfg = Debug|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Debug|x64.Build.0 = Debug|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Debug|x86.ActiveCfg = Debug|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Debug|x86.Build.0 = Debug|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Release|Any CPU.Build.0 = Release|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Release|ARM.ActiveCfg = Release|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Release|ARM.Build.0 = Release|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Release|x64.ActiveCfg = Release|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Release|x64.Build.0 = Release|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Release|x86.ActiveCfg = Release|Any CPU
{5FCCD9CE-9E7E-40C1-9B99-3328FED9EED7}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE


Loading…
Cancel
Save