From 870ed00e3c32c45797815235a34d8bfd2812bb16 Mon Sep 17 00:00:00 2001 From: Gerardo Date: Thu, 19 Oct 2017 13:21:06 +0200 Subject: [PATCH] Refactoring Added almost of the hits. Options modified but must be reviewed completely. --- .../MqttCommunicationAdapterFactory.cs | 4 +- .../Implementations/MqttTcpChannel.cs | 2 +- .../MQTTnet.NetStandard/MqttClientFactory.cs | 4 +- .../MqttCommunicationAdapterFactory.cs | 4 +- .../Implementations/MqttTcpChannel.cs | 2 +- .../MqttClientFactory.cs | 4 +- MQTTnet.Core/Client/IMqttClient.cs | 2 +- MQTTnet.Core/Client/IMqttClientFactory.cs | 2 +- MQTTnet.Core/Client/IMqttClientManaged.cs | 9 + MQTTnet.Core/Client/IMqttClientQueued.cs | 9 - .../Client/IMqttClientQueuedStorage.cs | 4 +- .../IMqttCommunicationAdapterFactory.cs | 2 +- MQTTnet.Core/Client/MqttClient.cs | 5 +- MQTTnet.Core/Client/MqttClientManaged .cs | 163 +++++++++++++++++ .../Client/MqttClientManagedOptions.cs | 12 ++ MQTTnet.Core/Client/MqttClientOptions.cs | 24 +-- MQTTnet.Core/Client/MqttClientQueued.cs | 166 ------------------ .../Client/MqttClientQueuedOptions.cs | 11 -- ...ttClientQueuedPersistentMessagesManager.cs | 8 +- MQTTnet.Core/Client/MqttClientTcpOptions.cs | 26 ++- .../Client/MqttClientWebSocketOptions.cs | 26 ++- .../MqttCommunicationAdapterFactory.cs | 2 +- Tests/MQTTnet.TestApp.NetCore/Program.cs | 11 +- .../MainPage.xaml.cs | 2 +- 24 files changed, 273 insertions(+), 231 deletions(-) create mode 100644 MQTTnet.Core/Client/IMqttClientManaged.cs delete mode 100644 MQTTnet.Core/Client/IMqttClientQueued.cs create mode 100644 MQTTnet.Core/Client/MqttClientManaged .cs create mode 100644 MQTTnet.Core/Client/MqttClientManagedOptions.cs delete mode 100644 MQTTnet.Core/Client/MqttClientQueued.cs delete mode 100644 MQTTnet.Core/Client/MqttClientQueuedOptions.cs diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs index 218bb8c..ceaac98 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs @@ -7,7 +7,7 @@ namespace MQTTnet.Implementations { public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory { - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); @@ -21,7 +21,7 @@ namespace MQTTnet.Implementations return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } - if (options is MqttClientQueuedOptions queuedOptions) + if (options is MqttClientManagedOptions queuedOptions) { return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index dfe3654..2fe1804 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -121,7 +121,7 @@ namespace MQTTnet.Implementations return _options.TlsOptions.AllowUntrustedCertificates; } - private static X509CertificateCollection LoadCertificates(MqttClientOptions options) + private static X509CertificateCollection LoadCertificates(IMqttClientOptions options) { var certificates = new X509CertificateCollection(); if (options.TlsOptions.Certificates == null) diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 5bc8336..53ce276 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -10,9 +10,9 @@ namespace MQTTnet return new MqttClient(new MqttCommunicationAdapterFactory()); } - public IMqttClientQueued CreateMqttQueuedClient() + public IMqttClientManaged CreateMqttManagedClient() { - return new MqttClientQueued(new MqttCommunicationAdapterFactory()); + return new MqttClientManaged(new MqttCommunicationAdapterFactory()); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs index 36a34b1..9e429fd 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs @@ -7,7 +7,7 @@ namespace MQTTnet.Implementations { public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory { - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); @@ -21,7 +21,7 @@ namespace MQTTnet.Implementations return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } - if (options is MqttClientQueuedOptions queuedOptions) + if (options is MqttClientManagedOptions queuedOptions) { return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs index 03206f1..5335dae 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs @@ -89,7 +89,7 @@ namespace MQTTnet.Implementations RawReceiveStream = ReceiveStream; } - private static Certificate LoadCertificate(MqttClientOptions options) + private static Certificate LoadCertificate(IMqttClientOptions options) { if (options.TlsOptions.Certificates == null || !options.TlsOptions.Certificates.Any()) { diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index 5bc8336..53ce276 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -10,9 +10,9 @@ namespace MQTTnet return new MqttClient(new MqttCommunicationAdapterFactory()); } - public IMqttClientQueued CreateMqttQueuedClient() + public IMqttClientManaged CreateMqttManagedClient() { - return new MqttClientQueued(new MqttCommunicationAdapterFactory()); + return new MqttClientManaged(new MqttCommunicationAdapterFactory()); } } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClient.cs b/MQTTnet.Core/Client/IMqttClient.cs index 7622867..b46a2bc 100644 --- a/MQTTnet.Core/Client/IMqttClient.cs +++ b/MQTTnet.Core/Client/IMqttClient.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Core.Client event EventHandler Connected; event EventHandler Disconnected; - Task ConnectAsync(MqttClientOptions options); + Task ConnectAsync(IMqttClientOptions options); Task DisconnectAsync(); Task> SubscribeAsync(IEnumerable topicFilters); diff --git a/MQTTnet.Core/Client/IMqttClientFactory.cs b/MQTTnet.Core/Client/IMqttClientFactory.cs index f1d6411..25e03b3 100644 --- a/MQTTnet.Core/Client/IMqttClientFactory.cs +++ b/MQTTnet.Core/Client/IMqttClientFactory.cs @@ -4,6 +4,6 @@ { IMqttClient CreateMqttClient(); - IMqttClientQueued CreateMqttQueuedClient(); + IMqttClientManaged CreateMqttManagedClient(); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClientManaged.cs b/MQTTnet.Core/Client/IMqttClientManaged.cs new file mode 100644 index 0000000..1fc42f0 --- /dev/null +++ b/MQTTnet.Core/Client/IMqttClientManaged.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace MQTTnet.Core.Client +{ + public interface IMqttClientManaged : IMqttClient + { + //Task ConnectAsync(MqttClientManagedOptions options); + } +} diff --git a/MQTTnet.Core/Client/IMqttClientQueued.cs b/MQTTnet.Core/Client/IMqttClientQueued.cs deleted file mode 100644 index c172e0c..0000000 --- a/MQTTnet.Core/Client/IMqttClientQueued.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.Threading.Tasks; - -namespace MQTTnet.Core.Client -{ - public interface IMqttClientQueued: IMqttClient - { - Task ConnectAsync(MqttClientQueuedOptions options); - } -} diff --git a/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs b/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs index ffc5e5f..7909a56 100644 --- a/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs +++ b/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs @@ -5,8 +5,8 @@ namespace MQTTnet.Core.Client { public interface IMqttClientQueuedStorage { - Task SaveInflightMessagesAsync(IList messages); + Task SaveQueuedMessagesAsync(IList messages); - Task> LoadInflightMessagesAsync(); + Task> LoadQueuedMessagesAsync(); } } diff --git a/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs b/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs index 092ea04..0e29590 100644 --- a/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs +++ b/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs @@ -4,6 +4,6 @@ namespace MQTTnet.Core.Client { public interface IMqttCommunicationAdapterFactory { - IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options); + IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 002ebe8..6cdf051 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -18,7 +18,7 @@ namespace MQTTnet.Core.Client private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory; - private MqttClientOptions _options; + private IMqttClientOptions _options; private bool _isReceivingPackets; private int _latestPacketIdentifier; internal CancellationTokenSource _cancellationTokenSource; @@ -35,7 +35,8 @@ namespace MQTTnet.Core.Client public bool IsConnected => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested; - public async Task ConnectAsync(MqttClientOptions options) + + public async Task ConnectAsync(IMqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/MQTTnet.Core/Client/MqttClientManaged .cs b/MQTTnet.Core/Client/MqttClientManaged .cs new file mode 100644 index 0000000..03afc92 --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientManaged .cs @@ -0,0 +1,163 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; +using MQTTnet.Core.Internal; + +namespace MQTTnet.Core.Client +{ + public class MqttClientManaged: IMqttClientManaged + { + private MqttClientManagedOptions _options; + private int _latestPacketIdentifier; + private readonly BlockingCollection _inflightQueue; + private bool _usePersistance = false; + private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager; + private readonly MqttClient _baseMqttClient; + + public MqttClientManaged(IMqttCommunicationAdapterFactory communicationChannelFactory) + { + _baseMqttClient = new MqttClient(communicationChannelFactory); + _baseMqttClient.Connected += BaseMqttClient_Connected; + _baseMqttClient.Disconnected += BaseMqttClient_Disconnected; + _baseMqttClient.ApplicationMessageReceived += BaseMqttClient_ApplicationMessageReceived; + _inflightQueue = new BlockingCollection(); + } + + private void BaseMqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) + { + ApplicationMessageReceived?.Invoke(this, e); + } + + private void BaseMqttClient_Disconnected(object sender, EventArgs e) + { + Disconnected?.Invoke(this, e); + } + + private void BaseMqttClient_Connected(object sender, EventArgs e) + { + Connected?.Invoke(this, e); + } + + public event EventHandler Connected; + public event EventHandler Disconnected; + public event EventHandler ApplicationMessageReceived; + + public bool IsConnected => _baseMqttClient.IsConnected; + + + public async Task ConnectAsync(IMqttClientOptions options) + { + //TODO VERY BAD + _options = options as MqttClientManagedOptions; + this._usePersistance = _options.Storage != null; + await _baseMqttClient.ConnectAsync(options); + SetupOutgoingPacketProcessingAsync(); + + //load persistentMessages + if (_usePersistance) + { + if (_persistentMessagesManager == null) + _persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options); + await _persistentMessagesManager.LoadMessagesAsync(); + await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false); + } + } + + public async Task DisconnectAsync() + { + await _baseMqttClient.DisconnectAsync(); + } + + public async Task UnsubscribeAsync(IEnumerable topicFilters) + { + await _baseMqttClient.UnsubscribeAsync(topicFilters); + } + + public async Task PublishAsync(IEnumerable applicationMessages) + { + await InternalPublishAsync(applicationMessages, true); + } + + private async Task InternalPublishAsync(IEnumerable applicationMessages, bool appendIfUsePersistance) + { + ThrowIfNotConnected(); + + foreach (var applicationMessage in applicationMessages) + { + if (_usePersistance && appendIfUsePersistance) + await _persistentMessagesManager.SaveMessageAsync(applicationMessage); + + _inflightQueue.Add(applicationMessage); + } + } + + public async Task> SubscribeAsync(IEnumerable topicFilters) + { + return await _baseMqttClient.SubscribeAsync(topicFilters); + } + + private void ThrowIfNotConnected() + { + if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); + } + + private ushort GetNewPacketIdentifier() + { + return (ushort)Interlocked.Increment(ref _latestPacketIdentifier); + } + + + private void SetupOutgoingPacketProcessingAsync() + { + Task.Factory.StartNew( + () => SendPackets(_baseMqttClient._cancellationTokenSource.Token), + _baseMqttClient._cancellationTokenSource.Token, + TaskCreationOptions.LongRunning, + TaskScheduler.Default).ConfigureAwait(false); + } + + private async Task SendPackets(CancellationToken cancellationToken) + { + MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets."); + MqttApplicationMessage messageInQueue = null; + + try + { + while (!cancellationToken.IsCancellationRequested) + { + messageInQueue = _inflightQueue.Take(); + await _baseMqttClient.PublishAsync(new List() { messageInQueue }); + if (_usePersistance) + await _persistentMessagesManager.Remove(messageInQueue); + } + } + catch (OperationCanceledException) + { + } + catch (MqttCommunicationException exception) + { + MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets."); + //message not send, equeue it again + if (messageInQueue != null) + _inflightQueue.Add(messageInQueue); + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets."); + await DisconnectAsync().ConfigureAwait(false); + } + finally + { + MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); + } + } + } +} diff --git a/MQTTnet.Core/Client/MqttClientManagedOptions.cs b/MQTTnet.Core/Client/MqttClientManagedOptions.cs new file mode 100644 index 0000000..77feabe --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientManagedOptions.cs @@ -0,0 +1,12 @@ + +using System; + +namespace MQTTnet.Core.Client +{ + public class MqttClientManagedOptions: MqttClientTcpOptions + { + public bool UseAutoReconnect { get; set; } + public TimeSpan AutoReconnectDelay { get; set; } + public IMqttClientQueuedStorage Storage { get; set; } + } +} diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs index f2e28fb..35cdf5f 100644 --- a/MQTTnet.Core/Client/MqttClientOptions.cs +++ b/MQTTnet.Core/Client/MqttClientOptions.cs @@ -1,26 +1,26 @@ -using System; -using MQTTnet.Core.Serializer; +using MQTTnet.Core.Serializer; +using System; namespace MQTTnet.Core.Client { - public abstract class MqttClientOptions + public interface IMqttClientOptions { - public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); + MqttClientTlsOptions TlsOptions { get; set; } - public MqttApplicationMessage WillMessage { get; set; } + MqttApplicationMessage WillMessage { get; set; } - public string UserName { get; set; } + string UserName { get; set; } - public string Password { get; set; } + string Password { get; set; } - public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty); + string ClientId { get; set; } - public bool CleanSession { get; set; } = true; + bool CleanSession { get; set; } - public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); + TimeSpan KeepAlivePeriod { get; set; } - public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + TimeSpan DefaultCommunicationTimeout { get; set; } - public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + MqttProtocolVersion ProtocolVersion { get; set; } } } diff --git a/MQTTnet.Core/Client/MqttClientQueued.cs b/MQTTnet.Core/Client/MqttClientQueued.cs deleted file mode 100644 index 05d5766..0000000 --- a/MQTTnet.Core/Client/MqttClientQueued.cs +++ /dev/null @@ -1,166 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Core.Adapter; -using MQTTnet.Core.Diagnostics; -using MQTTnet.Core.Exceptions; -using MQTTnet.Core.Packets; -using MQTTnet.Core.Protocol; -using MQTTnet.Core.Internal; - -namespace MQTTnet.Core.Client -{ - public class MqttClientQueued : MqttClient, IMqttClientQueued - { - private MqttClientQueuedOptions _options; - private int _latestPacketIdentifier; - private readonly ConcurrentQueue _inflightQueue; - private bool _usePersistance = false; - private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager; - - public MqttClientQueued(IMqttCommunicationAdapterFactory communicationChannelFactory) : base(communicationChannelFactory) - { - _inflightQueue = new ConcurrentQueue(); - } - - - public async Task ConnectAsync(MqttClientQueuedOptions options) - { - try - { - _options = options; - this._usePersistance = _options.UsePersistence; - await base.ConnectAsync(options); - SetupOutgoingPacketProcessingAsync(); - - //load persistentMessages - if (_usePersistance) - { - if (_persistentMessagesManager == null) - _persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options); - await _persistentMessagesManager.LoadMessagesAsync(); - await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false); - } - } - catch (Exception) - { - await DisconnectAsync().ConfigureAwait(false); - throw; - } - } - - public new async Task PublishAsync(IEnumerable applicationMessages) - { - await InternalPublishAsync(applicationMessages, true); - } - - private async Task InternalPublishAsync(IEnumerable applicationMessages, bool appendIfUsePersistance) - { - ThrowIfNotConnected(); - - foreach (var applicationMessage in applicationMessages) - { - if (_usePersistance && appendIfUsePersistance) - await _persistentMessagesManager.SaveMessageAsync(applicationMessage); - - _inflightQueue.Enqueue(applicationMessage); - } - } - - public new async Task> SubscribeAsync(IEnumerable topicFilters) - { - return await base.SubscribeAsync(topicFilters); - } - - private void ThrowIfNotConnected() - { - if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); - } - - private ushort GetNewPacketIdentifier() - { - return (ushort)Interlocked.Increment(ref _latestPacketIdentifier); - } - - - private void SetupOutgoingPacketProcessingAsync() - { -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Factory.StartNew( - () => SendPackets(base._cancellationTokenSource.Token), - base._cancellationTokenSource.Token, - TaskCreationOptions.LongRunning, - TaskScheduler.Default).ConfigureAwait(false); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - } - - private async Task SendPackets(CancellationToken cancellationToken) - { - MqttNetTrace.Information(nameof(MqttClient), "Start sending packets."); - MqttApplicationMessage messageToSend = null; - try - { - while (!cancellationToken.IsCancellationRequested) - { - while (_inflightQueue.TryDequeue(out messageToSend)) - { - MqttQualityOfServiceLevel qosGroup = messageToSend.QualityOfServiceLevel; - MqttPublishPacket publishPacket = messageToSend.ToPublishPacket(); - switch (qosGroup) - { - case MqttQualityOfServiceLevel.AtMostOnce: - { - // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - await base._adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, base._cancellationTokenSource.Token, publishPacket); - break; - } - - case MqttQualityOfServiceLevel.AtLeastOnce: - { - publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - await base.SendAndReceiveAsync(publishPacket); - break; - } - case MqttQualityOfServiceLevel.ExactlyOnce: - { - publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - var pubRecPacket = await base.SendAndReceiveAsync(publishPacket).ConfigureAwait(false); - await base.SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); - break; - } - default: - { - throw new InvalidOperationException(); - } - } - //delete from persistence - if (_usePersistance) - await _persistentMessagesManager.Remove(messageToSend); - } - }; - } - catch (OperationCanceledException) - { - } - catch (MqttCommunicationException exception) - { - MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets."); - //message not send, equeued again - if (messageToSend != null) - _inflightQueue.Enqueue(messageToSend); - } - catch (Exception exception) - { - MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets."); - await DisconnectAsync().ConfigureAwait(false); - } - finally - { - MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); - } - } - } -} diff --git a/MQTTnet.Core/Client/MqttClientQueuedOptions.cs b/MQTTnet.Core/Client/MqttClientQueuedOptions.cs deleted file mode 100644 index e7af0d7..0000000 --- a/MQTTnet.Core/Client/MqttClientQueuedOptions.cs +++ /dev/null @@ -1,11 +0,0 @@ - - -namespace MQTTnet.Core.Client -{ - public class MqttClientQueuedOptions: MqttClientTcpOptions - { - public bool UsePersistence { get; set; } - - public IMqttClientQueuedStorage Storage { get; set; } - } -} diff --git a/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs b/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs index 235b2bf..b012e21 100644 --- a/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs +++ b/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs @@ -10,9 +10,9 @@ namespace MQTTnet.Core.Client public class MqttClientQueuedPersistentMessagesManager { private readonly IList _persistedMessages = new List(); - private readonly MqttClientQueuedOptions _options; + private readonly MqttClientManagedOptions _options; - public MqttClientQueuedPersistentMessagesManager(MqttClientQueuedOptions options) + public MqttClientQueuedPersistentMessagesManager(MqttClientManagedOptions options) { _options = options ?? throw new ArgumentNullException(nameof(options)); } @@ -21,7 +21,7 @@ namespace MQTTnet.Core.Client { try { - var persistentMessages = await _options.Storage.LoadInflightMessagesAsync(); + var persistentMessages = await _options.Storage.LoadQueuedMessagesAsync(); lock (_persistedMessages) { _persistedMessages.Clear(); @@ -50,7 +50,7 @@ namespace MQTTnet.Core.Client { if (_options.Storage != null) { - await _options.Storage.SaveInflightMessagesAsync(_persistedMessages); + await _options.Storage.SaveQueuedMessagesAsync(_persistedMessages); } } catch (Exception exception) diff --git a/MQTTnet.Core/Client/MqttClientTcpOptions.cs b/MQTTnet.Core/Client/MqttClientTcpOptions.cs index beaa506..98d27dc 100644 --- a/MQTTnet.Core/Client/MqttClientTcpOptions.cs +++ b/MQTTnet.Core/Client/MqttClientTcpOptions.cs @@ -1,7 +1,29 @@ -namespace MQTTnet.Core.Client +using MQTTnet.Core.Serializer; +using System; + +namespace MQTTnet.Core.Client { - public class MqttClientTcpOptions : MqttClientOptions + public class MqttClientTcpOptions : IMqttClientOptions { + public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); + + public MqttApplicationMessage WillMessage { get; set; } + + public string UserName { get; set; } + + public string Password { get; set; } + + public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty); + + public bool CleanSession { get; set; } = true; + + public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); + + public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + + public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + + public string Server { get; set; } public int? Port { get; set; } diff --git a/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs b/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs index 4b90524..b7a904d 100644 --- a/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs +++ b/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs @@ -1,7 +1,29 @@ -namespace MQTTnet.Core.Client +using System; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Core.Client { - public class MqttClientWebSocketOptions : MqttClientOptions + public class MqttClientWebSocketOptions : IMqttClientOptions { public string Uri { get; set; } + + public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); + + public MqttApplicationMessage WillMessage { get; set; } + + public string UserName { get; set; } + + public string Password { get; set; } + + public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty); + + public bool CleanSession { get; set; } = true; + + public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); + + public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + + public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + } } diff --git a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs index c83f83a..bed3218 100644 --- a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet.Core.Tests _adapter = adapter; } - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options) { return _adapter; } diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index 63e4380..eebe4b2 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -57,7 +57,7 @@ namespace MQTTnet.TestApp.NetCore try { - var options = new MqttClientQueuedOptions + var options = new MqttClientManagedOptions { Server = "192.168.0.14", ClientId = "XYZ", @@ -66,11 +66,10 @@ namespace MQTTnet.TestApp.NetCore Password = "passworda", KeepAlivePeriod = TimeSpan.FromSeconds(31), DefaultCommunicationTimeout = TimeSpan.FromSeconds(20), - UsePersistence = true, Storage = new TestStorage(), }; - var client = new MqttClientFactory().CreateMqttQueuedClient(); + var client = new MqttClientFactory().CreateMqttManagedClient(); client.ApplicationMessageReceived += (s, e) => { Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); @@ -177,7 +176,7 @@ namespace MQTTnet.TestApp.NetCore await client.SubscribeAsync(new List { - new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce) + new TopicFilter("#", MqttQualityOfServiceLevel.ExactlyOnce) }); Console.WriteLine("### SUBSCRIBED ###"); @@ -299,7 +298,7 @@ namespace MQTTnet.TestApp.NetCore string serializationFile = System.IO.Path.Combine(Environment.CurrentDirectory, "messages.bin"); private IList _messages = new List(); - public Task> LoadInflightMessagesAsync() + public Task> LoadQueuedMessagesAsync() { //deserialize // MqttApplicationMessage is not serializable @@ -319,7 +318,7 @@ namespace MQTTnet.TestApp.NetCore return Task.FromResult(_messages); } - public Task SaveInflightMessagesAsync(IList messages) + public Task SaveQueuedMessagesAsync(IList messages) { _messages = messages; //serialize diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 07d5955..4565626 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -41,7 +41,7 @@ namespace MQTTnet.TestApp.UniversalWindows private async void Connect(object sender, RoutedEventArgs e) { - MqttClientOptions options = null; + IMqttClientOptions options = null; if (UseTcp.IsChecked == true) { options = new MqttClientTcpOptions