Browse Source

Add server events for subscribed and unsubscribed topics. Add pending messages count for connected clients.

release/3.x.x
Christian 6 years ago
parent
commit
59dcbf4c0c
12 changed files with 205 additions and 74 deletions
  1. +1
    -0
      Build/MQTTnet.nuspec
  2. +2
    -0
      Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs
  3. +5
    -2
      Frameworks/MQTTnet.NetStandard/Server/IMqttServer.cs
  4. +13
    -11
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
  5. +30
    -19
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
  6. +40
    -14
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
  7. +17
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscribedTopicEventArgs.cs
  8. +23
    -13
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs
  9. +17
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttClientUnSubscribedTopicEventArgs.cs
  10. +6
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs
  11. +31
    -12
      Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs
  12. +20
    -2
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs

+ 1
- 0
Build/MQTTnet.nuspec View File

@@ -19,6 +19,7 @@
* [Client] The disconnected event now contains the exception which was thrown and causing the disconnect. * [Client] The disconnected event now contains the exception which was thrown and causing the disconnect.
* [Server] Fixed an issue which lets the server block 1 second after accepting a new connection (thanks to @kpreisser). * [Server] Fixed an issue which lets the server block 1 second after accepting a new connection (thanks to @kpreisser).
* [Server] The server now allows managing client subscriptions. * [Server] The server now allows managing client subscriptions.
* [Server] Added events for topic subscriptions.
</releaseNotes> </releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright> <copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 2
- 0
Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs View File

@@ -12,5 +12,7 @@ namespace MQTTnet.Server
public TimeSpan LastPacketReceived { get; set; } public TimeSpan LastPacketReceived { get; set; }


public TimeSpan LastNonKeepAlivePacketReceived { get; set; } public TimeSpan LastNonKeepAlivePacketReceived { get; set; }

public int PendingApplicationMessages { get; set; }
} }
} }

+ 5
- 2
Frameworks/MQTTnet.NetStandard/Server/IMqttServer.cs View File

@@ -6,10 +6,13 @@ namespace MQTTnet.Server
{ {
public interface IMqttServer : IApplicationMessageReceiver, IApplicationMessagePublisher public interface IMqttServer : IApplicationMessageReceiver, IApplicationMessagePublisher
{ {
event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;
event EventHandler<MqttServerStartedEventArgs> Started; event EventHandler<MqttServerStartedEventArgs> Started;


event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;
event EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic;
event EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic;
Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync(); Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync();


Task SubscribeAsync(string clientId, IList<TopicFilter> topicFilters); Task SubscribeAsync(string clientId, IList<TopicFilter> topicFilters);


+ 13
- 11
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs View File

@@ -15,16 +15,18 @@ namespace MQTTnet.Server
private readonly ConcurrentQueue<MqttBasePacket> _queue = new ConcurrentQueue<MqttBasePacket>(); private readonly ConcurrentQueue<MqttBasePacket> _queue = new ConcurrentQueue<MqttBasePacket>();
private readonly SemaphoreSlim _queueWaitSemaphore = new SemaphoreSlim(0); private readonly SemaphoreSlim _queueWaitSemaphore = new SemaphoreSlim(0);
private readonly IMqttServerOptions _options; private readonly IMqttServerOptions _options;
private readonly MqttClientSession _session;
private readonly MqttClientSession _clientSession;
private readonly IMqttNetLogger _logger; private readonly IMqttNetLogger _logger;


public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession session, IMqttNetLogger logger)
public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession clientSession, IMqttNetLogger logger)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_session = session ?? throw new ArgumentNullException(nameof(session));
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
_clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
} }


public int Count => _queue.Count;

public void Start(IMqttChannelAdapter adapter, CancellationToken cancellationToken) public void Start(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
{ {
if (adapter == null) throw new ArgumentNullException(nameof(adapter)); if (adapter == null) throw new ArgumentNullException(nameof(adapter));
@@ -44,7 +46,7 @@ namespace MQTTnet.Server
_queue.Enqueue(packet); _queue.Enqueue(packet);
_queueWaitSemaphore.Release(); _queueWaitSemaphore.Release();


_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _session.ClientId);
_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _clientSession.ClientId);
} }


private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
@@ -61,7 +63,7 @@ namespace MQTTnet.Server
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.Error<MqttClientPendingMessagesQueue>(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _session.ClientId);
_logger.Error<MqttClientPendingMessagesQueue>(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId);
} }
} }


@@ -78,24 +80,24 @@ namespace MQTTnet.Server


await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false);


_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _session.ClientId);
_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
} }
catch (Exception exception) catch (Exception exception)
{ {
if (exception is MqttCommunicationTimedOutException) if (exception is MqttCommunicationTimedOutException)
{ {
_logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _session.ClientId);
_logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _clientSession.ClientId);
} }
else if (exception is MqttCommunicationException) else if (exception is MqttCommunicationException)
{ {
_logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _session.ClientId);
_logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _clientSession.ClientId);
} }
else if (exception is OperationCanceledException) else if (exception is OperationCanceledException)
{ {
} }
else else
{ {
_logger.Error<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed (ClientId: {0}).", _session.ClientId);
_logger.Error<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId);
} }


if (packet is MqttPublishPacket publishPacket) if (packet is MqttPublishPacket publishPacket)
@@ -108,7 +110,7 @@ namespace MQTTnet.Server
} }
} }


await _session.StopAsync();
await _clientSession.StopAsync();
} }
} }




+ 30
- 19
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs View File

@@ -21,10 +21,7 @@ namespace MQTTnet.Server
private readonly IMqttServerOptions _options; private readonly IMqttServerOptions _options;
private readonly IMqttNetLogger _logger; private readonly IMqttNetLogger _logger;


private readonly MqttClientSessionsManager _sessionsManager;
private readonly MqttRetainedMessagesManager _retainedMessagesManager; private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly MqttClientSubscriptionsManager _subscriptionsManager;
private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;


private IMqttChannelAdapter _adapter; private IMqttChannelAdapter _adapter;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
@@ -34,20 +31,24 @@ namespace MQTTnet.Server
string clientId, string clientId,
IMqttServerOptions options, IMqttServerOptions options,
MqttRetainedMessagesManager retainedMessagesManager, MqttRetainedMessagesManager retainedMessagesManager,
MqttClientSessionsManager sessionsManager,
IMqttNetLogger logger) IMqttNetLogger logger)
{ {
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));


ClientId = clientId; ClientId = clientId;
_subscriptionsManager = new MqttClientSubscriptionsManager(_options, clientId);
_pendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger);
SubscriptionsManager = new MqttClientSubscriptionsManager(_options, clientId);
PendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger);
} }


public Func<MqttClientSession, MqttApplicationMessage, Task> ApplicationMessageReceivedCallback { get; set; }

public MqttClientSubscriptionsManager SubscriptionsManager { get; }

public MqttClientPendingMessagesQueue PendingMessagesQueue { get; }

public string ClientId { get; } public string ClientId { get; }


public MqttProtocolVersion? ProtocolVersion => _adapter?.PacketSerializer.ProtocolVersion; public MqttProtocolVersion? ProtocolVersion => _adapter?.PacketSerializer.ProtocolVersion;
@@ -71,7 +72,7 @@ namespace MQTTnet.Server
_adapter = adapter; _adapter = adapter;
_cancellationTokenSource = cancellationTokenSource; _cancellationTokenSource = cancellationTokenSource;


_pendingMessagesQueue.Start(adapter, cancellationTokenSource.Token);
PendingMessagesQueue.Start(adapter, cancellationTokenSource.Token);


_lastPacketReceivedTracker.Restart(); _lastPacketReceivedTracker.Restart();
_lastNonKeepAlivePacketReceivedTracker.Restart(); _lastNonKeepAlivePacketReceivedTracker.Restart();
@@ -123,7 +124,7 @@ namespace MQTTnet.Server
if (willMessage != null) if (willMessage != null)
{ {
_willMessage = null; //clear willmessage so it is send just once _willMessage = null; //clear willmessage so it is send just once
await _sessionsManager.DispatchApplicationMessageAsync(this, willMessage).ConfigureAwait(false);
await ApplicationMessageReceivedCallback(this, willMessage).ConfigureAwait(false);
} }
} }
} }
@@ -132,7 +133,7 @@ namespace MQTTnet.Server
{ {
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));


var result = await _subscriptionsManager.CheckSubscriptionsAsync(applicationMessage);
var result = await SubscriptionsManager.CheckSubscriptionsAsync(applicationMessage);
if (!result.IsSubscribed) if (!result.IsSubscribed)
{ {
return; return;
@@ -141,30 +142,40 @@ namespace MQTTnet.Server
var publishPacket = applicationMessage.ToPublishPacket(); var publishPacket = applicationMessage.ToPublishPacket();
publishPacket.QualityOfServiceLevel = result.QualityOfServiceLevel; publishPacket.QualityOfServiceLevel = result.QualityOfServiceLevel;


_pendingMessagesQueue.Enqueue(publishPacket);
PendingMessagesQueue.Enqueue(publishPacket);
} }


public Task SubscribeAsync(IList<TopicFilter> topicFilters) public Task SubscribeAsync(IList<TopicFilter> topicFilters)
{ {
return _subscriptionsManager.SubscribeAsync(new MqttSubscribePacket
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

var response = SubscriptionsManager.SubscribeAsync(new MqttSubscribePacket
{ {
TopicFilters = topicFilters TopicFilters = topicFilters
}); });

return response;
} }


public Task UnsubscribeAsync(IList<string> topicFilters) public Task UnsubscribeAsync(IList<string> topicFilters)
{ {
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));


return _subscriptionsManager.UnsubscribeAsync(new MqttUnsubscribePacket
var response = SubscriptionsManager.UnsubscribeAsync(new MqttUnsubscribePacket
{ {
TopicFilters = topicFilters TopicFilters = topicFilters
}); });

return response;
} }


public void Dispose() public void Dispose()
{ {
_pendingMessagesQueue?.Dispose();
ApplicationMessageReceivedCallback = null;

SubscriptionsManager?.Dispose();
PendingMessagesQueue?.Dispose();

_cancellationTokenSource?.Dispose(); _cancellationTokenSource?.Dispose();
} }


@@ -250,7 +261,7 @@ namespace MQTTnet.Server


private async Task HandleIncomingSubscribePacketAsync(IMqttChannelAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) private async Task HandleIncomingSubscribePacketAsync(IMqttChannelAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken)
{ {
var subscribeResult = await _subscriptionsManager.SubscribeAsync(subscribePacket).ConfigureAwait(false);
var subscribeResult = await SubscriptionsManager.SubscribeAsync(subscribePacket).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket).ConfigureAwait(false); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket).ConfigureAwait(false);


if (subscribeResult.CloseConnection) if (subscribeResult.CloseConnection)
@@ -264,7 +275,7 @@ namespace MQTTnet.Server


private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken) private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken)
{ {
var unsubscribeResult = await _subscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false);
var unsubscribeResult = await SubscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, unsubscribeResult); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, unsubscribeResult);
} }


@@ -285,7 +296,7 @@ namespace MQTTnet.Server
{ {
case MqttQualityOfServiceLevel.AtMostOnce: case MqttQualityOfServiceLevel.AtMostOnce:
{ {
return _sessionsManager.DispatchApplicationMessageAsync(this, applicationMessage);
return ApplicationMessageReceivedCallback?.Invoke(this, applicationMessage);
} }
case MqttQualityOfServiceLevel.AtLeastOnce: case MqttQualityOfServiceLevel.AtLeastOnce:
{ {
@@ -304,7 +315,7 @@ namespace MQTTnet.Server


private async Task HandleIncomingPublishPacketWithQoS1(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken) private async Task HandleIncomingPublishPacketWithQoS1(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken)
{ {
await _sessionsManager.DispatchApplicationMessageAsync(this, applicationMessage).ConfigureAwait(false);
await ApplicationMessageReceivedCallback(this, applicationMessage).ConfigureAwait(false);


var response = new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }; var response = new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier };
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response).ConfigureAwait(false); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response).ConfigureAwait(false);
@@ -313,7 +324,7 @@ namespace MQTTnet.Server
private async Task HandleIncomingPublishPacketWithQoS2(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken) private async Task HandleIncomingPublishPacketWithQoS2(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken)
{ {
// QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
await _sessionsManager.DispatchApplicationMessageAsync(this, applicationMessage).ConfigureAwait(false);
await ApplicationMessageReceivedCallback(this, applicationMessage).ConfigureAwait(false);


var response = new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }; var response = new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier };
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response).ConfigureAwait(false); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response).ConfigureAwait(false);


+ 40
- 14
Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs View File

@@ -12,27 +12,32 @@ using MQTTnet.Serializer;


namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public sealed class MqttClientSessionsManager
public sealed class MqttClientSessionsManager : IDisposable
{ {
private readonly Dictionary<string, MqttClientSession> _sessions = new Dictionary<string, MqttClientSession>(); private readonly Dictionary<string, MqttClientSession> _sessions = new Dictionary<string, MqttClientSession>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);


private readonly IMqttServerOptions _options; private readonly IMqttServerOptions _options;
private readonly MqttServer _server;
private readonly MqttRetainedMessagesManager _retainedMessagesManager; private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttNetLogger _logger; private readonly IMqttNetLogger _logger;


public MqttClientSessionsManager(IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, MqttServer server, IMqttNetLogger logger)
public MqttClientSessionsManager(IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger)
{ {
_server = server ?? throw new ArgumentNullException(nameof(server));
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
} }


public async Task RunClientSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
public Action<ConnectedMqttClient> ClientConnectedCallback { get; set; }
public Action<ConnectedMqttClient> ClientDisconnectedCallback { get; set; }
public Action<string, TopicFilter> ClientSubscribedTopicCallback { get; set; }
public Action<string, string> ClientUnsubscribedTopicCallback { get; set; }
public Action<string, MqttApplicationMessage> ApplicationMessageReceivedCallback { get; set; }
public async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
{ {
var clientId = string.Empty; var clientId = string.Empty;
MqttClientSession clientSession = null;
try try
{ {
if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false) is MqttConnectPacket connectPacket)) if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false) is MqttConnectPacket connectPacket))
@@ -56,21 +61,22 @@ namespace MQTTnet.Server
return; return;
} }


var clientSession = await GetOrCreateClientSessionAsync(connectPacket).ConfigureAwait(false);
var result = await GetOrCreateClientSessionAsync(connectPacket).ConfigureAwait(false);
clientSession = result.Session;


await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttConnAckPacket await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttConnAckPacket
{ {
ConnectReturnCode = connectReturnCode, ConnectReturnCode = connectReturnCode,
IsSessionPresent = clientSession.IsExistingSession
IsSessionPresent = result.IsExistingSession
}).ConfigureAwait(false); }).ConfigureAwait(false);


_server.OnClientConnected(new ConnectedMqttClient
ClientConnectedCallback?.Invoke(new ConnectedMqttClient
{ {
ClientId = clientId, ClientId = clientId,
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
}); });


await clientSession.Session.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
} }
catch (Exception exception) catch (Exception exception)
{ {
@@ -87,10 +93,11 @@ namespace MQTTnet.Server
// ignored // ignored
} }


_server.OnClientDisconnected(new ConnectedMqttClient
ClientDisconnectedCallback?.Invoke(new ConnectedMqttClient
{ {
ClientId = clientId, ClientId = clientId,
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion,
PendingApplicationMessages = clientSession?.PendingMessagesQueue.Count ?? 0
}); });
} }
} }
@@ -123,7 +130,8 @@ namespace MQTTnet.Server
ClientId = s.Value.ClientId, ClientId = s.Value.ClientId,
ProtocolVersion = s.Value.ProtocolVersion ?? MqttProtocolVersion.V311, ProtocolVersion = s.Value.ProtocolVersion ?? MqttProtocolVersion.V311,
LastPacketReceived = s.Value.LastPacketReceived, LastPacketReceived = s.Value.LastPacketReceived,
LastNonKeepAlivePacketReceived = s.Value.LastNonKeepAlivePacketReceived
LastNonKeepAlivePacketReceived = s.Value.LastNonKeepAlivePacketReceived,
PendingApplicationMessages = s.Value.PendingMessagesQueue.Count
}).ToList(); }).ToList();
} }
finally finally
@@ -147,7 +155,7 @@ namespace MQTTnet.Server
await _retainedMessagesManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false); await _retainedMessagesManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false);
} }


_server.OnApplicationMessageReceived(senderClientSession?.ClientId, applicationMessage);
ApplicationMessageReceivedCallback?.Invoke(senderClientSession?.ClientId, applicationMessage);
} }
catch (Exception exception) catch (Exception exception)
{ {
@@ -271,7 +279,14 @@ namespace MQTTnet.Server
{ {
isExistingSession = false; isExistingSession = false;


clientSession = new MqttClientSession(connectPacket.ClientId, _options, _retainedMessagesManager, this, _logger);
clientSession = new MqttClientSession(connectPacket.ClientId, _options, _retainedMessagesManager, _logger)
{
ApplicationMessageReceivedCallback = DispatchApplicationMessageAsync
};

clientSession.SubscriptionsManager.TopicSubscribedCallback = ClientSubscribedTopicCallback;
clientSession.SubscriptionsManager.TopicUnsubscribedCallback = ClientUnsubscribedTopicCallback;

_sessions[connectPacket.ClientId] = clientSession; _sessions[connectPacket.ClientId] = clientSession;


_logger.Trace<MqttClientSessionsManager>("Created a new session for client '{0}'.", connectPacket.ClientId); _logger.Trace<MqttClientSessionsManager>("Created a new session for client '{0}'.", connectPacket.ClientId);
@@ -284,5 +299,16 @@ namespace MQTTnet.Server
_semaphore.Release(); _semaphore.Release();
} }
} }

public void Dispose()
{
ClientConnectedCallback = null;
ClientDisconnectedCallback = null;
ClientSubscribedTopicCallback = null;
ClientUnsubscribedTopicCallback = null;
ApplicationMessageReceivedCallback = null;
_semaphore?.Dispose();
}
} }
} }

+ 17
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscribedTopicEventArgs.cs View File

@@ -0,0 +1,17 @@
using System;

namespace MQTTnet.Server
{
public class MqttClientSubscribedTopicEventArgs : EventArgs
{
public MqttClientSubscribedTopicEventArgs(string clientId, TopicFilter topicFilter)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));
}

public string ClientId { get; }

public TopicFilter TopicFilter { get; }
}
}

+ 23
- 13
Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs View File

@@ -8,19 +8,22 @@ using MQTTnet.Protocol;


namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public sealed class MqttClientSubscriptionsManager
public sealed class MqttClientSubscriptionsManager : IDisposable
{ {
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>(); private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly IMqttServerOptions _options; private readonly IMqttServerOptions _options;
private readonly string _clientId; private readonly string _clientId;
public MqttClientSubscriptionsManager(IMqttServerOptions options, string clientId) public MqttClientSubscriptionsManager(IMqttServerOptions options, string clientId)
{ {
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
_clientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); _clientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
} }


public Action<string, TopicFilter> TopicSubscribedCallback { get; set; }
public Action<string, string> TopicUnsubscribedCallback { get; set; }

public async Task<MqttClientSubscribeResult> SubscribeAsync(MqttSubscribePacket subscribePacket) public async Task<MqttClientSubscribeResult> SubscribeAsync(MqttSubscribePacket subscribePacket)
{ {
if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket)); if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket));
@@ -54,6 +57,7 @@ namespace MQTTnet.Server
if (interceptorContext.AcceptSubscription) if (interceptorContext.AcceptSubscription)
{ {
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
TopicSubscribedCallback?.Invoke(_clientId, topicFilter);
} }
} }
} }
@@ -75,6 +79,7 @@ namespace MQTTnet.Server
foreach (var topicFilter in unsubscribePacket.TopicFilters) foreach (var topicFilter in unsubscribePacket.TopicFilters)
{ {
_subscriptions.Remove(topicFilter); _subscriptions.Remove(topicFilter);
TopicUnsubscribedCallback?.Invoke(_clientId, topicFilter);
} }
} }
finally finally
@@ -119,6 +124,22 @@ namespace MQTTnet.Server
} }
} }


public void Dispose()
{
_semaphore?.Dispose();
}

private static MqttSubscribeReturnCode ConvertToMaximumQoS(MqttQualityOfServiceLevel qualityOfServiceLevel)
{
switch (qualityOfServiceLevel)
{
case MqttQualityOfServiceLevel.AtMostOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS0;
case MqttQualityOfServiceLevel.AtLeastOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS1;
case MqttQualityOfServiceLevel.ExactlyOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS2;
default: return MqttSubscribeReturnCode.Failure;
}
}

private MqttSubscriptionInterceptorContext InterceptSubscribe(TopicFilter topicFilter) private MqttSubscriptionInterceptorContext InterceptSubscribe(TopicFilter topicFilter)
{ {
var interceptorContext = new MqttSubscriptionInterceptorContext(_clientId, topicFilter); var interceptorContext = new MqttSubscriptionInterceptorContext(_clientId, topicFilter);
@@ -148,16 +169,5 @@ namespace MQTTnet.Server
QualityOfServiceLevel = effectiveQoS QualityOfServiceLevel = effectiveQoS
}; };
} }

private static MqttSubscribeReturnCode ConvertToMaximumQoS(MqttQualityOfServiceLevel qualityOfServiceLevel)
{
switch (qualityOfServiceLevel)
{
case MqttQualityOfServiceLevel.AtMostOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS0;
case MqttQualityOfServiceLevel.AtLeastOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS1;
case MqttQualityOfServiceLevel.ExactlyOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS2;
default: return MqttSubscribeReturnCode.Failure;
}
}
} }
} }

+ 17
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttClientUnSubscribedTopicEventArgs.cs View File

@@ -0,0 +1,17 @@
using System;

namespace MQTTnet.Server
{
public class MqttClientUnsubscribedTopicEventArgs : EventArgs
{
public MqttClientUnsubscribedTopicEventArgs(string clientId, string topicFilter)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));
}

public string ClientId { get; }

public string TopicFilter { get; }
}
}

+ 6
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs View File

@@ -8,7 +8,7 @@ using MQTTnet.Packets;


namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public sealed class MqttRetainedMessagesManager
public sealed class MqttRetainedMessagesManager : IDisposable
{ {
private readonly Dictionary<string, MqttApplicationMessage> _retainedMessages = new Dictionary<string, MqttApplicationMessage>(); private readonly Dictionary<string, MqttApplicationMessage> _retainedMessages = new Dictionary<string, MqttApplicationMessage>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
@@ -102,6 +102,11 @@ namespace MQTTnet.Server
return retainedMessages; return retainedMessages;
} }


public void Dispose()
{
_semaphore?.Dispose();
}

private async Task HandleMessageInternalAsync(string clientId, MqttApplicationMessage applicationMessage) private async Task HandleMessageInternalAsync(string clientId, MqttApplicationMessage applicationMessage)
{ {
var saveIsRequired = false; var saveIsRequired = false;


+ 31
- 12
Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs View File

@@ -27,8 +27,12 @@ namespace MQTTnet.Server
} }


public event EventHandler<MqttServerStartedEventArgs> Started; public event EventHandler<MqttServerStartedEventArgs> Started;

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected; public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected; public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;
public event EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic;
public event EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic;

public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;


public Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync() public Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync()
@@ -71,11 +75,19 @@ namespace MQTTnet.Server
if (_cancellationTokenSource != null) throw new InvalidOperationException("The server is already started."); if (_cancellationTokenSource != null) throw new InvalidOperationException("The server is already started.");


_cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource = new CancellationTokenSource();
_retainedMessagesManager = new MqttRetainedMessagesManager(_options, _logger);
_clientSessionsManager = new MqttClientSessionsManager(_options, _retainedMessagesManager, this, _logger);


_retainedMessagesManager = new MqttRetainedMessagesManager(_options, _logger);
await _retainedMessagesManager.LoadMessagesAsync(); await _retainedMessagesManager.LoadMessagesAsync();


_clientSessionsManager = new MqttClientSessionsManager(_options, _retainedMessagesManager, _logger)
{
ClientConnectedCallback = OnClientConnected,
ClientDisconnectedCallback = OnClientDisconnected,
ClientSubscribedTopicCallback = OnClientSubscribedTopic,
ClientUnsubscribedTopicCallback = OnClientUnsubscribedTopic,
ApplicationMessageReceivedCallback = OnApplicationMessageReceived
};

foreach (var adapter in _adapters) foreach (var adapter in _adapters)
{ {
adapter.ClientAccepted += OnClientAccepted; adapter.ClientAccepted += OnClientAccepted;
@@ -110,39 +122,46 @@ namespace MQTTnet.Server
} }
finally finally
{ {
_clientSessionsManager?.Dispose();
_retainedMessagesManager?.Dispose();

_cancellationTokenSource = null; _cancellationTokenSource = null;
_retainedMessagesManager = null; _retainedMessagesManager = null;
_clientSessionsManager = null; _clientSessionsManager = null;
} }
} }


internal void OnClientConnected(ConnectedMqttClient client)
private void OnClientConnected(ConnectedMqttClient client)
{ {
if (client == null) throw new ArgumentNullException(nameof(client));

_logger.Info<MqttServer>("Client '{0}': Connected.", client.ClientId); _logger.Info<MqttServer>("Client '{0}': Connected.", client.ClientId);
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(client)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(client));
} }


internal void OnClientDisconnected(ConnectedMqttClient client)
private void OnClientDisconnected(ConnectedMqttClient client)
{ {
if (client == null) throw new ArgumentNullException(nameof(client));

_logger.Info<MqttServer>("Client '{0}': Disconnected.", client.ClientId); _logger.Info<MqttServer>("Client '{0}': Disconnected.", client.ClientId);
ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client)); ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client));
} }
internal void OnApplicationMessageReceived(string clientId, MqttApplicationMessage applicationMessage)
private void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter)
{ {
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
ClientSubscribedTopic?.Invoke(this, new MqttClientSubscribedTopicEventArgs(clientId, topicFilter));
}


private void OnClientUnsubscribedTopic(string clientId, string topicFilter)
{
ClientUnsubscribedTopic?.Invoke(this, new MqttClientUnsubscribedTopicEventArgs(clientId, topicFilter));
}
private void OnApplicationMessageReceived(string clientId, MqttApplicationMessage applicationMessage)
{
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(clientId, applicationMessage)); ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(clientId, applicationMessage));
} }


private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs)
{ {
eventArgs.SessionTask = Task.Run( eventArgs.SessionTask = Task.Run(
async () => await _clientSessionsManager.RunClientSessionAsync(eventArgs.Client, _cancellationTokenSource.Token).ConfigureAwait(false),
async () => await _clientSessionsManager.RunSessionAsync(eventArgs.Client, _cancellationTokenSource.Token).ConfigureAwait(false),
_cancellationTokenSource.Token); _cancellationTokenSource.Token);
} }
} }


+ 20
- 2
Tests/MQTTnet.Core.Tests/MqttServerTests.cs View File

@@ -79,7 +79,7 @@ namespace MQTTnet.Core.Tests
} }


[TestMethod] [TestMethod]
public async Task MqttServer_Unsubscribe()
public async Task MqttServer_SubscribeUnsubscribe()
{ {
var serverAdapter = new TestMqttServerAdapter(); var serverAdapter = new TestMqttServerAdapter();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
@@ -99,13 +99,31 @@ namespace MQTTnet.Core.Tests
await c2.PublishAsync(message); await c2.PublishAsync(message);
Assert.AreEqual(0, receivedMessagesCount); Assert.AreEqual(0, receivedMessagesCount);


var subscribeEventCalled = false;
s.ClientSubscribedTopic += (_, e) =>
{
subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1";
};

await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
await Task.Delay(100);
Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");

await c2.PublishAsync(message); await c2.PublishAsync(message);


await Task.Delay(500); await Task.Delay(500);
Assert.AreEqual(1, receivedMessagesCount); Assert.AreEqual(1, receivedMessagesCount);


var unsubscribeEventCalled = false;
s.ClientUnsubscribedTopic += (_, e) =>
{
unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
};

await c1.UnsubscribeAsync("a"); await c1.UnsubscribeAsync("a");
await Task.Delay(100);
Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");

await c2.PublishAsync(message); await c2.PublishAsync(message);


await Task.Delay(500); await Task.Delay(500);
@@ -343,7 +361,7 @@ namespace MQTTnet.Core.Tests


var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c1 = await serverAdapter.ConnectTestClient(s, "c1");
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c1.ApplicationMessageReceived += (_, e) => c1.ApplicationMessageReceived += (_, e) =>
{ {
if (Encoding.UTF8.GetString(e.ApplicationMessage.Payload) == "The body") if (Encoding.UTF8.GetString(e.ApplicationMessage.Payload) == "The body")


Loading…
Cancel
Save