Browse Source

Migrate events in server to async handlers.

release/3.x.x
Christian 5 years ago
parent
commit
dffc27c1f4
12 changed files with 169 additions and 68 deletions
  1. +9
    -2
      Source/MQTTnet/Server/IMqttServer.cs
  2. +9
    -0
      Source/MQTTnet/Server/IMqttServerClientConnectedHandler.cs
  3. +9
    -0
      Source/MQTTnet/Server/IMqttServerClientDisconnectedHandler.cs
  4. +2
    -0
      Source/MQTTnet/Server/IMqttServerFactory.cs
  5. +0
    -56
      Source/MQTTnet/Server/MqttClientConnection.cs
  6. +63
    -0
      Source/MQTTnet/Server/MqttClientSession.cs
  7. +7
    -2
      Source/MQTTnet/Server/MqttServer.cs
  8. +2
    -2
      Source/MQTTnet/Server/MqttServerClientConnectedEventArgs.cs
  9. +31
    -0
      Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs
  10. +2
    -2
      Source/MQTTnet/Server/MqttServerClientDisconnectedEventArgs.cs
  11. +31
    -0
      Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs
  12. +4
    -4
      Source/MQTTnet/Server/MqttServerEventDispatcher.cs

+ 9
- 2
Source/MQTTnet/Server/IMqttServer.cs View File

@@ -10,9 +10,16 @@ namespace MQTTnet.Server
event EventHandler Started;
event EventHandler Stopped;

event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;
IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; }
[Obsolete("Use ClientConnectedHandler instead.")]
event EventHandler<MqttServerClientConnectedEventArgs> ClientConnected;

IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; }
[Obsolete("Use ClientDisconnectedHandler instead.")]
event EventHandler<MqttServerClientDisconnectedEventArgs> ClientDisconnected;

event EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic;

event EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic;
IMqttServerOptions Options { get; }


+ 9
- 0
Source/MQTTnet/Server/IMqttServerClientConnectedHandler.cs View File

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public interface IMqttServerClientConnectedHandler
{
Task HandleClientConnectedAsync(MqttServerClientConnectedEventArgs eventArgs);
}
}

+ 9
- 0
Source/MQTTnet/Server/IMqttServerClientDisconnectedHandler.cs View File

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public interface IMqttServerClientDisconnectedHandler
{
Task HandleClientDisconnectedAsync(MqttServerClientDisconnectedEventArgs eventArgs);
}
}

+ 2
- 0
Source/MQTTnet/Server/IMqttServerFactory.cs View File

@@ -10,6 +10,8 @@ namespace MQTTnet.Server

IMqttServer CreateMqttServer(IMqttNetLogger logger);

IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> adapters);

IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger);
}
}

+ 0
- 56
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -14,62 +14,6 @@ using MQTTnet.Server.Status;

namespace MQTTnet.Server
{
public class MqttClientSession
{
private readonly DateTime _createdTimestamp = DateTime.UtcNow;

public MqttClientSession(string clientId, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));

SubscriptionsManager = new MqttClientSubscriptionsManager(clientId, eventDispatcher, serverOptions);
ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions);
}

public string ClientId { get; }

public bool IsCleanSession { get; set; } = true;

public MqttApplicationMessage WillMessage { get; set; }

public MqttClientSubscriptionsManager SubscriptionsManager { get; }

public MqttClientSessionApplicationMessagesQueue ApplicationMessagesQueue { get; }

public void EnqueueApplicationMessage(MqttApplicationMessage applicationMessage, string senderClientId, bool isRetainedApplicationMessage)
{
var checkSubscriptionsResult = SubscriptionsManager.CheckSubscriptions(applicationMessage.Topic, applicationMessage.QualityOfServiceLevel);
if (!checkSubscriptionsResult.IsSubscribed)
{
return;
}

ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage);
}

public async Task SubscribeAsync(ICollection<TopicFilter> topicFilters, MqttRetainedMessagesManager retainedMessagesManager)
{
await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false);
var matchingRetainedMessages = await retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false);
foreach (var matchingRetainedMessage in matchingRetainedMessages)
{
EnqueueApplicationMessage(matchingRetainedMessage, null, true);
}
}

public Task UnsubscribeAsync(IEnumerable<string> topicFilters)
{
return SubscriptionsManager.UnsubscribeAsync(topicFilters);
}

public void FillStatus(MqttSessionStatus status)
{
status.ClientId = ClientId;
status.CreatedTimestamp = _createdTimestamp;
status.PendingApplicationMessagesCount = ApplicationMessagesQueue.Count;
}
}

public class MqttClientConnection : IMqttClientSession, IDisposable
{
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();


+ 63
- 0
Source/MQTTnet/Server/MqttClientSession.cs View File

@@ -0,0 +1,63 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Server.Status;

namespace MQTTnet.Server
{
public class MqttClientSession
{
private readonly DateTime _createdTimestamp = DateTime.UtcNow;

public MqttClientSession(string clientId, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));

SubscriptionsManager = new MqttClientSubscriptionsManager(clientId, eventDispatcher, serverOptions);
ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions);
}

public string ClientId { get; }

public bool IsCleanSession { get; set; } = true;

public MqttApplicationMessage WillMessage { get; set; }

public MqttClientSubscriptionsManager SubscriptionsManager { get; }

public MqttClientSessionApplicationMessagesQueue ApplicationMessagesQueue { get; }

public void EnqueueApplicationMessage(MqttApplicationMessage applicationMessage, string senderClientId, bool isRetainedApplicationMessage)
{
var checkSubscriptionsResult = SubscriptionsManager.CheckSubscriptions(applicationMessage.Topic, applicationMessage.QualityOfServiceLevel);
if (!checkSubscriptionsResult.IsSubscribed)
{
return;
}

ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage);
}

public async Task SubscribeAsync(ICollection<TopicFilter> topicFilters, MqttRetainedMessagesManager retainedMessagesManager)
{
await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false);
var matchingRetainedMessages = await retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false);
foreach (var matchingRetainedMessage in matchingRetainedMessages)
{
EnqueueApplicationMessage(matchingRetainedMessage, null, true);
}
}

public Task UnsubscribeAsync(IEnumerable<string> topicFilters)
{
return SubscriptionsManager.UnsubscribeAsync(topicFilters);
}

public void FillStatus(MqttSessionStatus status)
{
status.ClientId = ClientId;
status.CreatedTimestamp = _createdTimestamp;
status.PendingApplicationMessagesCount = ApplicationMessagesQueue.Count;
}
}
}

+ 7
- 2
Source/MQTTnet/Server/MqttServer.cs View File

@@ -50,9 +50,14 @@ namespace MQTTnet.Server
public event EventHandler Started;
public event EventHandler Stopped;

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;
public IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; }
public event EventHandler<MqttServerClientConnectedEventArgs> ClientConnected;

public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; }
public event EventHandler<MqttServerClientDisconnectedEventArgs> ClientDisconnected;

public event EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic;

public event EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic;

public IMqttApplicationMessageHandler ReceivedApplicationMessageHandler { get; set; }


Source/MQTTnet/Server/MqttClientConnectedEventArgs.cs → Source/MQTTnet/Server/MqttServerClientConnectedEventArgs.cs View File

@@ -2,9 +2,9 @@

namespace MQTTnet.Server
{
public class MqttClientConnectedEventArgs : EventArgs
public class MqttServerClientConnectedEventArgs : EventArgs
{
public MqttClientConnectedEventArgs(string clientId)
public MqttServerClientConnectedEventArgs(string clientId)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
}

+ 31
- 0
Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttServerClientConnectedHandlerDelegate : IMqttServerClientConnectedHandler
{
private readonly Func<MqttServerClientConnectedEventArgs, Task> _handler;

public MqttServerClientConnectedHandlerDelegate(Action<MqttServerClientConnectedEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = context =>
{
handler(context);
return Task.FromResult(0);
};
}

public MqttServerClientConnectedHandlerDelegate(Func<MqttServerClientConnectedEventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleClientConnectedAsync(MqttServerClientConnectedEventArgs eventArgs)
{
return _handler(eventArgs);
}
}
}

Source/MQTTnet/Server/MqttClientDisconnectedEventArgs.cs → Source/MQTTnet/Server/MqttServerClientDisconnectedEventArgs.cs View File

@@ -2,9 +2,9 @@

namespace MQTTnet.Server
{
public class MqttClientDisconnectedEventArgs : EventArgs
public class MqttServerClientDisconnectedEventArgs : EventArgs
{
public MqttClientDisconnectedEventArgs(string clientId, MqttClientDisconnectType disconnectType)
public MqttServerClientDisconnectedEventArgs(string clientId, MqttClientDisconnectType disconnectType)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
DisconnectType = disconnectType;

+ 31
- 0
Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttServerClientDisconnectedHandlerDelegate : IMqttServerClientDisconnectedHandler
{
private readonly Func<MqttServerClientDisconnectedEventArgs, Task> _handler;

public MqttServerClientDisconnectedHandlerDelegate(Action<MqttServerClientDisconnectedEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = context =>
{
handler(context);
return Task.FromResult(0);
};
}

public MqttServerClientDisconnectedHandlerDelegate(Func<MqttServerClientDisconnectedEventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleClientDisconnectedAsync(MqttServerClientDisconnectedEventArgs eventArgs)
{
return _handler(eventArgs);
}
}
}

+ 4
- 4
Source/MQTTnet/Server/MqttServerEventDispatcher.cs View File

@@ -8,9 +8,9 @@ namespace MQTTnet.Server

public event EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic;

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
public event EventHandler<MqttServerClientConnectedEventArgs> ClientConnected;

public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;
public event EventHandler<MqttServerClientDisconnectedEventArgs> ClientDisconnected;

public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

@@ -26,7 +26,7 @@ namespace MQTTnet.Server

public void OnClientDisconnected(string clientId, MqttClientDisconnectType disconnectType)
{
ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientId, disconnectType));
ClientDisconnected?.Invoke(this, new MqttServerClientDisconnectedEventArgs(clientId, disconnectType));
}

public void OnApplicationMessageReceived(string senderClientId, MqttApplicationMessage applicationMessage)
@@ -36,7 +36,7 @@ namespace MQTTnet.Server

public void OnClientConnected(string clientId)
{
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientId));
ClientConnected?.Invoke(this, new MqttServerClientConnectedEventArgs(clientId));
}
}
}

Loading…
Cancel
Save