Browse Source

Partially fix UWP implementations

release/3.x.x
Christian Kratky 7 years ago
parent
commit
ff0b022357
4 changed files with 44 additions and 51 deletions
  1. +10
    -9
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  2. +1
    -1
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs
  3. +30
    -38
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs
  4. +3
    -3
      MQTTnet.Core/Channel/BufferedCommunicationChannel.cs

+ 10
- 9
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs View File

@@ -11,8 +11,8 @@ namespace MQTTnet.Implementations
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{ {
private ClientWebSocket _webSocket = new ClientWebSocket(); private ClientWebSocket _webSocket = new ClientWebSocket();
private int WebSocketBufferSize;
private int WebSocketBufferOffset;
private int _bufferSize;
private int _bufferOffset;


public async Task ConnectAsync(MqttClientOptions options) public async Task ConnectAsync(MqttClientOptions options)
{ {
@@ -43,22 +43,22 @@ namespace MQTTnet.Implementations
{ {
await ReadToBufferAsync(length, buffer).ConfigureAwait(false); await ReadToBufferAsync(length, buffer).ConfigureAwait(false);


var result = new ArraySegment<byte>(buffer, WebSocketBufferOffset, length);
WebSocketBufferSize -= length;
WebSocketBufferOffset += length;
var result = new ArraySegment<byte>(buffer, _bufferOffset, length);
_bufferSize -= length;
_bufferOffset += length;


return result; return result;
} }


private async Task ReadToBufferAsync(int length, byte[] buffer) private async Task ReadToBufferAsync(int length, byte[] buffer)
{ {
if (WebSocketBufferSize > 0)
if (_bufferSize > 0)
{ {
return; return;
} }


var offset = 0; var offset = 0;
while (_webSocket.State == WebSocketState.Open && WebSocketBufferSize < length)
while (_webSocket.State == WebSocketState.Open && _bufferSize < length)
{ {
WebSocketReceiveResult response; WebSocketReceiveResult response;
do do
@@ -67,7 +67,8 @@ namespace MQTTnet.Implementations
offset += response.Count; offset += response.Count;
} while (!response.EndOfMessage); } while (!response.EndOfMessage);


WebSocketBufferSize = response.Count;
_bufferSize = response.Count;

if (response.MessageType == WebSocketMessageType.Close) if (response.MessageType == WebSocketMessageType.Close)
{ {
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false); await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
@@ -94,7 +95,7 @@ namespace MQTTnet.Implementations


public int Peek() public int Peek()
{ {
return WebSocketBufferSize;
return _bufferSize;
} }
} }
} }

+ 1
- 1
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs View File

@@ -89,7 +89,7 @@ namespace MQTTnet.Implementations


public int Peek() public int Peek()
{ {
return 0;
} }


public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer) public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)


+ 30
- 38
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs View File

@@ -11,11 +11,8 @@ namespace MQTTnet.Implementations
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{ {
private ClientWebSocket _webSocket = new ClientWebSocket(); private ClientWebSocket _webSocket = new ClientWebSocket();
private const int BufferSize = 4096;
private const int BufferAmplifier = 20;
private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier];
private int WebSocketBufferSize;
private int WebSocketBufferOffset;
private int _bufferSize;
private int _bufferOffset;


public async Task ConnectAsync(MqttClientOptions options) public async Task ConnectAsync(MqttClientOptions options)
{ {
@@ -42,50 +39,40 @@ namespace MQTTnet.Implementations
_webSocket?.Dispose(); _webSocket?.Dispose();
} }


public Task ReadAsync(byte[] buffer)
public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{ {
return Task.WhenAll(ReadToBufferAsync(buffer));
await ReadToBufferAsync(length, buffer).ConfigureAwait(false);

var result = new ArraySegment<byte>(buffer, _bufferOffset, length);
_bufferSize -= length;
_bufferOffset += length;

return result;
} }


private async Task ReadToBufferAsync(byte[] buffer)
private async Task ReadToBufferAsync(int length, byte[] buffer)
{ {
var temporaryBuffer = new byte[BufferSize];
var offset = 0;
if (_bufferSize > 0)
{
return;
}


while (_webSocket.State == WebSocketState.Open)
var offset = 0;
while (_webSocket.State == WebSocketState.Open && _bufferSize < length)
{ {
if (WebSocketBufferSize == 0)
WebSocketReceiveResult response;
do
{ {
WebSocketBufferOffset = 0;

WebSocketReceiveResult response;
do
{
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None);
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, offset, buffer.Length - offset), CancellationToken.None).ConfigureAwait(false);
offset += response.Count;
} while (!response.EndOfMessage);


temporaryBuffer.CopyTo(WebSocketBuffer, offset);
offset += response.Count;
temporaryBuffer = new byte[BufferSize];
} while (!response.EndOfMessage);
_bufferSize = response.Count;


WebSocketBufferSize = response.Count;
if (response.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}

Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length);
WebSocketBufferSize -= buffer.Length;
WebSocketBufferOffset += buffer.Length;
}
else
if (response.MessageType == WebSocketMessageType.Close)
{ {
Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length);
WebSocketBufferSize -= buffer.Length;
WebSocketBufferOffset += buffer.Length;
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
} }

return;
} }
} }


@@ -105,5 +92,10 @@ namespace MQTTnet.Implementations
throw new MqttCommunicationException(exception); throw new MqttCommunicationException(exception);
} }
} }

public int Peek()
{
return _bufferSize;
}
} }
} }

+ 3
- 3
MQTTnet.Core/Channel/BufferedCommunicationChannel.cs View File

@@ -6,9 +6,9 @@ namespace MQTTnet.Core.Channel
{ {
public class BufferedCommunicationChannel : IMqttCommunicationChannel public class BufferedCommunicationChannel : IMqttCommunicationChannel
{ {
private IMqttCommunicationChannel _inner { get; }
private int _bufferSize = 0;
private int _bufferOffset = 0;
private readonly IMqttCommunicationChannel _inner;
private int _bufferSize;
private int _bufferOffset;


public BufferedCommunicationChannel(IMqttCommunicationChannel inner) public BufferedCommunicationChannel(IMqttCommunicationChannel inner)
{ {


Loading…
Cancel
Save