using System; using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; namespace MQTTnet.Implementations { public class WebSocketStream : Stream { private readonly ClientWebSocket _webSocket; public WebSocketStream(ClientWebSocket webSocket) { _webSocket = webSocket; } public override void Flush() { } public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { var currentOffset = offset; var targetOffset = offset + count; while ( _webSocket.State == WebSocketState.Open && currentOffset < targetOffset ) { var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); currentOffset += response.Count; if ( response.MessageType == WebSocketMessageType.Close ) { await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); } } return currentOffset - offset; } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { return _webSocket.SendAsync(new ArraySegment( buffer, offset, count ), WebSocketMessageType.Binary, true, cancellationToken); } public override int Read(byte[] buffer, int offset, int count) { return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); } public override void Write(byte[] buffer, int offset, int count) { WriteAsync( buffer, offset, count ).GetAwaiter().GetResult(); } public override bool CanRead => true; public override bool CanSeek => false; public override bool CanWrite => true; public override long Length { get { throw new NotSupportedException(); } } public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } } }