From d0f1c18a2fe9da5af081b3ee59584f3886e44e2e Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 26 Oct 2017 21:22:58 +0200 Subject: [PATCH] Server refactoring --- Frameworks/MQTTnet.NetStandard/MqttFactory.cs | 11 +++++-- .../ServiceCollectionExtensions.cs | 1 + .../MQTTnet.UniversalWindows/MqttFactory.cs | 11 +++++-- .../ApplicationMessagePublisherExtensions.cs | 16 +++++++++ MQTTnet.Core/Client/MqttClientExtensions.cs | 8 ----- .../IApplicationMessagePublisher.cs | 2 +- .../IApplicationMessageReceiver.cs | 3 +- .../ManagedClient/ManagedMqttClient.cs | 6 ++-- .../ManagedMqttClientExtensions.cs | 12 ++----- ...MqttApplicationMessageReceivedEventArgs.cs | 2 +- MQTTnet.Core/Server/IMqttServer.cs | 1 + MQTTnet.Core/Server/MqttClientSession.cs | 33 ++++++++++++------- .../Server/MqttClientSessionsManager.cs | 33 ++++++++++--------- .../Server/MqttClientSubscriptionsManager.cs | 7 ++++ MQTTnet.Core/Server/MqttServer.cs | 9 +++-- MQTTnet.Core/Server/MqttSubscribeResult.cs | 11 +++++++ .../TestClientSessionFactory.cs | 8 +++-- .../ManagedClientTest.cs | 6 ++-- .../MainPage.xaml | 7 +++- .../MainPage.xaml.cs | 24 ++++++++++++++ 20 files changed, 146 insertions(+), 65 deletions(-) create mode 100644 MQTTnet.Core/ApplicationMessagePublisherExtensions.cs rename MQTTnet.Core/{Client => }/IApplicationMessagePublisher.cs (87%) rename MQTTnet.Core/{Client => }/IApplicationMessageReceiver.cs (78%) rename MQTTnet.Core/{Client => }/MqttApplicationMessageReceivedEventArgs.cs (94%) create mode 100644 MQTTnet.Core/Server/MqttSubscribeResult.cs diff --git a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs index 7d88480..ad0e98d 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs @@ -4,6 +4,7 @@ using MQTTnet.Core.Client; using MQTTnet.Core.Serializer; using Microsoft.Extensions.Logging; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; using MQTTnet.Implementations; using MQTTnet.Core.ManagedClient; using MQTTnet.Core.Server; @@ -90,9 +91,15 @@ namespace MQTTnet }; } - public MqttClientSession CreateClientSession(string sessionId, MqttClientSessionsManager mqttClientSessionsManager) + public MqttClientSession CreateClientSession(string clientId, MqttClientSessionsManager clientSessionsManager) { - return new MqttClientSession(sessionId, mqttClientSessionsManager, _serviceProvider.GetRequiredService>(), _serviceProvider.GetRequiredService>()); + return new MqttClientSession( + clientId, + _serviceProvider.GetRequiredService>(), + clientSessionsManager, + _serviceProvider.GetRequiredService(), + _serviceProvider.GetRequiredService>(), + _serviceProvider.GetRequiredService>()); } public IMqttClient CreateMqttClient() diff --git a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs index fdecb32..aa53e1b 100644 --- a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs +++ b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs @@ -34,6 +34,7 @@ namespace MQTTnet services.AddTransient(); services.AddTransient(); + services.AddTransient(); services.AddTransient(); return services; } diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttFactory.cs index 24826cb..9a0721f 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttFactory.cs @@ -1,6 +1,7 @@ using System; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using MQTTnet.Core.Adapter; using MQTTnet.Core.Channel; using MQTTnet.Core.Client; @@ -85,9 +86,15 @@ namespace MQTTnet }; } - public MqttClientSession CreateClientSession(string sessionId, MqttClientSessionsManager mqttClientSessionsManager) + public MqttClientSession CreateClientSession(string clientId, MqttClientSessionsManager clientSessionsManager) { - return new MqttClientSession(sessionId, mqttClientSessionsManager, _serviceProvider.GetRequiredService>(), _serviceProvider.GetRequiredService>()); + return new MqttClientSession( + clientId, + _serviceProvider.GetRequiredService>(), + clientSessionsManager, + _serviceProvider.GetRequiredService(), + _serviceProvider.GetRequiredService>(), + _serviceProvider.GetRequiredService>()); } public IMqttClient CreateMqttClient() diff --git a/MQTTnet.Core/ApplicationMessagePublisherExtensions.cs b/MQTTnet.Core/ApplicationMessagePublisherExtensions.cs new file mode 100644 index 0000000..6893a1b --- /dev/null +++ b/MQTTnet.Core/ApplicationMessagePublisherExtensions.cs @@ -0,0 +1,16 @@ +using System; +using System.Threading.Tasks; + +namespace MQTTnet.Core +{ + public static class ApplicationMessagePublisherExtensions + { + public static Task PublishAsync(this IApplicationMessagePublisher client, params MqttApplicationMessage[] applicationMessages) + { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); + + return client.PublishAsync(applicationMessages); + } + } +} diff --git a/MQTTnet.Core/Client/MqttClientExtensions.cs b/MQTTnet.Core/Client/MqttClientExtensions.cs index ff61548..567184d 100644 --- a/MQTTnet.Core/Client/MqttClientExtensions.cs +++ b/MQTTnet.Core/Client/MqttClientExtensions.cs @@ -7,14 +7,6 @@ namespace MQTTnet.Core.Client { public static class MqttClientExtensions { - public static Task PublishAsync(this IApplicationMessagePublisher client, params MqttApplicationMessage[] applicationMessages) - { - if (client == null) throw new ArgumentNullException(nameof(client)); - if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); - - return client.PublishAsync(applicationMessages); - } - public static Task> SubscribeAsync(this IMqttClient client, params TopicFilter[] topicFilters) { if (client == null) throw new ArgumentNullException(nameof(client)); diff --git a/MQTTnet.Core/Client/IApplicationMessagePublisher.cs b/MQTTnet.Core/IApplicationMessagePublisher.cs similarity index 87% rename from MQTTnet.Core/Client/IApplicationMessagePublisher.cs rename to MQTTnet.Core/IApplicationMessagePublisher.cs index c77fd3f..2f6715b 100644 --- a/MQTTnet.Core/Client/IApplicationMessagePublisher.cs +++ b/MQTTnet.Core/IApplicationMessagePublisher.cs @@ -1,7 +1,7 @@ using System.Collections.Generic; using System.Threading.Tasks; -namespace MQTTnet.Core.Client +namespace MQTTnet.Core { public interface IApplicationMessagePublisher { diff --git a/MQTTnet.Core/Client/IApplicationMessageReceiver.cs b/MQTTnet.Core/IApplicationMessageReceiver.cs similarity index 78% rename from MQTTnet.Core/Client/IApplicationMessageReceiver.cs rename to MQTTnet.Core/IApplicationMessageReceiver.cs index 2d53cb8..379ba69 100644 --- a/MQTTnet.Core/Client/IApplicationMessageReceiver.cs +++ b/MQTTnet.Core/IApplicationMessageReceiver.cs @@ -1,6 +1,7 @@ using System; +using MQTTnet.Core.Client; -namespace MQTTnet.Core.Client +namespace MQTTnet.Core { public interface IApplicationMessageReceiver { diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs index 61c0067..4eb40b2 100644 --- a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs @@ -91,16 +91,15 @@ namespace MQTTnet.Core.ManagedClient return Task.FromResult(0); } - public Task PublishAsync(IEnumerable applicationMessages) + public async Task PublishAsync(IEnumerable applicationMessages) { if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); foreach (var applicationMessage in applicationMessages) { + await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); _messageQueue.Add(applicationMessage); } - - return Task.FromResult(0); } public Task SubscribeAsync(IEnumerable topicFilters) @@ -208,6 +207,7 @@ namespace MQTTnet.Core.ManagedClient } await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); + await _storageManager.RemoveAsync(message).ConfigureAwait(false); } } catch (OperationCanceledException) diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs index 62aa166..a9984c4 100644 --- a/MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs @@ -1,26 +1,18 @@ using System; using System.Threading.Tasks; -using MQTTnet.Core.Packets; namespace MQTTnet.Core.ManagedClient { public static class ManagedMqttClientExtensions { - public static Task EnqueueAsync(this ManagedMqttClient managedClient, params MqttApplicationMessage[] applicationMessages) - { - if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); - - return managedClient.EnqueueAsync(applicationMessages); - } - - public static Task SubscribeAsync(this ManagedMqttClient managedClient, params TopicFilter[] topicFilters) + public static Task SubscribeAsync(this IManagedMqttClient managedClient, params TopicFilter[] topicFilters) { if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); return managedClient.SubscribeAsync(topicFilters); } - public static Task UnsubscribeAsync(this ManagedMqttClient managedClient, params TopicFilter[] topicFilters) + public static Task UnsubscribeAsync(this IManagedMqttClient managedClient, params TopicFilter[] topicFilters) { if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); diff --git a/MQTTnet.Core/Client/MqttApplicationMessageReceivedEventArgs.cs b/MQTTnet.Core/MqttApplicationMessageReceivedEventArgs.cs similarity index 94% rename from MQTTnet.Core/Client/MqttApplicationMessageReceivedEventArgs.cs rename to MQTTnet.Core/MqttApplicationMessageReceivedEventArgs.cs index 98ebce5..3de0db5 100644 --- a/MQTTnet.Core/Client/MqttApplicationMessageReceivedEventArgs.cs +++ b/MQTTnet.Core/MqttApplicationMessageReceivedEventArgs.cs @@ -1,6 +1,6 @@ using System; -namespace MQTTnet.Core.Client +namespace MQTTnet.Core { public sealed class MqttApplicationMessageReceivedEventArgs : EventArgs { diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs index 58c4b74..7d76c23 100644 --- a/MQTTnet.Core/Server/IMqttServer.cs +++ b/MQTTnet.Core/Server/IMqttServer.cs @@ -11,6 +11,7 @@ namespace MQTTnet.Core.Server event EventHandler ClientDisconnected; IList GetConnectedClients(); + void Publish(IEnumerable applicationMessages); Task StartAsync(); Task StopAsync(); diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 2bd3617..74d93f4 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -9,6 +9,7 @@ using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Serializer; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace MQTTnet.Core.Server { @@ -16,8 +17,8 @@ namespace MQTTnet.Core.Server { private readonly HashSet _unacknowledgedPublishPackets = new HashSet(); - private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager(); - private readonly MqttClientSessionsManager _mqttClientSessionsManager; + private readonly MqttClientSubscriptionsManager _subscriptionsManager; + private readonly MqttClientSessionsManager _sessionsManager; private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; private readonly MqttServerOptions _options; private readonly ILogger _logger; @@ -26,14 +27,22 @@ namespace MQTTnet.Core.Server private CancellationTokenSource _cancellationTokenSource; private MqttApplicationMessage _willMessage; - public MqttClientSession(string clientId, MqttClientSessionsManager mqttClientSessionsManager, ILogger logger, ILogger msgQueueLogger) + public MqttClientSession( + string clientId, + IOptions options, + MqttClientSessionsManager sessionsManager, + MqttClientSubscriptionsManager subscriptionsManager, + ILogger logger, + ILogger messageQueueLogger) { - _mqttClientSessionsManager = mqttClientSessionsManager ?? throw new ArgumentNullException(nameof(mqttClientSessionsManager)); + _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); + _subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + ClientId = clientId; - _options = mqttClientSessionsManager.Options; - _pendingMessagesQueue = new MqttClientPendingMessagesQueue(mqttClientSessionsManager.Options, this, msgQueueLogger); + _options = options.Value; + _pendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, messageQueueLogger); } public string ClientId { get; } @@ -84,7 +93,7 @@ namespace MQTTnet.Core.Server { if (_willMessage != null) { - _mqttClientSessionsManager.DispatchApplicationMessage(this, _willMessage); + _sessionsManager.DispatchApplicationMessage(this, _willMessage); } } } @@ -177,7 +186,7 @@ namespace MQTTnet.Core.Server private void EnqueueRetainedMessages(MqttSubscribePacket subscribePacket) { - var retainedMessages = _mqttClientSessionsManager.RetainedMessagesManager.GetMessages(subscribePacket); + var retainedMessages = _sessionsManager.RetainedMessagesManager.GetMessages(subscribePacket); foreach (var publishPacket in retainedMessages) { EnqueuePublishPacket(publishPacket.ToPublishPacket()); @@ -191,19 +200,19 @@ namespace MQTTnet.Core.Server if (applicationMessage.Retain) { - await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, applicationMessage); + await _sessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, applicationMessage); } switch (applicationMessage.QualityOfServiceLevel) { case MqttQualityOfServiceLevel.AtMostOnce: { - _mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage); + _sessionsManager.DispatchApplicationMessage(this, applicationMessage); return; } case MqttQualityOfServiceLevel.AtLeastOnce: { - _mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage); + _sessionsManager.DispatchApplicationMessage(this, applicationMessage); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); @@ -218,7 +227,7 @@ namespace MQTTnet.Core.Server _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier); } - _mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage); + _sessionsManager.DispatchApplicationMessage(this, applicationMessage); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index 09e0781..0e92867 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -11,7 +11,6 @@ using MQTTnet.Core.Protocol; using MQTTnet.Core.Serializer; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using MQTTnet.Core.Client; namespace MQTTnet.Core.Server { @@ -19,14 +18,19 @@ namespace MQTTnet.Core.Server { private readonly Dictionary _clientSessions = new Dictionary(); private readonly ILogger _logger; - private readonly IMqttClientSesssionFactory _mqttClientSesssionFactory; - - public MqttClientSessionsManager(IOptions options, ILogger logger, MqttClientRetainedMessagesManager retainedMessagesManager, IMqttClientSesssionFactory mqttClientSesssionFactory) + private readonly IMqttClientSesssionFactory _clientSesssionFactory; + private readonly MqttServerOptions _options; + + public MqttClientSessionsManager( + IOptions options, + ILogger logger, + MqttClientRetainedMessagesManager retainedMessagesManager, + IMqttClientSesssionFactory clientSesssionFactory) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - Options = options.Value ?? throw new ArgumentNullException(nameof(options)); + _options = options.Value ?? throw new ArgumentNullException(nameof(options)); RetainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(options)); - _mqttClientSesssionFactory = mqttClientSesssionFactory ?? throw new ArgumentNullException(nameof(mqttClientSesssionFactory)); + _clientSesssionFactory = clientSesssionFactory ?? throw new ArgumentNullException(nameof(clientSesssionFactory)); } public event EventHandler ClientConnected; @@ -34,14 +38,13 @@ namespace MQTTnet.Core.Server public event EventHandler ApplicationMessageReceived; public MqttClientRetainedMessagesManager RetainedMessagesManager { get; } - public MqttServerOptions Options { get; } public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter) { var clientId = string.Empty; try { - if (!(await clientAdapter.ReceivePacketAsync(Options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket)) + if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket)) { throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); } @@ -50,11 +53,11 @@ namespace MQTTnet.Core.Server // Switch to the required protocol version before sending any response. clientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion; - + var connectReturnCode = ValidateConnection(connectPacket); if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { - await clientAdapter.SendPacketsAsync(Options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket + await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode }).ConfigureAwait(false); @@ -64,7 +67,7 @@ namespace MQTTnet.Core.Server var clientSession = GetOrCreateClientSession(connectPacket); - await clientAdapter.SendPacketsAsync(Options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket + await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode, IsSessionPresent = clientSession.IsExistingSession @@ -89,7 +92,7 @@ namespace MQTTnet.Core.Server { try { - await clientAdapter.DisconnectAsync(Options.DefaultCommunicationTimeout).ConfigureAwait(false); + await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); } catch (Exception) { @@ -147,9 +150,9 @@ namespace MQTTnet.Core.Server private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket) { - if (Options.ConnectionValidator != null) + if (_options.ConnectionValidator != null) { - return Options.ConnectionValidator(connectPacket); + return _options.ConnectionValidator(connectPacket); } return MqttConnectReturnCode.ConnectionAccepted; @@ -181,7 +184,7 @@ namespace MQTTnet.Core.Server { isExistingSession = false; - clientSession = _mqttClientSesssionFactory.CreateClientSession(connectPacket.ClientId, this); + clientSession = _clientSesssionFactory.CreateClientSession(connectPacket.ClientId, this); _clientSessions[connectPacket.ClientId] = clientSession; _logger.LogTrace("Created a new session for client '{0}'.", connectPacket.ClientId); diff --git a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs index 390f8f3..14d63d9 100644 --- a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs @@ -9,6 +9,11 @@ namespace MQTTnet.Core.Server { private readonly Dictionary _subscribedTopics = new Dictionary(); + public MqttClientSubscriptionsManager() + { + + } + public MqttSubAckPacket Subscribe(MqttSubscribePacket subscribePacket) { if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket)); @@ -19,6 +24,8 @@ namespace MQTTnet.Core.Server { foreach (var topicFilter in subscribePacket.TopicFilters) { + + _subscribedTopics[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; responsePacket.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1); // TODO: Add support for QoS 2. } diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index 16e2e2e..ff67e14 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -6,7 +6,6 @@ using MQTTnet.Core.Adapter; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Linq; -using MQTTnet.Core.Client; namespace MQTTnet.Core.Server { @@ -46,7 +45,7 @@ namespace MQTTnet.Core.Server public event EventHandler ClientDisconnected; public event EventHandler ApplicationMessageReceived; - public Task PublishAsync(IEnumerable applicationMessages) + public void Publish(IEnumerable applicationMessages) { if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); @@ -60,7 +59,11 @@ namespace MQTTnet.Core.Server _options.ApplicationMessageInterceptor?.Invoke(applicationMessage); _clientSessionsManager.DispatchApplicationMessage(null, applicationMessage); } - + } + + public Task PublishAsync(IEnumerable applicationMessages) + { + Publish(applicationMessages); return Task.FromResult(0); } diff --git a/MQTTnet.Core/Server/MqttSubscribeResult.cs b/MQTTnet.Core/Server/MqttSubscribeResult.cs new file mode 100644 index 0000000..ef80af4 --- /dev/null +++ b/MQTTnet.Core/Server/MqttSubscribeResult.cs @@ -0,0 +1,11 @@ +using MQTTnet.Core.Packets; + +namespace MQTTnet.Core.Server +{ + public class MqttSubscribeResult + { + public MqttSubAckPacket ResponsePacket { get; set; } + + public bool CloseConnection { get; set; } + } +} diff --git a/Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs b/Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs index 519ef50..638df85 100644 --- a/Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs +++ b/Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs @@ -1,12 +1,14 @@ -using MQTTnet.Core.Server; +using System; +using MQTTnet.Core.Server; namespace MQTTnet.Core.Tests { public class TestClientSessionFactory : IMqttClientSesssionFactory { - public MqttClientSession CreateClientSession(string sessionId, MqttClientSessionsManager mqttClientSessionsManager) + public MqttClientSession CreateClientSession(string clientId, MqttClientSessionsManager mqttClientSessionsManager) { - return new MqttClientSession(sessionId, mqttClientSessionsManager, new TestLogger(), new TestLogger()); + throw new NotImplementedException(); + //return new MqttClientSession(clientId, mqttClientSessionsManager, new TestLogger(), new TestLogger()); } } } diff --git a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs index a02e72c..7f6d430 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs @@ -50,14 +50,14 @@ namespace MQTTnet.TestApp.NetCore Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic); }; - await managedClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("1").Build()); - await managedClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS().Build()); + await managedClient.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("1").Build()); + await managedClient.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS().Build()); await managedClient.StartAsync(options); await managedClient.SubscribeAsync(new TopicFilter("xyz", MqttQualityOfServiceLevel.AtMostOnce)); - await managedClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("3").Build()); + await managedClient.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("3").Build()); Console.WriteLine("Managed client started."); Console.ReadLine(); diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml index e2b0a18..fac1a6c 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml @@ -65,7 +65,6 @@ - Topic: @@ -84,6 +83,12 @@ + + + + + + diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 3663ba4..32ff9f9 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -9,6 +9,7 @@ using MQTTnet.Core; using MQTTnet.Core.Client; using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Protocol; +using MQTTnet.Core.Server; using MQTTnet.Implementations; namespace MQTTnet.TestApp.UniversalWindows @@ -16,6 +17,7 @@ namespace MQTTnet.TestApp.UniversalWindows public sealed partial class MainPage { private IMqttClient _mqttClient; + private IMqttServer _mqttServer; public MainPage() { @@ -319,5 +321,27 @@ namespace MQTTnet.TestApp.UniversalWindows }; } + + private async void StartServer(object sender, RoutedEventArgs e) + { + if (_mqttServer != null) + { + return; + } + + _mqttServer = new MqttFactory().CreateMqttServer(); + await _mqttServer.StartAsync(); + } + + private async void StopServer(object sender, RoutedEventArgs e) + { + if (_mqttServer == null) + { + return; + } + + await _mqttServer.StopAsync(); + _mqttServer = null; + } } }