From e6e88e132b73e5805f4a03efe406bc2b718d74e2 Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Mon, 6 Nov 2017 12:47:31 +0100 Subject: [PATCH] possibly fixed timing issue in test --- .../MQTTnet.AspnetCore/MqttHostedServer.cs | 10 ++++++++-- Frameworks/MQTTnet.NetStandard/MqttFactory.cs | 4 +++- .../ServiceCollectionExtensions.cs | 4 ++-- .../Server/IMqttClientRetainedMessageManager.cs | 15 +++++++++++++++ .../Server/MqttClientRetainedMessagesManager.cs | 2 +- MQTTnet.Core/Server/MqttClientSession.cs | 11 +++++++---- .../Server/MqttClientSessionsManager.cs | 4 ---- MQTTnet.Core/Server/MqttServer.cs | 13 +++++++++++-- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 17 +++++++++++++++++ 9 files changed, 64 insertions(+), 16 deletions(-) create mode 100644 MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs diff --git a/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs b/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs index db72573..8d00518 100644 --- a/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs +++ b/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs @@ -11,8 +11,14 @@ namespace MQTTnet.AspNetCore { public class MqttHostedServer : MqttServer, IHostedService { - public MqttHostedServer(IOptions options, IEnumerable adapters, ILogger logger, MqttClientSessionsManager clientSessionsManager) - : base(options, adapters, logger, clientSessionsManager) + public MqttHostedServer( + IOptions options, + IEnumerable adapters, + ILogger logger, + MqttClientSessionsManager clientSessionsManager, + IMqttClientRetainedMessageManager clientRetainedMessageManager + ) + : base(options, adapters, logger, clientSessionsManager, clientRetainedMessageManager) { } diff --git a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs index 432a371..394374d 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs @@ -99,7 +99,9 @@ namespace MQTTnet clientSessionsManager, _serviceProvider.GetRequiredService(), _serviceProvider.GetRequiredService>(), - _serviceProvider.GetRequiredService>()); + _serviceProvider.GetRequiredService>(), + _serviceProvider.GetRequiredService() + ); } public IMqttClient CreateMqttClient() diff --git a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs index aa53e1b..4e6639a 100644 --- a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs +++ b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs @@ -33,9 +33,9 @@ namespace MQTTnet services.AddTransient(); services.AddTransient(); - services.AddTransient(); + services.AddSingleton(); services.AddTransient(); - services.AddTransient(); + services.AddSingleton(); return services; } diff --git a/MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs b/MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs new file mode 100644 index 0000000..cd3426e --- /dev/null +++ b/MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs @@ -0,0 +1,15 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using MQTTnet.Core.Packets; + +namespace MQTTnet.Core.Server +{ + public interface IMqttClientRetainedMessageManager + { + Task LoadMessagesAsync(); + + Task HandleMessageAsync(string clientId, MqttApplicationMessage applicationMessage); + + List GetMessages(MqttSubscribePacket subscribePacket); + } +} diff --git a/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs b/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs index 68a7b99..2213360 100644 --- a/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs +++ b/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs @@ -8,7 +8,7 @@ using Microsoft.Extensions.Options; namespace MQTTnet.Core.Server { - public sealed class MqttClientRetainedMessagesManager + public sealed class MqttClientRetainedMessagesManager : IMqttClientRetainedMessageManager { private readonly Dictionary _retainedMessages = new Dictionary(); private readonly ILogger _logger; diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 52f6393..00f906f 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -15,6 +15,7 @@ namespace MQTTnet.Core.Server { public sealed class MqttClientSession : IDisposable { + private readonly IMqttClientRetainedMessageManager _clientRetainedMessageManager; private readonly HashSet _unacknowledgedPublishPackets = new HashSet(); private readonly MqttClientSubscriptionsManager _subscriptionsManager; @@ -33,10 +34,12 @@ namespace MQTTnet.Core.Server MqttClientSessionsManager sessionsManager, MqttClientSubscriptionsManager subscriptionsManager, ILogger logger, - ILogger messageQueueLogger) + ILogger messageQueueLogger, + IMqttClientRetainedMessageManager clientRetainedMessageManager) { + _clientRetainedMessageManager = clientRetainedMessageManager ?? throw new ArgumentNullException(nameof(clientRetainedMessageManager)); _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); - _subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); + _subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(subscriptionsManager)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); ClientId = clientId; @@ -206,7 +209,7 @@ namespace MQTTnet.Core.Server private void EnqueueRetainedMessages(MqttSubscribePacket subscribePacket) { - var retainedMessages = _sessionsManager.RetainedMessagesManager.GetMessages(subscribePacket); + var retainedMessages = _clientRetainedMessageManager.GetMessages(subscribePacket); foreach (var publishPacket in retainedMessages) { EnqueuePublishPacket(publishPacket.ToPublishPacket()); @@ -227,7 +230,7 @@ namespace MQTTnet.Core.Server if (applicationMessage.Retain) { - await _sessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, applicationMessage); + await _clientRetainedMessageManager.HandleMessageAsync(ClientId, applicationMessage); } switch (applicationMessage.QualityOfServiceLevel) diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index 62cc1d8..b2cbdc0 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -24,12 +24,10 @@ namespace MQTTnet.Core.Server public MqttClientSessionsManager( IOptions options, ILogger logger, - MqttClientRetainedMessagesManager retainedMessagesManager, IMqttClientSesssionFactory clientSesssionFactory) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _options = options.Value ?? throw new ArgumentNullException(nameof(options)); - RetainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(options)); _clientSesssionFactory = clientSesssionFactory ?? throw new ArgumentNullException(nameof(clientSesssionFactory)); } @@ -37,8 +35,6 @@ namespace MQTTnet.Core.Server public event EventHandler ClientDisconnected; public event EventHandler ApplicationMessageReceived; - public MqttClientRetainedMessagesManager RetainedMessagesManager { get; } - public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter, CancellationToken cancellationToken) { var clientId = string.Empty; diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index f1ed791..f5a9527 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -6,11 +6,13 @@ using MQTTnet.Core.Adapter; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Linq; +using Microsoft.Extensions.DependencyInjection; namespace MQTTnet.Core.Server { public class MqttServer : IMqttServer { + private readonly IMqttClientRetainedMessageManager _clientRetainedMessageManager; private readonly ILogger _logger; private readonly MqttClientSessionsManager _clientSessionsManager; private readonly ICollection _adapters; @@ -18,11 +20,18 @@ namespace MQTTnet.Core.Server private CancellationTokenSource _cancellationTokenSource; - public MqttServer(IOptions options, IEnumerable adapters, ILogger logger, MqttClientSessionsManager clientSessionsManager) + public MqttServer( + IOptions options, + IEnumerable adapters, + ILogger logger, + MqttClientSessionsManager clientSessionsManager, + IMqttClientRetainedMessageManager clientRetainedMessageManager + ) { _options = options.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _clientSessionsManager = clientSessionsManager ?? throw new ArgumentNullException(nameof(clientSessionsManager)); + _clientRetainedMessageManager = clientRetainedMessageManager ?? throw new ArgumentNullException(nameof(clientRetainedMessageManager)); if (adapters == null) { @@ -79,7 +88,7 @@ namespace MQTTnet.Core.Server _cancellationTokenSource = new CancellationTokenSource(); - await _clientSessionsManager.RetainedMessagesManager.LoadMessagesAsync(); + await _clientRetainedMessageManager.LoadMessagesAsync(); foreach (var adapter in _adapters) { diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 106eca6..44d489b 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -8,6 +9,7 @@ using MQTTnet.Core.Client; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; using Microsoft.Extensions.DependencyInjection; +using MQTTnet.Core.Packets; namespace MQTTnet.Core.Tests { @@ -210,6 +212,7 @@ namespace MQTTnet.Core.Tests .BuildServiceProvider(); var s = new MqttFactory(services).CreateMqttServer(); + var retainMessagemanager = services.GetRequiredService(); var receivedMessagesCount = 0; try @@ -220,6 +223,20 @@ namespace MQTTnet.Core.Tests await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); await c1.DisconnectAsync(); + var subscribe = new MqttSubscribePacket() + { + TopicFilters = new List() + { + new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce) + } + }; + + //make shure the retainedMessageManagerreceived the package + while (retainMessagemanager.GetMessages(subscribe).Any()) + { + await Task.Delay(TimeSpan.FromMilliseconds(10)); + } + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));