diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index dc57e40..74d148d 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -31,7 +31,7 @@ namespace MQTTnet.Extensions.ManagedClient private ManagedMqttClientStorageManager _storageManager; - private bool _disposed = false; + private bool _disposed; private bool _subscriptionsNotPushed; public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) @@ -102,9 +102,8 @@ namespace MQTTnet.Extensions.ManagedClient _connectionCancellationToken = new CancellationTokenSource(); -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed _maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + _maintainConnectionTask.Forget(_logger); _logger.Info("Started"); } @@ -333,20 +332,20 @@ namespace MQTTnet.Extensions.ManagedClient } } - private void PublishQueuedMessages(CancellationToken cancellationToken) + private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) { try { while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected) { - //Peek at the message without dequeueing in order to prevent the - //possibility of the queue growing beyond the configured cap. - //Previously, messages could be re-enqueued if there was an - //exception, and this re-enqueueing did not honor the cap. - //Furthermore, because re-enqueueing would shuffle the order - //of the messages, the DropOldestQueuedMessage strategy would - //be unable to know which message is actually the oldest and would - //instead drop the first item in the queue. + // Peek at the message without dequeueing in order to prevent the + // possibility of the queue growing beyond the configured cap. + // Previously, messages could be re-enqueued if there was an + // exception, and this re-enqueueing did not honor the cap. + // Furthermore, because re-enqueueing would shuffle the order + // of the messages, the DropOldestQueuedMessage strategy would + // be unable to know which message is actually the oldest and would + // instead drop the first item in the queue. var message = _messageQueue.PeekAndWait(); if (message == null) { @@ -355,7 +354,7 @@ namespace MQTTnet.Extensions.ManagedClient cancellationToken.ThrowIfCancellationRequested(); - TryPublishQueuedMessage(message); + await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); } } catch (OperationCanceledException) @@ -371,25 +370,27 @@ namespace MQTTnet.Extensions.ManagedClient } } - private void TryPublishQueuedMessage(ManagedMqttApplicationMessage message) + private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message) { Exception transmitException = null; try { - _mqttClient.PublishAsync(message.ApplicationMessage).GetAwaiter().GetResult(); + await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false); + lock (_messageQueue) //lock to avoid conflict with this.PublishAsync { - //While publishing this message, this.PublishAsync could have booted this - //message off the queue to make room for another (when using a cap - //with the DropOldestQueuedMessage strategy). If the first item - //in the queue is equal to this message, then it's safe to remove - //it from the queue. If not, that means this.PublishAsync has already - //removed it, in which case we don't want to do anything. + // While publishing this message, this.PublishAsync could have booted this + // message off the queue to make room for another (when using a cap + // with the DropOldestQueuedMessage strategy). If the first item + // in the queue is equal to this message, then it's safe to remove + // it from the queue. If not, that means this.PublishAsync has already + // removed it, in which case we don't want to do anything. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); } + if (_storageManager != null) { - _storageManager.RemoveAsync(message).GetAwaiter().GetResult(); + await _storageManager.RemoveAsync(message).ConfigureAwait(false); } } catch (MqttCommunicationException exception) @@ -411,9 +412,10 @@ namespace MQTTnet.Extensions.ManagedClient { _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); } + if (_storageManager != null) { - _storageManager.RemoveAsync(message).GetAwaiter().GetResult(); + await _storageManager.RemoveAsync(message).ConfigureAwait(false); } } } @@ -424,7 +426,12 @@ namespace MQTTnet.Extensions.ManagedClient } finally { - ApplicationMessageProcessedHandler?.HandleApplicationMessageProcessedAsync(new ApplicationMessageProcessedEventArgs(message, transmitException)).GetAwaiter().GetResult(); + var eventHandler = ApplicationMessageProcessedHandler; + if (eventHandler != null) + { + var eventArguments = new ApplicationMessageProcessedEventArgs(message, transmitException); + await eventHandler.HandleApplicationMessageProcessedAsync(eventArguments).ConfigureAwait(false); + } } } @@ -509,7 +516,7 @@ namespace MQTTnet.Extensions.ManagedClient var cts = new CancellationTokenSource(); _publishingCancellationToken = cts; - Task.Factory.StartNew(() => PublishQueuedMessages(cts.Token), cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); + Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token).Forget(_logger); } private void StopPublishing()