@@ -18,6 +18,7 @@ | |||
* [Server] Added support for other WebSocket sub protocol formats like mqttv-3.1.1 (thanks to @israellot). | |||
* [Server] The takeover of an existing client sessions is now treated as a _clean_ disconnect of the previous client. | |||
* [Server] The pending messages queue per client is now limited to 250 messages. Overflow strategy and count can be changed via options (thanks to @VladimirAkopyan) | |||
* [Server] Keep alive checking is now suspended while large packages are being received (and thus the client is connected). Keep alive checking continues after a large packet is received completely. | |||
</releaseNotes> | |||
<copyright>Copyright Christian Kratky 2016-2018</copyright> | |||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | |||
@@ -12,7 +12,11 @@ namespace MQTTnet.Adapter | |||
string Endpoint { get; } | |||
IMqttPacketSerializer PacketSerializer { get; } | |||
event EventHandler ReadingPacketStarted; | |||
event EventHandler ReadingPacketCompleted; | |||
Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken); | |||
Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken); | |||
@@ -37,6 +37,9 @@ namespace MQTTnet.Adapter | |||
public IMqttPacketSerializer PacketSerializer { get; } | |||
public event EventHandler ReadingPacketStarted; | |||
public event EventHandler ReadingPacketCompleted; | |||
public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) | |||
{ | |||
ThrowIfDisposed(); | |||
@@ -110,7 +113,7 @@ namespace MQTTnet.Adapter | |||
throw new TaskCanceledException(); | |||
} | |||
packet = PacketSerializer.Deserialize(receivedMqttPacket.Header, receivedMqttPacket.Body); | |||
packet = PacketSerializer.Deserialize(receivedMqttPacket); | |||
if (packet == null) | |||
{ | |||
throw new MqttProtocolViolationException("Received malformed packet."); | |||
@@ -127,46 +130,55 @@ namespace MQTTnet.Adapter | |||
return packet; | |||
} | |||
private static async Task<ReceivedMqttPacket> ReceiveAsync(IMqttChannel channel, CancellationToken cancellationToken) | |||
private async Task<ReceivedMqttPacket> ReceiveAsync(IMqttChannel channel, CancellationToken cancellationToken) | |||
{ | |||
var header = await MqttPacketReader.ReadHeaderAsync(channel, cancellationToken).ConfigureAwait(false); | |||
if (header == null) | |||
var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(channel, cancellationToken).ConfigureAwait(false); | |||
if (!fixedHeader.HasValue) | |||
{ | |||
return null; | |||
} | |||
if (header.BodyLength == 0) | |||
{ | |||
return new ReceivedMqttPacket(header, null); | |||
} | |||
var body = new MemoryStream(header.BodyLength); | |||
var buffer = new byte[Math.Min(ReadBufferSize, header.BodyLength)]; | |||
while (body.Length < header.BodyLength) | |||
ReadingPacketStarted?.Invoke(this, EventArgs.Empty); | |||
try | |||
{ | |||
var bytesLeft = header.BodyLength - (int)body.Length; | |||
if (bytesLeft > buffer.Length) | |||
var bodyLength = await MqttPacketReader.ReadBodyLengthAsync(channel, cancellationToken).ConfigureAwait(false); | |||
if (bodyLength == 0) | |||
{ | |||
bytesLeft = buffer.Length; | |||
return new ReceivedMqttPacket(fixedHeader.Value, null); | |||
} | |||
var readBytesCount = await channel.ReadAsync(buffer, 0, bytesLeft, cancellationToken).ConfigureAwait(false); | |||
var body = new MemoryStream(bodyLength); | |||
// Check if the client closed the connection before sending the full body. | |||
if (readBytesCount == 0) | |||
var buffer = new byte[Math.Min(ReadBufferSize, bodyLength)]; | |||
while (body.Length < bodyLength) | |||
{ | |||
throw new MqttCommunicationException("Connection closed while reading remaining packet body."); | |||
} | |||
var bytesLeft = bodyLength - (int)body.Length; | |||
if (bytesLeft > buffer.Length) | |||
{ | |||
bytesLeft = buffer.Length; | |||
} | |||
// Here is no need to await because internally only an array is used and no real I/O operation is made. | |||
// Using async here will only generate overhead. | |||
body.Write(buffer, 0, readBytesCount); | |||
} | |||
var readBytesCount = await channel.ReadAsync(buffer, 0, bytesLeft, cancellationToken).ConfigureAwait(false); | |||
body.Seek(0L, SeekOrigin.Begin); | |||
// Check if the client closed the connection before sending the full body. | |||
if (readBytesCount <= 0) | |||
{ | |||
throw new MqttCommunicationException("Connection closed while reading remaining packet body."); | |||
} | |||
// Here is no need to await because internally only an array is used and no real I/O operation is made. | |||
// Using async here will only generate overhead. | |||
body.Write(buffer, 0, readBytesCount); | |||
} | |||
return new ReceivedMqttPacket(header, body); | |||
body.Seek(0L, SeekOrigin.Begin); | |||
return new ReceivedMqttPacket(fixedHeader.Value, body); | |||
} | |||
finally | |||
{ | |||
ReadingPacketCompleted?.Invoke(this, EventArgs.Empty); | |||
} | |||
} | |||
private static async Task ExecuteAndWrapExceptionAsync(Func<Task> action) | |||
@@ -1,18 +1,17 @@ | |||
using System; | |||
using System.IO; | |||
using MQTTnet.Packets; | |||
namespace MQTTnet.Adapter | |||
{ | |||
public sealed class ReceivedMqttPacket : IDisposable | |||
{ | |||
public ReceivedMqttPacket(MqttPacketHeader header, MemoryStream body) | |||
public ReceivedMqttPacket(byte fixedHeader, MemoryStream body) | |||
{ | |||
Header = header ?? throw new ArgumentNullException(nameof(header)); | |||
FixedHeader = fixedHeader; | |||
Body = body; | |||
} | |||
public MqttPacketHeader Header { get; } | |||
public byte FixedHeader { get; } | |||
public MemoryStream Body { get; } | |||
@@ -1,13 +0,0 @@ | |||
using MQTTnet.Protocol; | |||
namespace MQTTnet.Packets | |||
{ | |||
public class MqttPacketHeader | |||
{ | |||
public MqttControlPacketType ControlPacketType { get; set; } | |||
public byte FixedHeader { get; set; } | |||
public int BodyLength { get; set; } | |||
} | |||
} |
@@ -1,5 +1,5 @@ | |||
using System; | |||
using System.IO; | |||
using MQTTnet.Adapter; | |||
using MQTTnet.Packets; | |||
namespace MQTTnet.Serializer | |||
@@ -10,6 +10,6 @@ namespace MQTTnet.Serializer | |||
ArraySegment<byte> Serialize(MqttBasePacket mqttPacket); | |||
MqttBasePacket Deserialize(MqttPacketHeader header, Stream body); | |||
MqttBasePacket Deserialize(ReceivedMqttPacket receivedMqttPacket); | |||
} | |||
} |
@@ -5,14 +5,12 @@ using System.Threading; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Channel; | |||
using MQTTnet.Exceptions; | |||
using MQTTnet.Packets; | |||
using MQTTnet.Protocol; | |||
namespace MQTTnet.Serializer | |||
{ | |||
public static class MqttPacketReader | |||
{ | |||
public static async Task<MqttPacketHeader> ReadHeaderAsync(IMqttChannel channel, CancellationToken cancellationToken) | |||
public static async Task<byte?> ReadFixedHeaderAsync(IMqttChannel channel, CancellationToken cancellationToken) | |||
{ | |||
if (cancellationToken.IsCancellationRequested) | |||
{ | |||
@@ -29,22 +27,7 @@ namespace MQTTnet.Serializer | |||
return null; | |||
} | |||
var fixedHeader = buffer[0]; | |||
var controlPacketType = fixedHeader >> 4; | |||
if (controlPacketType < 1 || controlPacketType > 14) | |||
{ | |||
throw new MqttProtocolViolationException($"The packet type is invalid ({controlPacketType})."); | |||
} | |||
var bodyLength = await ReadBodyLengthAsync(channel, cancellationToken).ConfigureAwait(false); | |||
return new MqttPacketHeader | |||
{ | |||
FixedHeader = fixedHeader, | |||
ControlPacketType = (MqttControlPacketType)controlPacketType, | |||
BodyLength = bodyLength | |||
}; | |||
return buffer[0]; | |||
} | |||
public static ushort ReadUInt16(this Stream stream) | |||
@@ -80,9 +63,9 @@ namespace MQTTnet.Serializer | |||
return stream.ReadBytes(length); | |||
} | |||
public static byte[] ReadRemainingData(this Stream stream, MqttPacketHeader header) | |||
public static byte[] ReadRemainingData(this Stream stream) | |||
{ | |||
return stream.ReadBytes(header.BodyLength - (int)stream.Position); | |||
return stream.ReadBytes((int)(stream.Length - stream.Position)); | |||
} | |||
public static byte[] ReadBytes(this Stream stream, int count) | |||
@@ -92,7 +75,7 @@ namespace MQTTnet.Serializer | |||
return buffer; | |||
} | |||
private static async Task<int> ReadBodyLengthAsync(IMqttChannel stream, CancellationToken cancellationToken) | |||
public static async Task<int> ReadBodyLengthAsync(IMqttChannel channel, CancellationToken cancellationToken) | |||
{ | |||
// Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html. | |||
var multiplier = 1; | |||
@@ -107,7 +90,7 @@ namespace MQTTnet.Serializer | |||
throw new TaskCanceledException(); | |||
} | |||
var readCount = await stream.ReadAsync(buffer, 0, 1, cancellationToken).ConfigureAwait(false); | |||
var readCount = await channel.ReadAsync(buffer, 0, 1, cancellationToken).ConfigureAwait(false); | |||
if (readCount <= 0) | |||
{ | |||
throw new MqttCommunicationException("Connection closed while reading remaining length data."); | |||
@@ -4,6 +4,7 @@ using MQTTnet.Protocol; | |||
using System; | |||
using System.IO; | |||
using System.Linq; | |||
using MQTTnet.Adapter; | |||
namespace MQTTnet.Serializer | |||
{ | |||
@@ -63,28 +64,34 @@ namespace MQTTnet.Serializer | |||
} | |||
} | |||
public MqttBasePacket Deserialize(MqttPacketHeader header, Stream body) | |||
public MqttBasePacket Deserialize(ReceivedMqttPacket receivedMqttPacket) | |||
{ | |||
if (header == null) throw new ArgumentNullException(nameof(header)); | |||
if (body == null) throw new ArgumentNullException(nameof(body)); | |||
if (receivedMqttPacket == null) throw new ArgumentNullException(nameof(receivedMqttPacket)); | |||
switch (header.ControlPacketType) | |||
var controlPacketType = receivedMqttPacket.FixedHeader >> 4; | |||
if (controlPacketType < 1 || controlPacketType > 14) | |||
{ | |||
case MqttControlPacketType.Connect: return DeserializeConnect(body); | |||
case MqttControlPacketType.ConnAck: return DeserializeConnAck(body); | |||
throw new MqttProtocolViolationException($"The packet type is invalid ({controlPacketType})."); | |||
} | |||
switch ((MqttControlPacketType)controlPacketType) | |||
{ | |||
case MqttControlPacketType.Connect: return DeserializeConnect(receivedMqttPacket.Body); | |||
case MqttControlPacketType.ConnAck: return DeserializeConnAck(receivedMqttPacket.Body); | |||
case MqttControlPacketType.Disconnect: return new MqttDisconnectPacket(); | |||
case MqttControlPacketType.Publish: return DeserializePublish(header, body); | |||
case MqttControlPacketType.PubAck: return DeserializePubAck(body); | |||
case MqttControlPacketType.PubRec: return DeserializePubRec(body); | |||
case MqttControlPacketType.PubRel: return DeserializePubRel(body); | |||
case MqttControlPacketType.PubComp: return DeserializePubComp(body); | |||
case MqttControlPacketType.Publish: return DeserializePublish(receivedMqttPacket); | |||
case MqttControlPacketType.PubAck: return DeserializePubAck(receivedMqttPacket.Body); | |||
case MqttControlPacketType.PubRec: return DeserializePubRec(receivedMqttPacket.Body); | |||
case MqttControlPacketType.PubRel: return DeserializePubRel(receivedMqttPacket.Body); | |||
case MqttControlPacketType.PubComp: return DeserializePubComp(receivedMqttPacket.Body); | |||
case MqttControlPacketType.PingReq: return new MqttPingReqPacket(); | |||
case MqttControlPacketType.PingResp: return new MqttPingRespPacket(); | |||
case MqttControlPacketType.Subscribe: return DeserializeSubscribe(header, body); | |||
case MqttControlPacketType.SubAck: return DeserializeSubAck(header, body); | |||
case MqttControlPacketType.Unsubscibe: return DeserializeUnsubscribe(header, body); | |||
case MqttControlPacketType.UnsubAck: return DeserializeUnsubAck(body); | |||
default: throw new MqttProtocolViolationException($"Packet type ({(int)header.ControlPacketType}) not supported."); | |||
case MqttControlPacketType.Subscribe: return DeserializeSubscribe(receivedMqttPacket.Body); | |||
case MqttControlPacketType.SubAck: return DeserializeSubAck(receivedMqttPacket.Body); | |||
case MqttControlPacketType.Unsubscibe: return DeserializeUnsubscribe(receivedMqttPacket.Body); | |||
case MqttControlPacketType.UnsubAck: return DeserializeUnsubAck(receivedMqttPacket.Body); | |||
default: throw new MqttProtocolViolationException($"Packet type ({controlPacketType}) not supported."); | |||
} | |||
} | |||
@@ -138,7 +145,7 @@ namespace MQTTnet.Serializer | |||
}; | |||
} | |||
private static MqttBasePacket DeserializeUnsubscribe(MqttPacketHeader header, Stream body) | |||
private static MqttBasePacket DeserializeUnsubscribe(Stream body) | |||
{ | |||
ThrowIfBodyIsEmpty(body); | |||
@@ -147,7 +154,7 @@ namespace MQTTnet.Serializer | |||
PacketIdentifier = body.ReadUInt16(), | |||
}; | |||
while (body.Position != header.BodyLength) | |||
while (body.Position != body.Length) | |||
{ | |||
packet.TopicFilters.Add(body.ReadStringWithLengthPrefix()); | |||
} | |||
@@ -155,7 +162,7 @@ namespace MQTTnet.Serializer | |||
return packet; | |||
} | |||
private static MqttBasePacket DeserializeSubscribe(MqttPacketHeader header, Stream body) | |||
private static MqttBasePacket DeserializeSubscribe(Stream body) | |||
{ | |||
ThrowIfBodyIsEmpty(body); | |||
@@ -164,7 +171,7 @@ namespace MQTTnet.Serializer | |||
PacketIdentifier = body.ReadUInt16() | |||
}; | |||
while (body.Position != header.BodyLength) | |||
while (body.Position != body.Length) | |||
{ | |||
packet.TopicFilters.Add(new TopicFilter( | |||
body.ReadStringWithLengthPrefix(), | |||
@@ -174,11 +181,12 @@ namespace MQTTnet.Serializer | |||
return packet; | |||
} | |||
private static MqttBasePacket DeserializePublish(MqttPacketHeader header, Stream body) | |||
private static MqttBasePacket DeserializePublish(ReceivedMqttPacket receivedMqttPacket) | |||
{ | |||
var body = receivedMqttPacket.Body; | |||
ThrowIfBodyIsEmpty(body); | |||
var fixedHeader = new ByteReader(header.FixedHeader); | |||
var fixedHeader = new ByteReader(receivedMqttPacket.FixedHeader); | |||
var retain = fixedHeader.Read(); | |||
var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2); | |||
var dup = fixedHeader.Read(); | |||
@@ -196,7 +204,7 @@ namespace MQTTnet.Serializer | |||
PacketIdentifier = packetIdentifier, | |||
Retain = retain, | |||
Topic = topic, | |||
Payload = body.ReadRemainingData(header), | |||
Payload = body.ReadRemainingData(), | |||
QualityOfServiceLevel = qualityOfServiceLevel, | |||
Dup = dup | |||
}; | |||
@@ -282,7 +290,7 @@ namespace MQTTnet.Serializer | |||
return packet; | |||
} | |||
private static MqttBasePacket DeserializeSubAck(MqttPacketHeader header, Stream body) | |||
private static MqttBasePacket DeserializeSubAck(Stream body) | |||
{ | |||
ThrowIfBodyIsEmpty(body); | |||
@@ -291,7 +299,7 @@ namespace MQTTnet.Serializer | |||
PacketIdentifier = body.ReadUInt16() | |||
}; | |||
while (body.Position != header.BodyLength) | |||
while (body.Position != body.Length) | |||
{ | |||
packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)body.ReadByte()); | |||
} | |||
@@ -16,6 +16,7 @@ namespace MQTTnet.Server | |||
private readonly string _clientId; | |||
private readonly Action _callback; | |||
private bool _isPaused; | |||
private Task _workerTask; | |||
public MqttClientKeepAliveMonitor(string clientId, Action callback, IMqttNetChildLogger logger) | |||
@@ -41,10 +42,25 @@ namespace MQTTnet.Server | |||
_workerTask = Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken); | |||
} | |||
////public void WaitForCompletion() | |||
////{ | |||
//// SpinWait.SpinUntil(() => _workerTask == null || _workerTask.IsCanceled || _workerTask.IsCompleted || _workerTask.IsFaulted); | |||
////} | |||
public void Pause() | |||
{ | |||
_isPaused = true; | |||
} | |||
public void Resume() | |||
{ | |||
_isPaused = false; | |||
} | |||
public void PacketReceived(MqttBasePacket packet) | |||
{ | |||
_lastPacketReceivedTracker.Restart(); | |||
if (!(packet is MqttPingReqPacket)) | |||
{ | |||
_lastNonKeepAlivePacketReceivedTracker.Restart(); | |||
} | |||
} | |||
private async Task RunAsync(int keepAlivePeriod, CancellationToken cancellationToken) | |||
{ | |||
@@ -56,7 +72,7 @@ namespace MQTTnet.Server | |||
while (!cancellationToken.IsCancellationRequested) | |||
{ | |||
// Values described here: [MQTT-3.1.2-24]. | |||
if (_lastPacketReceivedTracker.Elapsed.TotalSeconds > keepAlivePeriod * 1.5D) | |||
if (!_isPaused && _lastPacketReceivedTracker.Elapsed.TotalSeconds > keepAlivePeriod * 1.5D) | |||
{ | |||
_logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientId); | |||
_callback(); | |||
@@ -79,15 +95,5 @@ namespace MQTTnet.Server | |||
_logger.Verbose("Client {0}: Stopped checking keep alive timeout.", _clientId); | |||
} | |||
} | |||
public void PacketReceived(MqttBasePacket packet) | |||
{ | |||
_lastPacketReceivedTracker.Restart(); | |||
if (!(packet is MqttPingReqPacket)) | |||
{ | |||
_lastNonKeepAlivePacketReceivedTracker.Restart(); | |||
} | |||
} | |||
} | |||
} |
@@ -73,6 +73,8 @@ namespace MQTTnet.Server | |||
try | |||
{ | |||
_adapter = adapter; | |||
adapter.ReadingPacketStarted += OnAdapterReadingPacketStarted; | |||
adapter.ReadingPacketCompleted += OnAdapterReadingPacketCompleted; | |||
_cancellationTokenSource = new CancellationTokenSource(); | |||
_wasCleanDisconnect = false; | |||
@@ -106,6 +108,12 @@ namespace MQTTnet.Server | |||
} | |||
finally | |||
{ | |||
if (_adapter != null) | |||
{ | |||
_adapter.ReadingPacketStarted -= OnAdapterReadingPacketStarted; | |||
_adapter.ReadingPacketCompleted -= OnAdapterReadingPacketCompleted; | |||
} | |||
_adapter = null; | |||
_cancellationTokenSource?.Dispose(); | |||
@@ -340,5 +348,15 @@ namespace MQTTnet.Server | |||
var response = new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }; | |||
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { response }, cancellationToken); | |||
} | |||
private void OnAdapterReadingPacketCompleted(object sender, EventArgs e) | |||
{ | |||
_keepAliveMonitor?.Pause(); | |||
} | |||
private void OnAdapterReadingPacketStarted(object sender, EventArgs e) | |||
{ | |||
_keepAliveMonitor?.Resume(); | |||
} | |||
} | |||
} |
@@ -7,6 +7,7 @@ using BenchmarkDotNet.Attributes.Exporters; | |||
using System; | |||
using System.Threading; | |||
using System.IO; | |||
using MQTTnet.Adapter; | |||
using MQTTnet.Core.Internal; | |||
namespace MQTTnet.Benchmarks | |||
@@ -48,11 +49,14 @@ namespace MQTTnet.Benchmarks | |||
{ | |||
using (var headerStream = new MemoryStream(Join(_serializedPacket))) | |||
{ | |||
var header = MqttPacketReader.ReadHeaderAsync(new TestMqttChannel(headerStream), CancellationToken.None).GetAwaiter().GetResult(); | |||
var channel = new TestMqttChannel(headerStream); | |||
using (var bodyStream = new MemoryStream(Join(_serializedPacket), (int)headerStream.Position, header.BodyLength)) | |||
var header = MqttPacketReader.ReadFixedHeaderAsync(new TestMqttChannel(headerStream), CancellationToken.None).GetAwaiter().GetResult(); | |||
var bodyLength = MqttPacketReader.ReadBodyLengthAsync(channel, CancellationToken.None).GetAwaiter().GetResult(); | |||
using (var bodyStream = new MemoryStream(Join(_serializedPacket), (int)headerStream.Position, bodyLength)) | |||
{ | |||
_serializer.Deserialize(header, bodyStream); | |||
_serializer.Deserialize(new ReceivedMqttPacket((byte)header, bodyStream)); | |||
} | |||
} | |||
} | |||
@@ -12,9 +12,9 @@ namespace MQTTnet.Core.Tests | |||
[TestMethod] | |||
public void MqttPacketReader_EmptyStream() | |||
{ | |||
var header = MqttPacketReader.ReadHeaderAsync(new TestMqttChannel(new MemoryStream()), CancellationToken.None).GetAwaiter().GetResult(); | |||
var header = MqttPacketReader.ReadFixedHeaderAsync(new TestMqttChannel(new MemoryStream()), CancellationToken.None).GetAwaiter().GetResult(); | |||
Assert.IsNull(header); | |||
Assert.AreEqual(-1, header); | |||
} | |||
} | |||
} |
@@ -3,6 +3,7 @@ using System.IO; | |||
using System.Text; | |||
using System.Threading; | |||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||
using MQTTnet.Adapter; | |||
using MQTTnet.Core.Internal; | |||
using MQTTnet.Packets; | |||
using MQTTnet.Protocol; | |||
@@ -416,11 +417,13 @@ namespace MQTTnet.Core.Tests | |||
using (var headerStream = new MemoryStream(Join(buffer1))) | |||
{ | |||
var header = MqttPacketReader.ReadHeaderAsync(new TestMqttChannel(headerStream), CancellationToken.None).GetAwaiter().GetResult(); | |||
var channel = new TestMqttChannel(headerStream); | |||
var header = MqttPacketReader.ReadFixedHeaderAsync(channel, CancellationToken.None).GetAwaiter().GetResult(); | |||
var bodyLength = MqttPacketReader.ReadBodyLengthAsync(channel, CancellationToken.None).GetAwaiter().GetResult(); | |||
using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, header.BodyLength)) | |||
using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, bodyLength)) | |||
{ | |||
var deserializedPacket = serializer.Deserialize(header, bodyStream); | |||
var deserializedPacket = serializer.Deserialize(new ReceivedMqttPacket((byte)header, bodyStream)); | |||
var buffer2 = serializer.Serialize(deserializedPacket); | |||
Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(Join(buffer2))); | |||
@@ -19,6 +19,9 @@ namespace MQTTnet.Core.Tests | |||
public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer(); | |||
public event EventHandler ReadingPacketStarted; | |||
public event EventHandler ReadingPacketCompleted; | |||
public void Dispose() | |||
{ | |||
} | |||