Kaynağa Gözat

Make storage update for managed client async.

release/3.x.x
Christian Kratky 5 yıl önce
ebeveyn
işleme
dd96cf98a8
1 değiştirilmiş dosya ile 32 ekleme ve 25 silme
  1. +32
    -25
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs

+ 32
- 25
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs Dosyayı Görüntüle

@@ -31,7 +31,7 @@ namespace MQTTnet.Extensions.ManagedClient

private ManagedMqttClientStorageManager _storageManager;

private bool _disposed = false;
private bool _disposed;
private bool _subscriptionsNotPushed;

public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
@@ -102,9 +102,8 @@ namespace MQTTnet.Extensions.ManagedClient

_connectionCancellationToken = new CancellationTokenSource();

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
_maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
_maintainConnectionTask.Forget(_logger);

_logger.Info("Started");
}
@@ -333,20 +332,20 @@ namespace MQTTnet.Extensions.ManagedClient
}
}

private void PublishQueuedMessages(CancellationToken cancellationToken)
private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected)
{
//Peek at the message without dequeueing in order to prevent the
//possibility of the queue growing beyond the configured cap.
//Previously, messages could be re-enqueued if there was an
//exception, and this re-enqueueing did not honor the cap.
//Furthermore, because re-enqueueing would shuffle the order
//of the messages, the DropOldestQueuedMessage strategy would
//be unable to know which message is actually the oldest and would
//instead drop the first item in the queue.
// Peek at the message without dequeueing in order to prevent the
// possibility of the queue growing beyond the configured cap.
// Previously, messages could be re-enqueued if there was an
// exception, and this re-enqueueing did not honor the cap.
// Furthermore, because re-enqueueing would shuffle the order
// of the messages, the DropOldestQueuedMessage strategy would
// be unable to know which message is actually the oldest and would
// instead drop the first item in the queue.
var message = _messageQueue.PeekAndWait();
if (message == null)
{
@@ -355,7 +354,7 @@ namespace MQTTnet.Extensions.ManagedClient

cancellationToken.ThrowIfCancellationRequested();

TryPublishQueuedMessage(message);
await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -371,25 +370,27 @@ namespace MQTTnet.Extensions.ManagedClient
}
}

private void TryPublishQueuedMessage(ManagedMqttApplicationMessage message)
private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
{
Exception transmitException = null;
try
{
_mqttClient.PublishAsync(message.ApplicationMessage).GetAwaiter().GetResult();
await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);

lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
{
//While publishing this message, this.PublishAsync could have booted this
//message off the queue to make room for another (when using a cap
//with the DropOldestQueuedMessage strategy). If the first item
//in the queue is equal to this message, then it's safe to remove
//it from the queue. If not, that means this.PublishAsync has already
//removed it, in which case we don't want to do anything.
// While publishing this message, this.PublishAsync could have booted this
// message off the queue to make room for another (when using a cap
// with the DropOldestQueuedMessage strategy). If the first item
// in the queue is equal to this message, then it's safe to remove
// it from the queue. If not, that means this.PublishAsync has already
// removed it, in which case we don't want to do anything.
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
}

if (_storageManager != null)
{
_storageManager.RemoveAsync(message).GetAwaiter().GetResult();
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
}
}
catch (MqttCommunicationException exception)
@@ -411,9 +412,10 @@ namespace MQTTnet.Extensions.ManagedClient
{
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
}

if (_storageManager != null)
{
_storageManager.RemoveAsync(message).GetAwaiter().GetResult();
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
}
}
}
@@ -424,7 +426,12 @@ namespace MQTTnet.Extensions.ManagedClient
}
finally
{
ApplicationMessageProcessedHandler?.HandleApplicationMessageProcessedAsync(new ApplicationMessageProcessedEventArgs(message, transmitException)).GetAwaiter().GetResult();
var eventHandler = ApplicationMessageProcessedHandler;
if (eventHandler != null)
{
var eventArguments = new ApplicationMessageProcessedEventArgs(message, transmitException);
await eventHandler.HandleApplicationMessageProcessedAsync(eventArguments).ConfigureAwait(false);
}
}
}

@@ -509,7 +516,7 @@ namespace MQTTnet.Extensions.ManagedClient
var cts = new CancellationTokenSource();
_publishingCancellationToken = cts;

Task.Factory.StartNew(() => PublishQueuedMessages(cts.Token), cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token).Forget(_logger);
}

private void StopPublishing()


Yükleniyor…
İptal
Kaydet