You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

WebSocket4NetMqttClientAdapterFactory.cs 4.7 KiB

пре 6 година
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. using MQTTnet.Client;
  2. using System;
  3. using System.Collections.Concurrent;
  4. using System.Linq;
  5. using System.Security.Authentication;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using MQTTnet.Adapter;
  9. using MQTTnet.Channel;
  10. using MQTTnet.Diagnostics;
  11. using MQTTnet.Serializer;
  12. using WebSocket4Net;
  13. namespace MQTTnet.TestApp.NetCore
  14. {
  15. public class WebSocket4NetMqttClientAdapterFactory : IMqttClientAdapterFactory
  16. {
  17. public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger)
  18. {
  19. if (options == null) throw new ArgumentNullException(nameof(options));
  20. if (logger == null) throw new ArgumentNullException(nameof(logger));
  21. if (!(options.ChannelOptions is MqttClientWebSocketOptions))
  22. {
  23. throw new NotSupportedException("Only WebSocket connections are supported.");
  24. }
  25. return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options), new MqttPacketSerializer(), logger);
  26. }
  27. private class WebSocket4NetMqttChannel : IMqttChannel
  28. {
  29. private readonly BlockingCollection<byte> _receiveBuffer = new BlockingCollection<byte>();
  30. private readonly IMqttClientOptions _clientOptions;
  31. private WebSocket4Net.WebSocket _webSocket;
  32. public WebSocket4NetMqttChannel(IMqttClientOptions clientOptions)
  33. {
  34. _clientOptions = clientOptions ?? throw new ArgumentNullException(nameof(clientOptions));
  35. }
  36. public string Endpoint { get; } = "";
  37. public Task ConnectAsync(CancellationToken cancellationToken)
  38. {
  39. var channelOptions = (MqttClientWebSocketOptions)_clientOptions.ChannelOptions;
  40. var uri = "ws://" + channelOptions.Uri;
  41. var sslProtocols = SslProtocols.None;
  42. if (channelOptions.TlsOptions.UseTls)
  43. {
  44. uri = "wss://" + channelOptions.Uri;
  45. sslProtocols = SslProtocols.Tls12;
  46. }
  47. var subProtocol = channelOptions.SubProtocols.FirstOrDefault() ?? string.Empty;
  48. _webSocket = new WebSocket4Net.WebSocket(uri, subProtocol, sslProtocols: sslProtocols);
  49. _webSocket.DataReceived += OnDataReceived;
  50. _webSocket.Open();
  51. SpinWait.SpinUntil(() => _webSocket.State == WebSocketState.Open, _clientOptions.CommunicationTimeout);
  52. return Task.FromResult(0);
  53. }
  54. public Task DisconnectAsync()
  55. {
  56. if (_webSocket != null)
  57. {
  58. _webSocket.DataReceived -= OnDataReceived;
  59. _webSocket.Close();
  60. SpinWait.SpinUntil(() => _webSocket.State == WebSocketState.Closed, _clientOptions.CommunicationTimeout);
  61. }
  62. _webSocket = null;
  63. return Task.FromResult(0);
  64. }
  65. public Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  66. {
  67. var readBytes = 0;
  68. while (count > 0 && !cancellationToken.IsCancellationRequested)
  69. {
  70. byte @byte;
  71. if (readBytes == 0)
  72. {
  73. // Block until at lease one byte was received.
  74. @byte = _receiveBuffer.Take(cancellationToken);
  75. }
  76. else
  77. {
  78. if (!_receiveBuffer.TryTake(out @byte))
  79. {
  80. return Task.FromResult(readBytes);
  81. }
  82. }
  83. buffer[offset] = @byte;
  84. offset++;
  85. count--;
  86. readBytes++;
  87. }
  88. return Task.FromResult(readBytes);
  89. }
  90. public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  91. {
  92. _webSocket.Send(buffer, offset, count);
  93. return Task.FromResult(0);
  94. }
  95. public void Dispose()
  96. {
  97. if (_webSocket != null)
  98. {
  99. _webSocket.DataReceived -= OnDataReceived;
  100. _webSocket.Dispose();
  101. }
  102. }
  103. private void OnDataReceived(object sender, WebSocket4Net.DataReceivedEventArgs e)
  104. {
  105. foreach (var @byte in e.Data)
  106. {
  107. _receiveBuffer.Add(@byte);
  108. }
  109. }
  110. }
  111. }
  112. }