Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 
 

116 строки
3.7 KiB

  1. using System;
  2. using System.Net.Security;
  3. using System.Net.Sockets;
  4. using System.Security.Authentication;
  5. using System.Security.Cryptography.X509Certificates;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Core.Channel;
  8. using MQTTnet.Core.Client;
  9. using System.IO;
  10. namespace MQTTnet.Implementations
  11. {
  12. public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
  13. {
  14. // ReSharper disable once MemberCanBePrivate.Global
  15. // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global
  16. public static int BufferSize { get; set; } = 4096 * 20; // Can be changed for fine tuning by library user.
  17. private Socket _socket;
  18. private SslStream _sslStream;
  19. /// <summary>
  20. /// called on client sockets are created in connect
  21. /// </summary>
  22. public MqttTcpChannel()
  23. {
  24. }
  25. /// <summary>
  26. /// called on server, sockets are passed in
  27. /// connect will not be called
  28. /// </summary>
  29. public MqttTcpChannel(Socket socket, SslStream sslStream)
  30. {
  31. _socket = socket ?? throw new ArgumentNullException(nameof(socket));
  32. _sslStream = sslStream;
  33. CreateStreams(socket, sslStream);
  34. }
  35. public Stream SendStream { get; private set; }
  36. public Stream ReceiveStream { get; private set; }
  37. public Stream RawReceiveStream { get; private set; }
  38. public async Task ConnectAsync(MqttClientOptions options)
  39. {
  40. if (options == null) throw new ArgumentNullException(nameof(options));
  41. if (_socket == null)
  42. {
  43. _socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
  44. }
  45. await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null).ConfigureAwait(false);
  46. if (options.TlsOptions.UseTls)
  47. {
  48. _sslStream = new SslStream(new NetworkStream(_socket, true));
  49. await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
  50. }
  51. CreateStreams(_socket, _sslStream);
  52. }
  53. public Task DisconnectAsync()
  54. {
  55. Dispose();
  56. return Task.FromResult(0);
  57. }
  58. public void Dispose()
  59. {
  60. RawReceiveStream?.Dispose();
  61. RawReceiveStream = null;
  62. ReceiveStream?.Dispose();
  63. ReceiveStream = null;
  64. SendStream?.Dispose();
  65. SendStream = null;
  66. _socket?.Dispose();
  67. _socket = null;
  68. _sslStream?.Dispose();
  69. _sslStream = null;
  70. }
  71. private void CreateStreams(Socket socket, Stream sslStream)
  72. {
  73. RawReceiveStream = sslStream ?? new NetworkStream(socket);
  74. //cannot use this as default buffering prevents from receiving the first connect message
  75. //need two streams otherwise read and write have to be synchronized
  76. SendStream = new BufferedStream(RawReceiveStream, BufferSize);
  77. ReceiveStream = new BufferedStream(RawReceiveStream, BufferSize);
  78. }
  79. private static X509CertificateCollection LoadCertificates(MqttClientOptions options)
  80. {
  81. var certificates = new X509CertificateCollection();
  82. if (options.TlsOptions.Certificates == null)
  83. {
  84. return certificates;
  85. }
  86. foreach (var certificate in options.TlsOptions.Certificates)
  87. {
  88. certificates.Add(new X509Certificate(certificate));
  89. }
  90. return certificates;
  91. }
  92. }
  93. }