diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index c982ed4..5a89ac7 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -11,8 +11,8 @@ namespace MQTTnet.Implementations public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { private ClientWebSocket _webSocket = new ClientWebSocket(); - private int WebSocketBufferSize; - private int WebSocketBufferOffset; + private int _bufferSize; + private int _bufferOffset; public async Task ConnectAsync(MqttClientOptions options) { @@ -43,22 +43,22 @@ namespace MQTTnet.Implementations { await ReadToBufferAsync(length, buffer).ConfigureAwait(false); - var result = new ArraySegment(buffer, WebSocketBufferOffset, length); - WebSocketBufferSize -= length; - WebSocketBufferOffset += length; + var result = new ArraySegment(buffer, _bufferOffset, length); + _bufferSize -= length; + _bufferOffset += length; return result; } private async Task ReadToBufferAsync(int length, byte[] buffer) { - if (WebSocketBufferSize > 0) + if (_bufferSize > 0) { return; } var offset = 0; - while (_webSocket.State == WebSocketState.Open && WebSocketBufferSize < length) + while (_webSocket.State == WebSocketState.Open && _bufferSize < length) { WebSocketReceiveResult response; do @@ -67,7 +67,8 @@ namespace MQTTnet.Implementations offset += response.Count; } while (!response.EndOfMessage); - WebSocketBufferSize = response.Count; + _bufferSize = response.Count; + if (response.MessageType == WebSocketMessageType.Close) { await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false); @@ -94,7 +95,7 @@ namespace MQTTnet.Implementations public int Peek() { - return WebSocketBufferSize; + return _bufferSize; } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs index 4f5aa98..eeae7a6 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs @@ -89,7 +89,7 @@ namespace MQTTnet.Implementations public int Peek() { - + return 0; } public async Task> ReadAsync(int length, byte[] buffer) diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs index b3d1f95..5a89ac7 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs @@ -11,11 +11,8 @@ namespace MQTTnet.Implementations public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { private ClientWebSocket _webSocket = new ClientWebSocket(); - private const int BufferSize = 4096; - private const int BufferAmplifier = 20; - private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier]; - private int WebSocketBufferSize; - private int WebSocketBufferOffset; + private int _bufferSize; + private int _bufferOffset; public async Task ConnectAsync(MqttClientOptions options) { @@ -42,50 +39,40 @@ namespace MQTTnet.Implementations _webSocket?.Dispose(); } - public Task ReadAsync(byte[] buffer) + public async Task> ReadAsync(int length, byte[] buffer) { - return Task.WhenAll(ReadToBufferAsync(buffer)); + await ReadToBufferAsync(length, buffer).ConfigureAwait(false); + + var result = new ArraySegment(buffer, _bufferOffset, length); + _bufferSize -= length; + _bufferOffset += length; + + return result; } - private async Task ReadToBufferAsync(byte[] buffer) + private async Task ReadToBufferAsync(int length, byte[] buffer) { - var temporaryBuffer = new byte[BufferSize]; - var offset = 0; + if (_bufferSize > 0) + { + return; + } - while (_webSocket.State == WebSocketState.Open) + var offset = 0; + while (_webSocket.State == WebSocketState.Open && _bufferSize < length) { - if (WebSocketBufferSize == 0) + WebSocketReceiveResult response; + do { - WebSocketBufferOffset = 0; - - WebSocketReceiveResult response; - do - { - response = await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None); + response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, offset, buffer.Length - offset), CancellationToken.None).ConfigureAwait(false); + offset += response.Count; + } while (!response.EndOfMessage); - temporaryBuffer.CopyTo(WebSocketBuffer, offset); - offset += response.Count; - temporaryBuffer = new byte[BufferSize]; - } while (!response.EndOfMessage); + _bufferSize = response.Count; - WebSocketBufferSize = response.Count; - if (response.MessageType == WebSocketMessageType.Close) - { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); - } - - Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length); - WebSocketBufferSize -= buffer.Length; - WebSocketBufferOffset += buffer.Length; - } - else + if (response.MessageType == WebSocketMessageType.Close) { - Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length); - WebSocketBufferSize -= buffer.Length; - WebSocketBufferOffset += buffer.Length; + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false); } - - return; } } @@ -105,5 +92,10 @@ namespace MQTTnet.Implementations throw new MqttCommunicationException(exception); } } + + public int Peek() + { + return _bufferSize; + } } } \ No newline at end of file diff --git a/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs b/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs index 34ee586..757f9c2 100644 --- a/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs +++ b/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs @@ -6,9 +6,9 @@ namespace MQTTnet.Core.Channel { public class BufferedCommunicationChannel : IMqttCommunicationChannel { - private IMqttCommunicationChannel _inner { get; } - private int _bufferSize = 0; - private int _bufferOffset = 0; + private readonly IMqttCommunicationChannel _inner; + private int _bufferSize; + private int _bufferOffset; public BufferedCommunicationChannel(IMqttCommunicationChannel inner) {