diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs index 799ccdd..218bb8c 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs @@ -21,6 +21,11 @@ namespace MQTTnet.Implementations return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); } + if (options is MqttClientQueuedOptions queuedOptions) + { + return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + } + throw new NotSupportedException(); } } diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 9358390..5bc8336 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -10,7 +10,7 @@ namespace MQTTnet return new MqttClient(new MqttCommunicationAdapterFactory()); } - public IMqttClient CreateMqttQueuedClient() + public IMqttClientQueued CreateMqttQueuedClient() { return new MqttClientQueued(new MqttCommunicationAdapterFactory()); } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs index 799ccdd..2bb7c5e 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs @@ -7,7 +7,7 @@ namespace MQTTnet.Implementations { public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory { - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientQueuedOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs index 03206f1..db10291 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs @@ -89,7 +89,7 @@ namespace MQTTnet.Implementations RawReceiveStream = ReceiveStream; } - private static Certificate LoadCertificate(MqttClientOptions options) + private static Certificate LoadCertificate(MqttClientQueuedOptions options) { if (options.TlsOptions.Certificates == null || !options.TlsOptions.Certificates.Any()) { diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index 15da033..5bc8336 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -9,5 +9,10 @@ namespace MQTTnet { return new MqttClient(new MqttCommunicationAdapterFactory()); } + + public IMqttClientQueued CreateMqttQueuedClient() + { + return new MqttClientQueued(new MqttCommunicationAdapterFactory()); + } } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClientFactory.cs b/MQTTnet.Core/Client/IMqttClientFactory.cs index 033eb99..f1d6411 100644 --- a/MQTTnet.Core/Client/IMqttClientFactory.cs +++ b/MQTTnet.Core/Client/IMqttClientFactory.cs @@ -3,5 +3,7 @@ public interface IMqttClientFactory { IMqttClient CreateMqttClient(); + + IMqttClientQueued CreateMqttQueuedClient(); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClientQueued.cs b/MQTTnet.Core/Client/IMqttClientQueued.cs new file mode 100644 index 0000000..c172e0c --- /dev/null +++ b/MQTTnet.Core/Client/IMqttClientQueued.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace MQTTnet.Core.Client +{ + public interface IMqttClientQueued: IMqttClient + { + Task ConnectAsync(MqttClientQueuedOptions options); + } +} diff --git a/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs b/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs new file mode 100644 index 0000000..ffc5e5f --- /dev/null +++ b/MQTTnet.Core/Client/IMqttClientQueuedStorage.cs @@ -0,0 +1,12 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace MQTTnet.Core.Client +{ + public interface IMqttClientQueuedStorage + { + Task SaveInflightMessagesAsync(IList messages); + + Task> LoadInflightMessagesAsync(); + } +} diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 80b850a..002ebe8 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -21,8 +21,8 @@ namespace MQTTnet.Core.Client private MqttClientOptions _options; private bool _isReceivingPackets; private int _latestPacketIdentifier; - private CancellationTokenSource _cancellationTokenSource; - private IMqttCommunicationAdapter _adapter; + internal CancellationTokenSource _cancellationTokenSource; + internal IMqttCommunicationAdapter _adapter; public MqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory) { @@ -344,7 +344,7 @@ namespace MQTTnet.Core.Client return _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, packet); } - private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket + internal async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket { var packetAwaiter = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout); await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false); diff --git a/MQTTnet.Core/Client/MqttClientQueued.cs b/MQTTnet.Core/Client/MqttClientQueued.cs index 925b965..05d5766 100644 --- a/MQTTnet.Core/Client/MqttClientQueued.cs +++ b/MQTTnet.Core/Client/MqttClientQueued.cs @@ -13,454 +13,88 @@ using MQTTnet.Core.Internal; namespace MQTTnet.Core.Client { - public class MqttClientQueued : IMqttClient + public class MqttClientQueued : MqttClient, IMqttClientQueued { - #region Fields - private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory; - private readonly MqttPacketDispatcher _packetDispatcher; - private readonly HashSet _unacknowledgedPublishPackets; - - private MqttClientOptions _options; - private bool _isReceivingPackets; + private MqttClientQueuedOptions _options; private int _latestPacketIdentifier; - private CancellationTokenSource _cancellationTokenSource; - private IMqttCommunicationAdapter _adapter; - - //added private readonly ConcurrentQueue _inflightQueue; private bool _usePersistance = false; - private ConcurrentQueue _persistentQueue; - #endregion + private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager; - #region Ctrs - public MqttClientQueued(IMqttCommunicationAdapterFactory communicationChannelFactory) + public MqttClientQueued(IMqttCommunicationAdapterFactory communicationChannelFactory) : base(communicationChannelFactory) { - _communicationChannelFactory = communicationChannelFactory ?? throw new ArgumentNullException(nameof(communicationChannelFactory)); - _packetDispatcher = new MqttPacketDispatcher(); - _unacknowledgedPublishPackets = new HashSet(); _inflightQueue = new ConcurrentQueue(); - _persistentQueue = new ConcurrentQueue(); } - #endregion - - #region Events - public event EventHandler Connected; - public event EventHandler Disconnected; - public event EventHandler ApplicationMessageReceived; - #endregion - - #region Poperties - public bool IsConnected => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested; - #endregion - #region MqttClient Methods - public async Task ConnectAsync(MqttClientOptions options) + public async Task ConnectAsync(MqttClientQueuedOptions options) { - if (options == null) throw new ArgumentNullException(nameof(options)); - - ThrowIfConnected("It is not allowed to connect with a server after the connection is established."); - try - { + { _options = options; - _cancellationTokenSource = new CancellationTokenSource(); - _latestPacketIdentifier = 0; - _packetDispatcher.Reset(); - - _adapter = _communicationChannelFactory.CreateMqttCommunicationAdapter(options); - - MqttNetTrace.Verbose(nameof(MqttClient), "Trying to connect with server."); - await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); - MqttNetTrace.Verbose(nameof(MqttClient), "Connection with server established."); - - await SetupIncomingPacketProcessingAsync(); + this._usePersistance = _options.UsePersistence; + await base.ConnectAsync(options); SetupOutgoingPacketProcessingAsync(); - await AuthenticateAsync(options.WillMessage); - MqttNetTrace.Verbose(nameof(MqttClient), "MQTT connection with server established."); - - if (_options.KeepAlivePeriod != TimeSpan.Zero) + //load persistentMessages + if (_usePersistance) { - StartSendKeepAliveMessages(_cancellationTokenSource.Token); + if (_persistentMessagesManager == null) + _persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options); + await _persistentMessagesManager.LoadMessagesAsync(); + await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false); } - - Connected?.Invoke(this, EventArgs.Empty); } catch (Exception) { - await DisconnectInternalAsync().ConfigureAwait(false); + await DisconnectAsync().ConfigureAwait(false); throw; } } - public async Task DisconnectAsync() + public new async Task PublishAsync(IEnumerable applicationMessages) { - if (!IsConnected) - { - return; - } - - try - { - await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false); - } - finally - { - await DisconnectInternalAsync().ConfigureAwait(false); - } + await InternalPublishAsync(applicationMessages, true); } - public async Task PublishAsync(IEnumerable applicationMessages) + private async Task InternalPublishAsync(IEnumerable applicationMessages, bool appendIfUsePersistance) { -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed ThrowIfNotConnected(); foreach (var applicationMessage in applicationMessages) { - if (_usePersistance) - _persistentQueue.Enqueue(applicationMessage); - _inflightQueue.Enqueue(applicationMessage); - } -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - } - - public async Task> SubscribeAsync(IEnumerable topicFilters) - { - if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - - ThrowIfNotConnected(); - - var subscribePacket = new MqttSubscribePacket - { - PacketIdentifier = GetNewPacketIdentifier(), - TopicFilters = topicFilters.ToList() - }; - - var response = await SendAndReceiveAsync(subscribePacket).ConfigureAwait(false); + if (_usePersistance && appendIfUsePersistance) + await _persistentMessagesManager.SaveMessageAsync(applicationMessage); - if (response.SubscribeReturnCodes.Count != subscribePacket.TopicFilters.Count) - { - throw new MqttProtocolViolationException("The return codes are not matching the topic filters [MQTT-3.9.3-1]."); + _inflightQueue.Enqueue(applicationMessage); } - - return subscribePacket.TopicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList(); } - public async Task UnsubscribeAsync(IEnumerable topicFilters) + public new async Task> SubscribeAsync(IEnumerable topicFilters) { - if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - - ThrowIfNotConnected(); - - var unsubscribePacket = new MqttUnsubscribePacket - { - PacketIdentifier = GetNewPacketIdentifier(), - TopicFilters = topicFilters.ToList() - }; - - await SendAndReceiveAsync(unsubscribePacket); + return await base.SubscribeAsync(topicFilters); } - #region private private void ThrowIfNotConnected() { if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); } - private void ThrowIfConnected(string message) - { - if (IsConnected) throw new MqttProtocolViolationException(message); - } - - private async Task SetupIncomingPacketProcessingAsync() - { - _isReceivingPackets = false; - -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Factory.StartNew( - () => ReceivePackets(_cancellationTokenSource.Token), - _cancellationTokenSource.Token, - TaskCreationOptions.LongRunning, - TaskScheduler.Default).ConfigureAwait(false); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - - while (!_isReceivingPackets && _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested) - { - await Task.Delay(TimeSpan.FromMilliseconds(100)); - } - } - - private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage) - { - var connectPacket = new MqttConnectPacket - { - ClientId = _options.ClientId, - Username = _options.UserName, - Password = _options.Password, - CleanSession = _options.CleanSession, - KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, - WillMessage = willApplicationMessage - }; - - var response = await SendAndReceiveAsync(connectPacket).ConfigureAwait(false); - if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) - { - throw new MqttConnectingFailedException(response.ConnectReturnCode); - } - } - - private async Task ReceivePackets(CancellationToken cancellationToken) - { - MqttNetTrace.Information(nameof(MqttClient), "Start receiving packets."); - - try - { - while (!cancellationToken.IsCancellationRequested) - { - _isReceivingPackets = true; - - var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false); - if (cancellationToken.IsCancellationRequested) - { - return; - } - - StartProcessReceivedPacket(packet, cancellationToken); - } - } - catch (OperationCanceledException) - { - } - catch (MqttCommunicationException exception) - { - if (cancellationToken.IsCancellationRequested) - { - return; - } - - MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); - await DisconnectInternalAsync().ConfigureAwait(false); - } - catch (Exception exception) - { - MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); - await DisconnectInternalAsync().ConfigureAwait(false); - } - finally - { - MqttNetTrace.Information(nameof(MqttClient), "Stopped receiving packets."); - } - } - - private void StartProcessReceivedPacket(MqttBasePacket packet, CancellationToken cancellationToken) - { -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => ProcessReceivedPacketAsync(packet), cancellationToken).ConfigureAwait(false); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - } - - private async Task ProcessReceivedPacketAsync(MqttBasePacket packet) - { - try - { - MqttNetTrace.Information(nameof(MqttClient), "Received <<< {0}", packet); - - if (packet is MqttPingReqPacket) - { - await SendAsync(new MqttPingRespPacket()); - return; - } - - if (packet is MqttDisconnectPacket) - { - await DisconnectAsync(); - return; - } - - if (packet is MqttPublishPacket publishPacket) - { - await ProcessReceivedPublishPacket(publishPacket); - return; - } - - if (packet is MqttPubRelPacket pubRelPacket) - { - await ProcessReceivedPubRelPacket(pubRelPacket); - return; - } - - _packetDispatcher.Dispatch(packet); - } - catch (Exception exception) - { - MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while processing received packet."); - } - } - - private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket) - { - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) - { - FireApplicationMessageReceivedEvent(publishPacket); - return; - } - - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) - { - FireApplicationMessageReceivedEvent(publishPacket); - await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); - return; - } - - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) - { - // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] - lock (_unacknowledgedPublishPackets) - { - _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier); - } - - FireApplicationMessageReceivedEvent(publishPacket); - await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); - return; - } - - throw new MqttCommunicationException("Received a not supported QoS level."); - } - - private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) - { - lock (_unacknowledgedPublishPackets) - { - _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); - } - - return SendAsync(pubRelPacket.CreateResponse()); - } - - private async Task DisconnectInternalAsync() - { - var cts = _cancellationTokenSource; - if (cts == null || cts.IsCancellationRequested) - { - return; - } - - cts.Cancel(false); - cts.Dispose(); - _cancellationTokenSource = null; - - try - { - await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); - MqttNetTrace.Information(nameof(MqttClient), "Disconnected from adapter."); - } - catch (Exception exception) - { - MqttNetTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting from adapter."); - } - finally - { - Disconnected?.Invoke(this, EventArgs.Empty); - } - } - - private void StartSendKeepAliveMessages(CancellationToken cancellationToken) - { -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Factory.StartNew(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - } - - private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) - { - MqttNetTrace.Information(nameof(MqttClient), "Start sending keep alive packets."); - - try - { - while (!cancellationToken.IsCancellationRequested) - { - await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false); - if (cancellationToken.IsCancellationRequested) - { - return; - } - - await SendAndReceiveAsync(new MqttPingReqPacket()).ConfigureAwait(false); - } - } - catch (OperationCanceledException) - { - } - catch (MqttCommunicationException exception) - { - if (cancellationToken.IsCancellationRequested) - { - return; - } - - MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending/receiving keep alive packets."); - await DisconnectInternalAsync().ConfigureAwait(false); - } - catch (Exception exception) - { - MqttNetTrace.Warning(nameof(MqttClient), exception, "Unhandled exception while sending/receiving keep alive packets."); - await DisconnectInternalAsync().ConfigureAwait(false); - } - finally - { - MqttNetTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); - } - } - - private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket - { - var packetAwaiter = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout); - await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false); - return (TResponsePacket)await packetAwaiter.ConfigureAwait(false); - } - - private Task SendAsync(MqttBasePacket packet) - { - return _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, packet); - } - private ushort GetNewPacketIdentifier() { return (ushort)Interlocked.Increment(ref _latestPacketIdentifier); } + - private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) - { - try - { - var applicationMessage = publishPacket.ToApplicationMessage(); - ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage)); - } - catch (Exception exception) - { - MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); - } - } - #endregion - #endregion - - #region MqttClientQueued Methos private void SetupOutgoingPacketProcessingAsync() { #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed Task.Factory.StartNew( - () => SendPackets(_cancellationTokenSource.Token), - _cancellationTokenSource.Token, + () => SendPackets(base._cancellationTokenSource.Token), + base._cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - - //while (_cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested) - //{ - // await Task.Delay(TimeSpan.FromMilliseconds(100)); - //} } private async Task SendPackets(CancellationToken cancellationToken) @@ -480,21 +114,21 @@ namespace MQTTnet.Core.Client case MqttQualityOfServiceLevel.AtMostOnce: { // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, publishPacket); + await base._adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, base._cancellationTokenSource.Token, publishPacket); break; } case MqttQualityOfServiceLevel.AtLeastOnce: { publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - await SendAndReceiveAsync(publishPacket); + await base.SendAndReceiveAsync(publishPacket); break; } case MqttQualityOfServiceLevel.ExactlyOnce: { publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - var pubRecPacket = await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); - await SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); + var pubRecPacket = await base.SendAndReceiveAsync(publishPacket).ConfigureAwait(false); + await base.SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); break; } default: @@ -502,6 +136,9 @@ namespace MQTTnet.Core.Client throw new InvalidOperationException(); } } + //delete from persistence + if (_usePersistance) + await _persistentMessagesManager.Remove(messageToSend); } }; } @@ -518,13 +155,12 @@ namespace MQTTnet.Core.Client catch (Exception exception) { MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets."); - await DisconnectInternalAsync().ConfigureAwait(false); + await DisconnectAsync().ConfigureAwait(false); } finally { MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); } } - #endregion } } diff --git a/MQTTnet.Core/Client/MqttClientQueuedOptions.cs b/MQTTnet.Core/Client/MqttClientQueuedOptions.cs new file mode 100644 index 0000000..e7af0d7 --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientQueuedOptions.cs @@ -0,0 +1,11 @@ + + +namespace MQTTnet.Core.Client +{ + public class MqttClientQueuedOptions: MqttClientTcpOptions + { + public bool UsePersistence { get; set; } + + public IMqttClientQueuedStorage Storage { get; set; } + } +} diff --git a/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs b/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs new file mode 100644 index 0000000..235b2bf --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs @@ -0,0 +1,84 @@ +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Packets; +using System; +using System.Linq; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace MQTTnet.Core.Client +{ + public class MqttClientQueuedPersistentMessagesManager + { + private readonly IList _persistedMessages = new List(); + private readonly MqttClientQueuedOptions _options; + + public MqttClientQueuedPersistentMessagesManager(MqttClientQueuedOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + + public async Task LoadMessagesAsync() + { + try + { + var persistentMessages = await _options.Storage.LoadInflightMessagesAsync(); + lock (_persistedMessages) + { + _persistedMessages.Clear(); + foreach (var persistentMessage in persistentMessages) + { + _persistedMessages.Add(persistentMessage); + } + } + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while loading persistent messages."); + } + } + + public async Task SaveMessageAsync(MqttApplicationMessage applicationMessage) + { + if (applicationMessage != null) + { + lock (_persistedMessages) + { + _persistedMessages.Add(applicationMessage); + } + } + try + { + if (_options.Storage != null) + { + await _options.Storage.SaveInflightMessagesAsync(_persistedMessages); + } + } + catch (Exception exception) + { + MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while saving persistent messages."); + } + } + + public List GetMessages() + { + var persistedMessages = new List(); + lock (_persistedMessages) + { + foreach (var persistedMessage in _persistedMessages) + { + persistedMessages.Add(persistedMessage); + } + } + + return persistedMessages; + } + + public async Task Remove(MqttApplicationMessage message) + { + lock (_persistedMessages) + _persistedMessages.Remove(message); + + await SaveMessageAsync(null); + } + } +} diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index 1e507cb..5080593 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -22,20 +22,6 @@ - - - - - - - - - - - - - - \ No newline at end of file diff --git a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs index c83f83a..1b451ee 100644 --- a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs @@ -12,7 +12,7 @@ namespace MQTTnet.Core.Tests _adapter = adapter; } - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options) + public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientQueuedOptions options) { return _adapter; } diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index f2a44e0..be69c9a 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -6,6 +6,7 @@ using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; using System; using System.Collections.Generic; +using System.IO; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -57,7 +58,7 @@ namespace MQTTnet.TestApp.NetCore try { - var options = new MqttClientTcpOptions + var options = new MqttClientQueuedOptions { Server = "192.168.0.14", ClientId = "XYZ", @@ -66,7 +67,8 @@ namespace MQTTnet.TestApp.NetCore Password = "passworda", KeepAlivePeriod = TimeSpan.FromSeconds(31), DefaultCommunicationTimeout = TimeSpan.FromSeconds(20), - + UsePersistence = true, + Storage = new TestStorage(), }; var client = new MqttClientFactory().CreateMqttQueuedClient(); @@ -272,5 +274,70 @@ namespace MQTTnet.TestApp.NetCore Console.ReadLine(); } + + [Serializable] + public sealed class TemporaryApplicationMessage + { + public TemporaryApplicationMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, bool retain) + { + Topic = topic ?? throw new ArgumentNullException(nameof(topic)); + Payload = payload ?? throw new ArgumentNullException(nameof(payload)); + QualityOfServiceLevel = qualityOfServiceLevel; + Retain = retain; + } + + public string Topic { get; } + + public byte[] Payload { get; } + + public MqttQualityOfServiceLevel QualityOfServiceLevel { get; } + + public bool Retain { get; } + } + + private class TestStorage : IMqttClientQueuedStorage + { + string serializationFile = Path.Combine(Environment.CurrentDirectory, "messages.bin"); + private IList _messages = new List(); + + public Task> LoadInflightMessagesAsync() + { + //deserialize + // MqttApplicationMessage is not serializable + if (File.Exists(serializationFile)) + { + using (Stream stream = File.Open(serializationFile, FileMode.Open)) + { + var bformatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); + + var temp = (List)bformatter.Deserialize(stream); + foreach (var a in temp) + { + _messages.Add(new MqttApplicationMessage(a.Topic, a.Payload, a.QualityOfServiceLevel, a.Retain)); + } + } + } + return Task.FromResult(_messages); + } + + public Task SaveInflightMessagesAsync(IList messages) + { + _messages = messages; + //serialize + // MqttApplicationMessage is not serializable + using (System.IO.Stream stream = File.Open(serializationFile, FileMode.Create)) + { + var bformatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); + IList temp = new List(); + foreach (var m in _messages) + { + temp.Add(new TemporaryApplicationMessage(m.Topic, m.Payload, m.QualityOfServiceLevel, m.Retain)); + } + bformatter.Serialize(stream, temp); + } + + return Task.FromResult(0); + } + } } } diff --git a/Tests/MQTTnet.TestApp.NetCore/messages.bin b/Tests/MQTTnet.TestApp.NetCore/messages.bin new file mode 100644 index 0000000..59505fa Binary files /dev/null and b/Tests/MQTTnet.TestApp.NetCore/messages.bin differ diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 07d5955..8a22f5e 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -41,7 +41,7 @@ namespace MQTTnet.TestApp.UniversalWindows private async void Connect(object sender, RoutedEventArgs e) { - MqttClientOptions options = null; + MqttClientQueuedOptions options = null; if (UseTcp.IsChecked == true) { options = new MqttClientTcpOptions