Browse Source

Server refactoring

release/3.x.x
Christian Kratky 7 years ago
parent
commit
d0f1c18a2f
20 changed files with 146 additions and 65 deletions
  1. +9
    -2
      Frameworks/MQTTnet.NetStandard/MqttFactory.cs
  2. +1
    -0
      Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs
  3. +9
    -2
      Frameworks/MQTTnet.UniversalWindows/MqttFactory.cs
  4. +16
    -0
      MQTTnet.Core/ApplicationMessagePublisherExtensions.cs
  5. +0
    -8
      MQTTnet.Core/Client/MqttClientExtensions.cs
  6. +1
    -1
      MQTTnet.Core/IApplicationMessagePublisher.cs
  7. +2
    -1
      MQTTnet.Core/IApplicationMessageReceiver.cs
  8. +3
    -3
      MQTTnet.Core/ManagedClient/ManagedMqttClient.cs
  9. +2
    -10
      MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs
  10. +1
    -1
      MQTTnet.Core/MqttApplicationMessageReceivedEventArgs.cs
  11. +1
    -0
      MQTTnet.Core/Server/IMqttServer.cs
  12. +21
    -12
      MQTTnet.Core/Server/MqttClientSession.cs
  13. +18
    -15
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  14. +7
    -0
      MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs
  15. +6
    -3
      MQTTnet.Core/Server/MqttServer.cs
  16. +11
    -0
      MQTTnet.Core/Server/MqttSubscribeResult.cs
  17. +5
    -3
      Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs
  18. +3
    -3
      Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs
  19. +6
    -1
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml
  20. +24
    -0
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 9
- 2
Frameworks/MQTTnet.NetStandard/MqttFactory.cs View File

@@ -4,6 +4,7 @@ using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer; using MQTTnet.Core.Serializer;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MQTTnet.Implementations; using MQTTnet.Implementations;
using MQTTnet.Core.ManagedClient; using MQTTnet.Core.ManagedClient;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;
@@ -90,9 +91,15 @@ namespace MQTTnet
}; };
} }


public MqttClientSession CreateClientSession(string sessionId, MqttClientSessionsManager mqttClientSessionsManager)
public MqttClientSession CreateClientSession(string clientId, MqttClientSessionsManager clientSessionsManager)
{ {
return new MqttClientSession(sessionId, mqttClientSessionsManager, _serviceProvider.GetRequiredService<ILogger<MqttClientSession>>(), _serviceProvider.GetRequiredService<ILogger<MqttClientPendingMessagesQueue>>());
return new MqttClientSession(
clientId,
_serviceProvider.GetRequiredService<IOptions<MqttServerOptions>>(),
clientSessionsManager,
_serviceProvider.GetRequiredService<MqttClientSubscriptionsManager>(),
_serviceProvider.GetRequiredService<ILogger<MqttClientSession>>(),
_serviceProvider.GetRequiredService<ILogger<MqttClientPendingMessagesQueue>>());
} }


public IMqttClient CreateMqttClient() public IMqttClient CreateMqttClient()


+ 1
- 0
Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs View File

@@ -34,6 +34,7 @@ namespace MQTTnet
services.AddTransient<IMqttPacketSerializer, MqttPacketSerializer>(); services.AddTransient<IMqttPacketSerializer, MqttPacketSerializer>();


services.AddTransient<MqttClientSessionsManager>(); services.AddTransient<MqttClientSessionsManager>();
services.AddTransient<MqttClientSubscriptionsManager>();
services.AddTransient<MqttClientRetainedMessagesManager>(); services.AddTransient<MqttClientRetainedMessagesManager>();
return services; return services;
} }


+ 9
- 2
Frameworks/MQTTnet.UniversalWindows/MqttFactory.cs View File

@@ -1,6 +1,7 @@
using System; using System;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Channel; using MQTTnet.Core.Channel;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
@@ -85,9 +86,15 @@ namespace MQTTnet
}; };
} }


public MqttClientSession CreateClientSession(string sessionId, MqttClientSessionsManager mqttClientSessionsManager)
public MqttClientSession CreateClientSession(string clientId, MqttClientSessionsManager clientSessionsManager)
{ {
return new MqttClientSession(sessionId, mqttClientSessionsManager, _serviceProvider.GetRequiredService<ILogger<MqttClientSession>>(), _serviceProvider.GetRequiredService<ILogger<MqttClientPendingMessagesQueue>>());
return new MqttClientSession(
clientId,
_serviceProvider.GetRequiredService<IOptions<MqttServerOptions>>(),
clientSessionsManager,
_serviceProvider.GetRequiredService<MqttClientSubscriptionsManager>(),
_serviceProvider.GetRequiredService<ILogger<MqttClientSession>>(),
_serviceProvider.GetRequiredService<ILogger<MqttClientPendingMessagesQueue>>());
} }


public IMqttClient CreateMqttClient() public IMqttClient CreateMqttClient()


+ 16
- 0
MQTTnet.Core/ApplicationMessagePublisherExtensions.cs View File

@@ -0,0 +1,16 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Core
{
public static class ApplicationMessagePublisherExtensions
{
public static Task PublishAsync(this IApplicationMessagePublisher client, params MqttApplicationMessage[] applicationMessages)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));

return client.PublishAsync(applicationMessages);
}
}
}

+ 0
- 8
MQTTnet.Core/Client/MqttClientExtensions.cs View File

@@ -7,14 +7,6 @@ namespace MQTTnet.Core.Client
{ {
public static class MqttClientExtensions public static class MqttClientExtensions
{ {
public static Task PublishAsync(this IApplicationMessagePublisher client, params MqttApplicationMessage[] applicationMessages)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));

return client.PublishAsync(applicationMessages);
}

public static Task<IList<MqttSubscribeResult>> SubscribeAsync(this IMqttClient client, params TopicFilter[] topicFilters) public static Task<IList<MqttSubscribeResult>> SubscribeAsync(this IMqttClient client, params TopicFilter[] topicFilters)
{ {
if (client == null) throw new ArgumentNullException(nameof(client)); if (client == null) throw new ArgumentNullException(nameof(client));


MQTTnet.Core/Client/IApplicationMessagePublisher.cs → MQTTnet.Core/IApplicationMessagePublisher.cs View File

@@ -1,7 +1,7 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;


namespace MQTTnet.Core.Client
namespace MQTTnet.Core
{ {
public interface IApplicationMessagePublisher public interface IApplicationMessagePublisher
{ {

MQTTnet.Core/Client/IApplicationMessageReceiver.cs → MQTTnet.Core/IApplicationMessageReceiver.cs View File

@@ -1,6 +1,7 @@
using System; using System;
using MQTTnet.Core.Client;


namespace MQTTnet.Core.Client
namespace MQTTnet.Core
{ {
public interface IApplicationMessageReceiver public interface IApplicationMessageReceiver
{ {

+ 3
- 3
MQTTnet.Core/ManagedClient/ManagedMqttClient.cs View File

@@ -91,16 +91,15 @@ namespace MQTTnet.Core.ManagedClient
return Task.FromResult(0); return Task.FromResult(0);
} }


public Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{ {
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));


foreach (var applicationMessage in applicationMessages) foreach (var applicationMessage in applicationMessages)
{ {
await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
_messageQueue.Add(applicationMessage); _messageQueue.Add(applicationMessage);
} }

return Task.FromResult(0);
} }


public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters) public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
@@ -208,6 +207,7 @@ namespace MQTTnet.Core.ManagedClient
} }


await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)


+ 2
- 10
MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs View File

@@ -1,26 +1,18 @@
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Packets;


namespace MQTTnet.Core.ManagedClient namespace MQTTnet.Core.ManagedClient
{ {
public static class ManagedMqttClientExtensions public static class ManagedMqttClientExtensions
{ {
public static Task EnqueueAsync(this ManagedMqttClient managedClient, params MqttApplicationMessage[] applicationMessages)
{
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient));

return managedClient.EnqueueAsync(applicationMessages);
}

public static Task SubscribeAsync(this ManagedMqttClient managedClient, params TopicFilter[] topicFilters)
public static Task SubscribeAsync(this IManagedMqttClient managedClient, params TopicFilter[] topicFilters)
{ {
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); if (managedClient == null) throw new ArgumentNullException(nameof(managedClient));


return managedClient.SubscribeAsync(topicFilters); return managedClient.SubscribeAsync(topicFilters);
} }


public static Task UnsubscribeAsync(this ManagedMqttClient managedClient, params TopicFilter[] topicFilters)
public static Task UnsubscribeAsync(this IManagedMqttClient managedClient, params TopicFilter[] topicFilters)
{ {
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); if (managedClient == null) throw new ArgumentNullException(nameof(managedClient));




MQTTnet.Core/Client/MqttApplicationMessageReceivedEventArgs.cs → MQTTnet.Core/MqttApplicationMessageReceivedEventArgs.cs View File

@@ -1,6 +1,6 @@
using System; using System;


namespace MQTTnet.Core.Client
namespace MQTTnet.Core
{ {
public sealed class MqttApplicationMessageReceivedEventArgs : EventArgs public sealed class MqttApplicationMessageReceivedEventArgs : EventArgs
{ {

+ 1
- 0
MQTTnet.Core/Server/IMqttServer.cs View File

@@ -11,6 +11,7 @@ namespace MQTTnet.Core.Server
event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected; event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;


IList<ConnectedMqttClient> GetConnectedClients(); IList<ConnectedMqttClient> GetConnectedClients();
void Publish(IEnumerable<MqttApplicationMessage> applicationMessages);


Task StartAsync(); Task StartAsync();
Task StopAsync(); Task StopAsync();


+ 21
- 12
MQTTnet.Core/Server/MqttClientSession.cs View File

@@ -9,6 +9,7 @@ using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using MQTTnet.Core.Serializer; using MQTTnet.Core.Serializer;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;


namespace MQTTnet.Core.Server namespace MQTTnet.Core.Server
{ {
@@ -16,8 +17,8 @@ namespace MQTTnet.Core.Server
{ {
private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>(); private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>();


private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager();
private readonly MqttClientSessionsManager _mqttClientSessionsManager;
private readonly MqttClientSubscriptionsManager _subscriptionsManager;
private readonly MqttClientSessionsManager _sessionsManager;
private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
private readonly MqttServerOptions _options; private readonly MqttServerOptions _options;
private readonly ILogger<MqttClientSession> _logger; private readonly ILogger<MqttClientSession> _logger;
@@ -26,14 +27,22 @@ namespace MQTTnet.Core.Server
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
private MqttApplicationMessage _willMessage; private MqttApplicationMessage _willMessage;


public MqttClientSession(string clientId, MqttClientSessionsManager mqttClientSessionsManager, ILogger<MqttClientSession> logger, ILogger<MqttClientPendingMessagesQueue> msgQueueLogger)
public MqttClientSession(
string clientId,
IOptions<MqttServerOptions> options,
MqttClientSessionsManager sessionsManager,
MqttClientSubscriptionsManager subscriptionsManager,
ILogger<MqttClientSession> logger,
ILogger<MqttClientPendingMessagesQueue> messageQueueLogger)
{ {
_mqttClientSessionsManager = mqttClientSessionsManager ?? throw new ArgumentNullException(nameof(mqttClientSessionsManager));
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));

ClientId = clientId; ClientId = clientId;


_options = mqttClientSessionsManager.Options;
_pendingMessagesQueue = new MqttClientPendingMessagesQueue(mqttClientSessionsManager.Options, this, msgQueueLogger);
_options = options.Value;
_pendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, messageQueueLogger);
} }


public string ClientId { get; } public string ClientId { get; }
@@ -84,7 +93,7 @@ namespace MQTTnet.Core.Server
{ {
if (_willMessage != null) if (_willMessage != null)
{ {
_mqttClientSessionsManager.DispatchApplicationMessage(this, _willMessage);
_sessionsManager.DispatchApplicationMessage(this, _willMessage);
} }
} }
} }
@@ -177,7 +186,7 @@ namespace MQTTnet.Core.Server


private void EnqueueRetainedMessages(MqttSubscribePacket subscribePacket) private void EnqueueRetainedMessages(MqttSubscribePacket subscribePacket)
{ {
var retainedMessages = _mqttClientSessionsManager.RetainedMessagesManager.GetMessages(subscribePacket);
var retainedMessages = _sessionsManager.RetainedMessagesManager.GetMessages(subscribePacket);
foreach (var publishPacket in retainedMessages) foreach (var publishPacket in retainedMessages)
{ {
EnqueuePublishPacket(publishPacket.ToPublishPacket()); EnqueuePublishPacket(publishPacket.ToPublishPacket());
@@ -191,19 +200,19 @@ namespace MQTTnet.Core.Server


if (applicationMessage.Retain) if (applicationMessage.Retain)
{ {
await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, applicationMessage);
await _sessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, applicationMessage);
} }


switch (applicationMessage.QualityOfServiceLevel) switch (applicationMessage.QualityOfServiceLevel)
{ {
case MqttQualityOfServiceLevel.AtMostOnce: case MqttQualityOfServiceLevel.AtMostOnce:
{ {
_mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage);
_sessionsManager.DispatchApplicationMessage(this, applicationMessage);
return; return;
} }
case MqttQualityOfServiceLevel.AtLeastOnce: case MqttQualityOfServiceLevel.AtLeastOnce:
{ {
_mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage);
_sessionsManager.DispatchApplicationMessage(this, applicationMessage);


await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token,
new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
@@ -218,7 +227,7 @@ namespace MQTTnet.Core.Server
_unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier); _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier);
} }


_mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage);
_sessionsManager.DispatchApplicationMessage(this, applicationMessage);


await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token,
new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });


+ 18
- 15
MQTTnet.Core/Server/MqttClientSessionsManager.cs View File

@@ -11,7 +11,6 @@ using MQTTnet.Core.Protocol;
using MQTTnet.Core.Serializer; using MQTTnet.Core.Serializer;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using MQTTnet.Core.Client;


namespace MQTTnet.Core.Server namespace MQTTnet.Core.Server
{ {
@@ -19,14 +18,19 @@ namespace MQTTnet.Core.Server
{ {
private readonly Dictionary<string, MqttClientSession> _clientSessions = new Dictionary<string, MqttClientSession>(); private readonly Dictionary<string, MqttClientSession> _clientSessions = new Dictionary<string, MqttClientSession>();
private readonly ILogger<MqttClientSessionsManager> _logger; private readonly ILogger<MqttClientSessionsManager> _logger;
private readonly IMqttClientSesssionFactory _mqttClientSesssionFactory;

public MqttClientSessionsManager(IOptions<MqttServerOptions> options, ILogger<MqttClientSessionsManager> logger, MqttClientRetainedMessagesManager retainedMessagesManager, IMqttClientSesssionFactory mqttClientSesssionFactory)
private readonly IMqttClientSesssionFactory _clientSesssionFactory;
private readonly MqttServerOptions _options;

public MqttClientSessionsManager(
IOptions<MqttServerOptions> options,
ILogger<MqttClientSessionsManager> logger,
MqttClientRetainedMessagesManager retainedMessagesManager,
IMqttClientSesssionFactory clientSesssionFactory)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
Options = options.Value ?? throw new ArgumentNullException(nameof(options));
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
RetainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(options)); RetainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(options));
_mqttClientSesssionFactory = mqttClientSesssionFactory ?? throw new ArgumentNullException(nameof(mqttClientSesssionFactory));
_clientSesssionFactory = clientSesssionFactory ?? throw new ArgumentNullException(nameof(clientSesssionFactory));
} }


public event EventHandler<MqttClientConnectedEventArgs> ClientConnected; public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
@@ -34,14 +38,13 @@ namespace MQTTnet.Core.Server
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;


public MqttClientRetainedMessagesManager RetainedMessagesManager { get; } public MqttClientRetainedMessagesManager RetainedMessagesManager { get; }
public MqttServerOptions Options { get; }


public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter) public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter)
{ {
var clientId = string.Empty; var clientId = string.Empty;
try try
{ {
if (!(await clientAdapter.ReceivePacketAsync(Options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket))
if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket))
{ {
throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
} }
@@ -50,11 +53,11 @@ namespace MQTTnet.Core.Server


// Switch to the required protocol version before sending any response. // Switch to the required protocol version before sending any response.
clientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion; clientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion;
var connectReturnCode = ValidateConnection(connectPacket); var connectReturnCode = ValidateConnection(connectPacket);
if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{ {
await clientAdapter.SendPacketsAsync(Options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
{ {
ConnectReturnCode = connectReturnCode ConnectReturnCode = connectReturnCode
}).ConfigureAwait(false); }).ConfigureAwait(false);
@@ -64,7 +67,7 @@ namespace MQTTnet.Core.Server


var clientSession = GetOrCreateClientSession(connectPacket); var clientSession = GetOrCreateClientSession(connectPacket);


await clientAdapter.SendPacketsAsync(Options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
{ {
ConnectReturnCode = connectReturnCode, ConnectReturnCode = connectReturnCode,
IsSessionPresent = clientSession.IsExistingSession IsSessionPresent = clientSession.IsExistingSession
@@ -89,7 +92,7 @@ namespace MQTTnet.Core.Server
{ {
try try
{ {
await clientAdapter.DisconnectAsync(Options.DefaultCommunicationTimeout).ConfigureAwait(false);
await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
} }
catch (Exception) catch (Exception)
{ {
@@ -147,9 +150,9 @@ namespace MQTTnet.Core.Server


private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket) private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
{ {
if (Options.ConnectionValidator != null)
if (_options.ConnectionValidator != null)
{ {
return Options.ConnectionValidator(connectPacket);
return _options.ConnectionValidator(connectPacket);
} }


return MqttConnectReturnCode.ConnectionAccepted; return MqttConnectReturnCode.ConnectionAccepted;
@@ -181,7 +184,7 @@ namespace MQTTnet.Core.Server
{ {
isExistingSession = false; isExistingSession = false;


clientSession = _mqttClientSesssionFactory.CreateClientSession(connectPacket.ClientId, this);
clientSession = _clientSesssionFactory.CreateClientSession(connectPacket.ClientId, this);
_clientSessions[connectPacket.ClientId] = clientSession; _clientSessions[connectPacket.ClientId] = clientSession;


_logger.LogTrace("Created a new session for client '{0}'.", connectPacket.ClientId); _logger.LogTrace("Created a new session for client '{0}'.", connectPacket.ClientId);


+ 7
- 0
MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs View File

@@ -9,6 +9,11 @@ namespace MQTTnet.Core.Server
{ {
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscribedTopics = new Dictionary<string, MqttQualityOfServiceLevel>(); private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscribedTopics = new Dictionary<string, MqttQualityOfServiceLevel>();


public MqttClientSubscriptionsManager()
{
}

public MqttSubAckPacket Subscribe(MqttSubscribePacket subscribePacket) public MqttSubAckPacket Subscribe(MqttSubscribePacket subscribePacket)
{ {
if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket)); if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket));
@@ -19,6 +24,8 @@ namespace MQTTnet.Core.Server
{ {
foreach (var topicFilter in subscribePacket.TopicFilters) foreach (var topicFilter in subscribePacket.TopicFilters)
{ {


_subscribedTopics[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; _subscribedTopics[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
responsePacket.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1); // TODO: Add support for QoS 2. responsePacket.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1); // TODO: Add support for QoS 2.
} }


+ 6
- 3
MQTTnet.Core/Server/MqttServer.cs View File

@@ -6,7 +6,6 @@ using MQTTnet.Core.Adapter;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System.Linq; using System.Linq;
using MQTTnet.Core.Client;


namespace MQTTnet.Core.Server namespace MQTTnet.Core.Server
{ {
@@ -46,7 +45,7 @@ namespace MQTTnet.Core.Server
public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected; public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;


public Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
public void Publish(IEnumerable<MqttApplicationMessage> applicationMessages)
{ {
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));


@@ -60,7 +59,11 @@ namespace MQTTnet.Core.Server
_options.ApplicationMessageInterceptor?.Invoke(applicationMessage); _options.ApplicationMessageInterceptor?.Invoke(applicationMessage);
_clientSessionsManager.DispatchApplicationMessage(null, applicationMessage); _clientSessionsManager.DispatchApplicationMessage(null, applicationMessage);
} }
}

public Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{
Publish(applicationMessages);
return Task.FromResult(0); return Task.FromResult(0);
} }




+ 11
- 0
MQTTnet.Core/Server/MqttSubscribeResult.cs View File

@@ -0,0 +1,11 @@
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Server
{
public class MqttSubscribeResult
{
public MqttSubAckPacket ResponsePacket { get; set; }

public bool CloseConnection { get; set; }
}
}

+ 5
- 3
Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs View File

@@ -1,12 +1,14 @@
using MQTTnet.Core.Server;
using System;
using MQTTnet.Core.Server;


namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
{ {
public class TestClientSessionFactory : IMqttClientSesssionFactory public class TestClientSessionFactory : IMqttClientSesssionFactory
{ {
public MqttClientSession CreateClientSession(string sessionId, MqttClientSessionsManager mqttClientSessionsManager)
public MqttClientSession CreateClientSession(string clientId, MqttClientSessionsManager mqttClientSessionsManager)
{ {
return new MqttClientSession(sessionId, mqttClientSessionsManager, new TestLogger<MqttClientSession>(), new TestLogger<MqttClientPendingMessagesQueue>());
throw new NotImplementedException();
//return new MqttClientSession(clientId, mqttClientSessionsManager, new TestLogger<MqttClientSession>(), new TestLogger<MqttClientPendingMessagesQueue>());
} }
} }
} }

+ 3
- 3
Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs View File

@@ -50,14 +50,14 @@ namespace MQTTnet.TestApp.NetCore
Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic); Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic);
}; };


await managedClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("1").Build());
await managedClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS().Build());
await managedClient.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("1").Build());
await managedClient.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS().Build());


await managedClient.StartAsync(options); await managedClient.StartAsync(options);


await managedClient.SubscribeAsync(new TopicFilter("xyz", MqttQualityOfServiceLevel.AtMostOnce)); await managedClient.SubscribeAsync(new TopicFilter("xyz", MqttQualityOfServiceLevel.AtMostOnce));


await managedClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("3").Build());
await managedClient.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("3").Build());


Console.WriteLine("Managed client started."); Console.WriteLine("Managed client started.");
Console.ReadLine(); Console.ReadLine();


+ 6
- 1
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml View File

@@ -65,7 +65,6 @@
<Button Click="Publish" Width="120">Publish</Button> <Button Click="Publish" Width="120">Publish</Button>
</StackPanel> </StackPanel>
</PivotItem> </PivotItem>

<PivotItem Header="Subscribe"> <PivotItem Header="Subscribe">
<StackPanel Background="{ThemeResource ApplicationPageBackgroundThemeBrush}"> <StackPanel Background="{ThemeResource ApplicationPageBackgroundThemeBrush}">
<TextBlock>Topic:</TextBlock> <TextBlock>Topic:</TextBlock>
@@ -84,6 +83,12 @@
</StackPanel> </StackPanel>
</StackPanel> </StackPanel>
</PivotItem> </PivotItem>
<PivotItem Header="Server">
<StackPanel Orientation="Horizontal">
<Button Width="120" Margin="0,0,10,0" Click="StartServer">Start</Button>
<Button Width="120" Margin="0,0,10,0" Click="StopServer">Stop</Button>
</StackPanel>
</PivotItem>
</Pivot> </Pivot>


<ScrollViewer Margin="12,10,12,0" Grid.Row="1" Background="Black" Foreground="WhiteSmoke" FontFamily="Consolas" HorizontalScrollMode="Enabled" VerticalScrollMode="Enabled" HorizontalScrollBarVisibility="Visible" VerticalScrollBarVisibility="Visible"> <ScrollViewer Margin="12,10,12,0" Grid.Row="1" Background="Black" Foreground="WhiteSmoke" FontFamily="Consolas" HorizontalScrollMode="Enabled" VerticalScrollMode="Enabled" HorizontalScrollBarVisibility="Visible" VerticalScrollBarVisibility="Visible">


+ 24
- 0
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs View File

@@ -9,6 +9,7 @@ using MQTTnet.Core;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;
using MQTTnet.Implementations; using MQTTnet.Implementations;


namespace MQTTnet.TestApp.UniversalWindows namespace MQTTnet.TestApp.UniversalWindows
@@ -16,6 +17,7 @@ namespace MQTTnet.TestApp.UniversalWindows
public sealed partial class MainPage public sealed partial class MainPage
{ {
private IMqttClient _mqttClient; private IMqttClient _mqttClient;
private IMqttServer _mqttServer;


public MainPage() public MainPage()
{ {
@@ -319,5 +321,27 @@ namespace MQTTnet.TestApp.UniversalWindows
}; };


} }

private async void StartServer(object sender, RoutedEventArgs e)
{
if (_mqttServer != null)
{
return;
}

_mqttServer = new MqttFactory().CreateMqttServer();
await _mqttServer.StartAsync();
}

private async void StopServer(object sender, RoutedEventArgs e)
{
if (_mqttServer == null)
{
return;
}

await _mqttServer.StopAsync();
_mqttServer = null;
}
} }
} }

Loading…
Cancel
Save