Browse Source

Added option for how many subscriptions to send in a single call to t… (#1209)

* Added option for how many subscriptions to send in a single call to the server.  aws iotcore limits this to 8

* Split subscription messages up to 8 at a time when reconnecting
release/3.x.x
scottbrogden-iheartmedia 3 years ago
committed by GitHub
parent
commit
d01d859227
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 13 deletions
  1. +3
    -1
      Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientOptions.cs
  2. +52
    -9
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  3. +3
    -1
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptions.cs
  4. +8
    -2
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptionsBuilder.cs

+ 3
- 1
Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientOptions.cs View File

@@ -1,4 +1,4 @@
using System;
using System;
using MQTTnet.Client.Options;
using MQTTnet.Server;

@@ -17,5 +17,7 @@ namespace MQTTnet.Extensions.ManagedClient
int MaxPendingMessages { get; }

MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; }

int? MaxSubcribeUnsubscribeMessagesAtOnce { get; }
}
}

+ 52
- 9
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -1,4 +1,4 @@
using MQTTnet.Client;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Publishing;
@@ -504,23 +504,54 @@ namespace MQTTnet.Extensions.ManagedClient
_reconnectSubscriptions[subscription.Topic] = subscription.QualityOfServiceLevel;
}

try
List<MqttTopicFilter> subs = new List<MqttTopicFilter>();
foreach (var sub in subscriptions)
{
if (unsubscriptions.Any())
subs.Add(sub);
//aws only allows 8 in a single message
if (subs.Count == Options.MaxSubcribeUnsubscribeMessagesAtOnce)
{
await InternalClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
await SendSubscribeUnsubscribe(subs, null).ConfigureAwait(false);
subs.Clear();
}
}

await SendSubscribeUnsubscribe(subs, null);

if (subscriptions.Any())
List<string> unSubs = new List<string>();
foreach (var unSub in unsubscriptions)
{
unSubs.Add(unSub);
//aws only allows 8 in a single message
if (unSubs.Count == Options.MaxSubcribeUnsubscribeMessagesAtOnce)
{
await InternalClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
await SendSubscribeUnsubscribe(null, unSubs).ConfigureAwait(false);
unSubs.Clear();
}
}
catch (Exception exception)

await SendSubscribeUnsubscribe(null, unSubs);
}
}

async Task SendSubscribeUnsubscribe(List<MqttTopicFilter> subscriptions, List<string> unsubscriptions)
{
try
{
if (unsubscriptions != null && unsubscriptions.Any())
{
await InternalClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
}

if (subscriptions != null && subscriptions.Any())
{
await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
await InternalClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
}
}
catch (Exception exception)
{
await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
}
}

async Task PublishReconnectSubscriptionsAsync()
@@ -532,7 +563,19 @@ namespace MQTTnet.Extensions.ManagedClient
if (_reconnectSubscriptions.Any())
{
var subscriptions = _reconnectSubscriptions.Select(i => new MqttTopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value });
await InternalClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
List<MqttTopicFilter> subs = new List<MqttTopicFilter>();
foreach (var sub in subscriptions)
{
subs.Add(sub);
//aws only allows 8 in a single message
if (subs.Count == Options.MaxSubcribeUnsubscribeMessagesAtOnce)
{
await SendSubscribeUnsubscribe(subs, null).ConfigureAwait(false);
subs.Clear();
}
}

await SendSubscribeUnsubscribe(subs, null).ConfigureAwait(false);
}
}
catch (Exception exception)


+ 3
- 1
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptions.cs View File

@@ -1,4 +1,4 @@
using System;
using System;
using MQTTnet.Client.Options;
using MQTTnet.Server;

@@ -17,5 +17,7 @@ namespace MQTTnet.Extensions.ManagedClient
public int MaxPendingMessages { get; set; } = int.MaxValue;

public MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; set; } = MqttPendingMessagesOverflowStrategy.DropNewMessage;

public int? MaxSubcribeUnsubscribeMessagesAtOnce { get; set; } = null;
}
}

+ 8
- 2
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptionsBuilder.cs View File

@@ -1,4 +1,4 @@
using System;
using System;
using MQTTnet.Client.Options;
using MQTTnet.Server;

@@ -68,7 +68,13 @@ namespace MQTTnet.Extensions.ManagedClient
options(_clientOptionsBuilder);
return this;
}

public ManagedMqttClientOptionsBuilder WithMaxSubcribeUnsubscribeMessagesAtOnce(int? value)
{
_options.MaxSubcribeUnsubscribeMessagesAtOnce = value;
return this;
}

public ManagedMqttClientOptions Build()
{
if (_clientOptionsBuilder != null)


Loading…
Cancel
Save