You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

335 line
13 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Adapter;
  7. using MQTTnet.Diagnostics;
  8. using MQTTnet.Exceptions;
  9. using MQTTnet.Packets;
  10. using MQTTnet.Protocol;
  11. using MQTTnet.Serializer;
  12. namespace MQTTnet.Server
  13. {
  14. public sealed class MqttClientSessionsManager : IDisposable
  15. {
  16. private readonly Dictionary<string, MqttClientSession> _sessions = new Dictionary<string, MqttClientSession>();
  17. private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
  18. private readonly IMqttServerOptions _options;
  19. private readonly MqttRetainedMessagesManager _retainedMessagesManager;
  20. private readonly IMqttNetLogger _logger;
  21. public MqttClientSessionsManager(IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger)
  22. {
  23. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  24. _options = options ?? throw new ArgumentNullException(nameof(options));
  25. _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
  26. }
  27. public Action<ConnectedMqttClient> ClientConnectedCallback { get; set; }
  28. public Action<ConnectedMqttClient, bool> ClientDisconnectedCallback { get; set; }
  29. public Action<string, TopicFilter> ClientSubscribedTopicCallback { get; set; }
  30. public Action<string, string> ClientUnsubscribedTopicCallback { get; set; }
  31. public Action<string, MqttApplicationMessage> ApplicationMessageReceivedCallback { get; set; }
  32. public async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
  33. {
  34. var clientId = string.Empty;
  35. var wasCleanDisconnect = false;
  36. MqttClientSession clientSession = null;
  37. try
  38. {
  39. if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken)
  40. .ConfigureAwait(false) is MqttConnectPacket connectPacket))
  41. {
  42. throw new MqttProtocolViolationException(
  43. "The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
  44. }
  45. clientId = connectPacket.ClientId;
  46. // Switch to the required protocol version before sending any response.
  47. clientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion;
  48. var connectReturnCode = ValidateConnection(connectPacket);
  49. if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
  50. {
  51. await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[]
  52. {
  53. new MqttConnAckPacket
  54. {
  55. ConnectReturnCode = connectReturnCode
  56. }
  57. }).ConfigureAwait(false);
  58. return;
  59. }
  60. var result = await GetOrCreateClientSessionAsync(connectPacket).ConfigureAwait(false);
  61. clientSession = result.Session;
  62. await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[]
  63. {
  64. new MqttConnAckPacket
  65. {
  66. ConnectReturnCode = connectReturnCode,
  67. IsSessionPresent = result.IsExistingSession
  68. }
  69. }).ConfigureAwait(false);
  70. ClientConnectedCallback?.Invoke(new ConnectedMqttClient
  71. {
  72. ClientId = clientId,
  73. ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
  74. });
  75. wasCleanDisconnect = await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
  76. }
  77. catch (OperationCanceledException)
  78. {
  79. }
  80. catch (Exception exception)
  81. {
  82. _logger.Error<MqttClientSessionsManager>(exception, exception.Message);
  83. }
  84. finally
  85. {
  86. try
  87. {
  88. await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
  89. clientAdapter.Dispose();
  90. }
  91. catch (Exception exception)
  92. {
  93. _logger.Error<MqttClientSessionsManager>(exception, exception.Message);
  94. }
  95. ClientDisconnectedCallback?.Invoke(new ConnectedMqttClient
  96. {
  97. ClientId = clientId,
  98. ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion,
  99. PendingApplicationMessages = clientSession?.PendingMessagesQueue.Count ?? 0
  100. },
  101. wasCleanDisconnect);
  102. }
  103. }
  104. public async Task StopAsync()
  105. {
  106. await _semaphore.WaitAsync().ConfigureAwait(false);
  107. try
  108. {
  109. foreach (var session in _sessions)
  110. {
  111. await session.Value.StopAsync().ConfigureAwait(false);
  112. }
  113. _sessions.Clear();
  114. }
  115. finally
  116. {
  117. _semaphore.Release();
  118. }
  119. }
  120. public async Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync()
  121. {
  122. await _semaphore.WaitAsync().ConfigureAwait(false);
  123. try
  124. {
  125. return _sessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient
  126. {
  127. ClientId = s.Value.ClientId,
  128. ProtocolVersion = s.Value.ProtocolVersion ?? MqttProtocolVersion.V311,
  129. LastPacketReceived = s.Value.KeepAliveMonitor.LastPacketReceived,
  130. LastNonKeepAlivePacketReceived = s.Value.KeepAliveMonitor.LastNonKeepAlivePacketReceived,
  131. PendingApplicationMessages = s.Value.PendingMessagesQueue.Count
  132. }).ToList();
  133. }
  134. finally
  135. {
  136. _semaphore.Release();
  137. }
  138. }
  139. public async Task DispatchApplicationMessageAsync(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
  140. {
  141. try
  142. {
  143. var interceptorContext = InterceptApplicationMessage(senderClientSession, applicationMessage);
  144. if (interceptorContext.CloseConnection)
  145. {
  146. await senderClientSession.StopAsync().ConfigureAwait(false);
  147. }
  148. if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish)
  149. {
  150. return;
  151. }
  152. if (applicationMessage.Retain)
  153. {
  154. await _retainedMessagesManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false);
  155. }
  156. ApplicationMessageReceivedCallback?.Invoke(senderClientSession?.ClientId, applicationMessage);
  157. }
  158. catch (Exception exception)
  159. {
  160. _logger.Error<MqttClientSessionsManager>(exception, "Error while processing application message");
  161. }
  162. await _semaphore.WaitAsync().ConfigureAwait(false);
  163. try
  164. {
  165. foreach (var clientSession in _sessions.Values)
  166. {
  167. await clientSession.EnqueueApplicationMessageAsync(applicationMessage);
  168. }
  169. }
  170. finally
  171. {
  172. _semaphore.Release();
  173. }
  174. }
  175. public async Task SubscribeAsync(string clientId, IList<TopicFilter> topicFilters)
  176. {
  177. if (clientId == null) throw new ArgumentNullException(nameof(clientId));
  178. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  179. await _semaphore.WaitAsync().ConfigureAwait(false);
  180. try
  181. {
  182. if (!_sessions.TryGetValue(clientId, out var session))
  183. {
  184. throw new InvalidOperationException($"Client session {clientId} is unknown.");
  185. }
  186. await session.SubscribeAsync(topicFilters);
  187. }
  188. finally
  189. {
  190. _semaphore.Release();
  191. }
  192. }
  193. public async Task UnsubscribeAsync(string clientId, IList<string> topicFilters)
  194. {
  195. if (clientId == null) throw new ArgumentNullException(nameof(clientId));
  196. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  197. await _semaphore.WaitAsync().ConfigureAwait(false);
  198. try
  199. {
  200. if (!_sessions.TryGetValue(clientId, out var session))
  201. {
  202. throw new InvalidOperationException($"Client session {clientId} is unknown.");
  203. }
  204. await session.UnsubscribeAsync(topicFilters);
  205. }
  206. finally
  207. {
  208. _semaphore.Release();
  209. }
  210. }
  211. private MqttApplicationMessageInterceptorContext InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
  212. {
  213. var interceptorContext = new MqttApplicationMessageInterceptorContext(
  214. senderClientSession?.ClientId,
  215. applicationMessage);
  216. var interceptor = _options.ApplicationMessageInterceptor;
  217. if (interceptor == null)
  218. {
  219. return interceptorContext;
  220. }
  221. interceptor(interceptorContext);
  222. return interceptorContext;
  223. }
  224. private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
  225. {
  226. if (_options.ConnectionValidator == null)
  227. {
  228. return MqttConnectReturnCode.ConnectionAccepted;
  229. }
  230. var context = new MqttConnectionValidatorContext(
  231. connectPacket.ClientId,
  232. connectPacket.Username,
  233. connectPacket.Password,
  234. connectPacket.WillMessage);
  235. _options.ConnectionValidator(context);
  236. return context.ReturnCode;
  237. }
  238. private async Task<GetOrCreateClientSessionResult> GetOrCreateClientSessionAsync(MqttConnectPacket connectPacket)
  239. {
  240. await _semaphore.WaitAsync().ConfigureAwait(false);
  241. try
  242. {
  243. var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var clientSession);
  244. if (isSessionPresent)
  245. {
  246. if (connectPacket.CleanSession)
  247. {
  248. _sessions.Remove(connectPacket.ClientId);
  249. await clientSession.StopAsync().ConfigureAwait(false);
  250. clientSession.Dispose();
  251. clientSession = null;
  252. _logger.Verbose<MqttClientSessionsManager>("Stopped existing session of client '{0}'.", connectPacket.ClientId);
  253. }
  254. else
  255. {
  256. _logger.Verbose<MqttClientSessionsManager>("Reusing existing session of client '{0}'.", connectPacket.ClientId);
  257. }
  258. }
  259. var isExistingSession = true;
  260. if (clientSession == null)
  261. {
  262. isExistingSession = false;
  263. clientSession = new MqttClientSession(connectPacket.ClientId, _options, _retainedMessagesManager, _logger)
  264. {
  265. ApplicationMessageReceivedCallback = DispatchApplicationMessageAsync
  266. };
  267. clientSession.SubscriptionsManager.TopicSubscribedCallback = ClientSubscribedTopicCallback;
  268. clientSession.SubscriptionsManager.TopicUnsubscribedCallback = ClientUnsubscribedTopicCallback;
  269. _sessions[connectPacket.ClientId] = clientSession;
  270. _logger.Verbose<MqttClientSessionsManager>("Created a new session for client '{0}'.", connectPacket.ClientId);
  271. }
  272. return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession };
  273. }
  274. finally
  275. {
  276. _semaphore.Release();
  277. }
  278. }
  279. public void Dispose()
  280. {
  281. ClientConnectedCallback = null;
  282. ClientDisconnectedCallback = null;
  283. ClientSubscribedTopicCallback = null;
  284. ClientUnsubscribedTopicCallback = null;
  285. ApplicationMessageReceivedCallback = null;
  286. _semaphore?.Dispose();
  287. }
  288. }
  289. }