From 3193381b693402d564ba9d23c13ab0addbbd0f4a Mon Sep 17 00:00:00 2001 From: Christian Date: Sun, 25 Mar 2018 14:42:12 +0200 Subject: [PATCH] Refactor web socket stream. Add mqtt as default sub protocol. --- .../Client/MqttClientWebSocketOptions.cs | 2 +- .../Implementations/WebSocketStream.cs | 36 +++++++++++-------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClientWebSocketOptions.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClientWebSocketOptions.cs index 1819daa..114283d 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClientWebSocketOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClientWebSocketOptions.cs @@ -9,7 +9,7 @@ namespace MQTTnet.Client public IDictionary RequestHeaders { get; set; } - public ICollection SubProtocols { get; set; } + public ICollection SubProtocols { get; set; } = new List { "mqtt" }; public CookieContainer CookieContainer { get; set; } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs index 1aa02a6..bad0393 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs @@ -11,9 +11,9 @@ namespace MQTTnet.Implementations { public class WebSocketStream : Stream { - private readonly WebSocket _webSocket; - private readonly byte[] _chunkBuffer = new byte[MqttWebSocketChannel.BufferSize]; + private readonly byte[] _chunckBuffer = new byte[MqttWebSocketChannel.BufferSize]; private readonly Queue _buffer = new Queue(MqttWebSocketChannel.BufferSize); + private readonly WebSocket _webSocket; public WebSocketStream(WebSocket webSocket) { @@ -55,25 +55,28 @@ namespace MQTTnet.Implementations // Use existing date from buffer. while (count > 0 && _buffer.Any()) { - buffer[offset++] = _buffer.Dequeue(); + buffer[offset] = _buffer.Dequeue(); count--; bytesRead++; + offset++; } if (count == 0) { return bytesRead; } - + + // Fetch new data if the buffer is not full. while (_webSocket.State == WebSocketState.Open) { - await FetchChunkAsync(cancellationToken); + await FetchChunkAsync(cancellationToken).ConfigureAwait(false); while (count > 0 && _buffer.Any()) { - buffer[offset++] = _buffer.Dequeue(); + buffer[offset] = _buffer.Dequeue(); count--; bytesRead++; + offset++; } if (count == 0) @@ -111,19 +114,24 @@ namespace MQTTnet.Implementations } private async Task FetchChunkAsync(CancellationToken cancellationToken) - { - var response = await _webSocket.ReceiveAsync(new ArraySegment(_chunkBuffer, 0, _chunkBuffer.Length), cancellationToken).ConfigureAwait(false); - - for (var i = 0; i < response.Count; i++) - { - var @byte = _chunkBuffer[i]; - _buffer.Enqueue(@byte); - } + { + var response = await _webSocket.ReceiveAsync(new ArraySegment(_chunckBuffer, 0, _chunckBuffer.Length), cancellationToken).ConfigureAwait(false); if (response.MessageType == WebSocketMessageType.Close) { await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); } + else if (response.MessageType == WebSocketMessageType.Binary) + { + for (var i = 0; i < response.Count; i++) + { + _buffer.Enqueue(_chunckBuffer[i]); + } + } + else if (response.MessageType == WebSocketMessageType.Text) + { + throw new MqttProtocolViolationException("WebSocket channel received TEXT message."); + } } } }