@@ -11,10 +11,10 @@ using MQTTnet.Protocol;
namespace MQTTnet.ManagedClient
{
public class ManagedMqttClient : IManagedMqttClient
public class ManagedMqttClient : IManagedMqttClient, IDisposable
{
private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>();
private readonly HashSet<TopicFilter> _subscriptions = new HashSet<TopicFilter >();
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel >();
private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1);
private readonly IMqttClient _mqttClient;
@@ -75,11 +75,8 @@ namespace MQTTnet.ManagedClient
public Task StopAsync()
{
_connectionCancellationToken?.Cancel(false);
_connectionCancellationToken = null;
_publishingCancellationToken?.Cancel(false);
_publishingCancellationToken = null;
StopPublishing();
StopMaintainingConnection();
while (_messageQueue.Any())
{
@@ -88,7 +85,7 @@ namespace MQTTnet.ManagedClient
return Task.FromResult(0);
}
public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));
@@ -113,10 +110,8 @@ namespace MQTTnet.ManagedClient
{
foreach (var topicFilter in topicFilters)
{
if (_subscriptions.Add(topicFilter))
{
_subscriptionsNotPushed = true;
}
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
_subscriptionsNotPushed = true;
}
}
finally
@@ -125,14 +120,14 @@ namespace MQTTnet.ManagedClient
}
}
public async Task UnsubscribeAsync(IEnumerable<TopicFilter> topicFilter s)
public async Task UnsubscribeAsync(IEnumerable<string> topic s)
{
await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
try
{
foreach (var topicFilter in topicFilter s)
foreach (var topic in topics)
{
if (_subscriptions.Remove(topicFilter ))
if (_subscriptions.Remove(topic))
{
_subscriptionsNotPushed = true;
}
@@ -144,6 +139,14 @@ namespace MQTTnet.ManagedClient
}
}
public void Dispose()
{
_messageQueue?.Dispose();
_subscriptionsSemaphore?.Dispose();
_connectionCancellationToken?.Dispose();
_publishingCancellationToken?.Dispose();
}
private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
{
try
@@ -174,9 +177,7 @@ namespace MQTTnet.ManagedClient
var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
if (connectionState == ReconnectionResult.NotConnected)
{
_publishingCancellationToken?.Cancel(false);
_publishingCancellationToken = null;
StopPublishing();
await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
return;
}
@@ -185,18 +186,15 @@ namespace MQTTnet.ManagedClient
{
await PushSubscriptionsAsync().ConfigureAwait(false);
_publishingCancellationToken = new CancellationTokenSource();
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(async () => await PublishQueuedMessagesAsync(_publishingCancellationToken.Token).ConfigureAwait(false), _publishingCancellationToken.Token).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
StartPublishing();
return;
}
if (connectionState == ReconnectionResult.StillConnected)
{
await Task.Delay(TimeSpan.FromSeconds(1), _ connectionC ancellationToken. Token).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -211,7 +209,7 @@ namespace MQTTnet.ManagedClient
_logger.Error<ManagedMqttClient>(exception, "Unhandled exception while maintaining connection.");
}
}
private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
{
try
@@ -279,7 +277,7 @@ namespace MQTTnet.ManagedClient
await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
try
{
subscriptions = _subscriptions.ToList();
subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)). ToList();
_subscriptionsNotPushed = false;
}
finally
@@ -335,5 +333,35 @@ namespace MQTTnet.ManagedClient
{
Connected?.Invoke(this, eventArgs);
}
private void StartPublishing()
{
if (_publishingCancellationToken != null)
{
StopPublishing();
}
var cts = new CancellationTokenSource();
_publishingCancellationToken = cts;
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(async () => await PublishQueuedMessagesAsync(cts.Token).ConfigureAwait(false), cts.Token).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
private void StopPublishing()
{
_publishingCancellationToken?.Cancel(false);
_publishingCancellationToken?.Dispose();
_publishingCancellationToken = null;
}
private void StopMaintainingConnection()
{
_connectionCancellationToken?.Cancel(false);
_connectionCancellationToken?.Dispose();
_connectionCancellationToken = null;
}
}
}