You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

WebSocketStream.cs 2.8 KiB

7 年之前
7 年之前
7 年之前
7 年之前
7 年之前
7 年之前
7 年之前
7 年之前
7 年之前
7 年之前
7 年之前
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. using System;
  2. using System.IO;
  3. using System.Net.WebSockets;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Core.Exceptions;
  7. namespace MQTTnet.Implementations
  8. {
  9. public class WebSocketStream : Stream
  10. {
  11. private readonly WebSocket _webSocket;
  12. public WebSocketStream(WebSocket webSocket)
  13. {
  14. _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
  15. }
  16. public override bool CanRead => true;
  17. public override bool CanSeek => false;
  18. public override bool CanWrite => true;
  19. public override long Length => throw new NotSupportedException();
  20. public override long Position
  21. {
  22. get => throw new NotSupportedException();
  23. set => throw new NotSupportedException();
  24. }
  25. public override void Flush()
  26. {
  27. }
  28. public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  29. {
  30. var currentOffset = offset;
  31. var targetOffset = offset + count;
  32. while (_webSocket.State == WebSocketState.Open && currentOffset < targetOffset)
  33. {
  34. var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false);
  35. currentOffset += response.Count;
  36. count -= response.Count;
  37. if (response.MessageType == WebSocketMessageType.Close)
  38. {
  39. await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false);
  40. }
  41. }
  42. if (_webSocket.State == WebSocketState.Closed)
  43. {
  44. throw new MqttCommunicationException( "connection closed" );
  45. }
  46. return currentOffset - offset;
  47. }
  48. public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  49. {
  50. return _webSocket.SendAsync(new ArraySegment<byte>(buffer, offset, count), WebSocketMessageType.Binary, true, cancellationToken);
  51. }
  52. public override int Read(byte[] buffer, int offset, int count)
  53. {
  54. return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
  55. }
  56. public override void Write(byte[] buffer, int offset, int count)
  57. {
  58. WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
  59. }
  60. public override long Seek(long offset, SeekOrigin origin)
  61. {
  62. throw new NotSupportedException();
  63. }
  64. public override void SetLength(long value)
  65. {
  66. throw new NotSupportedException();
  67. }
  68. }
  69. }