瀏覽代碼

use streams

release/3.x.x
Eggers Jan 7 年之前
父節點
當前提交
16a184a92f
共有 15 個檔案被更改,包括 330 行新增436 行删除
  1. +6
    -3
      Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs
  2. +2
    -39
      Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
  3. +6
    -59
      Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
  4. +80
    -0
      Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs
  5. +1
    -0
      Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
  6. +2
    -1
      Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
  7. +3
    -39
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  8. +6
    -61
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  9. +80
    -0
      Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs
  10. +0
    -77
      MQTTnet.Core/Channel/BufferedCommunicationChannel.cs
  11. +3
    -10
      MQTTnet.Core/Channel/IMqttCommunicationChannel.cs
  12. +1
    -0
      MQTTnet.Core/MQTTnet.Core.csproj
  13. +5
    -23
      MQTTnet.Core/Serializer/MqttPacketReader.cs
  14. +133
    -108
      MQTTnet.Core/Serializer/MqttPacketSerializer.cs
  15. +2
    -16
      Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs

+ 6
- 3
Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs 查看文件

@@ -87,7 +87,9 @@ namespace MQTTnet.Implementations
try
{
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false);
var clientAdapter = new MqttChannelCommunicationAdapter(new BufferedCommunicationChannel(new MqttTcpChannel(clientSocket, null)), new MqttPacketSerializer());

var tcpChannel = new MqttTcpChannel(clientSocket, null);
var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
}
catch (Exception exception) when (!(exception is ObjectDisposedException))
@@ -110,8 +112,9 @@ namespace MQTTnet.Implementations

var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());

var tcpChannel = new MqttTcpChannel(clientSocket, sslStream);
var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
}
catch (Exception exception)


+ 2
- 39
Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs 查看文件

@@ -17,6 +17,8 @@ namespace MQTTnet.Implementations
private Socket _socket;
private SslStream _sslStream;

public Stream Stream => _dataStream;

/// <summary>
/// called on client sockets are created in connect
/// </summary>
@@ -79,45 +81,6 @@ namespace MQTTnet.Implementations
}
}

public async Task WriteAsync(byte[] buffer)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));

try
{
await _dataStream.WriteAsync(buffer, 0, buffer.Length);
}
catch (SocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{
try
{
var totalBytes = 0;

do
{
var read = await _dataStream.ReadAsync(buffer, totalBytes, length - totalBytes).ConfigureAwait(false);
if (read == 0)
{
throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting));
}

totalBytes += read;
}
while (totalBytes < length);
return new ArraySegment<byte>(buffer, 0, length);
}
catch (SocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public void Dispose()
{
_socket?.Dispose();


+ 6
- 59
Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs 查看文件

@@ -2,6 +2,7 @@
using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;
using System;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
@@ -11,8 +12,8 @@ namespace MQTTnet.Implementations
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private ClientWebSocket _webSocket = new ClientWebSocket();
private int WebSocketBufferSize;
private int WebSocketBufferOffset;
public Stream Stream { get; private set; }

public async Task ConnectAsync(MqttClientOptions options)
{
@@ -22,6 +23,8 @@ namespace MQTTnet.Implementations
{
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);

Stream = new WebSocketStream(_webSocket);
}
catch (WebSocketException exception)
{
@@ -31,6 +34,7 @@ namespace MQTTnet.Implementations

public Task DisconnectAsync()
{
Stream = null;
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}

@@ -38,62 +42,5 @@ namespace MQTTnet.Implementations
{
_webSocket?.Dispose();
}

public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{
await ReadToBufferAsync(length, buffer).ConfigureAwait(false);
var result = new ArraySegment<byte>(buffer, WebSocketBufferOffset, length);
WebSocketBufferSize -= length;
WebSocketBufferOffset += length;
return result;
}

private async Task ReadToBufferAsync(int length, byte[] buffer)
{
if (WebSocketBufferSize > 0)
{
return;
}

var offset = 0;
while (_webSocket.State == WebSocketState.Open && WebSocketBufferSize < length)
{
WebSocketReceiveResult response;
do
{
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, offset, buffer.Length - offset), CancellationToken.None).ConfigureAwait(false);
offset += response.Count;
} while (!response.EndOfMessage);

WebSocketBufferSize = response.Count;
if (response.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
}
}
}

public async Task WriteAsync(byte[] buffer)
{
if (buffer == null) {
throw new ArgumentNullException(nameof(buffer));
}

try
{
await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true, CancellationToken.None);
}
catch (WebSocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public int Peek()
{
return WebSocketBufferSize;
}
}
}

+ 80
- 0
Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs 查看文件

@@ -0,0 +1,80 @@
using System;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public class WebSocketStream : Stream
{
private readonly ClientWebSocket _webSocket;

public WebSocketStream( ClientWebSocket webSocket )
{
_webSocket = webSocket;
}

public override void Flush()
{
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var currentOffset = offset;
var targetOffset = offset + count;
while (_webSocket.State == WebSocketState.Open && currentOffset < targetOffset)
{
var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false);
currentOffset += response.Count;

if ( response.MessageType == WebSocketMessageType.Close )
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false);
}
}

return currentOffset - offset;
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return _webSocket.SendAsync(new ArraySegment<byte>( buffer, offset, count ), WebSocketMessageType.Binary, true, cancellationToken);
}

public override int Read( byte[] buffer, int offset, int count )
{
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;

public override long Length
{
get { throw new NotSupportedException(); }
}

public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}

public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}

public override void SetLength(long value)
{
throw new NotSupportedException();
}
}
}

+ 1
- 0
Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj 查看文件

@@ -101,6 +101,7 @@
</ItemGroup>
<ItemGroup>
<Compile Include="Implementations\MqttWebSocketChannel.cs" />
<Compile Include="Implementations\WebSocketStream.cs" />
<Compile Include="MqttClientFactory.cs" />
<Compile Include="MqttServerFactory.cs" />
<Compile Include="Implementations\MqttServerAdapter.cs" />


+ 2
- 1
Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs 查看文件

@@ -22,7 +22,8 @@ namespace MQTTnet
{
case MqttConnectionType.Tcp:
case MqttConnectionType.Tls:
return new BufferedCommunicationChannel( new MqttTcpChannel() );
var tcp = new MqttTcpChannel();
return tcp;
case MqttConnectionType.Ws:
case MqttConnectionType.Wss:
return new MqttWebSocketChannel();


+ 3
- 39
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs 查看文件

@@ -17,6 +17,9 @@ namespace MQTTnet.Implementations
private Socket _socket;
private SslStream _sslStream;


public Stream Stream => _dataStream;

/// <summary>
/// called on client sockets are created in connect
/// </summary>
@@ -78,45 +81,6 @@ namespace MQTTnet.Implementations
}
}

public async Task WriteAsync(byte[] buffer)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));

try
{
await _dataStream.WriteAsync(buffer, 0, buffer.Length);
}
catch (SocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{
try
{
var totalBytes = 0;

do
{
var read = await _dataStream.ReadAsync(buffer, totalBytes, length - totalBytes).ConfigureAwait(false);
if (read == 0)
{
throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting));
}

totalBytes += read;
}
while (totalBytes < length);
return new ArraySegment<byte>(buffer, 0, length);
}
catch (SocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public void Dispose()
{
_socket?.Dispose();


+ 6
- 61
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs 查看文件

@@ -2,6 +2,7 @@
using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;
using System;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
@@ -11,8 +12,8 @@ namespace MQTTnet.Implementations
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private ClientWebSocket _webSocket = new ClientWebSocket();
private int _bufferSize;
private int _bufferOffset;
public Stream Stream { get; private set; }

public async Task ConnectAsync(MqttClientOptions options)
{
@@ -22,6 +23,8 @@ namespace MQTTnet.Implementations
{
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);

Stream = new WebSocketStream(_webSocket);
}
catch (WebSocketException exception)
{
@@ -31,6 +34,7 @@ namespace MQTTnet.Implementations

public Task DisconnectAsync()
{
Stream = null;
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}

@@ -38,64 +42,5 @@ namespace MQTTnet.Implementations
{
_webSocket?.Dispose();
}

public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{
await ReadToBufferAsync(length, buffer).ConfigureAwait(false);

var result = new ArraySegment<byte>(buffer, _bufferOffset, length);
_bufferSize -= length;
_bufferOffset += length;

return result;
}

private async Task ReadToBufferAsync(int length, byte[] buffer)
{
if (_bufferSize > 0)
{
return;
}

var offset = 0;
while (_webSocket.State == WebSocketState.Open && _bufferSize < length)
{
WebSocketReceiveResult response;
do
{
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, offset, buffer.Length - offset), CancellationToken.None).ConfigureAwait(false);
offset += response.Count;
} while (!response.EndOfMessage);

_bufferSize = response.Count;

if (response.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
}
}
}

public async Task WriteAsync(byte[] buffer)
{
if (buffer == null)
{
throw new ArgumentNullException(nameof(buffer));
}

try
{
await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true, CancellationToken.None);
}
catch (WebSocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public int Peek()
{
return _bufferSize;
}
}
}

+ 80
- 0
Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs 查看文件

@@ -0,0 +1,80 @@
using System;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public class WebSocketStream : Stream
{
private readonly ClientWebSocket _webSocket;

public WebSocketStream(ClientWebSocket webSocket)
{
_webSocket = webSocket;
}

public override void Flush()
{
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var currentOffset = offset;
var targetOffset = offset + count;
while ( _webSocket.State == WebSocketState.Open && currentOffset < targetOffset )
{
var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false);
currentOffset += response.Count;

if ( response.MessageType == WebSocketMessageType.Close )
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false);
}
}

return currentOffset - offset;
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return _webSocket.SendAsync(new ArraySegment<byte>( buffer, offset, count ), WebSocketMessageType.Binary, true, cancellationToken);
}

public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync( buffer, offset, count ).GetAwaiter().GetResult();
}

public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;

public override long Length
{
get { throw new NotSupportedException(); }
}

public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}

public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}

public override void SetLength(long value)
{
throw new NotSupportedException();
}
}
}

+ 0
- 77
MQTTnet.Core/Channel/BufferedCommunicationChannel.cs 查看文件

@@ -1,77 +0,0 @@
using System.Threading.Tasks;
using MQTTnet.Core.Client;
using System;

namespace MQTTnet.Core.Channel
{
public class BufferedCommunicationChannel : IMqttCommunicationChannel
{
private readonly IMqttCommunicationChannel _inner;
private int _bufferSize;
private int _bufferOffset;

public BufferedCommunicationChannel(IMqttCommunicationChannel inner)
{
_inner = inner;
}

public Task ConnectAsync(MqttClientOptions options)
{
return _inner.ConnectAsync(options);
}

public Task DisconnectAsync()
{
return _inner.DisconnectAsync();
}

public int Peek()
{
return _inner.Peek();
}

public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{
//read from buffer
if (_bufferSize > 0)
{
return ReadFomBuffer(length, buffer);
}

var available = _inner.Peek();
// if there are less or equal bytes available then requested then just read em
if (available <= length)
{
return await _inner.ReadAsync(length, buffer);
}

//if more bytes are available than requested do buffer them to reduce calls to network buffers
await WriteToBuffer(available, buffer).ConfigureAwait(false);
return ReadFomBuffer(length, buffer);
}

private async Task WriteToBuffer(int available, byte[] buffer)
{
await _inner.ReadAsync(available, buffer).ConfigureAwait(false);
_bufferSize = available;
_bufferOffset = 0;
}

private ArraySegment<byte> ReadFomBuffer(int length, byte[] buffer)
{
var result = new ArraySegment<byte>(buffer, _bufferOffset, length);
_bufferSize -= length;
_bufferOffset += length;

if (_bufferSize < 0)
{
}
return result;
}

public Task WriteAsync(byte[] buffer)
{
return _inner.WriteAsync(buffer);
}
}
}

+ 3
- 10
MQTTnet.Core/Channel/IMqttCommunicationChannel.cs 查看文件

@@ -1,6 +1,6 @@
using System.Threading.Tasks;
using MQTTnet.Core.Client;
using System;
using System.IO;

namespace MQTTnet.Core.Channel
{
@@ -9,14 +9,7 @@ namespace MQTTnet.Core.Channel
Task ConnectAsync(MqttClientOptions options);

Task DisconnectAsync();

Task WriteAsync(byte[] buffer);

/// <summary>
/// get the currently available number of bytes without reading them
/// </summary>
int Peek();

Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer);
Stream Stream { get; }
}
}

+ 1
- 0
MQTTnet.Core/MQTTnet.Core.csproj 查看文件

@@ -5,6 +5,7 @@
<AssemblyName>MQTTnet.Core</AssemblyName>
<RootNamespace>MQTTnet.Core</RootNamespace>
<GeneratePackageOnBuild>False</GeneratePackageOnBuild>
<DebugType>Full</DebugType>
<Description></Description>
<Product></Product>
<Company></Company>


+ 5
- 23
MQTTnet.Core/Serializer/MqttPacketReader.cs 查看文件

@@ -49,13 +49,13 @@ namespace MQTTnet.Core.Serializer
return ReadBytes(_header.BodyLength - (int)BaseStream.Position);
}

public static async Task<MqttPacketHeader> ReadHeaderFromSourceAsync(IMqttCommunicationChannel source, byte[] buffer)
public static MqttPacketHeader ReadHeaderFromSource(IMqttCommunicationChannel source)
{
var fixedHeader = await ReadStreamByteAsync(source, buffer).ConfigureAwait(false);
var fixedHeader = (byte)source.Stream.ReadByte();
var byteReader = new ByteReader(fixedHeader);
byteReader.Read(4);
var controlPacketType = (MqttControlPacketType)byteReader.Read(4);
var bodyLength = await ReadBodyLengthFromSourceAsync(source, buffer).ConfigureAwait(false);
var bodyLength = ReadBodyLengthFromSource(source);

return new MqttPacketHeader()
{
@@ -64,26 +64,8 @@ namespace MQTTnet.Core.Serializer
BodyLength = bodyLength
};
}

private static async Task<byte> ReadStreamByteAsync(IMqttCommunicationChannel source, byte[] readBuffer)
{
var result = await ReadFromSourceAsync(source, 1, readBuffer).ConfigureAwait(false);
return result.Array[result.Offset];
}

public static async Task<ArraySegment<byte>> ReadFromSourceAsync(IMqttCommunicationChannel source, int length, byte[] buffer)
{
try
{
return await source.ReadAsync(length, buffer);
}
catch (Exception exception)
{
throw new MqttCommunicationException(exception);
}
}
private static async Task<int> ReadBodyLengthFromSourceAsync(IMqttCommunicationChannel source, byte[] buffer)
private static int ReadBodyLengthFromSource(IMqttCommunicationChannel source)
{
// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var multiplier = 1;
@@ -91,7 +73,7 @@ namespace MQTTnet.Core.Serializer
byte encodedByte;
do
{
encodedByte = await ReadStreamByteAsync(source, buffer).ConfigureAwait(false);
encodedByte = (byte)source.Stream.ReadByte();
value += (encodedByte & 127) * multiplier;
multiplier *= 128;
if (multiplier > 128 * 128 * 128)


+ 133
- 108
MQTTnet.Core/Serializer/MqttPacketSerializer.cs 查看文件

@@ -17,7 +17,7 @@ namespace MQTTnet.Core.Serializer
private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs");

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
private byte[] _readBuffer = new byte[BufferConstants.Size];
private Task _sendTask = Task.FromResult( 0 );

public async Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination)
{
@@ -32,12 +32,21 @@ namespace MQTTnet.Core.Serializer

var body = stream.ToArray();
MqttPacketWriter.BuildLengthHeader(body.Length, header);
await destination.WriteAsync(header.ToArray()).ConfigureAwait(false);
await destination.WriteAsync(body).ConfigureAwait(false);
var headerArray = header.ToArray();
var writeBuffer = new byte[header.Count + body.Length];
Buffer.BlockCopy( headerArray, 0, writeBuffer, 0, headerArray.Length );
Buffer.BlockCopy( body, 0, writeBuffer, headerArray.Length, body.Length );

_sendTask = Send( writeBuffer, destination );
}
}

private async Task Send(byte[] buffer, IMqttCommunicationChannel destination )
{
await _sendTask.ConfigureAwait( false );
await destination.Stream.WriteAsync( buffer, 0, buffer.Length ).ConfigureAwait( false );
}

private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter writer)
{
if (packet is MqttConnectPacket connectPacket)
@@ -116,117 +125,127 @@ namespace MQTTnet.Core.Serializer
public async Task<MqttBasePacket> DeserializeAsync(IMqttCommunicationChannel source)
{
if (source == null) throw new ArgumentNullException(nameof(source));
var header = MqttPacketReader.ReadHeaderFromSource(source);
MemoryStream body = null;
if ( header.BodyLength > 0 )
{
var totalRead = 0;
var readBuffer = new byte[header.BodyLength];
do
{
var read = await source.Stream.ReadAsync( readBuffer, totalRead, header.BodyLength - totalRead ).ConfigureAwait(false);
totalRead += read;
} while ( totalRead < header.BodyLength );
body = new MemoryStream( readBuffer, 0, header.BodyLength );
}
else
{
body = new MemoryStream();
}

var header = await MqttPacketReader.ReadHeaderFromSourceAsync(source, _readBuffer).ConfigureAwait(false);

var body = await GetBody(source, header).ConfigureAwait(false);

using (var mqttPacketReader = new MqttPacketReader(body, header))
using (var reader = new MqttPacketReader(body, header))
{
switch (header.ControlPacketType)
{
case MqttControlPacketType.Connect:
{
return DeserializeConnect(mqttPacketReader);
}

case MqttControlPacketType.ConnAck:
{
return DeserializeConnAck(mqttPacketReader);
}

case MqttControlPacketType.Disconnect:
{
return new MqttDisconnectPacket();
}

case MqttControlPacketType.Publish:
{
return DeserializePublish(mqttPacketReader, header);
}

case MqttControlPacketType.PubAck:
{
return new MqttPubAckPacket
{
PacketIdentifier = mqttPacketReader.ReadUInt16()
};
}

case MqttControlPacketType.PubRec:
{
return new MqttPubRecPacket
{
PacketIdentifier = mqttPacketReader.ReadUInt16()
};
}

case MqttControlPacketType.PubRel:
{
return new MqttPubRelPacket
{
PacketIdentifier = mqttPacketReader.ReadUInt16()
};
}

case MqttControlPacketType.PubComp:
{
return new MqttPubCompPacket
{
PacketIdentifier = mqttPacketReader.ReadUInt16()
};
}

case MqttControlPacketType.PingReq:
{
return new MqttPingReqPacket();
}

case MqttControlPacketType.PingResp:
{
return new MqttPingRespPacket();
}

case MqttControlPacketType.Subscribe:
{
return DeserializeSubscribe(mqttPacketReader);
}

case MqttControlPacketType.SubAck:
{
return DeserializeSubAck(mqttPacketReader);
}

case MqttControlPacketType.Unsubscibe:
{
return DeserializeUnsubscribe(mqttPacketReader);
}

case MqttControlPacketType.UnsubAck:
{
return new MqttUnsubAckPacket
{
PacketIdentifier = mqttPacketReader.ReadUInt16()
};
}

default:
{
throw new MqttProtocolViolationException($"Packet type ({(int)header.ControlPacketType}) not supported.");
}
}
return Deserialize( header, reader );
}
}

private async Task<MemoryStream> GetBody(IMqttCommunicationChannel source, MqttPacketHeader header)
private static MqttBasePacket Deserialize(MqttPacketHeader header, MqttPacketReader reader)
{
if (header.BodyLength > 0)
switch (header.ControlPacketType)
{
var segment = await MqttPacketReader.ReadFromSourceAsync(source, header.BodyLength, _readBuffer).ConfigureAwait(false);
return new MemoryStream(segment.Array, segment.Offset, segment.Count);
}
case MqttControlPacketType.Connect:
{
return DeserializeConnect( reader );
}

case MqttControlPacketType.ConnAck:
{
return DeserializeConnAck( reader );
}

case MqttControlPacketType.Disconnect:
{
return new MqttDisconnectPacket();
}

case MqttControlPacketType.Publish:
{
return DeserializePublish( reader, header );
}

case MqttControlPacketType.PubAck:
{
return new MqttPubAckPacket
{
PacketIdentifier = reader.ReadUInt16()
};
}

case MqttControlPacketType.PubRec:
{
return new MqttPubRecPacket
{
PacketIdentifier = reader.ReadUInt16()
};
}

return new MemoryStream();
case MqttControlPacketType.PubRel:
{
return new MqttPubRelPacket
{
PacketIdentifier = reader.ReadUInt16()
};
}

case MqttControlPacketType.PubComp:
{
return new MqttPubCompPacket
{
PacketIdentifier = reader.ReadUInt16()
};
}

case MqttControlPacketType.PingReq:
{
return new MqttPingReqPacket();
}

case MqttControlPacketType.PingResp:
{
return new MqttPingRespPacket();
}

case MqttControlPacketType.Subscribe:
{
return DeserializeSubscribe( reader );
}

case MqttControlPacketType.SubAck:
{
return DeserializeSubAck( reader );
}

case MqttControlPacketType.Unsubscibe:
{
return DeserializeUnsubscribe( reader );
}

case MqttControlPacketType.UnsubAck:
{
return new MqttUnsubAckPacket
{
PacketIdentifier = reader.ReadUInt16()
};
}

default:
{
throw new MqttProtocolViolationException(
$"Packet type ({(int) header.ControlPacketType}) not supported." );
}
}
}

private static MqttBasePacket DeserializeUnsubscribe(MqttPacketReader reader)
@@ -268,7 +287,13 @@ namespace MQTTnet.Core.Serializer
var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2);
var dup = fixedHeader.Read();

var topic = reader.ReadStringWithLengthPrefix();

var length = reader.ReadUInt16();
if (length != 5)
{
}
var topic = Encoding.UTF8.GetString( reader.ReadBytes( length ), 0, length );

ushort packetIdentifier = 0;
if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)


+ 2
- 16
Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs 查看文件

@@ -391,6 +391,8 @@ namespace MQTTnet.Core.Tests
{
private readonly MemoryStream _stream = new MemoryStream();

public Stream Stream => _stream;

public bool IsConnected { get; } = true;

public TestChannel()
@@ -413,26 +415,10 @@ namespace MQTTnet.Core.Tests
return Task.FromResult(0);
}

public Task WriteAsync(byte[] buffer)
{
return _stream.WriteAsync(buffer, 0, buffer.Length);
}

public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{
await _stream.ReadAsync(buffer, 0, length);
return new ArraySegment<byte>(buffer, 0, length);
}

public byte[] ToArray()
{
return _stream.ToArray();
}

public int Peek()
{
return (int)_stream.Length - (int)_stream.Position;
}
}

private static void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)


Loading…
取消
儲存