Browse Source

Refactor ManagedClient

release/3.x.x
Christian 6 years ago
parent
commit
c26555aca5
1 changed files with 7 additions and 10 deletions
  1. +7
    -10
      Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs

+ 7
- 10
Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs View File

@@ -69,7 +69,7 @@ namespace MQTTnet.ManagedClient
_connectionCancellationToken = new CancellationTokenSource(); _connectionCancellationToken = new CancellationTokenSource();


#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token).ConfigureAwait(false);
Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token).ConfigureAwait(false), _connectionCancellationToken.Token).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed


_logger.Info<ManagedMqttClient>("Started"); _logger.Info<ManagedMqttClient>("Started");
@@ -156,7 +156,7 @@ namespace MQTTnet.ManagedClient
{ {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
await TryMaintainConnectionAsync(cancellationToken);
await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)
@@ -187,7 +187,7 @@ namespace MQTTnet.ManagedClient


if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed) if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
{ {
await PushSubscriptionsAsync().ConfigureAwait(false);
await SynchronizeSubscriptionsAsync().ConfigureAwait(false);


StartPublishing(); StartPublishing();


@@ -280,7 +280,7 @@ namespace MQTTnet.ManagedClient
} }
} }


private async Task PushSubscriptionsAsync()
private async Task SynchronizeSubscriptionsAsync()
{ {
_logger.Info<ManagedMqttClient>(nameof(ManagedMqttClient), "Synchronizing subscriptions"); _logger.Info<ManagedMqttClient>(nameof(ManagedMqttClient), "Synchronizing subscriptions");


@@ -301,12 +301,8 @@ namespace MQTTnet.ManagedClient
{ {
_subscriptionsSemaphore.Release(); _subscriptionsSemaphore.Release();
} }
if (unsubscriptions.Any())
{
await _mqttClient.UnsubscribeAsync(unsubscriptions);
}

if (!subscriptions.Any())
if (!subscriptions.Any() && !unsubscriptions.Any())
{ {
return; return;
} }
@@ -314,6 +310,7 @@ namespace MQTTnet.ManagedClient
try try
{ {
await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false); await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
await _mqttClient.UnsubscribeAsync(unsubscriptions).ConfigureAwait(false);
} }
catch (Exception exception) catch (Exception exception)
{ {


Loading…
Cancel
Save