Não pode escolher mais do que 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 
 

323 linhas
12 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Adapter;
  7. using MQTTnet.Diagnostics;
  8. using MQTTnet.Exceptions;
  9. using MQTTnet.Packets;
  10. using MQTTnet.Protocol;
  11. namespace MQTTnet.Server
  12. {
  13. public sealed class MqttClientSessionsManager : IDisposable
  14. {
  15. private readonly Dictionary<string, MqttClientSession> _sessions = new Dictionary<string, MqttClientSession>();
  16. private readonly SemaphoreSlim _sessionsLock = new SemaphoreSlim(1, 1);
  17. private readonly MqttRetainedMessagesManager _retainedMessagesManager;
  18. private readonly IMqttServerOptions _options;
  19. private readonly IMqttNetLogger _logger;
  20. public MqttClientSessionsManager(IMqttServerOptions options, MqttServer server, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger)
  21. {
  22. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  23. _options = options ?? throw new ArgumentNullException(nameof(options));
  24. Server = server ?? throw new ArgumentNullException(nameof(server));
  25. _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
  26. }
  27. public MqttServer Server { get; }
  28. public async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
  29. {
  30. var clientId = string.Empty;
  31. var wasCleanDisconnect = false;
  32. MqttClientSession clientSession = null;
  33. try
  34. {
  35. if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken)
  36. .ConfigureAwait(false) is MqttConnectPacket connectPacket))
  37. {
  38. throw new MqttProtocolViolationException(
  39. "The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
  40. }
  41. clientId = connectPacket.ClientId;
  42. // Switch to the required protocol version before sending any response.
  43. clientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion;
  44. var connectReturnCode = ValidateConnection(connectPacket);
  45. if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
  46. {
  47. await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[]
  48. {
  49. new MqttConnAckPacket
  50. {
  51. ConnectReturnCode = connectReturnCode
  52. }
  53. }, cancellationToken).ConfigureAwait(false);
  54. return;
  55. }
  56. var result = await PrepareClientSessionAsync(connectPacket).ConfigureAwait(false);
  57. clientSession = result.Session;
  58. await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[]
  59. {
  60. new MqttConnAckPacket
  61. {
  62. ConnectReturnCode = connectReturnCode,
  63. IsSessionPresent = result.IsExistingSession
  64. }
  65. }, cancellationToken).ConfigureAwait(false);
  66. Server.OnClientConnected(new ConnectedMqttClient
  67. {
  68. ClientId = clientId,
  69. ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
  70. });
  71. wasCleanDisconnect = await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
  72. }
  73. catch (OperationCanceledException)
  74. {
  75. }
  76. catch (Exception exception)
  77. {
  78. _logger.Error<MqttClientSessionsManager>(exception, exception.Message);
  79. }
  80. finally
  81. {
  82. try
  83. {
  84. await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
  85. clientAdapter.Dispose();
  86. }
  87. catch (Exception exception)
  88. {
  89. _logger.Error<MqttClientSessionsManager>(exception, exception.Message);
  90. }
  91. Server.OnClientDisconnected(new ConnectedMqttClient
  92. {
  93. ClientId = clientId,
  94. ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion,
  95. PendingApplicationMessages = clientSession?.PendingMessagesQueue.Count ?? 0
  96. },
  97. wasCleanDisconnect);
  98. }
  99. }
  100. public async Task StopAsync()
  101. {
  102. await _sessionsLock.WaitAsync().ConfigureAwait(false);
  103. try
  104. {
  105. foreach (var session in _sessions)
  106. {
  107. session.Value.Stop(MqttClientDisconnectType.NotClean);
  108. }
  109. _sessions.Clear();
  110. }
  111. finally
  112. {
  113. _sessionsLock.Release();
  114. }
  115. }
  116. public async Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync()
  117. {
  118. await _sessionsLock.WaitAsync().ConfigureAwait(false);
  119. try
  120. {
  121. return _sessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient
  122. {
  123. ClientId = s.Value.ClientId,
  124. ProtocolVersion = s.Value.ProtocolVersion,
  125. LastPacketReceived = s.Value.KeepAliveMonitor.LastPacketReceived,
  126. LastNonKeepAlivePacketReceived = s.Value.KeepAliveMonitor.LastNonKeepAlivePacketReceived,
  127. PendingApplicationMessages = s.Value.PendingMessagesQueue.Count
  128. }).ToList();
  129. }
  130. finally
  131. {
  132. _sessionsLock.Release();
  133. }
  134. }
  135. public void StartDispatchApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
  136. {
  137. Task.Run(() => DispatchApplicationMessageAsync(senderClientSession, applicationMessage));
  138. }
  139. public async Task SubscribeAsync(string clientId, IList<TopicFilter> topicFilters)
  140. {
  141. if (clientId == null) throw new ArgumentNullException(nameof(clientId));
  142. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  143. await _sessionsLock.WaitAsync().ConfigureAwait(false);
  144. try
  145. {
  146. if (!_sessions.TryGetValue(clientId, out var session))
  147. {
  148. throw new InvalidOperationException($"Client session '{clientId}' is unknown.");
  149. }
  150. await session.SubscribeAsync(topicFilters).ConfigureAwait(false);
  151. }
  152. finally
  153. {
  154. _sessionsLock.Release();
  155. }
  156. }
  157. public async Task UnsubscribeAsync(string clientId, IList<string> topicFilters)
  158. {
  159. if (clientId == null) throw new ArgumentNullException(nameof(clientId));
  160. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  161. await _sessionsLock.WaitAsync().ConfigureAwait(false);
  162. try
  163. {
  164. if (!_sessions.TryGetValue(clientId, out var session))
  165. {
  166. throw new InvalidOperationException($"Client session '{clientId}' is unknown.");
  167. }
  168. await session.UnsubscribeAsync(topicFilters).ConfigureAwait(false);
  169. }
  170. finally
  171. {
  172. _sessionsLock.Release();
  173. }
  174. }
  175. public void Dispose()
  176. {
  177. _sessionsLock?.Dispose();
  178. }
  179. private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
  180. {
  181. if (_options.ConnectionValidator == null)
  182. {
  183. return MqttConnectReturnCode.ConnectionAccepted;
  184. }
  185. var context = new MqttConnectionValidatorContext(
  186. connectPacket.ClientId,
  187. connectPacket.Username,
  188. connectPacket.Password,
  189. connectPacket.WillMessage);
  190. _options.ConnectionValidator(context);
  191. return context.ReturnCode;
  192. }
  193. private async Task<GetOrCreateClientSessionResult> PrepareClientSessionAsync(MqttConnectPacket connectPacket)
  194. {
  195. await _sessionsLock.WaitAsync().ConfigureAwait(false);
  196. try
  197. {
  198. var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var clientSession);
  199. if (isSessionPresent)
  200. {
  201. if (connectPacket.CleanSession)
  202. {
  203. _sessions.Remove(connectPacket.ClientId);
  204. clientSession.Stop(MqttClientDisconnectType.Clean);
  205. clientSession.Dispose();
  206. clientSession = null;
  207. _logger.Verbose<MqttClientSessionsManager>("Stopped existing session of client '{0}'.", connectPacket.ClientId);
  208. }
  209. else
  210. {
  211. _logger.Verbose<MqttClientSessionsManager>("Reusing existing session of client '{0}'.", connectPacket.ClientId);
  212. }
  213. }
  214. var isExistingSession = true;
  215. if (clientSession == null)
  216. {
  217. isExistingSession = false;
  218. clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _retainedMessagesManager, _logger);
  219. _sessions[connectPacket.ClientId] = clientSession;
  220. _logger.Verbose<MqttClientSessionsManager>("Created a new session for client '{0}'.", connectPacket.ClientId);
  221. }
  222. return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession };
  223. }
  224. finally
  225. {
  226. _sessionsLock.Release();
  227. }
  228. }
  229. private async Task DispatchApplicationMessageAsync(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
  230. {
  231. try
  232. {
  233. var interceptorContext = InterceptApplicationMessage(senderClientSession, applicationMessage);
  234. if (interceptorContext.CloseConnection)
  235. {
  236. senderClientSession.Stop(MqttClientDisconnectType.NotClean);
  237. }
  238. if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish)
  239. {
  240. return;
  241. }
  242. if (applicationMessage.Retain)
  243. {
  244. await _retainedMessagesManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false);
  245. }
  246. Server.OnApplicationMessageReceived(senderClientSession?.ClientId, applicationMessage);
  247. }
  248. catch (Exception exception)
  249. {
  250. _logger.Error<MqttClientSessionsManager>(exception, "Error while processing application message");
  251. }
  252. await _sessionsLock.WaitAsync().ConfigureAwait(false);
  253. try
  254. {
  255. foreach (var clientSession in _sessions.Values)
  256. {
  257. await clientSession.EnqueueApplicationMessageAsync(applicationMessage).ConfigureAwait(false);
  258. }
  259. }
  260. finally
  261. {
  262. _sessionsLock.Release();
  263. }
  264. }
  265. private MqttApplicationMessageInterceptorContext InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
  266. {
  267. var interceptorContext = new MqttApplicationMessageInterceptorContext(
  268. senderClientSession?.ClientId,
  269. applicationMessage);
  270. var interceptor = _options.ApplicationMessageInterceptor;
  271. if (interceptor == null)
  272. {
  273. return interceptorContext;
  274. }
  275. interceptor(interceptorContext);
  276. return interceptorContext;
  277. }
  278. }
  279. }