25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 
 
 

135 satır
5.0 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Adapter;
  6. using MQTTnet.Core.Diagnostics;
  7. using MQTTnet.Core.Exceptions;
  8. using MQTTnet.Core.Packets;
  9. using MQTTnet.Core.Protocol;
  10. namespace MQTTnet.Core.Server
  11. {
  12. public sealed class MqttClientSessionsManager
  13. {
  14. private readonly object _syncRoot = new object();
  15. private readonly Dictionary<string, MqttClientSession> _clientSessions = new Dictionary<string, MqttClientSession>();
  16. private readonly MqttServerOptions _options;
  17. public MqttClientSessionsManager(MqttServerOptions options)
  18. {
  19. _options = options ?? throw new ArgumentNullException(nameof(options));
  20. }
  21. public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs)
  22. {
  23. try
  24. {
  25. var connectPacket = await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout) as MqttConnectPacket;
  26. if (connectPacket == null)
  27. {
  28. throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
  29. }
  30. var connectReturnCode = ValidateConnection(connectPacket);
  31. if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
  32. {
  33. await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket
  34. {
  35. ConnectReturnCode = connectReturnCode
  36. }, _options.DefaultCommunicationTimeout);
  37. return;
  38. }
  39. var clientSession = GetOrCreateClientSession(connectPacket);
  40. await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket
  41. {
  42. ConnectReturnCode = connectReturnCode,
  43. IsSessionPresent = clientSession.IsExistingSession
  44. }, _options.DefaultCommunicationTimeout);
  45. await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter);
  46. }
  47. catch (Exception exception)
  48. {
  49. MqttTrace.Error(nameof(MqttServer), exception, exception.Message);
  50. }
  51. finally
  52. {
  53. await eventArgs.ClientAdapter.DisconnectAsync();
  54. }
  55. }
  56. public void Clear()
  57. {
  58. lock (_syncRoot)
  59. {
  60. _clientSessions.Clear();
  61. }
  62. }
  63. public IList<string> GetConnectedClients()
  64. {
  65. lock (_syncRoot)
  66. {
  67. return _clientSessions.Where(s => s.Value.IsConnected).Select(s => s.Key).ToList();
  68. }
  69. }
  70. private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
  71. {
  72. if (_options.ConnectionValidator != null)
  73. {
  74. return _options.ConnectionValidator(connectPacket);
  75. }
  76. return MqttConnectReturnCode.ConnectionAccepted;
  77. }
  78. private GetOrCreateClientSessionResult GetOrCreateClientSession(MqttConnectPacket connectPacket)
  79. {
  80. lock (_syncRoot)
  81. {
  82. MqttClientSession clientSession;
  83. var isSessionPresent = _clientSessions.TryGetValue(connectPacket.ClientId, out clientSession);
  84. if (isSessionPresent)
  85. {
  86. if (connectPacket.CleanSession)
  87. {
  88. _clientSessions.Remove(connectPacket.ClientId);
  89. clientSession.Dispose();
  90. clientSession = null;
  91. MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Disposed existing session of client '{connectPacket.ClientId}'.");
  92. }
  93. else
  94. {
  95. MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Reusing existing session of client '{connectPacket.ClientId}'.");
  96. }
  97. }
  98. var isExistingSession = true;
  99. if (clientSession == null)
  100. {
  101. isExistingSession = false;
  102. clientSession = new MqttClientSession(_options, DispatchPublishPacket);
  103. _clientSessions[connectPacket.ClientId] = clientSession;
  104. MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Created a new session for client '{connectPacket.ClientId}'.");
  105. }
  106. return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession };
  107. }
  108. }
  109. private void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
  110. {
  111. foreach (var clientSession in _clientSessions.Values.ToList())
  112. {
  113. clientSession.EnqueuePublishPacket(senderClientSession, publishPacket);
  114. }
  115. }
  116. }
  117. }