ソースを参照

Merge pull request #266 from VladimirAkopyan/master

Limiting the growth of PendingMessagesQueue to a configurable Value
release/3.x.x
Christian 6年前
committed by GitHub
コミット
910c123887
この署名に対応する既知のキーがデータベースに存在しません GPGキーID: 4AEE18F83AFDEB23
5個のファイルの変更25行の追加1行の削除
  1. +2
    -0
      Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs
  2. +11
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
  3. +4
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
  4. +2
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs
  5. +6
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttServerOptionsBuilder.cs

+ 2
- 0
Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs ファイルの表示

@@ -5,6 +5,8 @@ namespace MQTTnet.Server
public interface IMqttServerOptions
{
int ConnectionBacklog { get; }
int MaxPendingMessagesPerClient { get; }

TimeSpan DefaultCommunicationTimeout { get; }

Action<MqttConnectionValidatorContext> ConnectionValidator { get; }


+ 11
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs ファイルの表示

@@ -50,6 +50,17 @@ namespace MQTTnet.Server
}
}

public async Task DropPacket()
{
MqttBasePacket packet = null;
await _queueWaitSemaphore.WaitAsync().ConfigureAwait(false);
if (!_queue.TryDequeue(out packet))
{
throw new InvalidOperationException(); // should not happen
}
_queueWaitSemaphore.Release();
}

public void Enqueue(MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));


+ 4
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs ファイルの表示

@@ -147,7 +147,10 @@ namespace MQTTnet.Server
{
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
}

if (_options.MaxPendingMessagesPerClient <= PendingMessagesQueue.Count)
{
await PendingMessagesQueue.DropPacket();
}
PendingMessagesQueue.Enqueue(publishPacket);
}



+ 2
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs ファイルの表示

@@ -9,6 +9,8 @@ namespace MQTTnet.Server
public MqttServerTlsEndpointOptions TlsEndpointOptions { get; } = new MqttServerTlsEndpointOptions();

public int ConnectionBacklog { get; set; } = 10;

public int MaxPendingMessagesPerClient { get; set; } = 250;
public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(15);



+ 6
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttServerOptionsBuilder.cs ファイルの表示

@@ -13,6 +13,12 @@ namespace MQTTnet.Server
return this;
}

public MqttServerOptionsBuilder WithMaxPendingMessagesPerClient(int value)
{
_options.MaxPendingMessagesPerClient = value;
return this;
}

public MqttServerOptionsBuilder WithDefaultCommunicationTimeout(TimeSpan value)
{
_options.DefaultCommunicationTimeout = value;


読み込み中…
キャンセル
保存