@@ -14,8 +14,8 @@ namespace MQTTnet.Extensions.ManagedClient
public class ManagedMqttClient : IManagedMqttClient
{
private readonly BlockingCollection<ManagedMqttApplicationMessage> _messageQueue = new BlockingCollection<ManagedMqttApplicationMessage>();
private readonly Concurrent Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Concurrent Dictionary<string, MqttQualityOfServiceLevel>();
private readonly List<string> _unsubscriptions = new Lis t<string>();
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly HashSet<string> _unsubscriptions = new HashSe t<string>();
private readonly IMqttClient _mqttClient;
private readonly IMqttNetChildLogger _logger;
@@ -43,12 +43,16 @@ namespace MQTTnet.Extensions.ManagedClient
public bool IsConnected => _mqttClient.IsConnected;
public bool IsStarted => _connectionCancellationToken != null;
public int PendingApplicationMessagesCount => _messageQueue.Count;
public event EventHandler<MqttClientConnectedEventArgs> Connected;
public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
public event EventHandler SynchronizingSubscriptionsFailed;
public event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed;
public event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed;
public async Task StartAsync(IManagedMqttClientOptions options)
{
@@ -120,10 +124,13 @@ namespace MQTTnet.Extensions.ManagedClient
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
foreach (var topicFilter in topicFilter s)
lock (_subscription s)
{
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
_subscriptionsNotPushed = true;
foreach (var topicFilter in topicFilters)
{
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
_subscriptionsNotPushed = true;
}
}
return Task.FromResult(0);
@@ -131,12 +138,17 @@ namespace MQTTnet.Extensions.ManagedClient
public Task UnsubscribeAsync(IEnumerable<string> topics)
{
foreach (var topic in topics)
if (topics == null) throw new ArgumentNullException(nameof(topics));
lock (_subscriptions)
{
if (_subscriptions.TryRemove(topic, out _))
foreach (var topic in topics )
{
_unsubscriptions.Add(topic);
_subscriptionsNotPushed = true;
if (_subscriptions.Remove(topic))
{
_unsubscriptions.Add(topic);
_subscriptionsNotPushed = true;
}
}
}
@@ -210,7 +222,7 @@ namespace MQTTnet.Extensions.ManagedClient
}
}
private async Task PublishQueuedMessagesAsync (CancellationToken cancellationToken)
private void PublishQueuedMessages (CancellationToken cancellationToken)
{
try
{
@@ -222,12 +234,9 @@ namespace MQTTnet.Extensions.ManagedClient
continue;
}
if (cancellationToken.IsCancellationRequested)
{
continue;
}
cancellationToken.ThrowIfCancellationRequested();
await TryPublishQueuedMessageAsync (message).ConfigureAwait(fals e);
TryPublishQueuedMessage(message);
}
}
catch (OperationCanceledException)
@@ -243,17 +252,13 @@ namespace MQTTnet.Extensions.ManagedClient
}
}
private async Task TryPublishQueuedMessageAsync (ManagedMqttApplicationMessage message)
private void TryPublishQueuedMessage (ManagedMqttApplicationMessage message)
{
Exception transmitException = null;
try
{
await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);
if (_storageManager != null)
{
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
}
_mqttClient.PublishAsync(message.ApplicationMessage).GetAwaiter().GetResult();
_storageManager?.RemoveAsync(message).GetAwaiter().GetResult();
}
catch (MqttCommunicationException exception)
{
@@ -282,13 +287,13 @@ namespace MQTTnet.Extensions.ManagedClient
_logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions");
List<TopicFilter> subscriptions;
Lis t<string> unsubscriptions;
HashSe t<string> unsubscriptions;
lock (_subscriptions)
{
subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList();
unsubscriptions = new Lis t<string>(_unsubscriptions);
unsubscriptions = new HashSe t<string>(_unsubscriptions);
_unsubscriptions.Clear();
_subscriptionsNotPushed = false;
@@ -316,7 +321,7 @@ namespace MQTTnet.Extensions.ManagedClient
_logger.Warning(exception, "Synchronizing subscriptions failed.");
_subscriptionsNotPushed = true;
SynchronizingSubscriptionsFailed?.Invoke(this, EventArgs.Empty );
SynchronizingSubscriptionsFailed?.Invoke(this, new MqttManagedProcessFailedEventArgs(exception) );
}
}
@@ -332,8 +337,9 @@ namespace MQTTnet.Extensions.ManagedClient
await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
return ReconnectionResult.Reconnected;
}
catch (Exception)
catch (Exception exception )
{
ConnectingFailed?.Invoke(this, new MqttManagedProcessFailedEventArgs(exception));
return ReconnectionResult.NotConnected;
}
}
@@ -364,9 +370,7 @@ namespace MQTTnet.Extensions.ManagedClient
_publishingCancellationToken = cts;
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Factory.StartNew(() => PublishQueuedMessages(cts.Token), cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
private void StopPublishing()