@@ -11,8 +11,14 @@ namespace MQTTnet.AspNetCore | |||
{ | |||
public class MqttHostedServer : MqttServer, IHostedService | |||
{ | |||
public MqttHostedServer(IOptions<MqttServerOptions> options, IEnumerable<IMqttServerAdapter> adapters, ILogger<MqttServer> logger, MqttClientSessionsManager clientSessionsManager) | |||
: base(options, adapters, logger, clientSessionsManager) | |||
public MqttHostedServer( | |||
IOptions<MqttServerOptions> options, | |||
IEnumerable<IMqttServerAdapter> adapters, | |||
ILogger<MqttServer> logger, | |||
MqttClientSessionsManager clientSessionsManager, | |||
IMqttClientRetainedMessageManager clientRetainedMessageManager | |||
) | |||
: base(options, adapters, logger, clientSessionsManager, clientRetainedMessageManager) | |||
{ | |||
} | |||
@@ -99,7 +99,9 @@ namespace MQTTnet | |||
clientSessionsManager, | |||
_serviceProvider.GetRequiredService<MqttClientSubscriptionsManager>(), | |||
_serviceProvider.GetRequiredService<ILogger<MqttClientSession>>(), | |||
_serviceProvider.GetRequiredService<ILogger<MqttClientPendingMessagesQueue>>()); | |||
_serviceProvider.GetRequiredService<ILogger<MqttClientPendingMessagesQueue>>(), | |||
_serviceProvider.GetRequiredService<IMqttClientRetainedMessageManager>() | |||
); | |||
} | |||
public IMqttClient CreateMqttClient() | |||
@@ -33,9 +33,9 @@ namespace MQTTnet | |||
services.AddTransient<IMqttServerAdapter, MqttServerAdapter>(); | |||
services.AddTransient<IMqttPacketSerializer, MqttPacketSerializer>(); | |||
services.AddTransient<MqttClientSessionsManager>(); | |||
services.AddSingleton<MqttClientSessionsManager>(); | |||
services.AddTransient<MqttClientSubscriptionsManager>(); | |||
services.AddTransient<MqttClientRetainedMessagesManager>(); | |||
services.AddSingleton<IMqttClientRetainedMessageManager, MqttClientRetainedMessagesManager>(); | |||
return services; | |||
} | |||
@@ -0,0 +1,15 @@ | |||
using System.Collections.Generic; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Core.Packets; | |||
namespace MQTTnet.Core.Server | |||
{ | |||
public interface IMqttClientRetainedMessageManager | |||
{ | |||
Task LoadMessagesAsync(); | |||
Task HandleMessageAsync(string clientId, MqttApplicationMessage applicationMessage); | |||
List<MqttApplicationMessage> GetMessages(MqttSubscribePacket subscribePacket); | |||
} | |||
} |
@@ -8,7 +8,7 @@ using Microsoft.Extensions.Options; | |||
namespace MQTTnet.Core.Server | |||
{ | |||
public sealed class MqttClientRetainedMessagesManager | |||
public sealed class MqttClientRetainedMessagesManager : IMqttClientRetainedMessageManager | |||
{ | |||
private readonly Dictionary<string, MqttApplicationMessage> _retainedMessages = new Dictionary<string, MqttApplicationMessage>(); | |||
private readonly ILogger<MqttClientRetainedMessagesManager> _logger; | |||
@@ -15,6 +15,7 @@ namespace MQTTnet.Core.Server | |||
{ | |||
public sealed class MqttClientSession : IDisposable | |||
{ | |||
private readonly IMqttClientRetainedMessageManager _clientRetainedMessageManager; | |||
private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>(); | |||
private readonly MqttClientSubscriptionsManager _subscriptionsManager; | |||
@@ -33,10 +34,12 @@ namespace MQTTnet.Core.Server | |||
MqttClientSessionsManager sessionsManager, | |||
MqttClientSubscriptionsManager subscriptionsManager, | |||
ILogger<MqttClientSession> logger, | |||
ILogger<MqttClientPendingMessagesQueue> messageQueueLogger) | |||
ILogger<MqttClientPendingMessagesQueue> messageQueueLogger, | |||
IMqttClientRetainedMessageManager clientRetainedMessageManager) | |||
{ | |||
_clientRetainedMessageManager = clientRetainedMessageManager ?? throw new ArgumentNullException(nameof(clientRetainedMessageManager)); | |||
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); | |||
_subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); | |||
_subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(subscriptionsManager)); | |||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||
ClientId = clientId; | |||
@@ -206,7 +209,7 @@ namespace MQTTnet.Core.Server | |||
private void EnqueueRetainedMessages(MqttSubscribePacket subscribePacket) | |||
{ | |||
var retainedMessages = _sessionsManager.RetainedMessagesManager.GetMessages(subscribePacket); | |||
var retainedMessages = _clientRetainedMessageManager.GetMessages(subscribePacket); | |||
foreach (var publishPacket in retainedMessages) | |||
{ | |||
EnqueuePublishPacket(publishPacket.ToPublishPacket()); | |||
@@ -227,7 +230,7 @@ namespace MQTTnet.Core.Server | |||
if (applicationMessage.Retain) | |||
{ | |||
await _sessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, applicationMessage); | |||
await _clientRetainedMessageManager.HandleMessageAsync(ClientId, applicationMessage); | |||
} | |||
switch (applicationMessage.QualityOfServiceLevel) | |||
@@ -24,12 +24,10 @@ namespace MQTTnet.Core.Server | |||
public MqttClientSessionsManager( | |||
IOptions<MqttServerOptions> options, | |||
ILogger<MqttClientSessionsManager> logger, | |||
MqttClientRetainedMessagesManager retainedMessagesManager, | |||
IMqttClientSesssionFactory clientSesssionFactory) | |||
{ | |||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||
_options = options.Value ?? throw new ArgumentNullException(nameof(options)); | |||
RetainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(options)); | |||
_clientSesssionFactory = clientSesssionFactory ?? throw new ArgumentNullException(nameof(clientSesssionFactory)); | |||
} | |||
@@ -37,8 +35,6 @@ namespace MQTTnet.Core.Server | |||
public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected; | |||
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; | |||
public MqttClientRetainedMessagesManager RetainedMessagesManager { get; } | |||
public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter, CancellationToken cancellationToken) | |||
{ | |||
var clientId = string.Empty; | |||
@@ -6,11 +6,13 @@ using MQTTnet.Core.Adapter; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System.Linq; | |||
using Microsoft.Extensions.DependencyInjection; | |||
namespace MQTTnet.Core.Server | |||
{ | |||
public class MqttServer : IMqttServer | |||
{ | |||
private readonly IMqttClientRetainedMessageManager _clientRetainedMessageManager; | |||
private readonly ILogger<MqttServer> _logger; | |||
private readonly MqttClientSessionsManager _clientSessionsManager; | |||
private readonly ICollection<IMqttServerAdapter> _adapters; | |||
@@ -18,11 +20,18 @@ namespace MQTTnet.Core.Server | |||
private CancellationTokenSource _cancellationTokenSource; | |||
public MqttServer(IOptions<MqttServerOptions> options, IEnumerable<IMqttServerAdapter> adapters, ILogger<MqttServer> logger, MqttClientSessionsManager clientSessionsManager) | |||
public MqttServer( | |||
IOptions<MqttServerOptions> options, | |||
IEnumerable<IMqttServerAdapter> adapters, | |||
ILogger<MqttServer> logger, | |||
MqttClientSessionsManager clientSessionsManager, | |||
IMqttClientRetainedMessageManager clientRetainedMessageManager | |||
) | |||
{ | |||
_options = options.Value ?? throw new ArgumentNullException(nameof(options)); | |||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||
_clientSessionsManager = clientSessionsManager ?? throw new ArgumentNullException(nameof(clientSessionsManager)); | |||
_clientRetainedMessageManager = clientRetainedMessageManager ?? throw new ArgumentNullException(nameof(clientRetainedMessageManager)); | |||
if (adapters == null) | |||
{ | |||
@@ -79,7 +88,7 @@ namespace MQTTnet.Core.Server | |||
_cancellationTokenSource = new CancellationTokenSource(); | |||
await _clientSessionsManager.RetainedMessagesManager.LoadMessagesAsync(); | |||
await _clientRetainedMessageManager.LoadMessagesAsync(); | |||
foreach (var adapter in _adapters) | |||
{ | |||
@@ -1,5 +1,6 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||
@@ -8,6 +9,7 @@ using MQTTnet.Core.Client; | |||
using MQTTnet.Core.Protocol; | |||
using MQTTnet.Core.Server; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using MQTTnet.Core.Packets; | |||
namespace MQTTnet.Core.Tests | |||
{ | |||
@@ -210,6 +212,7 @@ namespace MQTTnet.Core.Tests | |||
.BuildServiceProvider(); | |||
var s = new MqttFactory(services).CreateMqttServer(); | |||
var retainMessagemanager = services.GetRequiredService<IMqttClientRetainedMessageManager>(); | |||
var receivedMessagesCount = 0; | |||
try | |||
@@ -220,6 +223,20 @@ namespace MQTTnet.Core.Tests | |||
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); | |||
await c1.DisconnectAsync(); | |||
var subscribe = new MqttSubscribePacket() | |||
{ | |||
TopicFilters = new List<TopicFilter>() | |||
{ | |||
new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce) | |||
} | |||
}; | |||
//make shure the retainedMessageManagerreceived the package | |||
while (retainMessagemanager.GetMessages(subscribe).Any()) | |||
{ | |||
await Task.Delay(TimeSpan.FromMilliseconds(10)); | |||
} | |||
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); | |||
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; | |||
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); | |||