Browse Source

Refactor ManagedMqttClientStorage

release/3.x.x
Christian Kratky 6 years ago
parent
commit
137d1d181b
2 changed files with 18 additions and 25 deletions
  1. +4
    -3
      Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs
  2. +14
    -22
      Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs

+ 4
- 3
Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs View File

@@ -68,10 +68,11 @@ namespace MQTTnet.ManagedClient
if (_options.Storage != null)
{
_storageManager = new ManagedMqttClientStorageManager(_options.Storage);
await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
foreach (var loadedMessage in _storageManager.ApplicationMessages)
var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);

foreach (var message in messages)
{
_messageQueue.Add(loadedMessage);
_messageQueue.Add(message);
}
}



+ 14
- 22
Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs View File

@@ -2,65 +2,57 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Internal;

namespace MQTTnet.ManagedClient
{
public class ManagedMqttClientStorageManager
{
private readonly List<MqttApplicationMessage> _applicationMessages = new List<MqttApplicationMessage>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly IManagedMqttClientStorage _storage;
private readonly List<MqttApplicationMessage> _messages = new List<MqttApplicationMessage>();
private readonly AsyncLock _messagesLock = new AsyncLock();

public List<MqttApplicationMessage> ApplicationMessages => _applicationMessages;
private readonly IManagedMqttClientStorage _storage;

public ManagedMqttClientStorageManager(IManagedMqttClientStorage storage)
{
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
}

public async Task LoadQueuedMessagesAsync()
public async Task<List<MqttApplicationMessage>> LoadQueuedMessagesAsync()
{
var loadedMessages = await _storage.LoadQueuedMessagesAsync().ConfigureAwait(false);
_applicationMessages.AddRange(loadedMessages);
_messages.AddRange(loadedMessages);

return _messages;
}

public async Task AddAsync(MqttApplicationMessage applicationMessage)
{
await _semaphore.WaitAsync().ConfigureAwait(false);
try
using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
{
_applicationMessages.Add(applicationMessage);
_messages.Add(applicationMessage);
await SaveAsync().ConfigureAwait(false);
}
finally
{
_semaphore.Release();
}
}

public async Task RemoveAsync(MqttApplicationMessage applicationMessage)
{
await _semaphore.WaitAsync().ConfigureAwait(false);
try
using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
{
var index = _applicationMessages.IndexOf(applicationMessage);
var index = _messages.IndexOf(applicationMessage);
if (index == -1)
{
return;
}

_applicationMessages.RemoveAt(index);
_messages.RemoveAt(index);
await SaveAsync().ConfigureAwait(false);
}
finally
{
_semaphore.Release();
}
}

private Task SaveAsync()
{
return _storage.SaveQueuedMessagesAsync(_applicationMessages);
return _storage.SaveQueuedMessagesAsync(_messages);
}
}
}

Loading…
Cancel
Save