Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 
 

194 rindas
7.9 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Core.Adapter;
  7. using MQTTnet.Core.Exceptions;
  8. using MQTTnet.Core.Internal;
  9. using MQTTnet.Core.Packets;
  10. using MQTTnet.Core.Protocol;
  11. using MQTTnet.Core.Serializer;
  12. using Microsoft.Extensions.Logging;
  13. using Microsoft.Extensions.Options;
  14. using MQTTnet.Core.Client;
  15. namespace MQTTnet.Core.Server
  16. {
  17. public sealed class MqttClientSessionsManager
  18. {
  19. private readonly Dictionary<string, MqttClientSession> _clientSessions = new Dictionary<string, MqttClientSession>();
  20. private readonly ILogger<MqttClientSessionsManager> _logger;
  21. private readonly IMqttClientSesssionFactory _mqttClientSesssionFactory;
  22. public MqttClientSessionsManager(IOptions<MqttServerOptions> options, ILogger<MqttClientSessionsManager> logger, MqttClientRetainedMessagesManager retainedMessagesManager, IMqttClientSesssionFactory mqttClientSesssionFactory)
  23. {
  24. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  25. Options = options.Value ?? throw new ArgumentNullException(nameof(options));
  26. RetainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(options));
  27. _mqttClientSesssionFactory = mqttClientSesssionFactory ?? throw new ArgumentNullException(nameof(mqttClientSesssionFactory));
  28. }
  29. public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
  30. public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;
  31. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  32. public MqttClientRetainedMessagesManager RetainedMessagesManager { get; }
  33. public MqttServerOptions Options { get; }
  34. public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter)
  35. {
  36. var clientId = string.Empty;
  37. try
  38. {
  39. if (!(await clientAdapter.ReceivePacketAsync(Options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket))
  40. {
  41. throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
  42. }
  43. clientId = connectPacket.ClientId;
  44. // Switch to the required protocol version before sending any response.
  45. clientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion;
  46. var connectReturnCode = ValidateConnection(connectPacket);
  47. if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
  48. {
  49. await clientAdapter.SendPacketsAsync(Options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
  50. {
  51. ConnectReturnCode = connectReturnCode
  52. }).ConfigureAwait(false);
  53. return;
  54. }
  55. var clientSession = GetOrCreateClientSession(connectPacket);
  56. await clientAdapter.SendPacketsAsync(Options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
  57. {
  58. ConnectReturnCode = connectReturnCode,
  59. IsSessionPresent = clientSession.IsExistingSession
  60. }).ConfigureAwait(false);
  61. ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(new ConnectedMqttClient
  62. {
  63. ClientId = clientId,
  64. ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
  65. }));
  66. using (_logger.BeginScope(clientId))
  67. {
  68. await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false);
  69. }
  70. }
  71. catch (Exception exception)
  72. {
  73. _logger.LogError(new EventId(), exception, exception.Message);
  74. }
  75. finally
  76. {
  77. try
  78. {
  79. await clientAdapter.DisconnectAsync(Options.DefaultCommunicationTimeout).ConfigureAwait(false);
  80. }
  81. catch (Exception)
  82. {
  83. //ignored
  84. }
  85. ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(new ConnectedMqttClient
  86. {
  87. ClientId = clientId,
  88. ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
  89. }));
  90. }
  91. }
  92. public void Clear()
  93. {
  94. lock (_clientSessions)
  95. {
  96. _clientSessions.Clear();
  97. }
  98. }
  99. public IList<ConnectedMqttClient> GetConnectedClients()
  100. {
  101. lock (_clientSessions)
  102. {
  103. return _clientSessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient
  104. {
  105. ClientId = s.Value.ClientId,
  106. ProtocolVersion = s.Value.ProtocolVersion ?? MqttProtocolVersion.V311
  107. }).ToList();
  108. }
  109. }
  110. public void DispatchApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
  111. {
  112. try
  113. {
  114. var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession?.ClientId, applicationMessage);
  115. ApplicationMessageReceived?.Invoke(this, eventArgs);
  116. }
  117. catch (Exception exception)
  118. {
  119. _logger.LogError(new EventId(), exception, "Error while processing application message");
  120. }
  121. lock (_clientSessions)
  122. {
  123. foreach (var clientSession in _clientSessions.Values.ToList())
  124. {
  125. clientSession.EnqueuePublishPacket(applicationMessage.ToPublishPacket());
  126. }
  127. }
  128. }
  129. private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
  130. {
  131. if (Options.ConnectionValidator != null)
  132. {
  133. return Options.ConnectionValidator(connectPacket);
  134. }
  135. return MqttConnectReturnCode.ConnectionAccepted;
  136. }
  137. private GetOrCreateClientSessionResult GetOrCreateClientSession(MqttConnectPacket connectPacket)
  138. {
  139. lock (_clientSessions)
  140. {
  141. var isSessionPresent = _clientSessions.TryGetValue(connectPacket.ClientId, out var clientSession);
  142. if (isSessionPresent)
  143. {
  144. if (connectPacket.CleanSession)
  145. {
  146. _clientSessions.Remove(connectPacket.ClientId);
  147. clientSession.Dispose();
  148. clientSession = null;
  149. _logger.LogTrace("Disposed existing session of client '{0}'.", connectPacket.ClientId);
  150. }
  151. else
  152. {
  153. _logger.LogTrace("Reusing existing session of client '{0}'.", connectPacket.ClientId);
  154. }
  155. }
  156. var isExistingSession = true;
  157. if (clientSession == null)
  158. {
  159. isExistingSession = false;
  160. clientSession = _mqttClientSesssionFactory.CreateClientSession(connectPacket.ClientId, this);
  161. _clientSessions[connectPacket.ClientId] = clientSession;
  162. _logger.LogTrace("Created a new session for client '{0}'.", connectPacket.ClientId);
  163. }
  164. return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession };
  165. }
  166. }
  167. }
  168. }