diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs
index 47dfb45..a5279e0 100644
--- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs
+++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs
@@ -87,7 +87,9 @@ namespace MQTTnet.Implementations
try
{
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false);
- var clientAdapter = new MqttChannelCommunicationAdapter(new BufferedCommunicationChannel(new MqttTcpChannel(clientSocket, null)), new MqttPacketSerializer());
+
+ var tcpChannel = new MqttTcpChannel(clientSocket, null);
+ var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
}
catch (Exception exception) when (!(exception is ObjectDisposedException))
@@ -110,8 +112,9 @@ namespace MQTTnet.Implementations
var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false);
-
- var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());
+
+ var tcpChannel = new MqttTcpChannel(clientSocket, sslStream);
+ var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
}
catch (Exception exception)
diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
index 0ecf373..dede268 100644
--- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
+++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
@@ -17,6 +17,8 @@ namespace MQTTnet.Implementations
private Socket _socket;
private SslStream _sslStream;
+ public Stream Stream => _dataStream;
+
///
/// called on client sockets are created in connect
///
@@ -79,45 +81,6 @@ namespace MQTTnet.Implementations
}
}
- public async Task WriteAsync(byte[] buffer)
- {
- if (buffer == null) throw new ArgumentNullException(nameof(buffer));
-
- try
- {
- await _dataStream.WriteAsync(buffer, 0, buffer.Length);
- }
- catch (SocketException exception)
- {
- throw new MqttCommunicationException(exception);
- }
- }
-
- public async Task> ReadAsync(int length, byte[] buffer)
- {
- try
- {
- var totalBytes = 0;
-
- do
- {
- var read = await _dataStream.ReadAsync(buffer, totalBytes, length - totalBytes).ConfigureAwait(false);
- if (read == 0)
- {
- throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting));
- }
-
- totalBytes += read;
- }
- while (totalBytes < length);
- return new ArraySegment(buffer, 0, length);
- }
- catch (SocketException exception)
- {
- throw new MqttCommunicationException(exception);
- }
- }
-
public void Dispose()
{
_socket?.Dispose();
diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
index 925c818..c3aed02 100644
--- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
+++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
@@ -2,6 +2,7 @@
using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;
using System;
+using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
@@ -11,8 +12,8 @@ namespace MQTTnet.Implementations
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private ClientWebSocket _webSocket = new ClientWebSocket();
- private int WebSocketBufferSize;
- private int WebSocketBufferOffset;
+
+ public Stream Stream { get; private set; }
public async Task ConnectAsync(MqttClientOptions options)
{
@@ -22,6 +23,8 @@ namespace MQTTnet.Implementations
{
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
+
+ Stream = new WebSocketStream(_webSocket);
}
catch (WebSocketException exception)
{
@@ -31,6 +34,7 @@ namespace MQTTnet.Implementations
public Task DisconnectAsync()
{
+ Stream = null;
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}
@@ -38,62 +42,5 @@ namespace MQTTnet.Implementations
{
_webSocket?.Dispose();
}
-
- public async Task> ReadAsync(int length, byte[] buffer)
- {
- await ReadToBufferAsync(length, buffer).ConfigureAwait(false);
-
- var result = new ArraySegment(buffer, WebSocketBufferOffset, length);
- WebSocketBufferSize -= length;
- WebSocketBufferOffset += length;
-
- return result;
- }
-
- private async Task ReadToBufferAsync(int length, byte[] buffer)
- {
- if (WebSocketBufferSize > 0)
- {
- return;
- }
-
- var offset = 0;
- while (_webSocket.State == WebSocketState.Open && WebSocketBufferSize < length)
- {
- WebSocketReceiveResult response;
- do
- {
- response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, offset, buffer.Length - offset), CancellationToken.None).ConfigureAwait(false);
- offset += response.Count;
- } while (!response.EndOfMessage);
-
- WebSocketBufferSize = response.Count;
- if (response.MessageType == WebSocketMessageType.Close)
- {
- await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
- }
- }
- }
-
- public async Task WriteAsync(byte[] buffer)
- {
- if (buffer == null) {
- throw new ArgumentNullException(nameof(buffer));
- }
-
- try
- {
- await _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, CancellationToken.None);
- }
- catch (WebSocketException exception)
- {
- throw new MqttCommunicationException(exception);
- }
- }
-
- public int Peek()
- {
- return WebSocketBufferSize;
- }
}
}
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs
new file mode 100644
index 0000000..4a89a92
--- /dev/null
+++ b/Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs
@@ -0,0 +1,80 @@
+using System;
+using System.IO;
+using System.Net.WebSockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace MQTTnet.Implementations
+{
+ public class WebSocketStream : Stream
+ {
+ private readonly ClientWebSocket _webSocket;
+
+ public WebSocketStream( ClientWebSocket webSocket )
+ {
+ _webSocket = webSocket;
+ }
+
+ public override void Flush()
+ {
+ }
+
+ public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ var currentOffset = offset;
+ var targetOffset = offset + count;
+ while (_webSocket.State == WebSocketState.Open && currentOffset < targetOffset)
+ {
+ var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false);
+ currentOffset += response.Count;
+
+ if ( response.MessageType == WebSocketMessageType.Close )
+ {
+ await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ return currentOffset - offset;
+ }
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ return _webSocket.SendAsync(new ArraySegment( buffer, offset, count ), WebSocketMessageType.Binary, true, cancellationToken);
+ }
+
+ public override int Read( byte[] buffer, int offset, int count )
+ {
+ return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
+ }
+
+ public override bool CanRead => true;
+ public override bool CanSeek => false;
+ public override bool CanWrite => true;
+
+ public override long Length
+ {
+ get { throw new NotSupportedException(); }
+ }
+
+ public override long Position
+ {
+ get { throw new NotSupportedException(); }
+ set { throw new NotSupportedException(); }
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new NotSupportedException();
+ }
+ }
+}
diff --git a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
index 7578afe..3f2519f 100644
--- a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
+++ b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
@@ -101,6 +101,7 @@
+
diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
index b3e0080..b057dde 100644
--- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
+++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
@@ -22,7 +22,8 @@ namespace MQTTnet
{
case MqttConnectionType.Tcp:
case MqttConnectionType.Tls:
- return new BufferedCommunicationChannel( new MqttTcpChannel() );
+ var tcp = new MqttTcpChannel();
+ return tcp;
case MqttConnectionType.Ws:
case MqttConnectionType.Wss:
return new MqttWebSocketChannel();
diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
index bddd79d..817ab44 100644
--- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
+++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
@@ -17,6 +17,9 @@ namespace MQTTnet.Implementations
private Socket _socket;
private SslStream _sslStream;
+
+ public Stream Stream => _dataStream;
+
///
/// called on client sockets are created in connect
///
@@ -78,45 +81,6 @@ namespace MQTTnet.Implementations
}
}
- public async Task WriteAsync(byte[] buffer)
- {
- if (buffer == null) throw new ArgumentNullException(nameof(buffer));
-
- try
- {
- await _dataStream.WriteAsync(buffer, 0, buffer.Length);
- }
- catch (SocketException exception)
- {
- throw new MqttCommunicationException(exception);
- }
- }
-
- public async Task> ReadAsync(int length, byte[] buffer)
- {
- try
- {
- var totalBytes = 0;
-
- do
- {
- var read = await _dataStream.ReadAsync(buffer, totalBytes, length - totalBytes).ConfigureAwait(false);
- if (read == 0)
- {
- throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting));
- }
-
- totalBytes += read;
- }
- while (totalBytes < length);
- return new ArraySegment(buffer, 0, length);
- }
- catch (SocketException exception)
- {
- throw new MqttCommunicationException(exception);
- }
- }
-
public void Dispose()
{
_socket?.Dispose();
diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
index 5a89ac7..2e0bdf8 100644
--- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
+++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
@@ -2,6 +2,7 @@
using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;
using System;
+using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
@@ -11,8 +12,8 @@ namespace MQTTnet.Implementations
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private ClientWebSocket _webSocket = new ClientWebSocket();
- private int _bufferSize;
- private int _bufferOffset;
+
+ public Stream Stream { get; private set; }
public async Task ConnectAsync(MqttClientOptions options)
{
@@ -22,6 +23,8 @@ namespace MQTTnet.Implementations
{
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
+
+ Stream = new WebSocketStream(_webSocket);
}
catch (WebSocketException exception)
{
@@ -31,6 +34,7 @@ namespace MQTTnet.Implementations
public Task DisconnectAsync()
{
+ Stream = null;
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}
@@ -38,64 +42,5 @@ namespace MQTTnet.Implementations
{
_webSocket?.Dispose();
}
-
- public async Task> ReadAsync(int length, byte[] buffer)
- {
- await ReadToBufferAsync(length, buffer).ConfigureAwait(false);
-
- var result = new ArraySegment(buffer, _bufferOffset, length);
- _bufferSize -= length;
- _bufferOffset += length;
-
- return result;
- }
-
- private async Task ReadToBufferAsync(int length, byte[] buffer)
- {
- if (_bufferSize > 0)
- {
- return;
- }
-
- var offset = 0;
- while (_webSocket.State == WebSocketState.Open && _bufferSize < length)
- {
- WebSocketReceiveResult response;
- do
- {
- response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, offset, buffer.Length - offset), CancellationToken.None).ConfigureAwait(false);
- offset += response.Count;
- } while (!response.EndOfMessage);
-
- _bufferSize = response.Count;
-
- if (response.MessageType == WebSocketMessageType.Close)
- {
- await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
- }
- }
- }
-
- public async Task WriteAsync(byte[] buffer)
- {
- if (buffer == null)
- {
- throw new ArgumentNullException(nameof(buffer));
- }
-
- try
- {
- await _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, CancellationToken.None);
- }
- catch (WebSocketException exception)
- {
- throw new MqttCommunicationException(exception);
- }
- }
-
- public int Peek()
- {
- return _bufferSize;
- }
}
}
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs
new file mode 100644
index 0000000..0b74692
--- /dev/null
+++ b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs
@@ -0,0 +1,80 @@
+using System;
+using System.IO;
+using System.Net.WebSockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace MQTTnet.Implementations
+{
+ public class WebSocketStream : Stream
+ {
+ private readonly ClientWebSocket _webSocket;
+
+ public WebSocketStream(ClientWebSocket webSocket)
+ {
+ _webSocket = webSocket;
+ }
+
+ public override void Flush()
+ {
+ }
+
+ public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ var currentOffset = offset;
+ var targetOffset = offset + count;
+ while ( _webSocket.State == WebSocketState.Open && currentOffset < targetOffset )
+ {
+ var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false);
+ currentOffset += response.Count;
+
+ if ( response.MessageType == WebSocketMessageType.Close )
+ {
+ await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ return currentOffset - offset;
+ }
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ return _webSocket.SendAsync(new ArraySegment( buffer, offset, count ), WebSocketMessageType.Binary, true, cancellationToken);
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ WriteAsync( buffer, offset, count ).GetAwaiter().GetResult();
+ }
+
+ public override bool CanRead => true;
+ public override bool CanSeek => false;
+ public override bool CanWrite => true;
+
+ public override long Length
+ {
+ get { throw new NotSupportedException(); }
+ }
+
+ public override long Position
+ {
+ get { throw new NotSupportedException(); }
+ set { throw new NotSupportedException(); }
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new NotSupportedException();
+ }
+ }
+}
diff --git a/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs b/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs
deleted file mode 100644
index 757f9c2..0000000
--- a/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs
+++ /dev/null
@@ -1,77 +0,0 @@
-using System.Threading.Tasks;
-using MQTTnet.Core.Client;
-using System;
-
-namespace MQTTnet.Core.Channel
-{
- public class BufferedCommunicationChannel : IMqttCommunicationChannel
- {
- private readonly IMqttCommunicationChannel _inner;
- private int _bufferSize;
- private int _bufferOffset;
-
- public BufferedCommunicationChannel(IMqttCommunicationChannel inner)
- {
- _inner = inner;
- }
-
- public Task ConnectAsync(MqttClientOptions options)
- {
- return _inner.ConnectAsync(options);
- }
-
- public Task DisconnectAsync()
- {
- return _inner.DisconnectAsync();
- }
-
- public int Peek()
- {
- return _inner.Peek();
- }
-
- public async Task> ReadAsync(int length, byte[] buffer)
- {
- //read from buffer
- if (_bufferSize > 0)
- {
- return ReadFomBuffer(length, buffer);
- }
-
- var available = _inner.Peek();
- // if there are less or equal bytes available then requested then just read em
- if (available <= length)
- {
- return await _inner.ReadAsync(length, buffer);
- }
-
- //if more bytes are available than requested do buffer them to reduce calls to network buffers
- await WriteToBuffer(available, buffer).ConfigureAwait(false);
- return ReadFomBuffer(length, buffer);
- }
-
- private async Task WriteToBuffer(int available, byte[] buffer)
- {
- await _inner.ReadAsync(available, buffer).ConfigureAwait(false);
- _bufferSize = available;
- _bufferOffset = 0;
- }
-
- private ArraySegment ReadFomBuffer(int length, byte[] buffer)
- {
- var result = new ArraySegment(buffer, _bufferOffset, length);
- _bufferSize -= length;
- _bufferOffset += length;
-
- if (_bufferSize < 0)
- {
- }
- return result;
- }
-
- public Task WriteAsync(byte[] buffer)
- {
- return _inner.WriteAsync(buffer);
- }
- }
-}
diff --git a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs
index 0f6ea4b..6fb72f3 100644
--- a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs
+++ b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs
@@ -1,6 +1,6 @@
using System.Threading.Tasks;
using MQTTnet.Core.Client;
-using System;
+using System.IO;
namespace MQTTnet.Core.Channel
{
@@ -9,14 +9,7 @@ namespace MQTTnet.Core.Channel
Task ConnectAsync(MqttClientOptions options);
Task DisconnectAsync();
-
- Task WriteAsync(byte[] buffer);
-
- ///
- /// get the currently available number of bytes without reading them
- ///
- int Peek();
-
- Task> ReadAsync(int length, byte[] buffer);
+
+ Stream Stream { get; }
}
}
diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj
index 480b4c7..797bb85 100644
--- a/MQTTnet.Core/MQTTnet.Core.csproj
+++ b/MQTTnet.Core/MQTTnet.Core.csproj
@@ -5,6 +5,7 @@
MQTTnet.Core
MQTTnet.Core
False
+ Full
diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs
index 38d3c7c..ea479c2 100644
--- a/MQTTnet.Core/Serializer/MqttPacketReader.cs
+++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs
@@ -49,13 +49,13 @@ namespace MQTTnet.Core.Serializer
return ReadBytes(_header.BodyLength - (int)BaseStream.Position);
}
- public static async Task ReadHeaderFromSourceAsync(IMqttCommunicationChannel source, byte[] buffer)
+ public static MqttPacketHeader ReadHeaderFromSource(IMqttCommunicationChannel source)
{
- var fixedHeader = await ReadStreamByteAsync(source, buffer).ConfigureAwait(false);
+ var fixedHeader = (byte)source.Stream.ReadByte();
var byteReader = new ByteReader(fixedHeader);
byteReader.Read(4);
var controlPacketType = (MqttControlPacketType)byteReader.Read(4);
- var bodyLength = await ReadBodyLengthFromSourceAsync(source, buffer).ConfigureAwait(false);
+ var bodyLength = ReadBodyLengthFromSource(source);
return new MqttPacketHeader()
{
@@ -64,26 +64,8 @@ namespace MQTTnet.Core.Serializer
BodyLength = bodyLength
};
}
-
- private static async Task ReadStreamByteAsync(IMqttCommunicationChannel source, byte[] readBuffer)
- {
- var result = await ReadFromSourceAsync(source, 1, readBuffer).ConfigureAwait(false);
- return result.Array[result.Offset];
- }
-
- public static async Task> ReadFromSourceAsync(IMqttCommunicationChannel source, int length, byte[] buffer)
- {
- try
- {
- return await source.ReadAsync(length, buffer);
- }
- catch (Exception exception)
- {
- throw new MqttCommunicationException(exception);
- }
- }
- private static async Task ReadBodyLengthFromSourceAsync(IMqttCommunicationChannel source, byte[] buffer)
+ private static int ReadBodyLengthFromSource(IMqttCommunicationChannel source)
{
// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var multiplier = 1;
@@ -91,7 +73,7 @@ namespace MQTTnet.Core.Serializer
byte encodedByte;
do
{
- encodedByte = await ReadStreamByteAsync(source, buffer).ConfigureAwait(false);
+ encodedByte = (byte)source.Stream.ReadByte();
value += (encodedByte & 127) * multiplier;
multiplier *= 128;
if (multiplier > 128 * 128 * 128)
diff --git a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs
index 064b79b..6f2c9ff 100644
--- a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs
+++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs
@@ -17,7 +17,7 @@ namespace MQTTnet.Core.Serializer
private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs");
public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
- private byte[] _readBuffer = new byte[BufferConstants.Size];
+ private Task _sendTask = Task.FromResult( 0 );
public async Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination)
{
@@ -32,12 +32,21 @@ namespace MQTTnet.Core.Serializer
var body = stream.ToArray();
MqttPacketWriter.BuildLengthHeader(body.Length, header);
-
- await destination.WriteAsync(header.ToArray()).ConfigureAwait(false);
- await destination.WriteAsync(body).ConfigureAwait(false);
+ var headerArray = header.ToArray();
+ var writeBuffer = new byte[header.Count + body.Length];
+ Buffer.BlockCopy( headerArray, 0, writeBuffer, 0, headerArray.Length );
+ Buffer.BlockCopy( body, 0, writeBuffer, headerArray.Length, body.Length );
+
+ _sendTask = Send( writeBuffer, destination );
}
}
+ private async Task Send(byte[] buffer, IMqttCommunicationChannel destination )
+ {
+ await _sendTask.ConfigureAwait( false );
+ await destination.Stream.WriteAsync( buffer, 0, buffer.Length ).ConfigureAwait( false );
+ }
+
private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter writer)
{
if (packet is MqttConnectPacket connectPacket)
@@ -116,117 +125,127 @@ namespace MQTTnet.Core.Serializer
public async Task DeserializeAsync(IMqttCommunicationChannel source)
{
if (source == null) throw new ArgumentNullException(nameof(source));
+
+ var header = MqttPacketReader.ReadHeaderFromSource(source);
+
+ MemoryStream body = null;
+ if ( header.BodyLength > 0 )
+ {
+ var totalRead = 0;
+ var readBuffer = new byte[header.BodyLength];
+ do
+ {
+ var read = await source.Stream.ReadAsync( readBuffer, totalRead, header.BodyLength - totalRead ).ConfigureAwait(false);
+ totalRead += read;
+ } while ( totalRead < header.BodyLength );
+ body = new MemoryStream( readBuffer, 0, header.BodyLength );
+ }
+ else
+ {
+ body = new MemoryStream();
+ }
- var header = await MqttPacketReader.ReadHeaderFromSourceAsync(source, _readBuffer).ConfigureAwait(false);
-
- var body = await GetBody(source, header).ConfigureAwait(false);
-
- using (var mqttPacketReader = new MqttPacketReader(body, header))
+ using (var reader = new MqttPacketReader(body, header))
{
- switch (header.ControlPacketType)
- {
- case MqttControlPacketType.Connect:
- {
- return DeserializeConnect(mqttPacketReader);
- }
-
- case MqttControlPacketType.ConnAck:
- {
- return DeserializeConnAck(mqttPacketReader);
- }
-
- case MqttControlPacketType.Disconnect:
- {
- return new MqttDisconnectPacket();
- }
-
- case MqttControlPacketType.Publish:
- {
- return DeserializePublish(mqttPacketReader, header);
- }
-
- case MqttControlPacketType.PubAck:
- {
- return new MqttPubAckPacket
- {
- PacketIdentifier = mqttPacketReader.ReadUInt16()
- };
- }
-
- case MqttControlPacketType.PubRec:
- {
- return new MqttPubRecPacket
- {
- PacketIdentifier = mqttPacketReader.ReadUInt16()
- };
- }
-
- case MqttControlPacketType.PubRel:
- {
- return new MqttPubRelPacket
- {
- PacketIdentifier = mqttPacketReader.ReadUInt16()
- };
- }
-
- case MqttControlPacketType.PubComp:
- {
- return new MqttPubCompPacket
- {
- PacketIdentifier = mqttPacketReader.ReadUInt16()
- };
- }
-
- case MqttControlPacketType.PingReq:
- {
- return new MqttPingReqPacket();
- }
-
- case MqttControlPacketType.PingResp:
- {
- return new MqttPingRespPacket();
- }
-
- case MqttControlPacketType.Subscribe:
- {
- return DeserializeSubscribe(mqttPacketReader);
- }
-
- case MqttControlPacketType.SubAck:
- {
- return DeserializeSubAck(mqttPacketReader);
- }
-
- case MqttControlPacketType.Unsubscibe:
- {
- return DeserializeUnsubscribe(mqttPacketReader);
- }
-
- case MqttControlPacketType.UnsubAck:
- {
- return new MqttUnsubAckPacket
- {
- PacketIdentifier = mqttPacketReader.ReadUInt16()
- };
- }
-
- default:
- {
- throw new MqttProtocolViolationException($"Packet type ({(int)header.ControlPacketType}) not supported.");
- }
- }
+ return Deserialize( header, reader );
}
}
- private async Task GetBody(IMqttCommunicationChannel source, MqttPacketHeader header)
+ private static MqttBasePacket Deserialize(MqttPacketHeader header, MqttPacketReader reader)
{
- if (header.BodyLength > 0)
+ switch (header.ControlPacketType)
{
- var segment = await MqttPacketReader.ReadFromSourceAsync(source, header.BodyLength, _readBuffer).ConfigureAwait(false);
- return new MemoryStream(segment.Array, segment.Offset, segment.Count);
- }
+ case MqttControlPacketType.Connect:
+ {
+ return DeserializeConnect( reader );
+ }
+
+ case MqttControlPacketType.ConnAck:
+ {
+ return DeserializeConnAck( reader );
+ }
+
+ case MqttControlPacketType.Disconnect:
+ {
+ return new MqttDisconnectPacket();
+ }
+
+ case MqttControlPacketType.Publish:
+ {
+ return DeserializePublish( reader, header );
+ }
+
+ case MqttControlPacketType.PubAck:
+ {
+ return new MqttPubAckPacket
+ {
+ PacketIdentifier = reader.ReadUInt16()
+ };
+ }
+
+ case MqttControlPacketType.PubRec:
+ {
+ return new MqttPubRecPacket
+ {
+ PacketIdentifier = reader.ReadUInt16()
+ };
+ }
- return new MemoryStream();
+ case MqttControlPacketType.PubRel:
+ {
+ return new MqttPubRelPacket
+ {
+ PacketIdentifier = reader.ReadUInt16()
+ };
+ }
+
+ case MqttControlPacketType.PubComp:
+ {
+ return new MqttPubCompPacket
+ {
+ PacketIdentifier = reader.ReadUInt16()
+ };
+ }
+
+ case MqttControlPacketType.PingReq:
+ {
+ return new MqttPingReqPacket();
+ }
+
+ case MqttControlPacketType.PingResp:
+ {
+ return new MqttPingRespPacket();
+ }
+
+ case MqttControlPacketType.Subscribe:
+ {
+ return DeserializeSubscribe( reader );
+ }
+
+ case MqttControlPacketType.SubAck:
+ {
+ return DeserializeSubAck( reader );
+ }
+
+ case MqttControlPacketType.Unsubscibe:
+ {
+ return DeserializeUnsubscribe( reader );
+ }
+
+ case MqttControlPacketType.UnsubAck:
+ {
+ return new MqttUnsubAckPacket
+ {
+ PacketIdentifier = reader.ReadUInt16()
+ };
+ }
+
+ default:
+ {
+ throw new MqttProtocolViolationException(
+ $"Packet type ({(int) header.ControlPacketType}) not supported." );
+ }
+ }
}
private static MqttBasePacket DeserializeUnsubscribe(MqttPacketReader reader)
@@ -268,7 +287,13 @@ namespace MQTTnet.Core.Serializer
var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2);
var dup = fixedHeader.Read();
- var topic = reader.ReadStringWithLengthPrefix();
+
+ var length = reader.ReadUInt16();
+ if (length != 5)
+ {
+
+ }
+ var topic = Encoding.UTF8.GetString( reader.ReadBytes( length ), 0, length );
ushort packetIdentifier = 0;
if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
index 6a18f50..4f33be3 100644
--- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
@@ -391,6 +391,8 @@ namespace MQTTnet.Core.Tests
{
private readonly MemoryStream _stream = new MemoryStream();
+ public Stream Stream => _stream;
+
public bool IsConnected { get; } = true;
public TestChannel()
@@ -413,26 +415,10 @@ namespace MQTTnet.Core.Tests
return Task.FromResult(0);
}
- public Task WriteAsync(byte[] buffer)
- {
- return _stream.WriteAsync(buffer, 0, buffer.Length);
- }
-
- public async Task> ReadAsync(int length, byte[] buffer)
- {
- await _stream.ReadAsync(buffer, 0, length);
- return new ArraySegment(buffer, 0, length);
- }
-
public byte[] ToArray()
{
return _stream.ToArray();
}
-
- public int Peek()
- {
- return (int)_stream.Length - (int)_stream.Position;
- }
}
private static void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)