using System; using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Exceptions; namespace MQTTnet.Implementations { public class WebSocketStream : Stream { private readonly WebSocket _webSocket; public WebSocketStream(WebSocket webSocket) { _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket)); } public override void Flush() { } public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { var currentOffset = offset; var targetOffset = offset + count; while (_webSocket.State == WebSocketState.Open && currentOffset < targetOffset) { var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); currentOffset += response.Count; count -= response.Count; if (response.MessageType == WebSocketMessageType.Close) { await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); } } if (_webSocket.State == WebSocketState.Closed) { throw new MqttCommunicationException("connection closed"); } return currentOffset - offset; } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { return _webSocket.SendAsync(new ArraySegment(buffer, offset, count), WebSocketMessageType.Binary, true, cancellationToken); } public override int Read(byte[] buffer, int offset, int count) { return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); } public override void Write(byte[] buffer, int offset, int count) { WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); } public override bool CanRead => true; public override bool CanSeek => false; public override bool CanWrite => true; public override long Length => throw new NotSupportedException(); public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } } }