ソースを参照

Optimized persisting of retained messages to avoid useless writes to the filesystem

release/3.x.x
Christian Kratky 7年前
コミット
d2e481719c
8個のファイルの変更154行の追加27行の削除
  1. +1
    -1
      MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs
  2. +51
    -20
      MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs
  3. +4
    -3
      MQTTnet.Core/Server/MqttClientSession.cs
  4. +1
    -1
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs
  5. +50
    -0
      Tests/MQTTnet.TestApp.UniversalWindows/JsonServerStorage.cs
  6. +1
    -0
      Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj
  7. +15
    -2
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml
  8. +31
    -0
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 1
- 1
MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs ファイルの表示

@@ -10,6 +10,6 @@ namespace MQTTnet.Core.Server

Task HandleMessageAsync(string clientId, MqttApplicationMessage applicationMessage);

List<MqttApplicationMessage> GetMessages(MqttSubscribePacket subscribePacket);
Task<List<MqttApplicationMessage>> GetSubscribedMessagesAsync(MqttSubscribePacket subscribePacket);
}
}

+ 51
- 20
MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs ファイルの表示

@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Packets;
using Microsoft.Extensions.Logging;
@@ -11,6 +12,7 @@ namespace MQTTnet.Core.Server
public sealed class MqttClientRetainedMessagesManager : IMqttClientRetainedMessageManager
{
private readonly Dictionary<string, MqttApplicationMessage> _retainedMessages = new Dictionary<string, MqttApplicationMessage>();
private readonly SemaphoreSlim _gate = new SemaphoreSlim(1, 1);
private readonly ILogger<MqttClientRetainedMessagesManager> _logger;
private readonly MqttServerOptions _options;

@@ -27,62 +29,87 @@ namespace MQTTnet.Core.Server
return;
}

await _gate.WaitAsync();
try
{
var retainedMessages = await _options.Storage.LoadRetainedMessagesAsync();
lock (_retainedMessages)

_retainedMessages.Clear();
foreach (var retainedMessage in retainedMessages)
{
_retainedMessages.Clear();
foreach (var retainedMessage in retainedMessages)
{
_retainedMessages[retainedMessage.Topic] = retainedMessage;
}
_retainedMessages[retainedMessage.Topic] = retainedMessage;
}
}
catch (Exception exception)
{
_logger.LogError(new EventId(), exception, "Unhandled exception while loading retained messages.");
}
finally
{
_gate.Release();
}
}

public async Task HandleMessageAsync(string clientId, MqttApplicationMessage applicationMessage)
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

List<MqttApplicationMessage> allRetainedMessages;
lock (_retainedMessages)
await _gate.WaitAsync();
try
{
var saveIsRequired = false;

if (applicationMessage.Payload?.Any() == false)
{
_retainedMessages.Remove(applicationMessage.Topic);
saveIsRequired = _retainedMessages.Remove(applicationMessage.Topic);
_logger.LogInformation("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic);
}
else
{
_retainedMessages[applicationMessage.Topic] = applicationMessage;
_logger.LogInformation("Client '{0}' updated retained message for topic '{1}'.", clientId, applicationMessage.Topic);
if (!_retainedMessages.ContainsKey(applicationMessage.Topic))
{
_retainedMessages[applicationMessage.Topic] = applicationMessage;
saveIsRequired = true;
}
else
{
var existingMessage = _retainedMessages[applicationMessage.Topic];
if (existingMessage.QualityOfServiceLevel != applicationMessage.QualityOfServiceLevel || !existingMessage.Payload.SequenceEqual(applicationMessage.Payload ?? new byte[0]))
{
_retainedMessages[applicationMessage.Topic] = applicationMessage;
saveIsRequired = true;
}
}
_logger.LogInformation("Client '{0}' set retained message for topic '{1}'.", clientId, applicationMessage.Topic);
}

allRetainedMessages = new List<MqttApplicationMessage>(_retainedMessages.Values);
}
if (!saveIsRequired)
{
_logger.LogTrace("Skipped saving retained messages because no changes were detected.");
}

try
{
if (_options.Storage != null)
if (saveIsRequired && _options.Storage != null)
{
await _options.Storage.SaveRetainedMessagesAsync(allRetainedMessages);
await _options.Storage.SaveRetainedMessagesAsync(_retainedMessages.Values.ToList());
}
}
catch (Exception exception)
{
_logger.LogError(new EventId(), exception, "Unhandled exception while saving retained messages.");
_logger.LogError(new EventId(), exception, "Unhandled exception while handling retained messages.");
}
finally
{
_gate.Release();
}
}

public List<MqttApplicationMessage> GetMessages(MqttSubscribePacket subscribePacket)
public async Task<List<MqttApplicationMessage>> GetSubscribedMessagesAsync(MqttSubscribePacket subscribePacket)
{
var retainedMessages = new List<MqttApplicationMessage>();
lock (_retainedMessages)

await _gate.WaitAsync();
try
{
foreach (var retainedMessage in _retainedMessages.Values)
{
@@ -103,6 +130,10 @@ namespace MQTTnet.Core.Server
}
}
}
finally
{
_gate.Release();
}

return retainedMessages;
}


+ 4
- 3
MQTTnet.Core/Server/MqttClientSession.cs ファイルの表示

@@ -191,8 +191,9 @@ namespace MQTTnet.Core.Server
private async Task HandleIncomingSubscribePacketAsync(IMqttCommunicationAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken)
{
var subscribeResult = _subscriptionsManager.Subscribe(subscribePacket, ClientId);

await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket);
EnqueueRetainedMessages(subscribePacket);
await EnqueueSubscribedRetainedMessagesAsync(subscribePacket);

if (subscribeResult.CloseConnection)
{
@@ -201,9 +202,9 @@ namespace MQTTnet.Core.Server
}
}

private void EnqueueRetainedMessages(MqttSubscribePacket subscribePacket)
private async Task EnqueueSubscribedRetainedMessagesAsync(MqttSubscribePacket subscribePacket)
{
var retainedMessages = _clientRetainedMessageManager.GetMessages(subscribePacket);
var retainedMessages = await _clientRetainedMessageManager.GetSubscribedMessagesAsync(subscribePacket);
foreach (var publishPacket in retainedMessages)
{
EnqueuePublishPacket(publishPacket.ToPublishPacket());


+ 1
- 1
Tests/MQTTnet.Core.Tests/MqttServerTests.cs ファイルの表示

@@ -232,7 +232,7 @@ namespace MQTTnet.Core.Tests
};

//make shure the retainedMessageManagerreceived the package
while (!retainMessagemanager.GetMessages(subscribe).Any())
while (!(await retainMessagemanager.GetSubscribedMessagesAsync(subscribe)).Any())
{
await Task.Delay(TimeSpan.FromMilliseconds(10));
}


+ 50
- 0
Tests/MQTTnet.TestApp.UniversalWindows/JsonServerStorage.cs ファイルの表示

@@ -0,0 +1,50 @@
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using MQTTnet.Core;
using MQTTnet.Core.Server;
using Newtonsoft.Json;

namespace MQTTnet.TestApp.UniversalWindows
{
public class JsonServerStorage : IMqttServerStorage
{
private readonly string _filename = Path.Combine(Windows.Storage.ApplicationData.Current.LocalFolder.Path, "Retained.json");

public async Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
{
await Task.CompletedTask;

var json = JsonConvert.SerializeObject(messages);
File.WriteAllText(_filename, json);
}

public async Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
{
await Task.CompletedTask;

if (!File.Exists(_filename))
{
return new List<MqttApplicationMessage>();
}

try
{
var json = File.ReadAllText(_filename);
return JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
}
catch
{
return new List<MqttApplicationMessage>();
}
}

public void Clear()
{
if (File.Exists(_filename))
{
File.Delete(_filename);
}
}
}
}

+ 1
- 0
Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj ファイルの表示

@@ -94,6 +94,7 @@
<Compile Include="App.xaml.cs">
<DependentUpon>App.xaml</DependentUpon>
</Compile>
<Compile Include="JsonServerStorage.cs" />
<Compile Include="MainPage.xaml.cs">
<DependentUpon>MainPage.xaml</DependentUpon>
</Compile>


+ 15
- 2
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml ファイルの表示

@@ -27,7 +27,7 @@
<TextBox x:Name="ClientId"></TextBox>
<TextBlock>Clean session:</TextBlock>
<CheckBox x:Name="CleanSession" IsChecked="True"></CheckBox>
<StackPanel Orientation="Horizontal">
<RadioButton x:Name="UseTcp" IsChecked="True" GroupName="connection">TCP</RadioButton>
<RadioButton x:Name="UseWs" GroupName="connection">WS</RadioButton>
@@ -77,9 +77,19 @@
<RadioButton Margin="0,0,10,0" x:Name="SubscribeQoS2" GroupName="sqos">2 (Exactly once)</RadioButton>
</StackPanel>

<TextBlock>Received messages:</TextBlock>
<ListBox MinHeight="50" MaxHeight="250" x:Name="ReceivedMessages" Margin="0,0,0,10">
<ListBox.ItemTemplate>
<DataTemplate>
<ContentPresenter Content="{Binding}" FontFamily="Consolas" FontSize="12"></ContentPresenter>
</DataTemplate>
</ListBox.ItemTemplate>
</ListBox>
<StackPanel Orientation="Horizontal">
<Button Click="Subscribe" Width="120" Margin="0,0,10,0">Subscribe</Button>
<Button Click="Unsubscribe" Width="120">Unsubscribe</Button>
<Button Click="Unsubscribe" Width="120" Margin="0,0,10,0">Unsubscribe</Button>
<Button Click="ClearReceivedMessages" Width="200">Clear received messages</Button>
</StackPanel>
</StackPanel>
</PivotItem>
@@ -88,6 +98,9 @@
<TextBlock>Port:</TextBlock>
<TextBox x:Name="ServerPort" Text="1883"></TextBox>

<CheckBox x:Name="ServerPersistRetainedMessages" IsChecked="True">Persist retained messages in JSON format</CheckBox>
<CheckBox x:Name="ServerClearRetainedMessages">Clear previously retained messages on startup</CheckBox>
<StackPanel Orientation="Horizontal">
<Button Width="120" Margin="0,0,10,0" Click="StartServer">Start</Button>
<Button Width="120" Margin="0,0,10,0" Click="StopServer">Stop</Button>


+ 31
- 0
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs ファイルの表示

@@ -88,10 +88,13 @@ namespace MQTTnet.TestApp.UniversalWindows
if (_mqttClient != null)
{
await _mqttClient.DisconnectAsync();
_mqttClient.ApplicationMessageReceived -= OnApplicationMessageReceived;
}

var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;

await _mqttClient.ConnectAsync(options);
}
catch (Exception exception)
@@ -100,6 +103,17 @@ namespace MQTTnet.TestApp.UniversalWindows
}
}

private async void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
{
var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";

await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
{
ReceivedMessages.Items.Add(item);
});
}

private async void Publish(object sender, RoutedEventArgs e)
{
if (_mqttClient == null)
@@ -332,9 +346,21 @@ namespace MQTTnet.TestApp.UniversalWindows
return;
}

JsonServerStorage storage = null;
if (ServerPersistRetainedMessages.IsChecked == true)
{
storage = new JsonServerStorage();

if (ServerClearRetainedMessages.IsChecked == true)
{
storage.Clear();
}
}

_mqttServer = new MqttFactory().CreateMqttServer(o =>
{
o.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text);
o.Storage = storage;
});

await _mqttServer.StartAsync();
@@ -350,5 +376,10 @@ namespace MQTTnet.TestApp.UniversalWindows
await _mqttServer.StopAsync();
_mqttServer = null;
}

private void ClearReceivedMessages(object sender, RoutedEventArgs e)
{
ReceivedMessages.Items.Clear();
}
}
}

読み込み中…
キャンセル
保存