using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Client; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; using MQTTnet.Internal; using MQTTnet.Protocol; namespace MQTTnet.Extensions.ManagedClient { public class ManagedMqttClient : IManagedMqttClient { private readonly BlockingCollection _messageQueue = new BlockingCollection(); private readonly Dictionary _subscriptions = new Dictionary(); private readonly AsyncLock _subscriptionsLock = new AsyncLock(); private readonly List _unsubscriptions = new List(); private readonly IMqttClient _mqttClient; private readonly IMqttNetChildLogger _logger; private CancellationTokenSource _connectionCancellationToken; private CancellationTokenSource _publishingCancellationToken; private ManagedMqttClientStorageManager _storageManager; private IManagedMqttClientOptions _options; private bool _subscriptionsNotPushed; public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); _mqttClient.Connected += OnConnected; _mqttClient.Disconnected += OnDisconnected; _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; _logger = logger.CreateChildLogger(nameof(ManagedMqttClient)); } public bool IsConnected => _mqttClient.IsConnected; public bool IsStarted => _connectionCancellationToken != null; public event EventHandler Connected; public event EventHandler Disconnected; public event EventHandler ApplicationMessageReceived; public event EventHandler ApplicationMessageProcessed; public event EventHandler SynchronizingSubscriptionsFailed; public async Task StartAsync(IManagedMqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options)); if (!options.ClientOptions.CleanSession) { throw new NotSupportedException("The managed client does not support existing sessions."); } if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started."); _options = options; if (_options.Storage != null) { _storageManager = new ManagedMqttClientStorageManager(_options.Storage); var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false); foreach (var message in messages) { _messageQueue.Add(message); } } _connectionCancellationToken = new CancellationTokenSource(); #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed _logger.Info("Started"); } public Task StopAsync() { StopPublishing(); StopMaintainingConnection(); while (_messageQueue.Any()) { _messageQueue.Take(); } return Task.FromResult(0); } public Task PublishAsync(MqttApplicationMessage applicationMessage) { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); return PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build()); } public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage) { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); if (_storageManager != null) { await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); } _messageQueue.Add(applicationMessage); } public async Task SubscribeAsync(IEnumerable topicFilters) { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { foreach (var topicFilter in topicFilters) { _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; _subscriptionsNotPushed = true; } } } public async Task UnsubscribeAsync(IEnumerable topics) { using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { foreach (var topic in topics) { if (_subscriptions.Remove(topic)) { _unsubscriptions.Add(topic); _subscriptionsNotPushed = true; } } } } public void Dispose() { _messageQueue?.Dispose(); _subscriptionsLock?.Dispose(); _connectionCancellationToken?.Dispose(); _publishingCancellationToken?.Dispose(); } private async Task MaintainConnectionAsync(CancellationToken cancellationToken) { try { while (!cancellationToken.IsCancellationRequested) { await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false); } } catch (OperationCanceledException) { } catch (Exception exception) { _logger.Error(exception, "Unhandled exception while maintaining connection."); } finally { await _mqttClient.DisconnectAsync().ConfigureAwait(false); _logger.Info("Stopped"); } } private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken) { try { var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false); if (connectionState == ReconnectionResult.NotConnected) { StopPublishing(); await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false); return; } if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed) { await SynchronizeSubscriptionsAsync().ConfigureAwait(false); StartPublishing(); return; } if (connectionState == ReconnectionResult.StillConnected) { await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); } } catch (OperationCanceledException) { } catch (MqttCommunicationException exception) { _logger.Warning(exception, "Communication exception while maintaining connection."); } catch (Exception exception) { _logger.Error(exception, "Unhandled exception while maintaining connection."); } } private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) { try { while (!cancellationToken.IsCancellationRequested) { var message = _messageQueue.Take(cancellationToken); if (message == null) { continue; } if (cancellationToken.IsCancellationRequested) { continue; } await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); } } catch (OperationCanceledException) { } catch (Exception exception) { _logger.Error(exception, "Unhandled exception while publishing queued application messages."); } finally { _logger.Verbose("Stopped publishing messages."); } } private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message) { Exception transmitException = null; try { await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false); if (_storageManager != null) { await _storageManager.RemoveAsync(message).ConfigureAwait(false); } } catch (MqttCommunicationException exception) { transmitException = exception; _logger.Warning(exception, $"Publishing application ({message.Id}) message failed."); if (message.ApplicationMessage.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) { _messageQueue.Add(message); } } catch (Exception exception) { transmitException = exception; _logger.Error(exception, $"Unhandled exception while publishing application message ({message.Id})."); } finally { ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException)); } } private async Task SynchronizeSubscriptionsAsync() { _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions"); List subscriptions; List unsubscriptions; using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); unsubscriptions = new List(_unsubscriptions); _unsubscriptions.Clear(); _subscriptionsNotPushed = false; } if (!subscriptions.Any() && !unsubscriptions.Any()) { return; } try { if (subscriptions.Any()) { await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false); } if (unsubscriptions.Any()) { await _mqttClient.UnsubscribeAsync(unsubscriptions).ConfigureAwait(false); } } catch (Exception exception) { _logger.Warning(exception, "Synchronizing subscriptions failed."); _subscriptionsNotPushed = true; SynchronizingSubscriptionsFailed?.Invoke(this, EventArgs.Empty); } } private async Task ReconnectIfRequiredAsync() { if (_mqttClient.IsConnected) { return ReconnectionResult.StillConnected; } try { await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false); return ReconnectionResult.Reconnected; } catch (Exception) { return ReconnectionResult.NotConnected; } } private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs) { ApplicationMessageReceived?.Invoke(this, eventArgs); } private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) { Disconnected?.Invoke(this, eventArgs); } private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs) { Connected?.Invoke(this, eventArgs); } private void StartPublishing() { if (_publishingCancellationToken != null) { StopPublishing(); } var cts = new CancellationTokenSource(); _publishingCancellationToken = cts; #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed } private void StopPublishing() { _publishingCancellationToken?.Cancel(false); _publishingCancellationToken?.Dispose(); _publishingCancellationToken = null; } private void StopMaintainingConnection() { _connectionCancellationToken?.Cancel(false); _connectionCancellationToken?.Dispose(); _connectionCancellationToken = null; } } }