From ea5616002a8a68eccebf4c24dd8f79e26166c6b9 Mon Sep 17 00:00:00 2001 From: Gerardo Date: Tue, 17 Oct 2017 14:02:41 +0200 Subject: [PATCH 1/4] Add support to not persistent Queue Added new class MqttClientQueued --- .../MQTTnet.NetStandard/MqttClientFactory.cs | 5 + MQTTnet.Core/Client/MqttClientQueued.cs | 530 ++++++++++++++++++ MQTTnet.Core/MQTTnet.Core.csproj | 6 + Tests/MQTTnet.TestApp.NetCore/Program.cs | 111 +++- 4 files changed, 647 insertions(+), 5 deletions(-) create mode 100644 MQTTnet.Core/Client/MqttClientQueued.cs diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 15da033..9358390 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -9,5 +9,10 @@ namespace MQTTnet { return new MqttClient(new MqttCommunicationAdapterFactory()); } + + public IMqttClient CreateMqttQueuedClient() + { + return new MqttClientQueued(new MqttCommunicationAdapterFactory()); + } } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClientQueued.cs b/MQTTnet.Core/Client/MqttClientQueued.cs new file mode 100644 index 0000000..925b965 --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientQueued.cs @@ -0,0 +1,530 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; +using MQTTnet.Core.Internal; + +namespace MQTTnet.Core.Client +{ + public class MqttClientQueued : IMqttClient + { + #region Fields + private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory; + private readonly MqttPacketDispatcher _packetDispatcher; + private readonly HashSet _unacknowledgedPublishPackets; + + private MqttClientOptions _options; + private bool _isReceivingPackets; + private int _latestPacketIdentifier; + private CancellationTokenSource _cancellationTokenSource; + private IMqttCommunicationAdapter _adapter; + + //added + private readonly ConcurrentQueue _inflightQueue; + private bool _usePersistance = false; + private ConcurrentQueue _persistentQueue; + #endregion + + #region Ctrs + public MqttClientQueued(IMqttCommunicationAdapterFactory communicationChannelFactory) + { + _communicationChannelFactory = communicationChannelFactory ?? throw new ArgumentNullException(nameof(communicationChannelFactory)); + _packetDispatcher = new MqttPacketDispatcher(); + _unacknowledgedPublishPackets = new HashSet(); + _inflightQueue = new ConcurrentQueue(); + _persistentQueue = new ConcurrentQueue(); + } + #endregion + + #region Events + public event EventHandler Connected; + public event EventHandler Disconnected; + public event EventHandler ApplicationMessageReceived; + #endregion + + #region Poperties + public bool IsConnected => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested; + #endregion + + #region MqttClient Methods + + public async Task ConnectAsync(MqttClientOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + ThrowIfConnected("It is not allowed to connect with a server after the connection is established."); + + try + { + _options = options; + _cancellationTokenSource = new CancellationTokenSource(); + _latestPacketIdentifier = 0; + _packetDispatcher.Reset(); + + _adapter = _communicationChannelFactory.CreateMqttCommunicationAdapter(options); + + MqttNetTrace.Verbose(nameof(MqttClient), "Trying to connect with server."); + await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); + MqttNetTrace.Verbose(nameof(MqttClient), "Connection with server established."); + + await SetupIncomingPacketProcessingAsync(); + SetupOutgoingPacketProcessingAsync(); + await AuthenticateAsync(options.WillMessage); + + MqttNetTrace.Verbose(nameof(MqttClient), "MQTT connection with server established."); + + if (_options.KeepAlivePeriod != TimeSpan.Zero) + { + StartSendKeepAliveMessages(_cancellationTokenSource.Token); + } + + Connected?.Invoke(this, EventArgs.Empty); + } + catch (Exception) + { + await DisconnectInternalAsync().ConfigureAwait(false); + throw; + } + } + + public async Task DisconnectAsync() + { + if (!IsConnected) + { + return; + } + + try + { + await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false); + } + finally + { + await DisconnectInternalAsync().ConfigureAwait(false); + } + } + + public async Task PublishAsync(IEnumerable applicationMessages) + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + ThrowIfNotConnected(); + + foreach (var applicationMessage in applicationMessages) + { + if (_usePersistance) + _persistentQueue.Enqueue(applicationMessage); + _inflightQueue.Enqueue(applicationMessage); + } +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + + public async Task> SubscribeAsync(IEnumerable topicFilters) + { + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + + ThrowIfNotConnected(); + + var subscribePacket = new MqttSubscribePacket + { + PacketIdentifier = GetNewPacketIdentifier(), + TopicFilters = topicFilters.ToList() + }; + + var response = await SendAndReceiveAsync(subscribePacket).ConfigureAwait(false); + + if (response.SubscribeReturnCodes.Count != subscribePacket.TopicFilters.Count) + { + throw new MqttProtocolViolationException("The return codes are not matching the topic filters [MQTT-3.9.3-1]."); + } + + return subscribePacket.TopicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList(); + } + + public async Task UnsubscribeAsync(IEnumerable topicFilters) + { + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + + ThrowIfNotConnected(); + + var unsubscribePacket = new MqttUnsubscribePacket + { + PacketIdentifier = GetNewPacketIdentifier(), + TopicFilters = topicFilters.ToList() + }; + + await SendAndReceiveAsync(unsubscribePacket); + } + + #region private + private void ThrowIfNotConnected() + { + if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); + } + + private void ThrowIfConnected(string message) + { + if (IsConnected) throw new MqttProtocolViolationException(message); + } + + private async Task SetupIncomingPacketProcessingAsync() + { + _isReceivingPackets = false; + +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Factory.StartNew( + () => ReceivePackets(_cancellationTokenSource.Token), + _cancellationTokenSource.Token, + TaskCreationOptions.LongRunning, + TaskScheduler.Default).ConfigureAwait(false); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + + while (!_isReceivingPackets && _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested) + { + await Task.Delay(TimeSpan.FromMilliseconds(100)); + } + } + + private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage) + { + var connectPacket = new MqttConnectPacket + { + ClientId = _options.ClientId, + Username = _options.UserName, + Password = _options.Password, + CleanSession = _options.CleanSession, + KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, + WillMessage = willApplicationMessage + }; + + var response = await SendAndReceiveAsync(connectPacket).ConfigureAwait(false); + if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) + { + throw new MqttConnectingFailedException(response.ConnectReturnCode); + } + } + + private async Task ReceivePackets(CancellationToken cancellationToken) + { + MqttNetTrace.Information(nameof(MqttClient), "Start receiving packets."); + + try + { + while (!cancellationToken.IsCancellationRequested) + { + _isReceivingPackets = true; + + var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false); + if (cancellationToken.IsCancellationRequested) + { + return; + } + + StartProcessReceivedPacket(packet, cancellationToken); + } + } + catch (OperationCanceledException) + { + } + catch (MqttCommunicationException exception) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); + await DisconnectInternalAsync().ConfigureAwait(false); + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); + await DisconnectInternalAsync().ConfigureAwait(false); + } + finally + { + MqttNetTrace.Information(nameof(MqttClient), "Stopped receiving packets."); + } + } + + private void StartProcessReceivedPacket(MqttBasePacket packet, CancellationToken cancellationToken) + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(() => ProcessReceivedPacketAsync(packet), cancellationToken).ConfigureAwait(false); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + + private async Task ProcessReceivedPacketAsync(MqttBasePacket packet) + { + try + { + MqttNetTrace.Information(nameof(MqttClient), "Received <<< {0}", packet); + + if (packet is MqttPingReqPacket) + { + await SendAsync(new MqttPingRespPacket()); + return; + } + + if (packet is MqttDisconnectPacket) + { + await DisconnectAsync(); + return; + } + + if (packet is MqttPublishPacket publishPacket) + { + await ProcessReceivedPublishPacket(publishPacket); + return; + } + + if (packet is MqttPubRelPacket pubRelPacket) + { + await ProcessReceivedPubRelPacket(pubRelPacket); + return; + } + + _packetDispatcher.Dispatch(packet); + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while processing received packet."); + } + } + + private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket) + { + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) + { + FireApplicationMessageReceivedEvent(publishPacket); + return; + } + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + { + FireApplicationMessageReceivedEvent(publishPacket); + await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); + return; + } + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) + { + // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] + lock (_unacknowledgedPublishPackets) + { + _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier); + } + + FireApplicationMessageReceivedEvent(publishPacket); + await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); + return; + } + + throw new MqttCommunicationException("Received a not supported QoS level."); + } + + private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) + { + lock (_unacknowledgedPublishPackets) + { + _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); + } + + return SendAsync(pubRelPacket.CreateResponse()); + } + + private async Task DisconnectInternalAsync() + { + var cts = _cancellationTokenSource; + if (cts == null || cts.IsCancellationRequested) + { + return; + } + + cts.Cancel(false); + cts.Dispose(); + _cancellationTokenSource = null; + + try + { + await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); + MqttNetTrace.Information(nameof(MqttClient), "Disconnected from adapter."); + } + catch (Exception exception) + { + MqttNetTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting from adapter."); + } + finally + { + Disconnected?.Invoke(this, EventArgs.Empty); + } + } + + private void StartSendKeepAliveMessages(CancellationToken cancellationToken) + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Factory.StartNew(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + + private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) + { + MqttNetTrace.Information(nameof(MqttClient), "Start sending keep alive packets."); + + try + { + while (!cancellationToken.IsCancellationRequested) + { + await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false); + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await SendAndReceiveAsync(new MqttPingReqPacket()).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + } + catch (MqttCommunicationException exception) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending/receiving keep alive packets."); + await DisconnectInternalAsync().ConfigureAwait(false); + } + catch (Exception exception) + { + MqttNetTrace.Warning(nameof(MqttClient), exception, "Unhandled exception while sending/receiving keep alive packets."); + await DisconnectInternalAsync().ConfigureAwait(false); + } + finally + { + MqttNetTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); + } + } + + private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket + { + var packetAwaiter = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout); + await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false); + return (TResponsePacket)await packetAwaiter.ConfigureAwait(false); + } + + private Task SendAsync(MqttBasePacket packet) + { + return _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, packet); + } + + private ushort GetNewPacketIdentifier() + { + return (ushort)Interlocked.Increment(ref _latestPacketIdentifier); + } + + private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) + { + try + { + var applicationMessage = publishPacket.ToApplicationMessage(); + ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage)); + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); + } + } + #endregion + #endregion + + #region MqttClientQueued Methos + private void SetupOutgoingPacketProcessingAsync() + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Factory.StartNew( + () => SendPackets(_cancellationTokenSource.Token), + _cancellationTokenSource.Token, + TaskCreationOptions.LongRunning, + TaskScheduler.Default).ConfigureAwait(false); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + + //while (_cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested) + //{ + // await Task.Delay(TimeSpan.FromMilliseconds(100)); + //} + } + + private async Task SendPackets(CancellationToken cancellationToken) + { + MqttNetTrace.Information(nameof(MqttClient), "Start sending packets."); + MqttApplicationMessage messageToSend = null; + try + { + while (!cancellationToken.IsCancellationRequested) + { + while (_inflightQueue.TryDequeue(out messageToSend)) + { + MqttQualityOfServiceLevel qosGroup = messageToSend.QualityOfServiceLevel; + MqttPublishPacket publishPacket = messageToSend.ToPublishPacket(); + switch (qosGroup) + { + case MqttQualityOfServiceLevel.AtMostOnce: + { + // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] + await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, publishPacket); + break; + } + + case MqttQualityOfServiceLevel.AtLeastOnce: + { + publishPacket.PacketIdentifier = GetNewPacketIdentifier(); + await SendAndReceiveAsync(publishPacket); + break; + } + case MqttQualityOfServiceLevel.ExactlyOnce: + { + publishPacket.PacketIdentifier = GetNewPacketIdentifier(); + var pubRecPacket = await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); + await SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); + break; + } + default: + { + throw new InvalidOperationException(); + } + } + } + }; + } + catch (OperationCanceledException) + { + } + catch (MqttCommunicationException exception) + { + MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets."); + //message not send, equeued again + if (messageToSend != null) + _inflightQueue.Enqueue(messageToSend); + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets."); + await DisconnectInternalAsync().ConfigureAwait(false); + } + finally + { + MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); + } + } + #endregion + } +} diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index 0e9b965..1e507cb 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -32,4 +32,10 @@ + + + + + + \ No newline at end of file diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index 821d861..f2a44e0 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -18,26 +18,127 @@ namespace MQTTnet.TestApp.NetCore { Console.WriteLine("MQTTnet - TestApp.NetFramework"); Console.WriteLine("1 = Start client"); - Console.WriteLine("2 = Start server"); - Console.WriteLine("3 = Start performance test"); + Console.WriteLine("2 = Start queued client"); + Console.WriteLine("3 = Start server"); + Console.WriteLine("4 = Start performance test"); var pressedKey = Console.ReadKey(true); if (pressedKey.Key == ConsoleKey.D1) { Task.Run(() => RunClientAsync(args)); Thread.Sleep(Timeout.Infinite); } - else if (pressedKey.Key == ConsoleKey.D2) + if (pressedKey.Key == ConsoleKey.D2) { - Task.Run(() => RunServerAsync(args)); + Task.Run(() => RunClientQueuedAsync(args)); Thread.Sleep(Timeout.Infinite); } else if (pressedKey.Key == ConsoleKey.D3) + { + Task.Run(() => RunServerAsync(args)); + Thread.Sleep(Timeout.Infinite); + } + else if (pressedKey.Key == ConsoleKey.D4) { Task.Run(PerformanceTest.RunAsync); Thread.Sleep(Timeout.Infinite); } } + private static async Task RunClientQueuedAsync(string[] arguments) + { + MqttNetTrace.TraceMessagePublished += (s, e) => + { + Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); + if (e.Exception != null) + { + Console.WriteLine(e.Exception); + } + }; + + try + { + var options = new MqttClientTcpOptions + { + Server = "192.168.0.14", + ClientId = "XYZ", + CleanSession = true, + UserName = "lobu", + Password = "passworda", + KeepAlivePeriod = TimeSpan.FromSeconds(31), + DefaultCommunicationTimeout = TimeSpan.FromSeconds(20), + + }; + + var client = new MqttClientFactory().CreateMqttQueuedClient(); + client.ApplicationMessageReceived += (s, e) => + { + Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); + Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); + Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); + Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); + Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); + Console.WriteLine(); + }; + + client.Connected += async (s, e) => + { + Console.WriteLine("### CONNECTED WITH SERVER ###"); + + await client.SubscribeAsync(new List + { + new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce) + }); + + Console.WriteLine("### SUBSCRIBED ###"); + }; + + client.Disconnected += async (s, e) => + { + Console.WriteLine("### DISCONNECTED FROM SERVER ###"); + await Task.Delay(TimeSpan.FromSeconds(5)); + + try + { + await client.ConnectAsync(options); + } + catch + { + Console.WriteLine("### RECONNECTING FAILED ###"); + } + }; + + try + { + await client.ConnectAsync(options); + } + catch (Exception exception) + { + Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception); + } + + Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###"); + + int i = 0; + while (true) + { + Console.ReadLine(); + i++; + var applicationMessage = new MqttApplicationMessage( + "PLNMAIN", + Encoding.UTF8.GetBytes(string.Format("Hello World {0}", i)), + MqttQualityOfServiceLevel.ExactlyOnce, + false + ); + + await client.PublishAsync(applicationMessage); + } + } + catch (Exception exception) + { + Console.WriteLine(exception); + } + } + private static async Task RunClientAsync(string[] arguments) { MqttNetTrace.TraceMessagePublished += (s, e) => @@ -55,7 +156,7 @@ namespace MQTTnet.TestApp.NetCore { Uri = "localhost", ClientId = "XYZ", - CleanSession = true + CleanSession = true, }; var client = new MqttClientFactory().CreateMqttClient(); From bc19f66812ca54265793a3ca820c0de1d6c5e8c4 Mon Sep 17 00:00:00 2001 From: Gerardo Date: Wed, 18 Oct 2017 16:41:54 +0200 Subject: [PATCH 2/4] Added suport to Persistence and refoactoring --- .../MqttCommunicationAdapterFactory.cs | 5 + .../MQTTnet.NetStandard/MqttClientFactory.cs | 2 +- .../MqttCommunicationAdapterFactory.cs | 2 +- .../Implementations/MqttTcpChannel.cs | 2 +- .../MqttClientFactory.cs | 5 + MQTTnet.Core/Client/IMqttClientFactory.cs | 2 + MQTTnet.Core/Client/IMqttClientQueued.cs | 9 + .../Client/IMqttClientQueuedStorage.cs | 12 + MQTTnet.Core/Client/MqttClient.cs | 6 +- MQTTnet.Core/Client/MqttClientQueued.cs | 432 ++---------------- .../Client/MqttClientQueuedOptions.cs | 11 + ...ttClientQueuedPersistentMessagesManager.cs | 84 ++++ MQTTnet.Core/MQTTnet.Core.csproj | 14 - .../MqttCommunicationAdapterFactory.cs | 2 +- Tests/MQTTnet.TestApp.NetCore/Program.cs | 71 ++- Tests/MQTTnet.TestApp.NetCore/messages.bin | Bin 0 -> 474 bytes .../MainPage.xaml.cs | 2 +- 17 files changed, 239 insertions(+), 422 deletions(-) create mode 100644 MQTTnet.Core/Client/IMqttClientQueued.cs create mode 100644 MQTTnet.Core/Client/IMqttClientQueuedStorage.cs create mode 100644 MQTTnet.Core/Client/MqttClientQueuedOptions.cs create mode 100644 MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs create mode 100644 Tests/MQTTnet.TestApp.NetCore/messages.bin diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs index 799ccdd..218bb8c 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs @@ -21,6 +21,11 @@ namespace MQTTnet.Implementations return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } + if (options is MqttClientQueuedOptions queuedOptions) + { + return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + } + throw new NotSupportedException(); } } diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 9358390..5bc8336 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -10,7 +10,7 @@ namespace MQTTnet return new MqttClient(new MqttCommunicationAdapterFactory()); } - public IMqttClient CreateMqttQueuedClient() + public IMqttClientQueued CreateMqttQueuedClient() { return new MqttClientQueued(new MqttCommunicationAdapterFactory()); } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs index 799ccdd..2bb7c5e 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs @@ -7,7 +7,7 @@ namespace MQTTnet.Implementations { public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory { - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientQueuedOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs index 03206f1..db10291 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs @@ -89,7 +89,7 @@ namespace MQTTnet.Implementations RawReceiveStream = ReceiveStream; } - private static Certificate LoadCertificate(MqttClientOptions options) + private static Certificate LoadCertificate(MqttClientQueuedOptions options) { if (options.TlsOptions.Certificates == null || !options.TlsOptions.Certificates.Any()) { diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index 15da033..5bc8336 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -9,5 +9,10 @@ namespace MQTTnet { return new MqttClient(new MqttCommunicationAdapterFactory()); } + + public IMqttClientQueued CreateMqttQueuedClient() + { + return new MqttClientQueued(new MqttCommunicationAdapterFactory()); + } } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClientFactory.cs b/MQTTnet.Core/Client/IMqttClientFactory.cs index 033eb99..f1d6411 100644 --- a/MQTTnet.Core/Client/IMqttClientFactory.cs +++ b/MQTTnet.Core/Client/IMqttClientFactory.cs @@ -3,5 +3,7 @@ public interface IMqttClientFactory { IMqttClient CreateMqttClient(); + + IMqttClientQueued CreateMqttQueuedClient(); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClientQueued.cs b/MQTTnet.Core/Client/IMqttClientQueued.cs new file mode 100644 index 0000000..c172e0c --- /dev/null +++ b/MQTTnet.Core/Client/IMqttClientQueued.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace MQTTnet.Core.Client +{ + public interface IMqttClientQueued: IMqttClient + { + Task ConnectAsync(MqttClientQueuedOptions options); + } +} diff --git a/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs b/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs new file mode 100644 index 0000000..ffc5e5f --- /dev/null +++ b/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs @@ -0,0 +1,12 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace MQTTnet.Core.Client +{ + public interface IMqttClientQueuedStorage + { + Task SaveInflightMessagesAsync(IList messages); + + Task> LoadInflightMessagesAsync(); + } +} diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 80b850a..002ebe8 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -21,8 +21,8 @@ namespace MQTTnet.Core.Client private MqttClientOptions _options; private bool _isReceivingPackets; private int _latestPacketIdentifier; - private CancellationTokenSource _cancellationTokenSource; - private IMqttCommunicationAdapter _adapter; + internal CancellationTokenSource _cancellationTokenSource; + internal IMqttCommunicationAdapter _adapter; public MqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory) { @@ -344,7 +344,7 @@ namespace MQTTnet.Core.Client return _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, packet); } - private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket + internal async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket { var packetAwaiter = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout); await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false); diff --git a/MQTTnet.Core/Client/MqttClientQueued.cs b/MQTTnet.Core/Client/MqttClientQueued.cs index 925b965..05d5766 100644 --- a/MQTTnet.Core/Client/MqttClientQueued.cs +++ b/MQTTnet.Core/Client/MqttClientQueued.cs @@ -13,454 +13,88 @@ using MQTTnet.Core.Internal; namespace MQTTnet.Core.Client { - public class MqttClientQueued : IMqttClient + public class MqttClientQueued : MqttClient, IMqttClientQueued { - #region Fields - private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory; - private readonly MqttPacketDispatcher _packetDispatcher; - private readonly HashSet _unacknowledgedPublishPackets; - - private MqttClientOptions _options; - private bool _isReceivingPackets; + private MqttClientQueuedOptions _options; private int _latestPacketIdentifier; - private CancellationTokenSource _cancellationTokenSource; - private IMqttCommunicationAdapter _adapter; - - //added private readonly ConcurrentQueue _inflightQueue; private bool _usePersistance = false; - private ConcurrentQueue _persistentQueue; - #endregion + private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager; - #region Ctrs - public MqttClientQueued(IMqttCommunicationAdapterFactory communicationChannelFactory) + public MqttClientQueued(IMqttCommunicationAdapterFactory communicationChannelFactory) : base(communicationChannelFactory) { - _communicationChannelFactory = communicationChannelFactory ?? throw new ArgumentNullException(nameof(communicationChannelFactory)); - _packetDispatcher = new MqttPacketDispatcher(); - _unacknowledgedPublishPackets = new HashSet(); _inflightQueue = new ConcurrentQueue(); - _persistentQueue = new ConcurrentQueue(); } - #endregion - - #region Events - public event EventHandler Connected; - public event EventHandler Disconnected; - public event EventHandler ApplicationMessageReceived; - #endregion - - #region Poperties - public bool IsConnected => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested; - #endregion - #region MqttClient Methods - public async Task ConnectAsync(MqttClientOptions options) + public async Task ConnectAsync(MqttClientQueuedOptions options) { - if (options == null) throw new ArgumentNullException(nameof(options)); - - ThrowIfConnected("It is not allowed to connect with a server after the connection is established."); - try - { + { _options = options; - _cancellationTokenSource = new CancellationTokenSource(); - _latestPacketIdentifier = 0; - _packetDispatcher.Reset(); - - _adapter = _communicationChannelFactory.CreateMqttCommunicationAdapter(options); - - MqttNetTrace.Verbose(nameof(MqttClient), "Trying to connect with server."); - await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); - MqttNetTrace.Verbose(nameof(MqttClient), "Connection with server established."); - - await SetupIncomingPacketProcessingAsync(); + this._usePersistance = _options.UsePersistence; + await base.ConnectAsync(options); SetupOutgoingPacketProcessingAsync(); - await AuthenticateAsync(options.WillMessage); - MqttNetTrace.Verbose(nameof(MqttClient), "MQTT connection with server established."); - - if (_options.KeepAlivePeriod != TimeSpan.Zero) + //load persistentMessages + if (_usePersistance) { - StartSendKeepAliveMessages(_cancellationTokenSource.Token); + if (_persistentMessagesManager == null) + _persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options); + await _persistentMessagesManager.LoadMessagesAsync(); + await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false); } - - Connected?.Invoke(this, EventArgs.Empty); } catch (Exception) { - await DisconnectInternalAsync().ConfigureAwait(false); + await DisconnectAsync().ConfigureAwait(false); throw; } } - public async Task DisconnectAsync() + public new async Task PublishAsync(IEnumerable applicationMessages) { - if (!IsConnected) - { - return; - } - - try - { - await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false); - } - finally - { - await DisconnectInternalAsync().ConfigureAwait(false); - } + await InternalPublishAsync(applicationMessages, true); } - public async Task PublishAsync(IEnumerable applicationMessages) + private async Task InternalPublishAsync(IEnumerable applicationMessages, bool appendIfUsePersistance) { -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed ThrowIfNotConnected(); foreach (var applicationMessage in applicationMessages) { - if (_usePersistance) - _persistentQueue.Enqueue(applicationMessage); - _inflightQueue.Enqueue(applicationMessage); - } -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - } - - public async Task> SubscribeAsync(IEnumerable topicFilters) - { - if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - - ThrowIfNotConnected(); - - var subscribePacket = new MqttSubscribePacket - { - PacketIdentifier = GetNewPacketIdentifier(), - TopicFilters = topicFilters.ToList() - }; - - var response = await SendAndReceiveAsync(subscribePacket).ConfigureAwait(false); + if (_usePersistance && appendIfUsePersistance) + await _persistentMessagesManager.SaveMessageAsync(applicationMessage); - if (response.SubscribeReturnCodes.Count != subscribePacket.TopicFilters.Count) - { - throw new MqttProtocolViolationException("The return codes are not matching the topic filters [MQTT-3.9.3-1]."); + _inflightQueue.Enqueue(applicationMessage); } - - return subscribePacket.TopicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList(); } - public async Task UnsubscribeAsync(IEnumerable topicFilters) + public new async Task> SubscribeAsync(IEnumerable topicFilters) { - if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - - ThrowIfNotConnected(); - - var unsubscribePacket = new MqttUnsubscribePacket - { - PacketIdentifier = GetNewPacketIdentifier(), - TopicFilters = topicFilters.ToList() - }; - - await SendAndReceiveAsync(unsubscribePacket); + return await base.SubscribeAsync(topicFilters); } - #region private private void ThrowIfNotConnected() { if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); } - private void ThrowIfConnected(string message) - { - if (IsConnected) throw new MqttProtocolViolationException(message); - } - - private async Task SetupIncomingPacketProcessingAsync() - { - _isReceivingPackets = false; - -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Factory.StartNew( - () => ReceivePackets(_cancellationTokenSource.Token), - _cancellationTokenSource.Token, - TaskCreationOptions.LongRunning, - TaskScheduler.Default).ConfigureAwait(false); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - - while (!_isReceivingPackets && _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested) - { - await Task.Delay(TimeSpan.FromMilliseconds(100)); - } - } - - private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage) - { - var connectPacket = new MqttConnectPacket - { - ClientId = _options.ClientId, - Username = _options.UserName, - Password = _options.Password, - CleanSession = _options.CleanSession, - KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, - WillMessage = willApplicationMessage - }; - - var response = await SendAndReceiveAsync(connectPacket).ConfigureAwait(false); - if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) - { - throw new MqttConnectingFailedException(response.ConnectReturnCode); - } - } - - private async Task ReceivePackets(CancellationToken cancellationToken) - { - MqttNetTrace.Information(nameof(MqttClient), "Start receiving packets."); - - try - { - while (!cancellationToken.IsCancellationRequested) - { - _isReceivingPackets = true; - - var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false); - if (cancellationToken.IsCancellationRequested) - { - return; - } - - StartProcessReceivedPacket(packet, cancellationToken); - } - } - catch (OperationCanceledException) - { - } - catch (MqttCommunicationException exception) - { - if (cancellationToken.IsCancellationRequested) - { - return; - } - - MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); - await DisconnectInternalAsync().ConfigureAwait(false); - } - catch (Exception exception) - { - MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); - await DisconnectInternalAsync().ConfigureAwait(false); - } - finally - { - MqttNetTrace.Information(nameof(MqttClient), "Stopped receiving packets."); - } - } - - private void StartProcessReceivedPacket(MqttBasePacket packet, CancellationToken cancellationToken) - { -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => ProcessReceivedPacketAsync(packet), cancellationToken).ConfigureAwait(false); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - } - - private async Task ProcessReceivedPacketAsync(MqttBasePacket packet) - { - try - { - MqttNetTrace.Information(nameof(MqttClient), "Received <<< {0}", packet); - - if (packet is MqttPingReqPacket) - { - await SendAsync(new MqttPingRespPacket()); - return; - } - - if (packet is MqttDisconnectPacket) - { - await DisconnectAsync(); - return; - } - - if (packet is MqttPublishPacket publishPacket) - { - await ProcessReceivedPublishPacket(publishPacket); - return; - } - - if (packet is MqttPubRelPacket pubRelPacket) - { - await ProcessReceivedPubRelPacket(pubRelPacket); - return; - } - - _packetDispatcher.Dispatch(packet); - } - catch (Exception exception) - { - MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while processing received packet."); - } - } - - private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket) - { - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) - { - FireApplicationMessageReceivedEvent(publishPacket); - return; - } - - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) - { - FireApplicationMessageReceivedEvent(publishPacket); - await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); - return; - } - - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) - { - // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] - lock (_unacknowledgedPublishPackets) - { - _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier); - } - - FireApplicationMessageReceivedEvent(publishPacket); - await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); - return; - } - - throw new MqttCommunicationException("Received a not supported QoS level."); - } - - private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) - { - lock (_unacknowledgedPublishPackets) - { - _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); - } - - return SendAsync(pubRelPacket.CreateResponse()); - } - - private async Task DisconnectInternalAsync() - { - var cts = _cancellationTokenSource; - if (cts == null || cts.IsCancellationRequested) - { - return; - } - - cts.Cancel(false); - cts.Dispose(); - _cancellationTokenSource = null; - - try - { - await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); - MqttNetTrace.Information(nameof(MqttClient), "Disconnected from adapter."); - } - catch (Exception exception) - { - MqttNetTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting from adapter."); - } - finally - { - Disconnected?.Invoke(this, EventArgs.Empty); - } - } - - private void StartSendKeepAliveMessages(CancellationToken cancellationToken) - { -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Factory.StartNew(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - } - - private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) - { - MqttNetTrace.Information(nameof(MqttClient), "Start sending keep alive packets."); - - try - { - while (!cancellationToken.IsCancellationRequested) - { - await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false); - if (cancellationToken.IsCancellationRequested) - { - return; - } - - await SendAndReceiveAsync(new MqttPingReqPacket()).ConfigureAwait(false); - } - } - catch (OperationCanceledException) - { - } - catch (MqttCommunicationException exception) - { - if (cancellationToken.IsCancellationRequested) - { - return; - } - - MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending/receiving keep alive packets."); - await DisconnectInternalAsync().ConfigureAwait(false); - } - catch (Exception exception) - { - MqttNetTrace.Warning(nameof(MqttClient), exception, "Unhandled exception while sending/receiving keep alive packets."); - await DisconnectInternalAsync().ConfigureAwait(false); - } - finally - { - MqttNetTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); - } - } - - private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket - { - var packetAwaiter = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout); - await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false); - return (TResponsePacket)await packetAwaiter.ConfigureAwait(false); - } - - private Task SendAsync(MqttBasePacket packet) - { - return _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, packet); - } - private ushort GetNewPacketIdentifier() { return (ushort)Interlocked.Increment(ref _latestPacketIdentifier); } + - private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) - { - try - { - var applicationMessage = publishPacket.ToApplicationMessage(); - ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage)); - } - catch (Exception exception) - { - MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); - } - } - #endregion - #endregion - - #region MqttClientQueued Methos private void SetupOutgoingPacketProcessingAsync() { #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed Task.Factory.StartNew( - () => SendPackets(_cancellationTokenSource.Token), - _cancellationTokenSource.Token, + () => SendPackets(base._cancellationTokenSource.Token), + base._cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - - //while (_cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested) - //{ - // await Task.Delay(TimeSpan.FromMilliseconds(100)); - //} } private async Task SendPackets(CancellationToken cancellationToken) @@ -480,21 +114,21 @@ namespace MQTTnet.Core.Client case MqttQualityOfServiceLevel.AtMostOnce: { // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, publishPacket); + await base._adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, base._cancellationTokenSource.Token, publishPacket); break; } case MqttQualityOfServiceLevel.AtLeastOnce: { publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - await SendAndReceiveAsync(publishPacket); + await base.SendAndReceiveAsync(publishPacket); break; } case MqttQualityOfServiceLevel.ExactlyOnce: { publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - var pubRecPacket = await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); - await SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); + var pubRecPacket = await base.SendAndReceiveAsync(publishPacket).ConfigureAwait(false); + await base.SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); break; } default: @@ -502,6 +136,9 @@ namespace MQTTnet.Core.Client throw new InvalidOperationException(); } } + //delete from persistence + if (_usePersistance) + await _persistentMessagesManager.Remove(messageToSend); } }; } @@ -518,13 +155,12 @@ namespace MQTTnet.Core.Client catch (Exception exception) { MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets."); - await DisconnectInternalAsync().ConfigureAwait(false); + await DisconnectAsync().ConfigureAwait(false); } finally { MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); } } - #endregion } } diff --git a/MQTTnet.Core/Client/MqttClientQueuedOptions.cs b/MQTTnet.Core/Client/MqttClientQueuedOptions.cs new file mode 100644 index 0000000..e7af0d7 --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientQueuedOptions.cs @@ -0,0 +1,11 @@ + + +namespace MQTTnet.Core.Client +{ + public class MqttClientQueuedOptions: MqttClientTcpOptions + { + public bool UsePersistence { get; set; } + + public IMqttClientQueuedStorage Storage { get; set; } + } +} diff --git a/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs b/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs new file mode 100644 index 0000000..235b2bf --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs @@ -0,0 +1,84 @@ +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Packets; +using System; +using System.Linq; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace MQTTnet.Core.Client +{ + public class MqttClientQueuedPersistentMessagesManager + { + private readonly IList _persistedMessages = new List(); + private readonly MqttClientQueuedOptions _options; + + public MqttClientQueuedPersistentMessagesManager(MqttClientQueuedOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + + public async Task LoadMessagesAsync() + { + try + { + var persistentMessages = await _options.Storage.LoadInflightMessagesAsync(); + lock (_persistedMessages) + { + _persistedMessages.Clear(); + foreach (var persistentMessage in persistentMessages) + { + _persistedMessages.Add(persistentMessage); + } + } + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while loading persistent messages."); + } + } + + public async Task SaveMessageAsync(MqttApplicationMessage applicationMessage) + { + if (applicationMessage != null) + { + lock (_persistedMessages) + { + _persistedMessages.Add(applicationMessage); + } + } + try + { + if (_options.Storage != null) + { + await _options.Storage.SaveInflightMessagesAsync(_persistedMessages); + } + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while saving persistent messages."); + } + } + + public List GetMessages() + { + var persistedMessages = new List(); + lock (_persistedMessages) + { + foreach (var persistedMessage in _persistedMessages) + { + persistedMessages.Add(persistedMessage); + } + } + + return persistedMessages; + } + + public async Task Remove(MqttApplicationMessage message) + { + lock (_persistedMessages) + _persistedMessages.Remove(message); + + await SaveMessageAsync(null); + } + } +} diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index 1e507cb..5080593 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -22,20 +22,6 @@ - - - - - - - - - - - - - - \ No newline at end of file diff --git a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs index c83f83a..1b451ee 100644 --- a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet.Core.Tests _adapter = adapter; } - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientQueuedOptions options) { return _adapter; } diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index f2a44e0..be69c9a 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -6,6 +6,7 @@ using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; using System; using System.Collections.Generic; +using System.IO; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -57,7 +58,7 @@ namespace MQTTnet.TestApp.NetCore try { - var options = new MqttClientTcpOptions + var options = new MqttClientQueuedOptions { Server = "192.168.0.14", ClientId = "XYZ", @@ -66,7 +67,8 @@ namespace MQTTnet.TestApp.NetCore Password = "passworda", KeepAlivePeriod = TimeSpan.FromSeconds(31), DefaultCommunicationTimeout = TimeSpan.FromSeconds(20), - + UsePersistence = true, + Storage = new TestStorage(), }; var client = new MqttClientFactory().CreateMqttQueuedClient(); @@ -272,5 +274,70 @@ namespace MQTTnet.TestApp.NetCore Console.ReadLine(); } + + [Serializable] + public sealed class TemporaryApplicationMessage + { + public TemporaryApplicationMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, bool retain) + { + Topic = topic ?? throw new ArgumentNullException(nameof(topic)); + Payload = payload ?? throw new ArgumentNullException(nameof(payload)); + QualityOfServiceLevel = qualityOfServiceLevel; + Retain = retain; + } + + public string Topic { get; } + + public byte[] Payload { get; } + + public MqttQualityOfServiceLevel QualityOfServiceLevel { get; } + + public bool Retain { get; } + } + + private class TestStorage : IMqttClientQueuedStorage + { + string serializationFile = Path.Combine(Environment.CurrentDirectory, "messages.bin"); + private IList _messages = new List(); + + public Task> LoadInflightMessagesAsync() + { + //deserialize + // MqttApplicationMessage is not serializable + if (File.Exists(serializationFile)) + { + using (Stream stream = File.Open(serializationFile, FileMode.Open)) + { + var bformatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); + + var temp = (List)bformatter.Deserialize(stream); + foreach (var a in temp) + { + _messages.Add(new MqttApplicationMessage(a.Topic, a.Payload, a.QualityOfServiceLevel, a.Retain)); + } + } + } + return Task.FromResult(_messages); + } + + public Task SaveInflightMessagesAsync(IList messages) + { + _messages = messages; + //serialize + // MqttApplicationMessage is not serializable + using (System.IO.Stream stream = File.Open(serializationFile, FileMode.Create)) + { + var bformatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); + IList temp = new List(); + foreach (var m in _messages) + { + temp.Add(new TemporaryApplicationMessage(m.Topic, m.Payload, m.QualityOfServiceLevel, m.Retain)); + } + bformatter.Serialize(stream, temp); + } + + return Task.FromResult(0); + } + } } } diff --git a/Tests/MQTTnet.TestApp.NetCore/messages.bin b/Tests/MQTTnet.TestApp.NetCore/messages.bin new file mode 100644 index 0000000000000000000000000000000000000000..59505fa898386f36e09b72ce0a44d1f4a8c2c083 GIT binary patch literal 474 zcmbu6K}*9h7=|;~xeomiy<5U|9z948JLty5uuCajRz5~BP0IHrLj6_yhngrm7DV4b zLJoP}CkdM{#y&&h8gqXWrrq{M2m_7_Aotk!d$Sltn}J>gu;M# zD0QCg{Yk6#2?k-`z!b)7okaL$@;Z=%j+a(zXkDNX-+%#BEq_+zj;pE~>E(O0XO!K8 zfUdVFaR^eOMMlaNh~zoAjU4`?bX`w_^=+eqGtC-OACNZhKi5wfD@NLWqgvJRk<)Y$ S8?KpuZ?4~vtpCA}^t&%d=9cgP literal 0 HcmV?d00001 diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 07d5955..8a22f5e 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -41,7 +41,7 @@ namespace MQTTnet.TestApp.UniversalWindows private async void Connect(object sender, RoutedEventArgs e) { - MqttClientOptions options = null; + MqttClientQueuedOptions options = null; if (UseTcp.IsChecked == true) { options = new MqttClientTcpOptions From 1f3b24d20faba1fd91be4c1d25ebd512c7b4ad10 Mon Sep 17 00:00:00 2001 From: Gerardo Date: Wed, 18 Oct 2017 17:14:13 +0200 Subject: [PATCH 3/4] Refactoring --- .../Implementations/MqttCommunicationAdapterFactory.cs | 8 +++++++- .../Implementations/MqttTcpChannel.cs | 2 +- .../MqttCommunicationAdapterFactory.cs | 2 +- Tests/MQTTnet.TestApp.NetCore/Program.cs | 9 ++++----- Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs | 2 +- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs index 2bb7c5e..36a34b1 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs @@ -7,7 +7,7 @@ namespace MQTTnet.Implementations { public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory { - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientQueuedOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); @@ -21,6 +21,12 @@ namespace MQTTnet.Implementations return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } + if (options is MqttClientQueuedOptions queuedOptions) + { + return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + } + + throw new NotSupportedException(); } } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs index db10291..03206f1 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs @@ -89,7 +89,7 @@ namespace MQTTnet.Implementations RawReceiveStream = ReceiveStream; } - private static Certificate LoadCertificate(MqttClientQueuedOptions options) + private static Certificate LoadCertificate(MqttClientOptions options) { if (options.TlsOptions.Certificates == null || !options.TlsOptions.Certificates.Any()) { diff --git a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs index 1b451ee..c83f83a 100644 --- a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet.Core.Tests _adapter = adapter; } - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientQueuedOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) { return _adapter; } diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index be69c9a..63e4380 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -6,7 +6,6 @@ using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; using System; using System.Collections.Generic; -using System.IO; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -297,16 +296,16 @@ namespace MQTTnet.TestApp.NetCore private class TestStorage : IMqttClientQueuedStorage { - string serializationFile = Path.Combine(Environment.CurrentDirectory, "messages.bin"); + string serializationFile = System.IO.Path.Combine(Environment.CurrentDirectory, "messages.bin"); private IList _messages = new List(); public Task> LoadInflightMessagesAsync() { //deserialize // MqttApplicationMessage is not serializable - if (File.Exists(serializationFile)) + if (System.IO.File.Exists(serializationFile)) { - using (Stream stream = File.Open(serializationFile, FileMode.Open)) + using (System.IO.Stream stream = System.IO.File.Open(serializationFile, System.IO.FileMode.Open)) { var bformatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); @@ -325,7 +324,7 @@ namespace MQTTnet.TestApp.NetCore _messages = messages; //serialize // MqttApplicationMessage is not serializable - using (System.IO.Stream stream = File.Open(serializationFile, FileMode.Create)) + using (System.IO.Stream stream = System.IO.File.Open(serializationFile, System.IO.FileMode.Create)) { var bformatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); IList temp = new List(); diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 8a22f5e..07d5955 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -41,7 +41,7 @@ namespace MQTTnet.TestApp.UniversalWindows private async void Connect(object sender, RoutedEventArgs e) { - MqttClientQueuedOptions options = null; + MqttClientOptions options = null; if (UseTcp.IsChecked == true) { options = new MqttClientTcpOptions From 870ed00e3c32c45797815235a34d8bfd2812bb16 Mon Sep 17 00:00:00 2001 From: Gerardo Date: Thu, 19 Oct 2017 13:21:06 +0200 Subject: [PATCH 4/4] Refactoring Added almost of the hits. Options modified but must be reviewed completely. --- .../MqttCommunicationAdapterFactory.cs | 4 +- .../Implementations/MqttTcpChannel.cs | 2 +- .../MQTTnet.NetStandard/MqttClientFactory.cs | 4 +- .../MqttCommunicationAdapterFactory.cs | 4 +- .../Implementations/MqttTcpChannel.cs | 2 +- .../MqttClientFactory.cs | 4 +- MQTTnet.Core/Client/IMqttClient.cs | 2 +- MQTTnet.Core/Client/IMqttClientFactory.cs | 2 +- MQTTnet.Core/Client/IMqttClientManaged.cs | 9 + MQTTnet.Core/Client/IMqttClientQueued.cs | 9 - .../Client/IMqttClientQueuedStorage.cs | 4 +- .../IMqttCommunicationAdapterFactory.cs | 2 +- MQTTnet.Core/Client/MqttClient.cs | 5 +- MQTTnet.Core/Client/MqttClientManaged .cs | 163 +++++++++++++++++ .../Client/MqttClientManagedOptions.cs | 12 ++ MQTTnet.Core/Client/MqttClientOptions.cs | 24 +-- MQTTnet.Core/Client/MqttClientQueued.cs | 166 ------------------ .../Client/MqttClientQueuedOptions.cs | 11 -- ...ttClientQueuedPersistentMessagesManager.cs | 8 +- MQTTnet.Core/Client/MqttClientTcpOptions.cs | 26 ++- .../Client/MqttClientWebSocketOptions.cs | 26 ++- .../MqttCommunicationAdapterFactory.cs | 2 +- Tests/MQTTnet.TestApp.NetCore/Program.cs | 11 +- .../MainPage.xaml.cs | 2 +- 24 files changed, 273 insertions(+), 231 deletions(-) create mode 100644 MQTTnet.Core/Client/IMqttClientManaged.cs delete mode 100644 MQTTnet.Core/Client/IMqttClientQueued.cs create mode 100644 MQTTnet.Core/Client/MqttClientManaged .cs create mode 100644 MQTTnet.Core/Client/MqttClientManagedOptions.cs delete mode 100644 MQTTnet.Core/Client/MqttClientQueued.cs delete mode 100644 MQTTnet.Core/Client/MqttClientQueuedOptions.cs diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs index 218bb8c..ceaac98 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs @@ -7,7 +7,7 @@ namespace MQTTnet.Implementations { public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory { - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); @@ -21,7 +21,7 @@ namespace MQTTnet.Implementations return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } - if (options is MqttClientQueuedOptions queuedOptions) + if (options is MqttClientManagedOptions queuedOptions) { return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index dfe3654..2fe1804 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -121,7 +121,7 @@ namespace MQTTnet.Implementations return _options.TlsOptions.AllowUntrustedCertificates; } - private static X509CertificateCollection LoadCertificates(MqttClientOptions options) + private static X509CertificateCollection LoadCertificates(IMqttClientOptions options) { var certificates = new X509CertificateCollection(); if (options.TlsOptions.Certificates == null) diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 5bc8336..53ce276 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -10,9 +10,9 @@ namespace MQTTnet return new MqttClient(new MqttCommunicationAdapterFactory()); } - public IMqttClientQueued CreateMqttQueuedClient() + public IMqttClientManaged CreateMqttManagedClient() { - return new MqttClientQueued(new MqttCommunicationAdapterFactory()); + return new MqttClientManaged(new MqttCommunicationAdapterFactory()); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs index 36a34b1..9e429fd 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs @@ -7,7 +7,7 @@ namespace MQTTnet.Implementations { public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory { - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); @@ -21,7 +21,7 @@ namespace MQTTnet.Implementations return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } - if (options is MqttClientQueuedOptions queuedOptions) + if (options is MqttClientManagedOptions queuedOptions) { return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs index 03206f1..5335dae 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs @@ -89,7 +89,7 @@ namespace MQTTnet.Implementations RawReceiveStream = ReceiveStream; } - private static Certificate LoadCertificate(MqttClientOptions options) + private static Certificate LoadCertificate(IMqttClientOptions options) { if (options.TlsOptions.Certificates == null || !options.TlsOptions.Certificates.Any()) { diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index 5bc8336..53ce276 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -10,9 +10,9 @@ namespace MQTTnet return new MqttClient(new MqttCommunicationAdapterFactory()); } - public IMqttClientQueued CreateMqttQueuedClient() + public IMqttClientManaged CreateMqttManagedClient() { - return new MqttClientQueued(new MqttCommunicationAdapterFactory()); + return new MqttClientManaged(new MqttCommunicationAdapterFactory()); } } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClient.cs b/MQTTnet.Core/Client/IMqttClient.cs index 7622867..b46a2bc 100644 --- a/MQTTnet.Core/Client/IMqttClient.cs +++ b/MQTTnet.Core/Client/IMqttClient.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Core.Client event EventHandler Connected; event EventHandler Disconnected; - Task ConnectAsync(MqttClientOptions options); + Task ConnectAsync(IMqttClientOptions options); Task DisconnectAsync(); Task> SubscribeAsync(IEnumerable topicFilters); diff --git a/MQTTnet.Core/Client/IMqttClientFactory.cs b/MQTTnet.Core/Client/IMqttClientFactory.cs index f1d6411..25e03b3 100644 --- a/MQTTnet.Core/Client/IMqttClientFactory.cs +++ b/MQTTnet.Core/Client/IMqttClientFactory.cs @@ -4,6 +4,6 @@ { IMqttClient CreateMqttClient(); - IMqttClientQueued CreateMqttQueuedClient(); + IMqttClientManaged CreateMqttManagedClient(); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClientManaged.cs b/MQTTnet.Core/Client/IMqttClientManaged.cs new file mode 100644 index 0000000..1fc42f0 --- /dev/null +++ b/MQTTnet.Core/Client/IMqttClientManaged.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace MQTTnet.Core.Client +{ + public interface IMqttClientManaged : IMqttClient + { + //Task ConnectAsync(MqttClientManagedOptions options); + } +} diff --git a/MQTTnet.Core/Client/IMqttClientQueued.cs b/MQTTnet.Core/Client/IMqttClientQueued.cs deleted file mode 100644 index c172e0c..0000000 --- a/MQTTnet.Core/Client/IMqttClientQueued.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.Threading.Tasks; - -namespace MQTTnet.Core.Client -{ - public interface IMqttClientQueued: IMqttClient - { - Task ConnectAsync(MqttClientQueuedOptions options); - } -} diff --git a/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs b/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs index ffc5e5f..7909a56 100644 --- a/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs +++ b/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs @@ -5,8 +5,8 @@ namespace MQTTnet.Core.Client { public interface IMqttClientQueuedStorage { - Task SaveInflightMessagesAsync(IList messages); + Task SaveQueuedMessagesAsync(IList messages); - Task> LoadInflightMessagesAsync(); + Task> LoadQueuedMessagesAsync(); } } diff --git a/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs b/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs index 092ea04..0e29590 100644 --- a/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs +++ b/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs @@ -4,6 +4,6 @@ namespace MQTTnet.Core.Client { public interface IMqttCommunicationAdapterFactory { - IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options); + IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 002ebe8..6cdf051 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -18,7 +18,7 @@ namespace MQTTnet.Core.Client private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory; - private MqttClientOptions _options; + private IMqttClientOptions _options; private bool _isReceivingPackets; private int _latestPacketIdentifier; internal CancellationTokenSource _cancellationTokenSource; @@ -35,7 +35,8 @@ namespace MQTTnet.Core.Client public bool IsConnected => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested; - public async Task ConnectAsync(MqttClientOptions options) + + public async Task ConnectAsync(IMqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/MQTTnet.Core/Client/MqttClientManaged .cs b/MQTTnet.Core/Client/MqttClientManaged .cs new file mode 100644 index 0000000..03afc92 --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientManaged .cs @@ -0,0 +1,163 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; +using MQTTnet.Core.Internal; + +namespace MQTTnet.Core.Client +{ + public class MqttClientManaged: IMqttClientManaged + { + private MqttClientManagedOptions _options; + private int _latestPacketIdentifier; + private readonly BlockingCollection _inflightQueue; + private bool _usePersistance = false; + private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager; + private readonly MqttClient _baseMqttClient; + + public MqttClientManaged(IMqttCommunicationAdapterFactory communicationChannelFactory) + { + _baseMqttClient = new MqttClient(communicationChannelFactory); + _baseMqttClient.Connected += BaseMqttClient_Connected; + _baseMqttClient.Disconnected += BaseMqttClient_Disconnected; + _baseMqttClient.ApplicationMessageReceived += BaseMqttClient_ApplicationMessageReceived; + _inflightQueue = new BlockingCollection(); + } + + private void BaseMqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) + { + ApplicationMessageReceived?.Invoke(this, e); + } + + private void BaseMqttClient_Disconnected(object sender, EventArgs e) + { + Disconnected?.Invoke(this, e); + } + + private void BaseMqttClient_Connected(object sender, EventArgs e) + { + Connected?.Invoke(this, e); + } + + public event EventHandler Connected; + public event EventHandler Disconnected; + public event EventHandler ApplicationMessageReceived; + + public bool IsConnected => _baseMqttClient.IsConnected; + + + public async Task ConnectAsync(IMqttClientOptions options) + { + //TODO VERY BAD + _options = options as MqttClientManagedOptions; + this._usePersistance = _options.Storage != null; + await _baseMqttClient.ConnectAsync(options); + SetupOutgoingPacketProcessingAsync(); + + //load persistentMessages + if (_usePersistance) + { + if (_persistentMessagesManager == null) + _persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options); + await _persistentMessagesManager.LoadMessagesAsync(); + await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false); + } + } + + public async Task DisconnectAsync() + { + await _baseMqttClient.DisconnectAsync(); + } + + public async Task UnsubscribeAsync(IEnumerable topicFilters) + { + await _baseMqttClient.UnsubscribeAsync(topicFilters); + } + + public async Task PublishAsync(IEnumerable applicationMessages) + { + await InternalPublishAsync(applicationMessages, true); + } + + private async Task InternalPublishAsync(IEnumerable applicationMessages, bool appendIfUsePersistance) + { + ThrowIfNotConnected(); + + foreach (var applicationMessage in applicationMessages) + { + if (_usePersistance && appendIfUsePersistance) + await _persistentMessagesManager.SaveMessageAsync(applicationMessage); + + _inflightQueue.Add(applicationMessage); + } + } + + public async Task> SubscribeAsync(IEnumerable topicFilters) + { + return await _baseMqttClient.SubscribeAsync(topicFilters); + } + + private void ThrowIfNotConnected() + { + if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); + } + + private ushort GetNewPacketIdentifier() + { + return (ushort)Interlocked.Increment(ref _latestPacketIdentifier); + } + + + private void SetupOutgoingPacketProcessingAsync() + { + Task.Factory.StartNew( + () => SendPackets(_baseMqttClient._cancellationTokenSource.Token), + _baseMqttClient._cancellationTokenSource.Token, + TaskCreationOptions.LongRunning, + TaskScheduler.Default).ConfigureAwait(false); + } + + private async Task SendPackets(CancellationToken cancellationToken) + { + MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets."); + MqttApplicationMessage messageInQueue = null; + + try + { + while (!cancellationToken.IsCancellationRequested) + { + messageInQueue = _inflightQueue.Take(); + await _baseMqttClient.PublishAsync(new List() { messageInQueue }); + if (_usePersistance) + await _persistentMessagesManager.Remove(messageInQueue); + } + } + catch (OperationCanceledException) + { + } + catch (MqttCommunicationException exception) + { + MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets."); + //message not send, equeue it again + if (messageInQueue != null) + _inflightQueue.Add(messageInQueue); + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets."); + await DisconnectAsync().ConfigureAwait(false); + } + finally + { + MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); + } + } + } +} diff --git a/MQTTnet.Core/Client/MqttClientManagedOptions.cs b/MQTTnet.Core/Client/MqttClientManagedOptions.cs new file mode 100644 index 0000000..77feabe --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientManagedOptions.cs @@ -0,0 +1,12 @@ + +using System; + +namespace MQTTnet.Core.Client +{ + public class MqttClientManagedOptions: MqttClientTcpOptions + { + public bool UseAutoReconnect { get; set; } + public TimeSpan AutoReconnectDelay { get; set; } + public IMqttClientQueuedStorage Storage { get; set; } + } +} diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs index f2e28fb..35cdf5f 100644 --- a/MQTTnet.Core/Client/MqttClientOptions.cs +++ b/MQTTnet.Core/Client/MqttClientOptions.cs @@ -1,26 +1,26 @@ -using System; -using MQTTnet.Core.Serializer; +using MQTTnet.Core.Serializer; +using System; namespace MQTTnet.Core.Client { - public abstract class MqttClientOptions + public interface IMqttClientOptions { - public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); + MqttClientTlsOptions TlsOptions { get; set; } - public MqttApplicationMessage WillMessage { get; set; } + MqttApplicationMessage WillMessage { get; set; } - public string UserName { get; set; } + string UserName { get; set; } - public string Password { get; set; } + string Password { get; set; } - public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty); + string ClientId { get; set; } - public bool CleanSession { get; set; } = true; + bool CleanSession { get; set; } - public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); + TimeSpan KeepAlivePeriod { get; set; } - public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + TimeSpan DefaultCommunicationTimeout { get; set; } - public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + MqttProtocolVersion ProtocolVersion { get; set; } } } diff --git a/MQTTnet.Core/Client/MqttClientQueued.cs b/MQTTnet.Core/Client/MqttClientQueued.cs deleted file mode 100644 index 05d5766..0000000 --- a/MQTTnet.Core/Client/MqttClientQueued.cs +++ /dev/null @@ -1,166 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Core.Adapter; -using MQTTnet.Core.Diagnostics; -using MQTTnet.Core.Exceptions; -using MQTTnet.Core.Packets; -using MQTTnet.Core.Protocol; -using MQTTnet.Core.Internal; - -namespace MQTTnet.Core.Client -{ - public class MqttClientQueued : MqttClient, IMqttClientQueued - { - private MqttClientQueuedOptions _options; - private int _latestPacketIdentifier; - private readonly ConcurrentQueue _inflightQueue; - private bool _usePersistance = false; - private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager; - - public MqttClientQueued(IMqttCommunicationAdapterFactory communicationChannelFactory) : base(communicationChannelFactory) - { - _inflightQueue = new ConcurrentQueue(); - } - - - public async Task ConnectAsync(MqttClientQueuedOptions options) - { - try - { - _options = options; - this._usePersistance = _options.UsePersistence; - await base.ConnectAsync(options); - SetupOutgoingPacketProcessingAsync(); - - //load persistentMessages - if (_usePersistance) - { - if (_persistentMessagesManager == null) - _persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options); - await _persistentMessagesManager.LoadMessagesAsync(); - await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false); - } - } - catch (Exception) - { - await DisconnectAsync().ConfigureAwait(false); - throw; - } - } - - public new async Task PublishAsync(IEnumerable applicationMessages) - { - await InternalPublishAsync(applicationMessages, true); - } - - private async Task InternalPublishAsync(IEnumerable applicationMessages, bool appendIfUsePersistance) - { - ThrowIfNotConnected(); - - foreach (var applicationMessage in applicationMessages) - { - if (_usePersistance && appendIfUsePersistance) - await _persistentMessagesManager.SaveMessageAsync(applicationMessage); - - _inflightQueue.Enqueue(applicationMessage); - } - } - - public new async Task> SubscribeAsync(IEnumerable topicFilters) - { - return await base.SubscribeAsync(topicFilters); - } - - private void ThrowIfNotConnected() - { - if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); - } - - private ushort GetNewPacketIdentifier() - { - return (ushort)Interlocked.Increment(ref _latestPacketIdentifier); - } - - - private void SetupOutgoingPacketProcessingAsync() - { -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Factory.StartNew( - () => SendPackets(base._cancellationTokenSource.Token), - base._cancellationTokenSource.Token, - TaskCreationOptions.LongRunning, - TaskScheduler.Default).ConfigureAwait(false); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - } - - private async Task SendPackets(CancellationToken cancellationToken) - { - MqttNetTrace.Information(nameof(MqttClient), "Start sending packets."); - MqttApplicationMessage messageToSend = null; - try - { - while (!cancellationToken.IsCancellationRequested) - { - while (_inflightQueue.TryDequeue(out messageToSend)) - { - MqttQualityOfServiceLevel qosGroup = messageToSend.QualityOfServiceLevel; - MqttPublishPacket publishPacket = messageToSend.ToPublishPacket(); - switch (qosGroup) - { - case MqttQualityOfServiceLevel.AtMostOnce: - { - // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - await base._adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, base._cancellationTokenSource.Token, publishPacket); - break; - } - - case MqttQualityOfServiceLevel.AtLeastOnce: - { - publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - await base.SendAndReceiveAsync(publishPacket); - break; - } - case MqttQualityOfServiceLevel.ExactlyOnce: - { - publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - var pubRecPacket = await base.SendAndReceiveAsync(publishPacket).ConfigureAwait(false); - await base.SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); - break; - } - default: - { - throw new InvalidOperationException(); - } - } - //delete from persistence - if (_usePersistance) - await _persistentMessagesManager.Remove(messageToSend); - } - }; - } - catch (OperationCanceledException) - { - } - catch (MqttCommunicationException exception) - { - MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets."); - //message not send, equeued again - if (messageToSend != null) - _inflightQueue.Enqueue(messageToSend); - } - catch (Exception exception) - { - MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets."); - await DisconnectAsync().ConfigureAwait(false); - } - finally - { - MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); - } - } - } -} diff --git a/MQTTnet.Core/Client/MqttClientQueuedOptions.cs b/MQTTnet.Core/Client/MqttClientQueuedOptions.cs deleted file mode 100644 index e7af0d7..0000000 --- a/MQTTnet.Core/Client/MqttClientQueuedOptions.cs +++ /dev/null @@ -1,11 +0,0 @@ - - -namespace MQTTnet.Core.Client -{ - public class MqttClientQueuedOptions: MqttClientTcpOptions - { - public bool UsePersistence { get; set; } - - public IMqttClientQueuedStorage Storage { get; set; } - } -} diff --git a/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs b/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs index 235b2bf..b012e21 100644 --- a/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs +++ b/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs @@ -10,9 +10,9 @@ namespace MQTTnet.Core.Client public class MqttClientQueuedPersistentMessagesManager { private readonly IList _persistedMessages = new List(); - private readonly MqttClientQueuedOptions _options; + private readonly MqttClientManagedOptions _options; - public MqttClientQueuedPersistentMessagesManager(MqttClientQueuedOptions options) + public MqttClientQueuedPersistentMessagesManager(MqttClientManagedOptions options) { _options = options ?? throw new ArgumentNullException(nameof(options)); } @@ -21,7 +21,7 @@ namespace MQTTnet.Core.Client { try { - var persistentMessages = await _options.Storage.LoadInflightMessagesAsync(); + var persistentMessages = await _options.Storage.LoadQueuedMessagesAsync(); lock (_persistedMessages) { _persistedMessages.Clear(); @@ -50,7 +50,7 @@ namespace MQTTnet.Core.Client { if (_options.Storage != null) { - await _options.Storage.SaveInflightMessagesAsync(_persistedMessages); + await _options.Storage.SaveQueuedMessagesAsync(_persistedMessages); } } catch (Exception exception) diff --git a/MQTTnet.Core/Client/MqttClientTcpOptions.cs b/MQTTnet.Core/Client/MqttClientTcpOptions.cs index beaa506..98d27dc 100644 --- a/MQTTnet.Core/Client/MqttClientTcpOptions.cs +++ b/MQTTnet.Core/Client/MqttClientTcpOptions.cs @@ -1,7 +1,29 @@ -namespace MQTTnet.Core.Client +using MQTTnet.Core.Serializer; +using System; + +namespace MQTTnet.Core.Client { - public class MqttClientTcpOptions : MqttClientOptions + public class MqttClientTcpOptions : IMqttClientOptions { + public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); + + public MqttApplicationMessage WillMessage { get; set; } + + public string UserName { get; set; } + + public string Password { get; set; } + + public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty); + + public bool CleanSession { get; set; } = true; + + public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); + + public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + + public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + + public string Server { get; set; } public int? Port { get; set; } diff --git a/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs b/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs index 4b90524..b7a904d 100644 --- a/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs +++ b/MQTTnet.Core/Client/MqttClientWebSocketOptions.cs @@ -1,7 +1,29 @@ -namespace MQTTnet.Core.Client +using System; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Core.Client { - public class MqttClientWebSocketOptions : MqttClientOptions + public class MqttClientWebSocketOptions : IMqttClientOptions { public string Uri { get; set; } + + public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); + + public MqttApplicationMessage WillMessage { get; set; } + + public string UserName { get; set; } + + public string Password { get; set; } + + public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty); + + public bool CleanSession { get; set; } = true; + + public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); + + public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); + + public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + } } diff --git a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs index c83f83a..bed3218 100644 --- a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet.Core.Tests _adapter = adapter; } - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options) { return _adapter; } diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index 63e4380..eebe4b2 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -57,7 +57,7 @@ namespace MQTTnet.TestApp.NetCore try { - var options = new MqttClientQueuedOptions + var options = new MqttClientManagedOptions { Server = "192.168.0.14", ClientId = "XYZ", @@ -66,11 +66,10 @@ namespace MQTTnet.TestApp.NetCore Password = "passworda", KeepAlivePeriod = TimeSpan.FromSeconds(31), DefaultCommunicationTimeout = TimeSpan.FromSeconds(20), - UsePersistence = true, Storage = new TestStorage(), }; - var client = new MqttClientFactory().CreateMqttQueuedClient(); + var client = new MqttClientFactory().CreateMqttManagedClient(); client.ApplicationMessageReceived += (s, e) => { Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); @@ -177,7 +176,7 @@ namespace MQTTnet.TestApp.NetCore await client.SubscribeAsync(new List { - new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce) + new TopicFilter("#", MqttQualityOfServiceLevel.ExactlyOnce) }); Console.WriteLine("### SUBSCRIBED ###"); @@ -299,7 +298,7 @@ namespace MQTTnet.TestApp.NetCore string serializationFile = System.IO.Path.Combine(Environment.CurrentDirectory, "messages.bin"); private IList _messages = new List(); - public Task> LoadInflightMessagesAsync() + public Task> LoadQueuedMessagesAsync() { //deserialize // MqttApplicationMessage is not serializable @@ -319,7 +318,7 @@ namespace MQTTnet.TestApp.NetCore return Task.FromResult(_messages); } - public Task SaveInflightMessagesAsync(IList messages) + public Task SaveQueuedMessagesAsync(IList messages) { _messages = messages; //serialize diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 07d5955..4565626 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -41,7 +41,7 @@ namespace MQTTnet.TestApp.UniversalWindows private async void Connect(object sender, RoutedEventArgs e) { - MqttClientOptions options = null; + IMqttClientOptions options = null; if (UseTcp.IsChecked == true) { options = new MqttClientTcpOptions