diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index 4631f24..2583897 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -33,11 +33,26 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Build/build.ps1 b/Build/build.ps1
index 6284473..e74f9f0 100644
--- a/Build/build.ps1
+++ b/Build/build.ps1
@@ -11,7 +11,7 @@ if ($path) {
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net451" /p:FileVersion=$version /p:AssemblyVersion=$version /verbosity:m
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$version /p:AssemblyVersion=$version /verbosity:m
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$version /p:AssemblyVersion=$version /verbosity:m
- &dotnet build ..\Frameworks\MQTTnet.AspNetCore\MQTTnet.AspNetCore.csproj -c="Release" /p:FileVersion=$version /p:AssemblyVersion=$version
+ &$msbuild ..\Frameworks\MQTTnet.AspNetCore\MQTTnet.AspNetCore.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$version /p:AssemblyVersion=$version /verbosity:m
Remove-Item .\NuGet -Force -Recurse
New-Item -ItemType Directory -Force -Path .\NuGet
diff --git a/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs b/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs
index db72573..8d00518 100644
--- a/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs
+++ b/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs
@@ -11,8 +11,14 @@ namespace MQTTnet.AspNetCore
{
public class MqttHostedServer : MqttServer, IHostedService
{
- public MqttHostedServer(IOptions options, IEnumerable adapters, ILogger logger, MqttClientSessionsManager clientSessionsManager)
- : base(options, adapters, logger, clientSessionsManager)
+ public MqttHostedServer(
+ IOptions options,
+ IEnumerable adapters,
+ ILogger logger,
+ MqttClientSessionsManager clientSessionsManager,
+ IMqttClientRetainedMessageManager clientRetainedMessageManager
+ )
+ : base(options, adapters, logger, clientSessionsManager, clientRetainedMessageManager)
{
}
diff --git a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs
index 432a371..394374d 100644
--- a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs
+++ b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs
@@ -99,7 +99,9 @@ namespace MQTTnet
clientSessionsManager,
_serviceProvider.GetRequiredService(),
_serviceProvider.GetRequiredService>(),
- _serviceProvider.GetRequiredService>());
+ _serviceProvider.GetRequiredService>(),
+ _serviceProvider.GetRequiredService()
+ );
}
public IMqttClient CreateMqttClient()
diff --git a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs
index aa53e1b..4e6639a 100644
--- a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs
+++ b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs
@@ -33,9 +33,9 @@ namespace MQTTnet
services.AddTransient();
services.AddTransient();
- services.AddTransient();
+ services.AddSingleton();
services.AddTransient();
- services.AddTransient();
+ services.AddSingleton();
return services;
}
diff --git a/MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs b/MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs
new file mode 100644
index 0000000..cd3426e
--- /dev/null
+++ b/MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs
@@ -0,0 +1,15 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using MQTTnet.Core.Packets;
+
+namespace MQTTnet.Core.Server
+{
+ public interface IMqttClientRetainedMessageManager
+ {
+ Task LoadMessagesAsync();
+
+ Task HandleMessageAsync(string clientId, MqttApplicationMessage applicationMessage);
+
+ List GetMessages(MqttSubscribePacket subscribePacket);
+ }
+}
diff --git a/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs b/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs
index 68a7b99..2213360 100644
--- a/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs
+++ b/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs
@@ -8,7 +8,7 @@ using Microsoft.Extensions.Options;
namespace MQTTnet.Core.Server
{
- public sealed class MqttClientRetainedMessagesManager
+ public sealed class MqttClientRetainedMessagesManager : IMqttClientRetainedMessageManager
{
private readonly Dictionary _retainedMessages = new Dictionary();
private readonly ILogger _logger;
diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs
index 52f6393..00f906f 100644
--- a/MQTTnet.Core/Server/MqttClientSession.cs
+++ b/MQTTnet.Core/Server/MqttClientSession.cs
@@ -15,6 +15,7 @@ namespace MQTTnet.Core.Server
{
public sealed class MqttClientSession : IDisposable
{
+ private readonly IMqttClientRetainedMessageManager _clientRetainedMessageManager;
private readonly HashSet _unacknowledgedPublishPackets = new HashSet();
private readonly MqttClientSubscriptionsManager _subscriptionsManager;
@@ -33,10 +34,12 @@ namespace MQTTnet.Core.Server
MqttClientSessionsManager sessionsManager,
MqttClientSubscriptionsManager subscriptionsManager,
ILogger logger,
- ILogger messageQueueLogger)
+ ILogger messageQueueLogger,
+ IMqttClientRetainedMessageManager clientRetainedMessageManager)
{
+ _clientRetainedMessageManager = clientRetainedMessageManager ?? throw new ArgumentNullException(nameof(clientRetainedMessageManager));
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
- _subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
+ _subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(subscriptionsManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
ClientId = clientId;
@@ -206,7 +209,7 @@ namespace MQTTnet.Core.Server
private void EnqueueRetainedMessages(MqttSubscribePacket subscribePacket)
{
- var retainedMessages = _sessionsManager.RetainedMessagesManager.GetMessages(subscribePacket);
+ var retainedMessages = _clientRetainedMessageManager.GetMessages(subscribePacket);
foreach (var publishPacket in retainedMessages)
{
EnqueuePublishPacket(publishPacket.ToPublishPacket());
@@ -227,7 +230,7 @@ namespace MQTTnet.Core.Server
if (applicationMessage.Retain)
{
- await _sessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, applicationMessage);
+ await _clientRetainedMessageManager.HandleMessageAsync(ClientId, applicationMessage);
}
switch (applicationMessage.QualityOfServiceLevel)
diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs
index 62cc1d8..b2cbdc0 100644
--- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs
+++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs
@@ -24,12 +24,10 @@ namespace MQTTnet.Core.Server
public MqttClientSessionsManager(
IOptions options,
ILogger logger,
- MqttClientRetainedMessagesManager retainedMessagesManager,
IMqttClientSesssionFactory clientSesssionFactory)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
- RetainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(options));
_clientSesssionFactory = clientSesssionFactory ?? throw new ArgumentNullException(nameof(clientSesssionFactory));
}
@@ -37,8 +35,6 @@ namespace MQTTnet.Core.Server
public event EventHandler ClientDisconnected;
public event EventHandler ApplicationMessageReceived;
- public MqttClientRetainedMessagesManager RetainedMessagesManager { get; }
-
public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter, CancellationToken cancellationToken)
{
var clientId = string.Empty;
diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs
index f1ed791..f5a9527 100644
--- a/MQTTnet.Core/Server/MqttServer.cs
+++ b/MQTTnet.Core/Server/MqttServer.cs
@@ -6,11 +6,13 @@ using MQTTnet.Core.Adapter;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Linq;
+using Microsoft.Extensions.DependencyInjection;
namespace MQTTnet.Core.Server
{
public class MqttServer : IMqttServer
{
+ private readonly IMqttClientRetainedMessageManager _clientRetainedMessageManager;
private readonly ILogger _logger;
private readonly MqttClientSessionsManager _clientSessionsManager;
private readonly ICollection _adapters;
@@ -18,11 +20,18 @@ namespace MQTTnet.Core.Server
private CancellationTokenSource _cancellationTokenSource;
- public MqttServer(IOptions options, IEnumerable adapters, ILogger logger, MqttClientSessionsManager clientSessionsManager)
+ public MqttServer(
+ IOptions options,
+ IEnumerable adapters,
+ ILogger logger,
+ MqttClientSessionsManager clientSessionsManager,
+ IMqttClientRetainedMessageManager clientRetainedMessageManager
+ )
{
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_clientSessionsManager = clientSessionsManager ?? throw new ArgumentNullException(nameof(clientSessionsManager));
+ _clientRetainedMessageManager = clientRetainedMessageManager ?? throw new ArgumentNullException(nameof(clientRetainedMessageManager));
if (adapters == null)
{
@@ -79,7 +88,7 @@ namespace MQTTnet.Core.Server
_cancellationTokenSource = new CancellationTokenSource();
- await _clientSessionsManager.RetainedMessagesManager.LoadMessagesAsync();
+ await _clientRetainedMessageManager.LoadMessagesAsync();
foreach (var adapter in _adapters)
{
diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
index 106eca6..86ff600 100644
--- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
@@ -8,6 +9,7 @@ using MQTTnet.Core.Client;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;
using Microsoft.Extensions.DependencyInjection;
+using MQTTnet.Core.Packets;
namespace MQTTnet.Core.Tests
{
@@ -210,6 +212,7 @@ namespace MQTTnet.Core.Tests
.BuildServiceProvider();
var s = new MqttFactory(services).CreateMqttServer();
+ var retainMessagemanager = services.GetRequiredService();
var receivedMessagesCount = 0;
try
@@ -220,6 +223,20 @@ namespace MQTTnet.Core.Tests
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
await c1.DisconnectAsync();
+ var subscribe = new MqttSubscribePacket()
+ {
+ TopicFilters = new List()
+ {
+ new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)
+ }
+ };
+
+ //make shure the retainedMessageManagerreceived the package
+ while (!retainMessagemanager.GetMessages(subscribe).Any())
+ {
+ await Task.Delay(TimeSpan.FromMilliseconds(10));
+ }
+
var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));