From cb09bac246209a8045c6a50808f0f923de4a9e99 Mon Sep 17 00:00:00 2001 From: Christian Date: Wed, 18 Apr 2018 23:10:00 +0200 Subject: [PATCH] Refactor the WebSocket channel and remove the internal queue. --- .../Client/MqttClientWebSocketOptions.cs | 2 + .../Implementations/MqttWebSocketChannel.cs | 80 ++++++++----------- 2 files changed, 37 insertions(+), 45 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClientWebSocketOptions.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClientWebSocketOptions.cs index 114283d..9718298 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClientWebSocketOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClientWebSocketOptions.cs @@ -13,6 +13,8 @@ namespace MQTTnet.Client public CookieContainer CookieContainer { get; set; } + public int BufferSize { get; set; } = 4096; + public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); } } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index 4a9fe65..9688ced 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -1,6 +1,4 @@ using System; -using System.Collections.Generic; -using System.Linq; using System.Net.WebSockets; using System.Security.Cryptography.X509Certificates; using System.Threading; @@ -13,19 +11,19 @@ namespace MQTTnet.Implementations { public sealed class MqttWebSocketChannel : IMqttChannel { - // ReSharper disable once MemberCanBePrivate.Global - // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global - public static int BufferSize { get; set; } = 4096; // Can be changed for fine tuning by library user. - - private readonly byte[] _chunckBuffer = new byte[BufferSize]; - private readonly Queue _buffer = new Queue(BufferSize); private readonly MqttClientWebSocketOptions _options; + private readonly byte[] _chunkBuffer; + + private int _chunkBufferLength; + private int _chunkBufferOffset; private WebSocket _webSocket; public MqttWebSocketChannel(MqttClientWebSocketOptions options) { _options = options ?? throw new ArgumentNullException(nameof(options)); + + _chunkBuffer = new byte[options.BufferSize]; } public MqttWebSocketChannel(WebSocket webSocket) @@ -49,13 +47,13 @@ namespace MQTTnet.Implementations } var clientWebSocket = new ClientWebSocket(); - + if (_options.RequestHeaders != null) { foreach (var requestHeader in _options.RequestHeaders) { clientWebSocket.Options.SetRequestHeader(requestHeader.Key, requestHeader.Value); - } + } } if (_options.SubProtocols != null) @@ -103,28 +101,18 @@ namespace MQTTnet.Implementations { var bytesRead = 0; - // Use existing date from buffer. - while (count > 0 && _buffer.Any()) - { - buffer[offset] = _buffer.Dequeue(); - count--; - bytesRead++; - offset++; - } - - if (count == 0) - { - return bytesRead; - } - - // Fetch new data if the buffer is not full. while (_webSocket.State == WebSocketState.Open) { - await FetchChunkAsync(cancellationToken).ConfigureAwait(false); + await EnsureFilledChunkBufferAsync(cancellationToken).ConfigureAwait(false); + if (_chunkBufferLength == 0) + { + return 0; + } - while (count > 0 && _buffer.Any()) + while (count > 0 && _chunkBufferOffset < _chunkBufferLength) { - buffer[offset] = _buffer.Dequeue(); + buffer[offset] = _chunkBuffer[_chunkBufferOffset]; + _chunkBufferOffset++; count--; bytesRead++; offset++; @@ -135,12 +123,7 @@ namespace MQTTnet.Implementations return bytesRead; } } - - if (_webSocket.State == WebSocketState.Closed) - { - throw new MqttCommunicationException("WebSocket connection closed."); - } - + return bytesRead; } @@ -161,25 +144,32 @@ namespace MQTTnet.Implementations finally { _webSocket = null; - } + } } - private async Task FetchChunkAsync(CancellationToken cancellationToken) + private async Task EnsureFilledChunkBufferAsync(CancellationToken cancellationToken) { - var response = await _webSocket.ReceiveAsync(new ArraySegment(_chunckBuffer, 0, _chunckBuffer.Length), cancellationToken).ConfigureAwait(false); + if (_chunkBufferOffset < _chunkBufferLength) + { + return; + } - if (response.MessageType == WebSocketMessageType.Close) + if (_webSocket.State == WebSocketState.Closed) { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); + throw new MqttCommunicationException("WebSocket connection closed."); } - else if (response.MessageType == WebSocketMessageType.Binary) + + var response = await _webSocket.ReceiveAsync(new ArraySegment(_chunkBuffer, 0, _chunkBuffer.Length), cancellationToken).ConfigureAwait(false); + _chunkBufferLength = response.Count; + _chunkBufferOffset = 0; + + if (response.MessageType == WebSocketMessageType.Close) { - for (var i = 0; i < response.Count; i++) - { - _buffer.Enqueue(_chunckBuffer[i]); - } + throw new MqttCommunicationException("The WebSocket server closed the connection."); + //await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); } - else if (response.MessageType == WebSocketMessageType.Text) + + if (response.MessageType == WebSocketMessageType.Text) { throw new MqttProtocolViolationException("WebSocket channel received TEXT message."); }