diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs index 799ccdd..ceaac98 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs @@ -7,7 +7,7 @@ namespace MQTTnet.Implementations { public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory { - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); @@ -21,6 +21,11 @@ namespace MQTTnet.Implementations return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } + if (options is MqttClientManagedOptions queuedOptions) + { + return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + } + throw new NotSupportedException(); } } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index dfe3654..2fe1804 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -121,7 +121,7 @@ namespace MQTTnet.Implementations return _options.TlsOptions.AllowUntrustedCertificates; } - private static X509CertificateCollection LoadCertificates(MqttClientOptions options) + private static X509CertificateCollection LoadCertificates(IMqttClientOptions options) { var certificates = new X509CertificateCollection(); if (options.TlsOptions.Certificates == null) diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 15da033..53ce276 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -9,5 +9,10 @@ namespace MQTTnet { return new MqttClient(new MqttCommunicationAdapterFactory()); } + + public IMqttClientManaged CreateMqttManagedClient() + { + return new MqttClientManaged(new MqttCommunicationAdapterFactory()); + } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs index 799ccdd..9e429fd 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs @@ -7,7 +7,7 @@ namespace MQTTnet.Implementations { public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory { - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); @@ -21,6 +21,12 @@ namespace MQTTnet.Implementations return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } + if (options is MqttClientManagedOptions queuedOptions) + { + return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + } + + throw new NotSupportedException(); } } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs index 03206f1..5335dae 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs @@ -89,7 +89,7 @@ namespace MQTTnet.Implementations RawReceiveStream = ReceiveStream; } - private static Certificate LoadCertificate(MqttClientOptions options) + private static Certificate LoadCertificate(IMqttClientOptions options) { if (options.TlsOptions.Certificates == null || !options.TlsOptions.Certificates.Any()) { diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index 15da033..53ce276 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -9,5 +9,10 @@ namespace MQTTnet { return new MqttClient(new MqttCommunicationAdapterFactory()); } + + public IMqttClientManaged CreateMqttManagedClient() + { + return new MqttClientManaged(new MqttCommunicationAdapterFactory()); + } } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClient.cs b/MQTTnet.Core/Client/IMqttClient.cs index 7622867..b46a2bc 100644 --- a/MQTTnet.Core/Client/IMqttClient.cs +++ b/MQTTnet.Core/Client/IMqttClient.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Core.Client event EventHandler Connected; event EventHandler Disconnected; - Task ConnectAsync(MqttClientOptions options); + Task ConnectAsync(IMqttClientOptions options); Task DisconnectAsync(); Task> SubscribeAsync(IEnumerable topicFilters); diff --git a/MQTTnet.Core/Client/IMqttClientFactory.cs b/MQTTnet.Core/Client/IMqttClientFactory.cs index 033eb99..25e03b3 100644 --- a/MQTTnet.Core/Client/IMqttClientFactory.cs +++ b/MQTTnet.Core/Client/IMqttClientFactory.cs @@ -3,5 +3,7 @@ public interface IMqttClientFactory { IMqttClient CreateMqttClient(); + + IMqttClientManaged CreateMqttManagedClient(); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClientManaged.cs b/MQTTnet.Core/Client/IMqttClientManaged.cs new file mode 100644 index 0000000..1fc42f0 --- /dev/null +++ b/MQTTnet.Core/Client/IMqttClientManaged.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace MQTTnet.Core.Client +{ + public interface IMqttClientManaged : IMqttClient + { + //Task ConnectAsync(MqttClientManagedOptions options); + } +} diff --git a/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs b/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs new file mode 100644 index 0000000..7909a56 --- /dev/null +++ b/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs @@ -0,0 +1,12 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace MQTTnet.Core.Client +{ + public interface IMqttClientQueuedStorage + { + Task SaveQueuedMessagesAsync(IList messages); + + Task> LoadQueuedMessagesAsync(); + } +} diff --git a/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs b/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs index 092ea04..0e29590 100644 --- a/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs +++ b/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs @@ -4,6 +4,6 @@ namespace MQTTnet.Core.Client { public interface IMqttCommunicationAdapterFactory { - IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options); + IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 80b850a..6cdf051 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -18,11 +18,11 @@ namespace MQTTnet.Core.Client private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory; - private MqttClientOptions _options; + private IMqttClientOptions _options; private bool _isReceivingPackets; private int _latestPacketIdentifier; - private CancellationTokenSource _cancellationTokenSource; - private IMqttCommunicationAdapter _adapter; + internal CancellationTokenSource _cancellationTokenSource; + internal IMqttCommunicationAdapter _adapter; public MqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory) { @@ -35,7 +35,8 @@ namespace MQTTnet.Core.Client public bool IsConnected => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested; - public async Task ConnectAsync(MqttClientOptions options) + + public async Task ConnectAsync(IMqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); @@ -344,7 +345,7 @@ namespace MQTTnet.Core.Client return _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, packet); } - private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket + internal async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket { var packetAwaiter = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout); await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false); diff --git a/MQTTnet.Core/Client/MqttClientManaged .cs b/MQTTnet.Core/Client/MqttClientManaged .cs new file mode 100644 index 0000000..03afc92 --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientManaged .cs @@ -0,0 +1,163 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; +using MQTTnet.Core.Internal; + +namespace MQTTnet.Core.Client +{ + public class MqttClientManaged: IMqttClientManaged + { + private MqttClientManagedOptions _options; + private int _latestPacketIdentifier; + private readonly BlockingCollection _inflightQueue; + private bool _usePersistance = false; + private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager; + private readonly MqttClient _baseMqttClient; + + public MqttClientManaged(IMqttCommunicationAdapterFactory communicationChannelFactory) + { + _baseMqttClient = new MqttClient(communicationChannelFactory); + _baseMqttClient.Connected += BaseMqttClient_Connected; + _baseMqttClient.Disconnected += BaseMqttClient_Disconnected; + _baseMqttClient.ApplicationMessageReceived += BaseMqttClient_ApplicationMessageReceived; + _inflightQueue = new BlockingCollection(); + } + + private void BaseMqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) + { + ApplicationMessageReceived?.Invoke(this, e); + } + + private void BaseMqttClient_Disconnected(object sender, EventArgs e) + { + Disconnected?.Invoke(this, e); + } + + private void BaseMqttClient_Connected(object sender, EventArgs e) + { + Connected?.Invoke(this, e); + } + + public event EventHandler Connected; + public event EventHandler Disconnected; + public event EventHandler ApplicationMessageReceived; + + public bool IsConnected => _baseMqttClient.IsConnected; + + + public async Task ConnectAsync(IMqttClientOptions options) + { + //TODO VERY BAD + _options = options as MqttClientManagedOptions; + this._usePersistance = _options.Storage != null; + await _baseMqttClient.ConnectAsync(options); + SetupOutgoingPacketProcessingAsync(); + + //load persistentMessages + if (_usePersistance) + { + if (_persistentMessagesManager == null) + _persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options); + await _persistentMessagesManager.LoadMessagesAsync(); + await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false); + } + } + + public async Task DisconnectAsync() + { + await _baseMqttClient.DisconnectAsync(); + } + + public async Task UnsubscribeAsync(IEnumerable topicFilters) + { + await _baseMqttClient.UnsubscribeAsync(topicFilters); + } + + public async Task PublishAsync(IEnumerable applicationMessages) + { + await InternalPublishAsync(applicationMessages, true); + } + + private async Task InternalPublishAsync(IEnumerable applicationMessages, bool appendIfUsePersistance) + { + ThrowIfNotConnected(); + + foreach (var applicationMessage in applicationMessages) + { + if (_usePersistance && appendIfUsePersistance) + await _persistentMessagesManager.SaveMessageAsync(applicationMessage); + + _inflightQueue.Add(applicationMessage); + } + } + + public async Task> SubscribeAsync(IEnumerable topicFilters) + { + return await _baseMqttClient.SubscribeAsync(topicFilters); + } + + private void ThrowIfNotConnected() + { + if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); + } + + private ushort GetNewPacketIdentifier() + { + return (ushort)Interlocked.Increment(ref _latestPacketIdentifier); + } + + + private void SetupOutgoingPacketProcessingAsync() + { + Task.Factory.StartNew( + () => SendPackets(_baseMqttClient._cancellationTokenSource.Token), + _baseMqttClient._cancellationTokenSource.Token, + TaskCreationOptions.LongRunning, + TaskScheduler.Default).ConfigureAwait(false); + } + + private async Task SendPackets(CancellationToken cancellationToken) + { + MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets."); + MqttApplicationMessage messageInQueue = null; + + try + { + while (!cancellationToken.IsCancellationRequested) + { + messageInQueue = _inflightQueue.Take(); + await _baseMqttClient.PublishAsync(new List() { messageInQueue }); + if (_usePersistance) + await _persistentMessagesManager.Remove(messageInQueue); + } + } + catch (OperationCanceledException) + { + } + catch (MqttCommunicationException exception) + { + MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets."); + //message not send, equeue it again + if (messageInQueue != null) + _inflightQueue.Add(messageInQueue); + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets."); + await DisconnectAsync().ConfigureAwait(false); + } + finally + { + MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); + } + } + } +} diff --git a/MQTTnet.Core/Client/MqttClientManagedOptions.cs b/MQTTnet.Core/Client/MqttClientManagedOptions.cs new file mode 100644 index 0000000..77feabe --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientManagedOptions.cs @@ -0,0 +1,12 @@ + +using System; + +namespace MQTTnet.Core.Client +{ + public class MqttClientManagedOptions: MqttClientTcpOptions + { + public bool UseAutoReconnect { get; set; } + public TimeSpan AutoReconnectDelay { get; set; } + public IMqttClientQueuedStorage Storage { get; set; } + } +} diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs index f2e28fb..35cdf5f 100644 --- a/MQTTnet.Core/Client/MqttClientOptions.cs +++ b/MQTTnet.Core/Client/MqttClientOptions.cs @@ -1,26 +1,26 @@ -using System; -using MQTTnet.Core.Serializer; +using MQTTnet.Core.Serializer; +using System; namespace MQTTnet.Core.Client { - public abstract class MqttClientOptions + public interface IMqttClientOptions { - public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); + MqttClientTlsOptions TlsOptions { get; set; } - public MqttApplicationMessage WillMessage { get; set; } + MqttApplicationMessage WillMessage { get; set; } - public string UserName { get; set; } + string UserName { get; set; } - public string Password { get; set; } + string Password { get; set; } - public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty); + string ClientId { get; set; } - public bool CleanSession { get; set; } = true; + bool CleanSession { get; set; } - public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); + TimeSpan KeepAlivePeriod { get; set; } - public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + TimeSpan DefaultCommunicationTimeout { get; set; } - public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + MqttProtocolVersion ProtocolVersion { get; set; } } } diff --git a/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs b/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs new file mode 100644 index 0000000..b012e21 --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs @@ -0,0 +1,84 @@ +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Packets; +using System; +using System.Linq; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace MQTTnet.Core.Client +{ + public class MqttClientQueuedPersistentMessagesManager + { + private readonly IList _persistedMessages = new List(); + private readonly MqttClientManagedOptions _options; + + public MqttClientQueuedPersistentMessagesManager(MqttClientManagedOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + + public async Task LoadMessagesAsync() + { + try + { + var persistentMessages = await _options.Storage.LoadQueuedMessagesAsync(); + lock (_persistedMessages) + { + _persistedMessages.Clear(); + foreach (var persistentMessage in persistentMessages) + { + _persistedMessages.Add(persistentMessage); + } + } + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while loading persistent messages."); + } + } + + public async Task SaveMessageAsync(MqttApplicationMessage applicationMessage) + { + if (applicationMessage != null) + { + lock (_persistedMessages) + { + _persistedMessages.Add(applicationMessage); + } + } + try + { + if (_options.Storage != null) + { + await _options.Storage.SaveQueuedMessagesAsync(_persistedMessages); + } + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while saving persistent messages."); + } + } + + public List GetMessages() + { + var persistedMessages = new List(); + lock (_persistedMessages) + { + foreach (var persistedMessage in _persistedMessages) + { + persistedMessages.Add(persistedMessage); + } + } + + return persistedMessages; + } + + public async Task Remove(MqttApplicationMessage message) + { + lock (_persistedMessages) + _persistedMessages.Remove(message); + + await SaveMessageAsync(null); + } + } +} diff --git a/MQTTnet.Core/Client/MqttClientTcpOptions.cs b/MQTTnet.Core/Client/MqttClientTcpOptions.cs index beaa506..98d27dc 100644 --- a/MQTTnet.Core/Client/MqttClientTcpOptions.cs +++ b/MQTTnet.Core/Client/MqttClientTcpOptions.cs @@ -1,7 +1,29 @@ -namespace MQTTnet.Core.Client +using MQTTnet.Core.Serializer; +using System; + +namespace MQTTnet.Core.Client { - public class MqttClientTcpOptions : MqttClientOptions + public class MqttClientTcpOptions : IMqttClientOptions { + public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); + + public MqttApplicationMessage WillMessage { get; set; } + + public string UserName { get; set; } + + public string Password { get; set; } + + public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty); + + public bool CleanSession { get; set; } = true; + + public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); + + public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + + public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + + public string Server { get; set; } public int? Port { get; set; } diff --git a/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs b/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs index 4b90524..b7a904d 100644 --- a/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs +++ b/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs @@ -1,7 +1,29 @@ -namespace MQTTnet.Core.Client +using System; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Core.Client { - public class MqttClientWebSocketOptions : MqttClientOptions + public class MqttClientWebSocketOptions : IMqttClientOptions { public string Uri { get; set; } + + public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); + + public MqttApplicationMessage WillMessage { get; set; } + + public string UserName { get; set; } + + public string Password { get; set; } + + public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty); + + public bool CleanSession { get; set; } = true; + + public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); + + public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + + public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + } } diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index 0e9b965..5080593 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -22,14 +22,6 @@ - - - - - - - - \ No newline at end of file diff --git a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs index c83f83a..bed3218 100644 --- a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet.Core.Tests _adapter = adapter; } - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options) { return _adapter; } diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index 31d26db..2e43623 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -19,26 +19,127 @@ namespace MQTTnet.TestApp.NetCore { Console.WriteLine("MQTTnet - TestApp.NetFramework"); Console.WriteLine("1 = Start client"); - Console.WriteLine("2 = Start server"); - Console.WriteLine("3 = Start performance test"); + Console.WriteLine("2 = Start queued client"); + Console.WriteLine("3 = Start server"); + Console.WriteLine("4 = Start performance test"); var pressedKey = Console.ReadKey(true); if (pressedKey.Key == ConsoleKey.D1) { Task.Run(() => RunClientAsync(args)); Thread.Sleep(Timeout.Infinite); } - else if (pressedKey.Key == ConsoleKey.D2) + if (pressedKey.Key == ConsoleKey.D2) { - Task.Run(() => RunServerAsync(args)); + Task.Run(() => RunClientQueuedAsync(args)); Thread.Sleep(Timeout.Infinite); } else if (pressedKey.Key == ConsoleKey.D3) + { + Task.Run(() => RunServerAsync(args)); + Thread.Sleep(Timeout.Infinite); + } + else if (pressedKey.Key == ConsoleKey.D4) { Task.Run(PerformanceTest.RunAsync); Thread.Sleep(Timeout.Infinite); } } + private static async Task RunClientQueuedAsync(string[] arguments) + { + MqttNetTrace.TraceMessagePublished += (s, e) => + { + Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); + if (e.Exception != null) + { + Console.WriteLine(e.Exception); + } + }; + + try + { + var options = new MqttClientManagedOptions + { + Server = "192.168.0.14", + ClientId = "XYZ", + CleanSession = true, + UserName = "lobu", + Password = "passworda", + KeepAlivePeriod = TimeSpan.FromSeconds(31), + DefaultCommunicationTimeout = TimeSpan.FromSeconds(20), + Storage = new TestStorage(), + }; + + var client = new MqttClientFactory().CreateMqttManagedClient(); + client.ApplicationMessageReceived += (s, e) => + { + Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); + Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); + Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); + Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); + Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); + Console.WriteLine(); + }; + + client.Connected += async (s, e) => + { + Console.WriteLine("### CONNECTED WITH SERVER ###"); + + await client.SubscribeAsync(new List + { + new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce) + }); + + Console.WriteLine("### SUBSCRIBED ###"); + }; + + client.Disconnected += async (s, e) => + { + Console.WriteLine("### DISCONNECTED FROM SERVER ###"); + await Task.Delay(TimeSpan.FromSeconds(5)); + + try + { + await client.ConnectAsync(options); + } + catch + { + Console.WriteLine("### RECONNECTING FAILED ###"); + } + }; + + try + { + await client.ConnectAsync(options); + } + catch (Exception exception) + { + Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception); + } + + Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###"); + + int i = 0; + while (true) + { + Console.ReadLine(); + i++; + var applicationMessage = new MqttApplicationMessage( + "PLNMAIN", + Encoding.UTF8.GetBytes(string.Format("Hello World {0}", i)), + MqttQualityOfServiceLevel.ExactlyOnce, + false + ); + + await client.PublishAsync(applicationMessage); + } + } + catch (Exception exception) + { + Console.WriteLine(exception); + } + } + private static async Task RunClientAsync(string[] arguments) { MqttNetTrace.TraceMessagePublished += (s, e) => @@ -56,7 +157,7 @@ namespace MQTTnet.TestApp.NetCore { Uri = "localhost", ClientId = "XYZ", - CleanSession = true + CleanSession = true, }; var client = new MqttClientFactory().CreateMqttClient(); @@ -76,7 +177,7 @@ namespace MQTTnet.TestApp.NetCore await client.SubscribeAsync(new List { - new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce) + new TopicFilter("#", MqttQualityOfServiceLevel.ExactlyOnce) }); Console.WriteLine("### SUBSCRIBED ###"); @@ -175,5 +276,70 @@ namespace MQTTnet.TestApp.NetCore Console.ReadLine(); } + + [Serializable] + public sealed class TemporaryApplicationMessage + { + public TemporaryApplicationMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, bool retain) + { + Topic = topic ?? throw new ArgumentNullException(nameof(topic)); + Payload = payload ?? throw new ArgumentNullException(nameof(payload)); + QualityOfServiceLevel = qualityOfServiceLevel; + Retain = retain; + } + + public string Topic { get; } + + public byte[] Payload { get; } + + public MqttQualityOfServiceLevel QualityOfServiceLevel { get; } + + public bool Retain { get; } + } + + private class TestStorage : IMqttClientQueuedStorage + { + string serializationFile = System.IO.Path.Combine(Environment.CurrentDirectory, "messages.bin"); + private IList _messages = new List(); + + public Task> LoadQueuedMessagesAsync() + { + //deserialize + // MqttApplicationMessage is not serializable + if (System.IO.File.Exists(serializationFile)) + { + using (System.IO.Stream stream = System.IO.File.Open(serializationFile, System.IO.FileMode.Open)) + { + var bformatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); + + var temp = (List)bformatter.Deserialize(stream); + foreach (var a in temp) + { + _messages.Add(new MqttApplicationMessage(a.Topic, a.Payload, a.QualityOfServiceLevel, a.Retain)); + } + } + } + return Task.FromResult(_messages); + } + + public Task SaveQueuedMessagesAsync(IList messages) + { + _messages = messages; + //serialize + // MqttApplicationMessage is not serializable + using (System.IO.Stream stream = System.IO.File.Open(serializationFile, System.IO.FileMode.Create)) + { + var bformatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); + IList temp = new List(); + foreach (var m in _messages) + { + temp.Add(new TemporaryApplicationMessage(m.Topic, m.Payload, m.QualityOfServiceLevel, m.Retain)); + } + bformatter.Serialize(stream, temp); + } + + return Task.FromResult(0); + } + } } } diff --git a/Tests/MQTTnet.TestApp.NetCore/messages.bin b/Tests/MQTTnet.TestApp.NetCore/messages.bin new file mode 100644 index 0000000..59505fa Binary files /dev/null and b/Tests/MQTTnet.TestApp.NetCore/messages.bin differ diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 07d5955..4565626 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -41,7 +41,7 @@ namespace MQTTnet.TestApp.UniversalWindows private async void Connect(object sender, RoutedEventArgs e) { - MqttClientOptions options = null; + IMqttClientOptions options = null; if (UseTcp.IsChecked == true) { options = new MqttClientTcpOptions