diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 9c2e0d3..7823d38 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -19,6 +19,7 @@ * [Client] The disconnected event now contains the exception which was thrown and causing the disconnect. * [Server] Fixed an issue which lets the server block 1 second after accepting a new connection (thanks to @kpreisser). * [Server] The server now allows managing client subscriptions. +* [Server] Added events for topic subscriptions. Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs index cd94193..4fa8e4e 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs @@ -12,5 +12,7 @@ namespace MQTTnet.Server public TimeSpan LastPacketReceived { get; set; } public TimeSpan LastNonKeepAlivePacketReceived { get; set; } + + public int PendingApplicationMessages { get; set; } } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/IMqttServer.cs b/Frameworks/MQTTnet.NetStandard/Server/IMqttServer.cs index 99ad0a0..956014b 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/IMqttServer.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/IMqttServer.cs @@ -6,10 +6,13 @@ namespace MQTTnet.Server { public interface IMqttServer : IApplicationMessageReceiver, IApplicationMessagePublisher { - event EventHandler ClientConnected; - event EventHandler ClientDisconnected; event EventHandler Started; + event EventHandler ClientConnected; + event EventHandler ClientDisconnected; + event EventHandler ClientSubscribedTopic; + event EventHandler ClientUnsubscribedTopic; + Task> GetConnectedClientsAsync(); Task SubscribeAsync(string clientId, IList topicFilters); diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs index 62a4d78..27dd49d 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs @@ -15,16 +15,18 @@ namespace MQTTnet.Server private readonly ConcurrentQueue _queue = new ConcurrentQueue(); private readonly SemaphoreSlim _queueWaitSemaphore = new SemaphoreSlim(0); private readonly IMqttServerOptions _options; - private readonly MqttClientSession _session; + private readonly MqttClientSession _clientSession; private readonly IMqttNetLogger _logger; - public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession session, IMqttNetLogger logger) + public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession clientSession, IMqttNetLogger logger) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _session = session ?? throw new ArgumentNullException(nameof(session)); _options = options ?? throw new ArgumentNullException(nameof(options)); + _clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } + public int Count => _queue.Count; + public void Start(IMqttChannelAdapter adapter, CancellationToken cancellationToken) { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); @@ -44,7 +46,7 @@ namespace MQTTnet.Server _queue.Enqueue(packet); _queueWaitSemaphore.Release(); - _logger.Trace("Enqueued packet (ClientId: {0}).", _session.ClientId); + _logger.Trace("Enqueued packet (ClientId: {0}).", _clientSession.ClientId); } private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) @@ -61,7 +63,7 @@ namespace MQTTnet.Server } catch (Exception exception) { - _logger.Error(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _session.ClientId); + _logger.Error(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId); } } @@ -78,24 +80,24 @@ namespace MQTTnet.Server await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false); - _logger.Trace("Enqueued packet sent (ClientId: {0}).", _session.ClientId); + _logger.Trace("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId); } catch (Exception exception) { if (exception is MqttCommunicationTimedOutException) { - _logger.Warning(exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _session.ClientId); + _logger.Warning(exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _clientSession.ClientId); } else if (exception is MqttCommunicationException) { - _logger.Warning(exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _session.ClientId); + _logger.Warning(exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _clientSession.ClientId); } else if (exception is OperationCanceledException) { } else { - _logger.Error(exception, "Sending publish packet failed (ClientId: {0}).", _session.ClientId); + _logger.Error(exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId); } if (packet is MqttPublishPacket publishPacket) @@ -108,7 +110,7 @@ namespace MQTTnet.Server } } - await _session.StopAsync(); + await _clientSession.StopAsync(); } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs index 4098616..c0e9e0c 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs @@ -21,10 +21,7 @@ namespace MQTTnet.Server private readonly IMqttServerOptions _options; private readonly IMqttNetLogger _logger; - private readonly MqttClientSessionsManager _sessionsManager; private readonly MqttRetainedMessagesManager _retainedMessagesManager; - private readonly MqttClientSubscriptionsManager _subscriptionsManager; - private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; private IMqttChannelAdapter _adapter; private CancellationTokenSource _cancellationTokenSource; @@ -34,20 +31,24 @@ namespace MQTTnet.Server string clientId, IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, - MqttClientSessionsManager sessionsManager, IMqttNetLogger logger) { _options = options ?? throw new ArgumentNullException(nameof(options)); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); - _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); ClientId = clientId; - _subscriptionsManager = new MqttClientSubscriptionsManager(_options, clientId); - _pendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger); + SubscriptionsManager = new MqttClientSubscriptionsManager(_options, clientId); + PendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger); } + public Func ApplicationMessageReceivedCallback { get; set; } + + public MqttClientSubscriptionsManager SubscriptionsManager { get; } + + public MqttClientPendingMessagesQueue PendingMessagesQueue { get; } + public string ClientId { get; } public MqttProtocolVersion? ProtocolVersion => _adapter?.PacketSerializer.ProtocolVersion; @@ -71,7 +72,7 @@ namespace MQTTnet.Server _adapter = adapter; _cancellationTokenSource = cancellationTokenSource; - _pendingMessagesQueue.Start(adapter, cancellationTokenSource.Token); + PendingMessagesQueue.Start(adapter, cancellationTokenSource.Token); _lastPacketReceivedTracker.Restart(); _lastNonKeepAlivePacketReceivedTracker.Restart(); @@ -123,7 +124,7 @@ namespace MQTTnet.Server if (willMessage != null) { _willMessage = null; //clear willmessage so it is send just once - await _sessionsManager.DispatchApplicationMessageAsync(this, willMessage).ConfigureAwait(false); + await ApplicationMessageReceivedCallback(this, willMessage).ConfigureAwait(false); } } } @@ -132,7 +133,7 @@ namespace MQTTnet.Server { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); - var result = await _subscriptionsManager.CheckSubscriptionsAsync(applicationMessage); + var result = await SubscriptionsManager.CheckSubscriptionsAsync(applicationMessage); if (!result.IsSubscribed) { return; @@ -141,30 +142,40 @@ namespace MQTTnet.Server var publishPacket = applicationMessage.ToPublishPacket(); publishPacket.QualityOfServiceLevel = result.QualityOfServiceLevel; - _pendingMessagesQueue.Enqueue(publishPacket); + PendingMessagesQueue.Enqueue(publishPacket); } public Task SubscribeAsync(IList topicFilters) { - return _subscriptionsManager.SubscribeAsync(new MqttSubscribePacket + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + + var response = SubscriptionsManager.SubscribeAsync(new MqttSubscribePacket { TopicFilters = topicFilters }); + + return response; } public Task UnsubscribeAsync(IList topicFilters) { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - return _subscriptionsManager.UnsubscribeAsync(new MqttUnsubscribePacket + var response = SubscriptionsManager.UnsubscribeAsync(new MqttUnsubscribePacket { TopicFilters = topicFilters }); + + return response; } public void Dispose() { - _pendingMessagesQueue?.Dispose(); + ApplicationMessageReceivedCallback = null; + + SubscriptionsManager?.Dispose(); + PendingMessagesQueue?.Dispose(); + _cancellationTokenSource?.Dispose(); } @@ -250,7 +261,7 @@ namespace MQTTnet.Server private async Task HandleIncomingSubscribePacketAsync(IMqttChannelAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) { - var subscribeResult = await _subscriptionsManager.SubscribeAsync(subscribePacket).ConfigureAwait(false); + var subscribeResult = await SubscriptionsManager.SubscribeAsync(subscribePacket).ConfigureAwait(false); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket).ConfigureAwait(false); if (subscribeResult.CloseConnection) @@ -264,7 +275,7 @@ namespace MQTTnet.Server private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken) { - var unsubscribeResult = await _subscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false); + var unsubscribeResult = await SubscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, unsubscribeResult); } @@ -285,7 +296,7 @@ namespace MQTTnet.Server { case MqttQualityOfServiceLevel.AtMostOnce: { - return _sessionsManager.DispatchApplicationMessageAsync(this, applicationMessage); + return ApplicationMessageReceivedCallback?.Invoke(this, applicationMessage); } case MqttQualityOfServiceLevel.AtLeastOnce: { @@ -304,7 +315,7 @@ namespace MQTTnet.Server private async Task HandleIncomingPublishPacketWithQoS1(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken) { - await _sessionsManager.DispatchApplicationMessageAsync(this, applicationMessage).ConfigureAwait(false); + await ApplicationMessageReceivedCallback(this, applicationMessage).ConfigureAwait(false); var response = new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }; await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response).ConfigureAwait(false); @@ -313,7 +324,7 @@ namespace MQTTnet.Server private async Task HandleIncomingPublishPacketWithQoS2(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken) { // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] - await _sessionsManager.DispatchApplicationMessageAsync(this, applicationMessage).ConfigureAwait(false); + await ApplicationMessageReceivedCallback(this, applicationMessage).ConfigureAwait(false); var response = new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }; await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response).ConfigureAwait(false); diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs index 6fc8a71..d580dd2 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs @@ -12,27 +12,32 @@ using MQTTnet.Serializer; namespace MQTTnet.Server { - public sealed class MqttClientSessionsManager + public sealed class MqttClientSessionsManager : IDisposable { private readonly Dictionary _sessions = new Dictionary(); private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); private readonly IMqttServerOptions _options; - private readonly MqttServer _server; private readonly MqttRetainedMessagesManager _retainedMessagesManager; private readonly IMqttNetLogger _logger; - public MqttClientSessionsManager(IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, MqttServer server, IMqttNetLogger logger) + public MqttClientSessionsManager(IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger) { - _server = server ?? throw new ArgumentNullException(nameof(server)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _options = options ?? throw new ArgumentNullException(nameof(options)); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); } - public async Task RunClientSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) + public Action ClientConnectedCallback { get; set; } + public Action ClientDisconnectedCallback { get; set; } + public Action ClientSubscribedTopicCallback { get; set; } + public Action ClientUnsubscribedTopicCallback { get; set; } + public Action ApplicationMessageReceivedCallback { get; set; } + + public async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) { var clientId = string.Empty; + MqttClientSession clientSession = null; try { if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false) is MqttConnectPacket connectPacket)) @@ -56,21 +61,22 @@ namespace MQTTnet.Server return; } - var clientSession = await GetOrCreateClientSessionAsync(connectPacket).ConfigureAwait(false); + var result = await GetOrCreateClientSessionAsync(connectPacket).ConfigureAwait(false); + clientSession = result.Session; await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode, - IsSessionPresent = clientSession.IsExistingSession + IsSessionPresent = result.IsExistingSession }).ConfigureAwait(false); - _server.OnClientConnected(new ConnectedMqttClient + ClientConnectedCallback?.Invoke(new ConnectedMqttClient { ClientId = clientId, ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion }); - await clientSession.Session.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); + await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); } catch (Exception exception) { @@ -87,10 +93,11 @@ namespace MQTTnet.Server // ignored } - _server.OnClientDisconnected(new ConnectedMqttClient + ClientDisconnectedCallback?.Invoke(new ConnectedMqttClient { ClientId = clientId, - ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion + ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion, + PendingApplicationMessages = clientSession?.PendingMessagesQueue.Count ?? 0 }); } } @@ -123,7 +130,8 @@ namespace MQTTnet.Server ClientId = s.Value.ClientId, ProtocolVersion = s.Value.ProtocolVersion ?? MqttProtocolVersion.V311, LastPacketReceived = s.Value.LastPacketReceived, - LastNonKeepAlivePacketReceived = s.Value.LastNonKeepAlivePacketReceived + LastNonKeepAlivePacketReceived = s.Value.LastNonKeepAlivePacketReceived, + PendingApplicationMessages = s.Value.PendingMessagesQueue.Count }).ToList(); } finally @@ -147,7 +155,7 @@ namespace MQTTnet.Server await _retainedMessagesManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false); } - _server.OnApplicationMessageReceived(senderClientSession?.ClientId, applicationMessage); + ApplicationMessageReceivedCallback?.Invoke(senderClientSession?.ClientId, applicationMessage); } catch (Exception exception) { @@ -271,7 +279,14 @@ namespace MQTTnet.Server { isExistingSession = false; - clientSession = new MqttClientSession(connectPacket.ClientId, _options, _retainedMessagesManager, this, _logger); + clientSession = new MqttClientSession(connectPacket.ClientId, _options, _retainedMessagesManager, _logger) + { + ApplicationMessageReceivedCallback = DispatchApplicationMessageAsync + }; + + clientSession.SubscriptionsManager.TopicSubscribedCallback = ClientSubscribedTopicCallback; + clientSession.SubscriptionsManager.TopicUnsubscribedCallback = ClientUnsubscribedTopicCallback; + _sessions[connectPacket.ClientId] = clientSession; _logger.Trace("Created a new session for client '{0}'.", connectPacket.ClientId); @@ -284,5 +299,16 @@ namespace MQTTnet.Server _semaphore.Release(); } } + + public void Dispose() + { + ClientConnectedCallback = null; + ClientDisconnectedCallback = null; + ClientSubscribedTopicCallback = null; + ClientUnsubscribedTopicCallback = null; + ApplicationMessageReceivedCallback = null; + + _semaphore?.Dispose(); + } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscribedTopicEventArgs.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscribedTopicEventArgs.cs new file mode 100644 index 0000000..11f5c06 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscribedTopicEventArgs.cs @@ -0,0 +1,17 @@ +using System; + +namespace MQTTnet.Server +{ + public class MqttClientSubscribedTopicEventArgs : EventArgs + { + public MqttClientSubscribedTopicEventArgs(string clientId, TopicFilter topicFilter) + { + ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); + TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter)); + } + + public string ClientId { get; } + + public TopicFilter TopicFilter { get; } + } +} diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs index 77949d4..97b9f04 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs @@ -8,19 +8,22 @@ using MQTTnet.Protocol; namespace MQTTnet.Server { - public sealed class MqttClientSubscriptionsManager + public sealed class MqttClientSubscriptionsManager : IDisposable { private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); private readonly Dictionary _subscriptions = new Dictionary(); private readonly IMqttServerOptions _options; private readonly string _clientId; - + public MqttClientSubscriptionsManager(IMqttServerOptions options, string clientId) { _options = options ?? throw new ArgumentNullException(nameof(options)); _clientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); } + public Action TopicSubscribedCallback { get; set; } + public Action TopicUnsubscribedCallback { get; set; } + public async Task SubscribeAsync(MqttSubscribePacket subscribePacket) { if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket)); @@ -54,6 +57,7 @@ namespace MQTTnet.Server if (interceptorContext.AcceptSubscription) { _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; + TopicSubscribedCallback?.Invoke(_clientId, topicFilter); } } } @@ -75,6 +79,7 @@ namespace MQTTnet.Server foreach (var topicFilter in unsubscribePacket.TopicFilters) { _subscriptions.Remove(topicFilter); + TopicUnsubscribedCallback?.Invoke(_clientId, topicFilter); } } finally @@ -119,6 +124,22 @@ namespace MQTTnet.Server } } + public void Dispose() + { + _semaphore?.Dispose(); + } + + private static MqttSubscribeReturnCode ConvertToMaximumQoS(MqttQualityOfServiceLevel qualityOfServiceLevel) + { + switch (qualityOfServiceLevel) + { + case MqttQualityOfServiceLevel.AtMostOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS0; + case MqttQualityOfServiceLevel.AtLeastOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS1; + case MqttQualityOfServiceLevel.ExactlyOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS2; + default: return MqttSubscribeReturnCode.Failure; + } + } + private MqttSubscriptionInterceptorContext InterceptSubscribe(TopicFilter topicFilter) { var interceptorContext = new MqttSubscriptionInterceptorContext(_clientId, topicFilter); @@ -148,16 +169,5 @@ namespace MQTTnet.Server QualityOfServiceLevel = effectiveQoS }; } - - private static MqttSubscribeReturnCode ConvertToMaximumQoS(MqttQualityOfServiceLevel qualityOfServiceLevel) - { - switch (qualityOfServiceLevel) - { - case MqttQualityOfServiceLevel.AtMostOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS0; - case MqttQualityOfServiceLevel.AtLeastOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS1; - case MqttQualityOfServiceLevel.ExactlyOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS2; - default: return MqttSubscribeReturnCode.Failure; - } - } } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientUnSubscribedTopicEventArgs.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientUnSubscribedTopicEventArgs.cs new file mode 100644 index 0000000..d76347e --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientUnSubscribedTopicEventArgs.cs @@ -0,0 +1,17 @@ +using System; + +namespace MQTTnet.Server +{ + public class MqttClientUnsubscribedTopicEventArgs : EventArgs + { + public MqttClientUnsubscribedTopicEventArgs(string clientId, string topicFilter) + { + ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); + TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter)); + } + + public string ClientId { get; } + + public string TopicFilter { get; } + } +} diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs index 2c0b260..7b90209 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs @@ -8,7 +8,7 @@ using MQTTnet.Packets; namespace MQTTnet.Server { - public sealed class MqttRetainedMessagesManager + public sealed class MqttRetainedMessagesManager : IDisposable { private readonly Dictionary _retainedMessages = new Dictionary(); private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); @@ -102,6 +102,11 @@ namespace MQTTnet.Server return retainedMessages; } + public void Dispose() + { + _semaphore?.Dispose(); + } + private async Task HandleMessageInternalAsync(string clientId, MqttApplicationMessage applicationMessage) { var saveIsRequired = false; diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs index ac3b6d1..3a4a018 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs @@ -27,8 +27,12 @@ namespace MQTTnet.Server } public event EventHandler Started; + public event EventHandler ClientConnected; public event EventHandler ClientDisconnected; + public event EventHandler ClientSubscribedTopic; + public event EventHandler ClientUnsubscribedTopic; + public event EventHandler ApplicationMessageReceived; public Task> GetConnectedClientsAsync() @@ -71,11 +75,19 @@ namespace MQTTnet.Server if (_cancellationTokenSource != null) throw new InvalidOperationException("The server is already started."); _cancellationTokenSource = new CancellationTokenSource(); - _retainedMessagesManager = new MqttRetainedMessagesManager(_options, _logger); - _clientSessionsManager = new MqttClientSessionsManager(_options, _retainedMessagesManager, this, _logger); + _retainedMessagesManager = new MqttRetainedMessagesManager(_options, _logger); await _retainedMessagesManager.LoadMessagesAsync(); + _clientSessionsManager = new MqttClientSessionsManager(_options, _retainedMessagesManager, _logger) + { + ClientConnectedCallback = OnClientConnected, + ClientDisconnectedCallback = OnClientDisconnected, + ClientSubscribedTopicCallback = OnClientSubscribedTopic, + ClientUnsubscribedTopicCallback = OnClientUnsubscribedTopic, + ApplicationMessageReceivedCallback = OnApplicationMessageReceived + }; + foreach (var adapter in _adapters) { adapter.ClientAccepted += OnClientAccepted; @@ -110,39 +122,46 @@ namespace MQTTnet.Server } finally { + _clientSessionsManager?.Dispose(); + _retainedMessagesManager?.Dispose(); + _cancellationTokenSource = null; _retainedMessagesManager = null; _clientSessionsManager = null; } } - internal void OnClientConnected(ConnectedMqttClient client) + private void OnClientConnected(ConnectedMqttClient client) { - if (client == null) throw new ArgumentNullException(nameof(client)); - _logger.Info("Client '{0}': Connected.", client.ClientId); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(client)); } - internal void OnClientDisconnected(ConnectedMqttClient client) + private void OnClientDisconnected(ConnectedMqttClient client) { - if (client == null) throw new ArgumentNullException(nameof(client)); - _logger.Info("Client '{0}': Disconnected.", client.ClientId); ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client)); } - - internal void OnApplicationMessageReceived(string clientId, MqttApplicationMessage applicationMessage) + + private void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter) { - if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); + ClientSubscribedTopic?.Invoke(this, new MqttClientSubscribedTopicEventArgs(clientId, topicFilter)); + } + private void OnClientUnsubscribedTopic(string clientId, string topicFilter) + { + ClientUnsubscribedTopic?.Invoke(this, new MqttClientUnsubscribedTopicEventArgs(clientId, topicFilter)); + } + + private void OnApplicationMessageReceived(string clientId, MqttApplicationMessage applicationMessage) + { ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(clientId, applicationMessage)); } private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) { eventArgs.SessionTask = Task.Run( - async () => await _clientSessionsManager.RunClientSessionAsync(eventArgs.Client, _cancellationTokenSource.Token).ConfigureAwait(false), + async () => await _clientSessionsManager.RunSessionAsync(eventArgs.Client, _cancellationTokenSource.Token).ConfigureAwait(false), _cancellationTokenSource.Token); } } diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 26c07df..7cf8bf3 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -79,7 +79,7 @@ namespace MQTTnet.Core.Tests } [TestMethod] - public async Task MqttServer_Unsubscribe() + public async Task MqttServer_SubscribeUnsubscribe() { var serverAdapter = new TestMqttServerAdapter(); var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); @@ -99,13 +99,31 @@ namespace MQTTnet.Core.Tests await c2.PublishAsync(message); Assert.AreEqual(0, receivedMessagesCount); + var subscribeEventCalled = false; + s.ClientSubscribedTopic += (_, e) => + { + subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1"; + }; + await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); + await Task.Delay(100); + Assert.IsTrue(subscribeEventCalled, "Subscribe event not called."); + await c2.PublishAsync(message); await Task.Delay(500); Assert.AreEqual(1, receivedMessagesCount); + var unsubscribeEventCalled = false; + s.ClientUnsubscribedTopic += (_, e) => + { + unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1"; + }; + await c1.UnsubscribeAsync("a"); + await Task.Delay(100); + Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called."); + await c2.PublishAsync(message); await Task.Delay(500); @@ -343,7 +361,7 @@ namespace MQTTnet.Core.Tests var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c2 = await serverAdapter.ConnectTestClient(s, "c2"); - + c1.ApplicationMessageReceived += (_, e) => { if (Encoding.UTF8.GetString(e.ApplicationMessage.Payload) == "The body")