From fe88b7551c36a37af76ef35de8ecfe0da31221e8 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 19 Oct 2017 20:45:37 +0200 Subject: [PATCH] Refactor managed MQTT client --- .../MqttCommunicationAdapterFactory.cs | 5 - .../MQTTnet.NetStandard/MqttClientFactory.cs | 5 +- .../MqttCommunicationAdapterFactory.cs | 6 - .../MqttClientFactory.cs | 5 +- MQTTnet.Core/Client/IMqttClientFactory.cs | 6 +- MQTTnet.Core/Client/IMqttClientManaged.cs | 9 - MQTTnet.Core/Client/MqttClientManaged .cs | 163 ------------------ .../Client/MqttClientManagedOptions.cs | 12 -- MQTTnet.Core/MQTTnet.Core.csproj | 6 - .../IManagedMqttClientOptions.cs | 15 ++ .../IManagedMqttClientStorage.cs} | 4 +- .../ManagedClient/ManagedMqttClient.cs | 156 +++++++++++++++++ .../ManagedMqttClientMessagesManager.cs} | 18 +- .../ManagedClient/ManagedMqttClientOptions.cs | 15 ++ 14 files changed, 206 insertions(+), 219 deletions(-) delete mode 100644 MQTTnet.Core/Client/IMqttClientManaged.cs delete mode 100644 MQTTnet.Core/Client/MqttClientManaged .cs delete mode 100644 MQTTnet.Core/Client/MqttClientManagedOptions.cs create mode 100644 MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs rename MQTTnet.Core/{Client/IMqttClientQueuedStorage.cs => ManagedClient/IManagedMqttClientStorage.cs} (73%) create mode 100644 MQTTnet.Core/ManagedClient/ManagedMqttClient.cs rename MQTTnet.Core/{Client/MqttClientQueuedPersistentMessagesManager.cs => ManagedClient/ManagedMqttClientMessagesManager.cs} (76%) create mode 100644 MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs index ceaac98..05e6fb6 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs @@ -21,11 +21,6 @@ namespace MQTTnet.Implementations return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } - if (options is MqttClientManagedOptions queuedOptions) - { - return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); - } - throw new NotSupportedException(); } } diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 53ce276..0976601 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -1,4 +1,5 @@ using MQTTnet.Core.Client; +using MQTTnet.Core.ManagedClient; using MQTTnet.Implementations; namespace MQTTnet @@ -10,9 +11,9 @@ namespace MQTTnet return new MqttClient(new MqttCommunicationAdapterFactory()); } - public IMqttClientManaged CreateMqttManagedClient() + public ManagedMqttClient CreateManagedMqttClient() { - return new MqttClientManaged(new MqttCommunicationAdapterFactory()); + return new ManagedMqttClient(new MqttCommunicationAdapterFactory()); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs index 9e429fd..05e6fb6 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs @@ -21,12 +21,6 @@ namespace MQTTnet.Implementations return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } - if (options is MqttClientManagedOptions queuedOptions) - { - return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); - } - - throw new NotSupportedException(); } } diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index 53ce276..0976601 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -1,4 +1,5 @@ using MQTTnet.Core.Client; +using MQTTnet.Core.ManagedClient; using MQTTnet.Implementations; namespace MQTTnet @@ -10,9 +11,9 @@ namespace MQTTnet return new MqttClient(new MqttCommunicationAdapterFactory()); } - public IMqttClientManaged CreateMqttManagedClient() + public ManagedMqttClient CreateManagedMqttClient() { - return new MqttClientManaged(new MqttCommunicationAdapterFactory()); + return new ManagedMqttClient(new MqttCommunicationAdapterFactory()); } } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClientFactory.cs b/MQTTnet.Core/Client/IMqttClientFactory.cs index 25e03b3..7547906 100644 --- a/MQTTnet.Core/Client/IMqttClientFactory.cs +++ b/MQTTnet.Core/Client/IMqttClientFactory.cs @@ -1,9 +1,11 @@ -namespace MQTTnet.Core.Client +using MQTTnet.Core.ManagedClient; + +namespace MQTTnet.Core.Client { public interface IMqttClientFactory { IMqttClient CreateMqttClient(); - IMqttClientManaged CreateMqttManagedClient(); + ManagedMqttClient CreateManagedMqttClient(); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClientManaged.cs b/MQTTnet.Core/Client/IMqttClientManaged.cs deleted file mode 100644 index 1fc42f0..0000000 --- a/MQTTnet.Core/Client/IMqttClientManaged.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.Threading.Tasks; - -namespace MQTTnet.Core.Client -{ - public interface IMqttClientManaged : IMqttClient - { - //Task ConnectAsync(MqttClientManagedOptions options); - } -} diff --git a/MQTTnet.Core/Client/MqttClientManaged .cs b/MQTTnet.Core/Client/MqttClientManaged .cs deleted file mode 100644 index 03afc92..0000000 --- a/MQTTnet.Core/Client/MqttClientManaged .cs +++ /dev/null @@ -1,163 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Core.Adapter; -using MQTTnet.Core.Diagnostics; -using MQTTnet.Core.Exceptions; -using MQTTnet.Core.Packets; -using MQTTnet.Core.Protocol; -using MQTTnet.Core.Internal; - -namespace MQTTnet.Core.Client -{ - public class MqttClientManaged: IMqttClientManaged - { - private MqttClientManagedOptions _options; - private int _latestPacketIdentifier; - private readonly BlockingCollection _inflightQueue; - private bool _usePersistance = false; - private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager; - private readonly MqttClient _baseMqttClient; - - public MqttClientManaged(IMqttCommunicationAdapterFactory communicationChannelFactory) - { - _baseMqttClient = new MqttClient(communicationChannelFactory); - _baseMqttClient.Connected += BaseMqttClient_Connected; - _baseMqttClient.Disconnected += BaseMqttClient_Disconnected; - _baseMqttClient.ApplicationMessageReceived += BaseMqttClient_ApplicationMessageReceived; - _inflightQueue = new BlockingCollection(); - } - - private void BaseMqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) - { - ApplicationMessageReceived?.Invoke(this, e); - } - - private void BaseMqttClient_Disconnected(object sender, EventArgs e) - { - Disconnected?.Invoke(this, e); - } - - private void BaseMqttClient_Connected(object sender, EventArgs e) - { - Connected?.Invoke(this, e); - } - - public event EventHandler Connected; - public event EventHandler Disconnected; - public event EventHandler ApplicationMessageReceived; - - public bool IsConnected => _baseMqttClient.IsConnected; - - - public async Task ConnectAsync(IMqttClientOptions options) - { - //TODO VERY BAD - _options = options as MqttClientManagedOptions; - this._usePersistance = _options.Storage != null; - await _baseMqttClient.ConnectAsync(options); - SetupOutgoingPacketProcessingAsync(); - - //load persistentMessages - if (_usePersistance) - { - if (_persistentMessagesManager == null) - _persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options); - await _persistentMessagesManager.LoadMessagesAsync(); - await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false); - } - } - - public async Task DisconnectAsync() - { - await _baseMqttClient.DisconnectAsync(); - } - - public async Task UnsubscribeAsync(IEnumerable topicFilters) - { - await _baseMqttClient.UnsubscribeAsync(topicFilters); - } - - public async Task PublishAsync(IEnumerable applicationMessages) - { - await InternalPublishAsync(applicationMessages, true); - } - - private async Task InternalPublishAsync(IEnumerable applicationMessages, bool appendIfUsePersistance) - { - ThrowIfNotConnected(); - - foreach (var applicationMessage in applicationMessages) - { - if (_usePersistance && appendIfUsePersistance) - await _persistentMessagesManager.SaveMessageAsync(applicationMessage); - - _inflightQueue.Add(applicationMessage); - } - } - - public async Task> SubscribeAsync(IEnumerable topicFilters) - { - return await _baseMqttClient.SubscribeAsync(topicFilters); - } - - private void ThrowIfNotConnected() - { - if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); - } - - private ushort GetNewPacketIdentifier() - { - return (ushort)Interlocked.Increment(ref _latestPacketIdentifier); - } - - - private void SetupOutgoingPacketProcessingAsync() - { - Task.Factory.StartNew( - () => SendPackets(_baseMqttClient._cancellationTokenSource.Token), - _baseMqttClient._cancellationTokenSource.Token, - TaskCreationOptions.LongRunning, - TaskScheduler.Default).ConfigureAwait(false); - } - - private async Task SendPackets(CancellationToken cancellationToken) - { - MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets."); - MqttApplicationMessage messageInQueue = null; - - try - { - while (!cancellationToken.IsCancellationRequested) - { - messageInQueue = _inflightQueue.Take(); - await _baseMqttClient.PublishAsync(new List() { messageInQueue }); - if (_usePersistance) - await _persistentMessagesManager.Remove(messageInQueue); - } - } - catch (OperationCanceledException) - { - } - catch (MqttCommunicationException exception) - { - MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets."); - //message not send, equeue it again - if (messageInQueue != null) - _inflightQueue.Add(messageInQueue); - } - catch (Exception exception) - { - MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets."); - await DisconnectAsync().ConfigureAwait(false); - } - finally - { - MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); - } - } - } -} diff --git a/MQTTnet.Core/Client/MqttClientManagedOptions.cs b/MQTTnet.Core/Client/MqttClientManagedOptions.cs deleted file mode 100644 index 77feabe..0000000 --- a/MQTTnet.Core/Client/MqttClientManagedOptions.cs +++ /dev/null @@ -1,12 +0,0 @@ - -using System; - -namespace MQTTnet.Core.Client -{ - public class MqttClientManagedOptions: MqttClientTcpOptions - { - public bool UseAutoReconnect { get; set; } - public TimeSpan AutoReconnectDelay { get; set; } - public IMqttClientQueuedStorage Storage { get; set; } - } -} diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index 89fdc9c..2f55ab5 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -22,10 +22,4 @@ - - - - - - \ No newline at end of file diff --git a/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs b/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs new file mode 100644 index 0000000..33c91e3 --- /dev/null +++ b/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs @@ -0,0 +1,15 @@ +using System; +using MQTTnet.Core.Client; + +namespace MQTTnet.Core.ManagedClient +{ + public interface IManagedMqttClientOptions + { + IMqttClientOptions ClientOptions { get; } + + bool UseAutoReconnect { get; } + TimeSpan AutoReconnectDelay { get; } + + IManagedMqttClientStorage Storage { get; } + } +} \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs b/MQTTnet.Core/ManagedClient/IManagedMqttClientStorage.cs similarity index 73% rename from MQTTnet.Core/Client/IMqttClientQueuedStorage.cs rename to MQTTnet.Core/ManagedClient/IManagedMqttClientStorage.cs index 7909a56..39ea89c 100644 --- a/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs +++ b/MQTTnet.Core/ManagedClient/IManagedMqttClientStorage.cs @@ -1,9 +1,9 @@ using System.Collections.Generic; using System.Threading.Tasks; -namespace MQTTnet.Core.Client +namespace MQTTnet.Core.ManagedClient { - public interface IMqttClientQueuedStorage + public interface IManagedMqttClientStorage { Task SaveQueuedMessagesAsync(IList messages); diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs new file mode 100644 index 0000000..1b86a75 --- /dev/null +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs @@ -0,0 +1,156 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Core.Client; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Packets; + +namespace MQTTnet.Core.ManagedClient +{ + public class ManagedMqttClient + { + private readonly List _messageQueue = new List(); + private readonly AutoResetEvent _messageQueueGate = new AutoResetEvent(false); + private readonly MqttClient _mqttClient; + + private IManagedMqttClientOptions _options; + + public ManagedMqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory) + { + if (communicationChannelFactory == null) throw new ArgumentNullException(nameof(communicationChannelFactory)); + + _mqttClient = new MqttClient(communicationChannelFactory); + _mqttClient.Connected += OnConnected; + _mqttClient.Disconnected += OnDisconnected; + _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; + } + + private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) + { + ApplicationMessageReceived?.Invoke(this, e); + } + + private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) + { + //Disconnected?.Invoke(this, e); + } + + private void OnConnected(object sender, EventArgs e) + { + Connected?.Invoke(this, e); + } + + public event EventHandler Connected; + public event EventHandler Disconnected; + public event EventHandler ApplicationMessageReceived; + + public bool IsConnected => _mqttClient.IsConnected; + + + public void Start(IManagedMqttClientOptions options) + { + + } + + public void Stop() + { + + } + + public async Task ConnectAsync(IManagedMqttClientOptions options) + { + //////TODO VERY BAD + ////_options = options as ManagedMqttClientTcpOptions; + ////this._usePersistance = _options.Storage != null; + ////await _mqttClient.ConnectAsync(options); + ////SetupOutgoingPacketProcessingAsync(); + + //////load persistentMessages + ////////if (_usePersistance) + ////////{ + //////// if (_persistentMessagesManager == null) + //////// _persistentMessagesManager = new ManagedMqttClientMessagesManager(_options); + //////// await _persistentMessagesManager.LoadMessagesAsync(); + //////// await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false); + ////////} + } + + public async Task DisconnectAsync() + { + await _mqttClient.DisconnectAsync(); + } + + public async Task UnsubscribeAsync(IEnumerable topicFilters) + { + // TODO: Move all subscriptions to list an subscribe after connection has lost. But only if server session is new. + await _mqttClient.UnsubscribeAsync(topicFilters); + } + + public void Enqueue(IEnumerable applicationMessages) + { + ThrowIfNotConnected(); + + _messageQueue.AddRange(applicationMessages); + _options.Storage?.SaveQueuedMessagesAsync(_messageQueue.ToList()); + + _messageQueueGate.Set(); + } + + public async Task> SubscribeAsync(IEnumerable topicFilters) + { + return await _mqttClient.SubscribeAsync(topicFilters); + } + + private void ThrowIfNotConnected() + { + if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); + } + + private void SetupOutgoingPacketProcessingAsync() + { + //Task.Factory.StartNew( + // () => SendPackets(_mqttClient._cancellationTokenSource.Token), + // _mqttClient._cancellationTokenSource.Token, + // TaskCreationOptions.LongRunning, + // TaskScheduler.Default).ConfigureAwait(false); + } + + private async Task SendPackets(CancellationToken cancellationToken) + { + //MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets."); + //MqttApplicationMessage messageInQueue = null; + + //try + //{ + // while (!cancellationToken.IsCancellationRequested) + // { + // messageInQueue = _inflightQueue.Take(); + // await _mqttClient.PublishAsync(new List() { messageInQueue }); + // if (_usePersistance) + // await _persistentMessagesManager.Remove(messageInQueue); + // } + //} + //catch (OperationCanceledException) + //{ + //} + //catch (MqttCommunicationException exception) + //{ + // MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets."); + // //message not send, equeue it again + // if (messageInQueue != null) + // _inflightQueue.Add(messageInQueue); + //} + //catch (Exception exception) + //{ + // MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets."); + // await DisconnectAsync().ConfigureAwait(false); + //} + //finally + //{ + // MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); + //} + } + } +} diff --git a/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClientMessagesManager.cs similarity index 76% rename from MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs rename to MQTTnet.Core/ManagedClient/ManagedMqttClientMessagesManager.cs index b012e21..7b667ea 100644 --- a/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClientMessagesManager.cs @@ -1,18 +1,16 @@ -using MQTTnet.Core.Diagnostics; -using MQTTnet.Core.Packets; -using System; -using System.Linq; +using System; using System.Collections.Generic; using System.Threading.Tasks; +using MQTTnet.Core.Diagnostics; -namespace MQTTnet.Core.Client +namespace MQTTnet.Core.ManagedClient { - public class MqttClientQueuedPersistentMessagesManager + public class ManagedMqttClientMessagesManager { private readonly IList _persistedMessages = new List(); - private readonly MqttClientManagedOptions _options; + private readonly ManagedMqttClientOptions _options; - public MqttClientQueuedPersistentMessagesManager(MqttClientManagedOptions options) + public ManagedMqttClientMessagesManager(ManagedMqttClientOptions options) { _options = options ?? throw new ArgumentNullException(nameof(options)); } @@ -33,7 +31,7 @@ namespace MQTTnet.Core.Client } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while loading persistent messages."); + MqttNetTrace.Error(nameof(ManagedMqttClientMessagesManager), exception, "Unhandled exception while loading persistent messages."); } } @@ -55,7 +53,7 @@ namespace MQTTnet.Core.Client } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while saving persistent messages."); + MqttNetTrace.Error(nameof(ManagedMqttClientMessagesManager), exception, "Unhandled exception while saving persistent messages."); } } diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs new file mode 100644 index 0000000..bcddb63 --- /dev/null +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs @@ -0,0 +1,15 @@ +using System; +using MQTTnet.Core.Client; + +namespace MQTTnet.Core.ManagedClient +{ + public class ManagedMqttClientOptions : IManagedMqttClientOptions + { + public IMqttClientOptions ClientOptions { get; set; } + + public bool UseAutoReconnect { get; set; } = true; + public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5); + + public IManagedMqttClientStorage Storage { get; set; } + } +}