Browse Source

Refactor sub/unsub management in managed client.

release/3.x.x
Christian 3 years ago
parent
commit
68f1043ef5
9 changed files with 61 additions and 43 deletions
  1. +1
    -1
      Build/MQTTnet.AspNetCore.nuspec
  2. +1
    -1
      Build/MQTTnet.Extensions.ManagedClient.nuspec
  3. +1
    -1
      Build/MQTTnet.Extensions.Rpc.nuspec
  4. +1
    -1
      Build/MQTTnet.Extensions.WebSocket4Net.nuspec
  5. +3
    -2
      Build/MQTTnet.nuspec
  6. +5
    -1
      Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientOptions.cs
  7. +45
    -32
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  8. +2
    -2
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptions.cs
  9. +2
    -2
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptionsBuilder.cs

+ 1
- 1
Build/MQTTnet.AspNetCore.nuspec View File

@@ -12,7 +12,7 @@
<requireLicenseAcceptance>true</requireLicenseAcceptance>
<description>This is a support library to integrate MQTTnet into AspNetCore.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<copyright>Copyright Christian Kratky 2016-2021</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin Blazor</tags>
<dependencies>
<group targetFramework="netstandard2.0">


+ 1
- 1
Build/MQTTnet.Extensions.ManagedClient.nuspec View File

@@ -12,7 +12,7 @@
<requireLicenseAcceptance>true</requireLicenseAcceptance>
<description>This is an extension library which provides a managed MQTT client with additional features using MQTTnet.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<copyright>Copyright Christian Kratky 2016-2021</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin Blazor</tags>
<dependencies>
<dependency id="MQTTnet" version="$nugetVersion" />


+ 1
- 1
Build/MQTTnet.Extensions.Rpc.nuspec View File

@@ -12,7 +12,7 @@
<requireLicenseAcceptance>true</requireLicenseAcceptance>
<description>This is an extension library which allows executing synchronous device calls including a response using MQTTnet.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<copyright>Copyright Christian Kratky 2016-2021</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin Blazor</tags>
<dependencies>
<dependency id="MQTTnet" version="$nugetVersion" />


+ 1
- 1
Build/MQTTnet.Extensions.WebSocket4Net.nuspec View File

@@ -12,7 +12,7 @@
<requireLicenseAcceptance>true</requireLicenseAcceptance>
<description>This is an extension library which allows using _WebSocket4Net_ as transport for MQTTnet clients.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<copyright>Copyright Christian Kratky 2016-2021</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin Blazor</tags>
<dependencies>
<dependency id="MQTTnet" version="$nugetVersion" />


+ 3
- 2
Build/MQTTnet.nuspec View File

@@ -12,7 +12,8 @@
<requireLicenseAcceptance>true</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol.</description>
<releaseNotes>
* [ManagedClient] Extended ReconnectAsync (thanks to @nvsnkv, #1202).
* [ManagedClient] Extended ReconnectAsync (thanks to @nvsnkv, #1202).
* [ManagedClient] Improved Amazon AWS support (thanks to @scottbrogden-iheartmedia, #1209).
* [Server] Fixed a memory/performance leak when using QoS Level 1.
* [Server] Exposed connection timestamp in client status.
* [Server] Refactored connection management code.
@@ -21,7 +22,7 @@
* [MQTTnet, MQTTnet.Extensions.ManagedClient] Fixed bug that allowed invalid subscriptions (Thanks to @marcelwinh).
Git commit: $gitCommit
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<copyright>Copyright Christian Kratky 2016-2021</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin Blazor</tags>
<dependencies>
<group targetFramework="net452" />


+ 5
- 1
Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientOptions.cs View File

@@ -18,6 +18,10 @@ namespace MQTTnet.Extensions.ManagedClient

MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; }

int? MaxSubcribeUnsubscribeMessagesAtOnce { get; }
/// <summary>
/// Defines the maximum amount of topic filters which will be sent in a SUBSCRIBE/UNSUBSCRIBE packet.
/// Amazon AWS limits this number to 8. The default is int.MaxValue.
/// </summary>
int MaxTopicFiltersInSubscribeUnsubscribePackets { get; }
}
}

+ 45
- 32
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -28,6 +28,7 @@ namespace MQTTnet.Extensions.ManagedClient
/// at reconnect and are solely owned by <see cref="MaintainConnectionAsync"/>.
/// </summary>
readonly Dictionary<string, MqttQualityOfServiceLevel> _reconnectSubscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();

readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
readonly HashSet<string> _unsubscriptions = new HashSet<string>();
readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0);
@@ -205,7 +206,6 @@ namespace MQTTnet.Extensions.ManagedClient
await applicationMessageSkippedHandler.HandleApplicationMessageSkippedAsync(applicationMessageSkippedEventArgs).ConfigureAwait(false);
}
}

}
}

@@ -248,6 +248,7 @@ namespace MQTTnet.Extensions.ManagedClient
_unsubscriptions.Add(topic);
}
}

_subscriptionsQueuedSignal.Release();

return Task.FromResult(0);
@@ -481,8 +482,14 @@ namespace MQTTnet.Extensions.ManagedClient

lock (_subscriptions)
{
subscriptions = _subscriptions.Select(i => new MqttTopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value }).ToList();
subscriptions = _subscriptions.Select(i => new MqttTopicFilter
{
Topic = i.Key,
QualityOfServiceLevel = i.Value
}).ToList();
_subscriptions.Clear();
unsubscriptions = new HashSet<string>(_unsubscriptions);
_unsubscriptions.Clear();
}
@@ -492,7 +499,7 @@ namespace MQTTnet.Extensions.ManagedClient
continue;
}

_logger.Verbose("Publishing {0} subscriptions and {1} unsubscriptions)", subscriptions.Count, unsubscriptions.Count);
_logger.Verbose("Publishing {0} added and {1} removed subscriptions", subscriptions.Count, unsubscriptions.Count);

foreach (var unsubscription in unsubscriptions)
{
@@ -504,48 +511,48 @@ namespace MQTTnet.Extensions.ManagedClient
_reconnectSubscriptions[subscription.Topic] = subscription.QualityOfServiceLevel;
}

List<MqttTopicFilter> subs = new List<MqttTopicFilter>();
foreach (var sub in subscriptions)
var addedTopicFilters = new List<MqttTopicFilter>();
foreach (var subscription in subscriptions)
{
subs.Add(sub);
//aws only allows 8 in a single message
if (subs.Count == Options.MaxSubcribeUnsubscribeMessagesAtOnce)
addedTopicFilters.Add(subscription);
if (addedTopicFilters.Count == Options.MaxTopicFiltersInSubscribeUnsubscribePackets)
{
await SendSubscribeUnsubscribe(subs, null).ConfigureAwait(false);
subs.Clear();
await SendSubscribeUnsubscribe(addedTopicFilters, null).ConfigureAwait(false);
addedTopicFilters.Clear();
}
}

await SendSubscribeUnsubscribe(subs, null);
await SendSubscribeUnsubscribe(addedTopicFilters, null).ConfigureAwait(false);

List<string> unSubs = new List<string>();
var removedTopicFilters = new List<string>();
foreach (var unSub in unsubscriptions)
{
unSubs.Add(unSub);
//aws only allows 8 in a single message
if (unSubs.Count == Options.MaxSubcribeUnsubscribeMessagesAtOnce)
removedTopicFilters.Add(unSub);
if (removedTopicFilters.Count == Options.MaxTopicFiltersInSubscribeUnsubscribePackets)
{
await SendSubscribeUnsubscribe(null, unSubs).ConfigureAwait(false);
unSubs.Clear();
await SendSubscribeUnsubscribe(null, removedTopicFilters).ConfigureAwait(false);
removedTopicFilters.Clear();
}
}

await SendSubscribeUnsubscribe(null, unSubs);
await SendSubscribeUnsubscribe(null, removedTopicFilters).ConfigureAwait(false);
}
}

async Task SendSubscribeUnsubscribe(List<MqttTopicFilter> subscriptions, List<string> unsubscriptions)
async Task SendSubscribeUnsubscribe(List<MqttTopicFilter> addedSubscriptions, List<string> removedSubscriptions)
{
try
{
if (unsubscriptions != null && unsubscriptions.Any())
if (removedSubscriptions != null && removedSubscriptions.Any())
{
await InternalClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
await InternalClient.UnsubscribeAsync(removedSubscriptions.ToArray()).ConfigureAwait(false);
}

if (subscriptions != null && subscriptions.Any())
if (addedSubscriptions != null && addedSubscriptions.Any())
{
await InternalClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
await InternalClient.SubscribeAsync(addedSubscriptions.ToArray()).ConfigureAwait(false);
}
}
catch (Exception exception)
@@ -562,20 +569,26 @@ namespace MQTTnet.Extensions.ManagedClient
{
if (_reconnectSubscriptions.Any())
{
var subscriptions = _reconnectSubscriptions.Select(i => new MqttTopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value });
List<MqttTopicFilter> subs = new List<MqttTopicFilter>();
var subscriptions = _reconnectSubscriptions.Select(i => new MqttTopicFilter
{
Topic = i.Key,
QualityOfServiceLevel = i.Value
});
var topicFilters = new List<MqttTopicFilter>();
foreach (var sub in subscriptions)
{
subs.Add(sub);
//aws only allows 8 in a single message
if (subs.Count == Options.MaxSubcribeUnsubscribeMessagesAtOnce)
topicFilters.Add(sub);
if (topicFilters.Count == Options.MaxTopicFiltersInSubscribeUnsubscribePackets)
{
await SendSubscribeUnsubscribe(subs, null).ConfigureAwait(false);
subs.Clear();
await SendSubscribeUnsubscribe(topicFilters, null).ConfigureAwait(false);
topicFilters.Clear();
}
}

await SendSubscribeUnsubscribe(subs, null).ConfigureAwait(false);
await SendSubscribeUnsubscribe(topicFilters, null).ConfigureAwait(false);
}
}
catch (Exception exception)
@@ -665,4 +678,4 @@ namespace MQTTnet.Extensions.ManagedClient
return remainingTime < TimeSpan.Zero ? TimeSpan.Zero : remainingTime;
}
}
}
}

+ 2
- 2
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptions.cs View File

@@ -4,7 +4,7 @@ using MQTTnet.Server;

namespace MQTTnet.Extensions.ManagedClient
{
public class ManagedMqttClientOptions : IManagedMqttClientOptions
public sealed class ManagedMqttClientOptions : IManagedMqttClientOptions
{
public IMqttClientOptions ClientOptions { get; set; }

@@ -18,6 +18,6 @@ namespace MQTTnet.Extensions.ManagedClient

public MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; set; } = MqttPendingMessagesOverflowStrategy.DropNewMessage;

public int? MaxSubcribeUnsubscribeMessagesAtOnce { get; set; } = null;
public int MaxTopicFiltersInSubscribeUnsubscribePackets { get; set; } = int.MaxValue;
}
}

+ 2
- 2
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptionsBuilder.cs View File

@@ -69,9 +69,9 @@ namespace MQTTnet.Extensions.ManagedClient
return this;
}

public ManagedMqttClientOptionsBuilder WithMaxSubcribeUnsubscribeMessagesAtOnce(int? value)
public ManagedMqttClientOptionsBuilder WithMaxTopicFiltersInSubscribeUnsubscribePackets(int value)
{
_options.MaxSubcribeUnsubscribeMessagesAtOnce = value;
_options.MaxTopicFiltersInSubscribeUnsubscribePackets = value;
return this;
}



Loading…
Cancel
Save