From 137d1d181bd45e8e676b78c3004b246e4a1e98f8 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 29 May 2018 21:21:30 +0200 Subject: [PATCH] Refactor ManagedMqttClientStorage --- .../ManagedClient/ManagedMqttClient.cs | 7 ++-- .../ManagedMqttClientStorageManager.cs | 36 ++++++++----------- 2 files changed, 18 insertions(+), 25 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs index 8b63694..7cad356 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs @@ -68,10 +68,11 @@ namespace MQTTnet.ManagedClient if (_options.Storage != null) { _storageManager = new ManagedMqttClientStorageManager(_options.Storage); - await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false); - foreach (var loadedMessage in _storageManager.ApplicationMessages) + var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false); + + foreach (var message in messages) { - _messageQueue.Add(loadedMessage); + _messageQueue.Add(message); } } diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs index 156fba0..49eaae3 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs @@ -2,65 +2,57 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Internal; namespace MQTTnet.ManagedClient { public class ManagedMqttClientStorageManager { - private readonly List _applicationMessages = new List(); - private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); - private readonly IManagedMqttClientStorage _storage; + private readonly List _messages = new List(); + private readonly AsyncLock _messagesLock = new AsyncLock(); - public List ApplicationMessages => _applicationMessages; + private readonly IManagedMqttClientStorage _storage; public ManagedMqttClientStorageManager(IManagedMqttClientStorage storage) { _storage = storage ?? throw new ArgumentNullException(nameof(storage)); } - public async Task LoadQueuedMessagesAsync() + public async Task> LoadQueuedMessagesAsync() { var loadedMessages = await _storage.LoadQueuedMessagesAsync().ConfigureAwait(false); - _applicationMessages.AddRange(loadedMessages); + _messages.AddRange(loadedMessages); + + return _messages; } public async Task AddAsync(MqttApplicationMessage applicationMessage) { - await _semaphore.WaitAsync().ConfigureAwait(false); - try + using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { - _applicationMessages.Add(applicationMessage); + _messages.Add(applicationMessage); await SaveAsync().ConfigureAwait(false); } - finally - { - _semaphore.Release(); - } } public async Task RemoveAsync(MqttApplicationMessage applicationMessage) { - await _semaphore.WaitAsync().ConfigureAwait(false); - try + using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { - var index = _applicationMessages.IndexOf(applicationMessage); + var index = _messages.IndexOf(applicationMessage); if (index == -1) { return; } - _applicationMessages.RemoveAt(index); + _messages.RemoveAt(index); await SaveAsync().ConfigureAwait(false); } - finally - { - _semaphore.Release(); - } } private Task SaveAsync() { - return _storage.SaveQueuedMessagesAsync(_applicationMessages); + return _storage.SaveQueuedMessagesAsync(_messages); } } }