diff --git a/Source/MQTTnet/Server/IMqttServer.cs b/Source/MQTTnet/Server/IMqttServer.cs index aceceee..0286720 100644 --- a/Source/MQTTnet/Server/IMqttServer.cs +++ b/Source/MQTTnet/Server/IMqttServer.cs @@ -10,9 +10,16 @@ namespace MQTTnet.Server event EventHandler Started; event EventHandler Stopped; - event EventHandler ClientConnected; - event EventHandler ClientDisconnected; + IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; } + [Obsolete("Use ClientConnectedHandler instead.")] + event EventHandler ClientConnected; + + IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; } + [Obsolete("Use ClientDisconnectedHandler instead.")] + event EventHandler ClientDisconnected; + event EventHandler ClientSubscribedTopic; + event EventHandler ClientUnsubscribedTopic; IMqttServerOptions Options { get; } diff --git a/Source/MQTTnet/Server/IMqttServerClientConnectedHandler.cs b/Source/MQTTnet/Server/IMqttServerClientConnectedHandler.cs new file mode 100644 index 0000000..b507e41 --- /dev/null +++ b/Source/MQTTnet/Server/IMqttServerClientConnectedHandler.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace MQTTnet.Server +{ + public interface IMqttServerClientConnectedHandler + { + Task HandleClientConnectedAsync(MqttServerClientConnectedEventArgs eventArgs); + } +} diff --git a/Source/MQTTnet/Server/IMqttServerClientDisconnectedHandler.cs b/Source/MQTTnet/Server/IMqttServerClientDisconnectedHandler.cs new file mode 100644 index 0000000..3614d2d --- /dev/null +++ b/Source/MQTTnet/Server/IMqttServerClientDisconnectedHandler.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace MQTTnet.Server +{ + public interface IMqttServerClientDisconnectedHandler + { + Task HandleClientDisconnectedAsync(MqttServerClientDisconnectedEventArgs eventArgs); + } +} diff --git a/Source/MQTTnet/Server/IMqttServerFactory.cs b/Source/MQTTnet/Server/IMqttServerFactory.cs index 7d23ed3..e8c7d40 100644 --- a/Source/MQTTnet/Server/IMqttServerFactory.cs +++ b/Source/MQTTnet/Server/IMqttServerFactory.cs @@ -10,6 +10,8 @@ namespace MQTTnet.Server IMqttServer CreateMqttServer(IMqttNetLogger logger); + IMqttServer CreateMqttServer(IEnumerable adapters); + IMqttServer CreateMqttServer(IEnumerable adapters, IMqttNetLogger logger); } } \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index fa1d6a0..0610bb9 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -14,62 +14,6 @@ using MQTTnet.Server.Status; namespace MQTTnet.Server { - public class MqttClientSession - { - private readonly DateTime _createdTimestamp = DateTime.UtcNow; - - public MqttClientSession(string clientId, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions) - { - ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); - - SubscriptionsManager = new MqttClientSubscriptionsManager(clientId, eventDispatcher, serverOptions); - ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions); - } - - public string ClientId { get; } - - public bool IsCleanSession { get; set; } = true; - - public MqttApplicationMessage WillMessage { get; set; } - - public MqttClientSubscriptionsManager SubscriptionsManager { get; } - - public MqttClientSessionApplicationMessagesQueue ApplicationMessagesQueue { get; } - - public void EnqueueApplicationMessage(MqttApplicationMessage applicationMessage, string senderClientId, bool isRetainedApplicationMessage) - { - var checkSubscriptionsResult = SubscriptionsManager.CheckSubscriptions(applicationMessage.Topic, applicationMessage.QualityOfServiceLevel); - if (!checkSubscriptionsResult.IsSubscribed) - { - return; - } - - ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage); - } - - public async Task SubscribeAsync(ICollection topicFilters, MqttRetainedMessagesManager retainedMessagesManager) - { - await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false); - var matchingRetainedMessages = await retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false); - foreach (var matchingRetainedMessage in matchingRetainedMessages) - { - EnqueueApplicationMessage(matchingRetainedMessage, null, true); - } - } - - public Task UnsubscribeAsync(IEnumerable topicFilters) - { - return SubscriptionsManager.UnsubscribeAsync(topicFilters); - } - - public void FillStatus(MqttSessionStatus status) - { - status.ClientId = ClientId; - status.CreatedTimestamp = _createdTimestamp; - status.PendingApplicationMessagesCount = ApplicationMessagesQueue.Count; - } - } - public class MqttClientConnection : IMqttClientSession, IDisposable { private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs new file mode 100644 index 0000000..427fdfa --- /dev/null +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -0,0 +1,63 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using MQTTnet.Server.Status; + +namespace MQTTnet.Server +{ + public class MqttClientSession + { + private readonly DateTime _createdTimestamp = DateTime.UtcNow; + + public MqttClientSession(string clientId, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions) + { + ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); + + SubscriptionsManager = new MqttClientSubscriptionsManager(clientId, eventDispatcher, serverOptions); + ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions); + } + + public string ClientId { get; } + + public bool IsCleanSession { get; set; } = true; + + public MqttApplicationMessage WillMessage { get; set; } + + public MqttClientSubscriptionsManager SubscriptionsManager { get; } + + public MqttClientSessionApplicationMessagesQueue ApplicationMessagesQueue { get; } + + public void EnqueueApplicationMessage(MqttApplicationMessage applicationMessage, string senderClientId, bool isRetainedApplicationMessage) + { + var checkSubscriptionsResult = SubscriptionsManager.CheckSubscriptions(applicationMessage.Topic, applicationMessage.QualityOfServiceLevel); + if (!checkSubscriptionsResult.IsSubscribed) + { + return; + } + + ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage); + } + + public async Task SubscribeAsync(ICollection topicFilters, MqttRetainedMessagesManager retainedMessagesManager) + { + await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false); + var matchingRetainedMessages = await retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false); + foreach (var matchingRetainedMessage in matchingRetainedMessages) + { + EnqueueApplicationMessage(matchingRetainedMessage, null, true); + } + } + + public Task UnsubscribeAsync(IEnumerable topicFilters) + { + return SubscriptionsManager.UnsubscribeAsync(topicFilters); + } + + public void FillStatus(MqttSessionStatus status) + { + status.ClientId = ClientId; + status.CreatedTimestamp = _createdTimestamp; + status.PendingApplicationMessagesCount = ApplicationMessagesQueue.Count; + } + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index 4d36e4c..a0a9d44 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -50,9 +50,14 @@ namespace MQTTnet.Server public event EventHandler Started; public event EventHandler Stopped; - public event EventHandler ClientConnected; - public event EventHandler ClientDisconnected; + public IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; } + public event EventHandler ClientConnected; + + public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; } + public event EventHandler ClientDisconnected; + public event EventHandler ClientSubscribedTopic; + public event EventHandler ClientUnsubscribedTopic; public IMqttApplicationMessageHandler ReceivedApplicationMessageHandler { get; set; } diff --git a/Source/MQTTnet/Server/MqttClientConnectedEventArgs.cs b/Source/MQTTnet/Server/MqttServerClientConnectedEventArgs.cs similarity index 61% rename from Source/MQTTnet/Server/MqttClientConnectedEventArgs.cs rename to Source/MQTTnet/Server/MqttServerClientConnectedEventArgs.cs index 6f043b4..71c42c3 100644 --- a/Source/MQTTnet/Server/MqttClientConnectedEventArgs.cs +++ b/Source/MQTTnet/Server/MqttServerClientConnectedEventArgs.cs @@ -2,9 +2,9 @@ namespace MQTTnet.Server { - public class MqttClientConnectedEventArgs : EventArgs + public class MqttServerClientConnectedEventArgs : EventArgs { - public MqttClientConnectedEventArgs(string clientId) + public MqttServerClientConnectedEventArgs(string clientId) { ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); } diff --git a/Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs b/Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs new file mode 100644 index 0000000..5847744 --- /dev/null +++ b/Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; + +namespace MQTTnet.Server +{ + public class MqttServerClientConnectedHandlerDelegate : IMqttServerClientConnectedHandler + { + private readonly Func _handler; + + public MqttServerClientConnectedHandlerDelegate(Action handler) + { + if (handler == null) throw new ArgumentNullException(nameof(handler)); + + _handler = context => + { + handler(context); + return Task.FromResult(0); + }; + } + + public MqttServerClientConnectedHandlerDelegate(Func handler) + { + _handler = handler ?? throw new ArgumentNullException(nameof(handler)); + } + + public Task HandleClientConnectedAsync(MqttServerClientConnectedEventArgs eventArgs) + { + return _handler(eventArgs); + } + } +} diff --git a/Source/MQTTnet/Server/MqttClientDisconnectedEventArgs.cs b/Source/MQTTnet/Server/MqttServerClientDisconnectedEventArgs.cs similarity index 64% rename from Source/MQTTnet/Server/MqttClientDisconnectedEventArgs.cs rename to Source/MQTTnet/Server/MqttServerClientDisconnectedEventArgs.cs index 225d025..9e74274 100644 --- a/Source/MQTTnet/Server/MqttClientDisconnectedEventArgs.cs +++ b/Source/MQTTnet/Server/MqttServerClientDisconnectedEventArgs.cs @@ -2,9 +2,9 @@ namespace MQTTnet.Server { - public class MqttClientDisconnectedEventArgs : EventArgs + public class MqttServerClientDisconnectedEventArgs : EventArgs { - public MqttClientDisconnectedEventArgs(string clientId, MqttClientDisconnectType disconnectType) + public MqttServerClientDisconnectedEventArgs(string clientId, MqttClientDisconnectType disconnectType) { ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); DisconnectType = disconnectType; diff --git a/Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs b/Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs new file mode 100644 index 0000000..45c46d9 --- /dev/null +++ b/Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; + +namespace MQTTnet.Server +{ + public class MqttServerClientDisconnectedHandlerDelegate : IMqttServerClientDisconnectedHandler + { + private readonly Func _handler; + + public MqttServerClientDisconnectedHandlerDelegate(Action handler) + { + if (handler == null) throw new ArgumentNullException(nameof(handler)); + + _handler = context => + { + handler(context); + return Task.FromResult(0); + }; + } + + public MqttServerClientDisconnectedHandlerDelegate(Func handler) + { + _handler = handler ?? throw new ArgumentNullException(nameof(handler)); + } + + public Task HandleClientDisconnectedAsync(MqttServerClientDisconnectedEventArgs eventArgs) + { + return _handler(eventArgs); + } + } +} diff --git a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs index 03ed93d..4daa994 100644 --- a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs +++ b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs @@ -8,9 +8,9 @@ namespace MQTTnet.Server public event EventHandler ClientUnsubscribedTopic; - public event EventHandler ClientConnected; + public event EventHandler ClientConnected; - public event EventHandler ClientDisconnected; + public event EventHandler ClientDisconnected; public event EventHandler ApplicationMessageReceived; @@ -26,7 +26,7 @@ namespace MQTTnet.Server public void OnClientDisconnected(string clientId, MqttClientDisconnectType disconnectType) { - ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientId, disconnectType)); + ClientDisconnected?.Invoke(this, new MqttServerClientDisconnectedEventArgs(clientId, disconnectType)); } public void OnApplicationMessageReceived(string senderClientId, MqttApplicationMessage applicationMessage) @@ -36,7 +36,7 @@ namespace MQTTnet.Server public void OnClientConnected(string clientId) { - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientId)); + ClientConnected?.Invoke(this, new MqttServerClientConnectedEventArgs(clientId)); } } }