diff --git a/Frameworks/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Frameworks/MQTTnet.AspnetCore/MqttConnectionContext.cs index 0bd2b4e..7683fd0 100644 --- a/Frameworks/MQTTnet.AspnetCore/MqttConnectionContext.cs +++ b/Frameworks/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -48,7 +48,8 @@ namespace MQTTnet.AspNetCore while (!cancellationToken.IsCancellationRequested) { ReadResult readResult; - + ReadingPacketStarted?.Invoke(this, EventArgs.Empty); + var readTask = input.ReadAsync(cancellationToken); if (readTask.IsCompleted) { @@ -84,6 +85,7 @@ namespace MQTTnet.AspNetCore // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data // before yielding the read again. input.AdvanceTo(consumed, observed); + ReadingPacketCompleted?.Invoke(this, EventArgs.Empty); } } @@ -106,6 +108,10 @@ namespace MQTTnet.AspNetCore } private int messageId; + + public event EventHandler ReadingPacketStarted; + public event EventHandler ReadingPacketCompleted; + public Task PublishAsync(MqttPublishPacket packet) { if (!packet.PacketIdentifier.HasValue && packet.QualityOfServiceLevel > MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce) diff --git a/Frameworks/MQTTnet.AspnetCore/ReaderExtensions.cs b/Frameworks/MQTTnet.AspnetCore/ReaderExtensions.cs index a4e5e02..718538b 100644 --- a/Frameworks/MQTTnet.AspnetCore/ReaderExtensions.cs +++ b/Frameworks/MQTTnet.AspnetCore/ReaderExtensions.cs @@ -1,6 +1,7 @@ using System; using System.Buffers; using System.IO; +using MQTTnet.Adapter; using MQTTnet.Exceptions; using MQTTnet.Packets; using MQTTnet.Protocol; @@ -10,37 +11,23 @@ namespace MQTTnet.AspNetCore { public static class ReaderExtensions { - public static MqttPacketHeader ReadHeader(this ref ReadOnlySequence input) - { - if (input.Length < 2) - { - return null; - } - - var fixedHeader = input.First.Span[0]; - var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4); - var bodyLength = ReadBodyLength(ref input); - - return new MqttPacketHeader - { - FixedHeader = fixedHeader, - ControlPacketType = controlPacketType, - BodyLength = bodyLength - }; - } - - private static int ReadBodyLength(ref ReadOnlySequence input) + private static bool TryReadBodyLength(ref ReadOnlySequence input, out int result) { // Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html. var multiplier = 1; var value = 0; byte encodedByte; var index = 1; + result = 0; var temp = input.Slice(0, Math.Min(5, input.Length)).GetArray(); do { + if (index == temp.Length) + { + return false; + } encodedByte = temp[index]; index++; @@ -55,7 +42,8 @@ namespace MQTTnet.AspNetCore input = input.Slice(index); - return value; + result = value; + return true; } @@ -75,17 +63,22 @@ namespace MQTTnet.AspNetCore { packet = null; var copy = input; - var header = copy.ReadHeader(); - if (header == null || copy.Length < header.BodyLength) + if (copy.Length < 2) + { + return false; + } + + var fixedheader = copy.First.Span[0]; + if (!TryReadBodyLength(ref copy, out var bodyLength)) { return false; } - input = copy.Slice(header.BodyLength); - var bodySlice = copy.Slice(0, header.BodyLength); + input = copy.Slice(bodyLength); + var bodySlice = copy.Slice(0, bodyLength); using (var body = new MemoryStream(bodySlice.GetArray())) { - packet = serializer.Deserialize(header, body); + packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, body)); return true; } } @@ -93,19 +86,24 @@ namespace MQTTnet.AspNetCore public static bool TryDeserialize(this IMqttPacketSerializer serializer, in ReadOnlySequence input, out MqttBasePacket packet, out SequencePosition consumed, out SequencePosition observed) { packet = null; + consumed = input.Start; + observed = input.End; var copy = input; - var header = copy.ReadHeader(); - if (header == null || copy.Length < header.BodyLength) + if (copy.Length < 2) + { + return false; + } + + var fixedheader = copy.First.Span[0]; + if (!TryReadBodyLength(ref copy, out var bodyLength)) { - consumed = input.Start; - observed = input.End; return false; } - var bodySlice = copy.Slice(0, header.BodyLength); + var bodySlice = copy.Slice(0, bodyLength); using (var body = new MemoryStream(bodySlice.GetArray())) { - packet = serializer.Deserialize(header, body); + packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, body)); consumed = bodySlice.End; observed = bodySlice.End; return true;