You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

72 line
2.8 KiB

  1. using System;
  2. using System.Threading.Tasks;
  3. using MQTTnet.Core.Adapter;
  4. using MQTTnet.Core.Server;
  5. using Windows.Networking.Sockets;
  6. using Microsoft.Extensions.Logging;
  7. namespace MQTTnet.Implementations
  8. {
  9. public class MqttServerAdapter : IMqttServerAdapter, IDisposable
  10. {
  11. private readonly ILogger<MqttServerAdapter> _logger;
  12. private readonly IMqttCommunicationAdapterFactory _mqttCommunicationAdapterFactory;
  13. private StreamSocketListener _defaultEndpointSocket;
  14. public MqttServerAdapter(ILogger<MqttServerAdapter> logger, IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory)
  15. {
  16. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  17. _mqttCommunicationAdapterFactory = mqttCommunicationAdapterFactory ?? throw new ArgumentNullException(nameof(mqttCommunicationAdapterFactory));
  18. }
  19. public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;
  20. public async Task StartAsync(MqttServerOptions options)
  21. {
  22. if (options == null) throw new ArgumentNullException(nameof(options));
  23. if (_defaultEndpointSocket != null) throw new InvalidOperationException("Server is already started.");
  24. if (options.DefaultEndpointOptions.IsEnabled)
  25. {
  26. _defaultEndpointSocket = new StreamSocketListener();
  27. _defaultEndpointSocket.Control.NoDelay = true;
  28. await _defaultEndpointSocket.BindServiceNameAsync(options.GetDefaultEndpointPort().ToString(), SocketProtectionLevel.PlainSocket);
  29. _defaultEndpointSocket.ConnectionReceived += AcceptDefaultEndpointConnectionsAsync;
  30. }
  31. if (options.TlsEndpointOptions.IsEnabled)
  32. {
  33. throw new NotSupportedException("TLS servers are not supported for UWP apps.");
  34. }
  35. }
  36. public Task StopAsync()
  37. {
  38. _defaultEndpointSocket?.Dispose();
  39. _defaultEndpointSocket = null;
  40. return Task.FromResult(0);
  41. }
  42. public void Dispose()
  43. {
  44. StopAsync();
  45. }
  46. private void AcceptDefaultEndpointConnectionsAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args)
  47. {
  48. try
  49. {
  50. args.Socket.Control.NoDelay = true;
  51. var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerMqttCommunicationAdapter(new MqttTcpChannel(args.Socket));
  52. ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
  53. }
  54. catch (Exception exception)
  55. {
  56. _logger.LogError(new EventId(), exception, "Error while accepting connection at default endpoint.");
  57. }
  58. }
  59. }
  60. }