You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

82 rivejä
2.8 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Adapter;
  6. using MQTTnet.Core.Diagnostics;
  7. namespace MQTTnet.Core.Server
  8. {
  9. public sealed class MqttServer
  10. {
  11. private readonly MqttClientSessionsManager _clientSessionsManager;
  12. private readonly ICollection<IMqttServerAdapter> _adapters;
  13. private readonly MqttServerOptions _options;
  14. private CancellationTokenSource _cancellationTokenSource;
  15. public MqttServer(MqttServerOptions options, ICollection<IMqttServerAdapter> adapters)
  16. {
  17. _options = options ?? throw new ArgumentNullException(nameof(options));
  18. _adapters = adapters ?? throw new ArgumentNullException(nameof(adapters));
  19. _clientSessionsManager = new MqttClientSessionsManager(options);
  20. }
  21. public IList<string> GetConnectedClients()
  22. {
  23. return _clientSessionsManager.GetConnectedClients();
  24. }
  25. public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
  26. public void InjectClient(string identifier, IMqttCommunicationAdapter adapter)
  27. {
  28. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  29. if (_cancellationTokenSource == null) throw new InvalidOperationException("The MQTT server is not started.");
  30. OnClientConnected(this, new MqttClientConnectedEventArgs(identifier, adapter));
  31. }
  32. public void Start()
  33. {
  34. if (_cancellationTokenSource != null) throw new InvalidOperationException("The MQTT server is already started.");
  35. _cancellationTokenSource = new CancellationTokenSource();
  36. foreach (var adapter in _adapters)
  37. {
  38. adapter.ClientConnected += OnClientConnected;
  39. adapter.Start(_options);
  40. }
  41. MqttTrace.Information(nameof(MqttServer), "Started.");
  42. }
  43. public void Stop()
  44. {
  45. _cancellationTokenSource?.Cancel();
  46. _cancellationTokenSource = null;
  47. foreach (var adapter in _adapters)
  48. {
  49. adapter.ClientConnected -= OnClientConnected;
  50. adapter.Stop();
  51. }
  52. _clientSessionsManager.Clear();
  53. MqttTrace.Information(nameof(MqttServer), "Stopped.");
  54. }
  55. private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs)
  56. {
  57. MqttTrace.Information(nameof(MqttServer), $"Client '{eventArgs.Identifier}': Connected.");
  58. ClientConnected?.Invoke(this, eventArgs);
  59. Task.Run(() => _clientSessionsManager.RunClientSessionAsync(eventArgs), _cancellationTokenSource.Token);
  60. }
  61. }
  62. }