You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

141 lines
5.4 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Adapter;
  6. using MQTTnet.Core.Diagnostics;
  7. using MQTTnet.Core.Exceptions;
  8. using MQTTnet.Core.Internal;
  9. using MQTTnet.Core.Packets;
  10. using MQTTnet.Core.Protocol;
  11. namespace MQTTnet.Core.Server
  12. {
  13. public sealed class MqttClientSessionsManager
  14. {
  15. private readonly object _syncRoot = new object();
  16. private readonly Dictionary<string, MqttClientSession> _clientSessions = new Dictionary<string, MqttClientSession>();
  17. private readonly MqttServerOptions _options;
  18. public MqttClientSessionsManager(MqttServerOptions options)
  19. {
  20. _options = options ?? throw new ArgumentNullException(nameof(options));
  21. }
  22. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  23. public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs)
  24. {
  25. try
  26. {
  27. var connectPacket = await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout) as MqttConnectPacket;
  28. if (connectPacket == null)
  29. {
  30. throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
  31. }
  32. var connectReturnCode = ValidateConnection(connectPacket);
  33. if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
  34. {
  35. await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket
  36. {
  37. ConnectReturnCode = connectReturnCode
  38. }, _options.DefaultCommunicationTimeout);
  39. return;
  40. }
  41. var clientSession = GetOrCreateClientSession(connectPacket);
  42. await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket
  43. {
  44. ConnectReturnCode = connectReturnCode,
  45. IsSessionPresent = clientSession.IsExistingSession
  46. }, _options.DefaultCommunicationTimeout);
  47. await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter);
  48. }
  49. catch (Exception exception)
  50. {
  51. MqttTrace.Error(nameof(MqttServer), exception, exception.Message);
  52. }
  53. finally
  54. {
  55. await eventArgs.ClientAdapter.DisconnectAsync();
  56. }
  57. }
  58. public void Clear()
  59. {
  60. lock (_syncRoot)
  61. {
  62. _clientSessions.Clear();
  63. }
  64. }
  65. public IList<string> GetConnectedClients()
  66. {
  67. lock (_syncRoot)
  68. {
  69. return _clientSessions.Where(s => s.Value.IsConnected).Select(s => s.Key).ToList();
  70. }
  71. }
  72. private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
  73. {
  74. if (_options.ConnectionValidator != null)
  75. {
  76. return _options.ConnectionValidator(connectPacket);
  77. }
  78. return MqttConnectReturnCode.ConnectionAccepted;
  79. }
  80. private GetOrCreateClientSessionResult GetOrCreateClientSession(MqttConnectPacket connectPacket)
  81. {
  82. lock (_syncRoot)
  83. {
  84. MqttClientSession clientSession;
  85. var isSessionPresent = _clientSessions.TryGetValue(connectPacket.ClientId, out clientSession);
  86. if (isSessionPresent)
  87. {
  88. if (connectPacket.CleanSession)
  89. {
  90. _clientSessions.Remove(connectPacket.ClientId);
  91. clientSession.Dispose();
  92. clientSession = null;
  93. MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Disposed existing session of client '{connectPacket.ClientId}'.");
  94. }
  95. else
  96. {
  97. MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Reusing existing session of client '{connectPacket.ClientId}'.");
  98. }
  99. }
  100. var isExistingSession = true;
  101. if (clientSession == null)
  102. {
  103. isExistingSession = false;
  104. clientSession = new MqttClientSession(connectPacket.ClientId, _options, DispatchPublishPacket);
  105. _clientSessions[connectPacket.ClientId] = clientSession;
  106. MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Created a new session for client '{connectPacket.ClientId}'.");
  107. }
  108. return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession };
  109. }
  110. }
  111. private void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
  112. {
  113. var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession.ClientId, publishPacket.ToApplicationMessage());
  114. ApplicationMessageReceived?.Invoke(this, eventArgs);
  115. foreach (var clientSession in _clientSessions.Values.ToList())
  116. {
  117. clientSession.EnqueuePublishPacket(senderClientSession, publishPacket);
  118. }
  119. }
  120. }
  121. }