Browse Source

fixed rebase

release/3.x.x
JanEggers 6 years ago
parent
commit
7a7fab0907
2 changed files with 37 additions and 33 deletions
  1. +7
    -1
      Frameworks/MQTTnet.AspnetCore/MqttConnectionContext.cs
  2. +30
    -32
      Frameworks/MQTTnet.AspnetCore/ReaderExtensions.cs

+ 7
- 1
Frameworks/MQTTnet.AspnetCore/MqttConnectionContext.cs View File

@@ -48,7 +48,8 @@ namespace MQTTnet.AspNetCore
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
ReadResult readResult; ReadResult readResult;
ReadingPacketStarted?.Invoke(this, EventArgs.Empty);

var readTask = input.ReadAsync(cancellationToken); var readTask = input.ReadAsync(cancellationToken);
if (readTask.IsCompleted) if (readTask.IsCompleted)
{ {
@@ -84,6 +85,7 @@ namespace MQTTnet.AspNetCore
// We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data
// before yielding the read again. // before yielding the read again.
input.AdvanceTo(consumed, observed); input.AdvanceTo(consumed, observed);
ReadingPacketCompleted?.Invoke(this, EventArgs.Empty);
} }
} }


@@ -106,6 +108,10 @@ namespace MQTTnet.AspNetCore
} }


private int messageId; private int messageId;

public event EventHandler ReadingPacketStarted;
public event EventHandler ReadingPacketCompleted;

public Task PublishAsync(MqttPublishPacket packet) public Task PublishAsync(MqttPublishPacket packet)
{ {
if (!packet.PacketIdentifier.HasValue && packet.QualityOfServiceLevel > MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce) if (!packet.PacketIdentifier.HasValue && packet.QualityOfServiceLevel > MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)


+ 30
- 32
Frameworks/MQTTnet.AspnetCore/ReaderExtensions.cs View File

@@ -1,6 +1,7 @@
using System; using System;
using System.Buffers; using System.Buffers;
using System.IO; using System.IO;
using MQTTnet.Adapter;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Packets; using MQTTnet.Packets;
using MQTTnet.Protocol; using MQTTnet.Protocol;
@@ -10,37 +11,23 @@ namespace MQTTnet.AspNetCore
{ {
public static class ReaderExtensions public static class ReaderExtensions
{ {
public static MqttPacketHeader ReadHeader(this ref ReadOnlySequence<byte> input)
{
if (input.Length < 2)
{
return null;
}

var fixedHeader = input.First.Span[0];
var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4);
var bodyLength = ReadBodyLength(ref input);

return new MqttPacketHeader
{
FixedHeader = fixedHeader,
ControlPacketType = controlPacketType,
BodyLength = bodyLength
};
}

private static int ReadBodyLength(ref ReadOnlySequence<byte> input)
private static bool TryReadBodyLength(ref ReadOnlySequence<byte> input, out int result)
{ {
// Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html. // Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html.
var multiplier = 1; var multiplier = 1;
var value = 0; var value = 0;
byte encodedByte; byte encodedByte;
var index = 1; var index = 1;
result = 0;


var temp = input.Slice(0, Math.Min(5, input.Length)).GetArray(); var temp = input.Slice(0, Math.Min(5, input.Length)).GetArray();


do do
{ {
if (index == temp.Length)
{
return false;
}
encodedByte = temp[index]; encodedByte = temp[index];
index++; index++;


@@ -55,7 +42,8 @@ namespace MQTTnet.AspNetCore


input = input.Slice(index); input = input.Slice(index);


return value;
result = value;
return true;
} }




@@ -75,17 +63,22 @@ namespace MQTTnet.AspNetCore
{ {
packet = null; packet = null;
var copy = input; var copy = input;
var header = copy.ReadHeader();
if (header == null || copy.Length < header.BodyLength)
if (copy.Length < 2)
{
return false;
}

var fixedheader = copy.First.Span[0];
if (!TryReadBodyLength(ref copy, out var bodyLength))
{ {
return false; return false;
} }


input = copy.Slice(header.BodyLength);
var bodySlice = copy.Slice(0, header.BodyLength);
input = copy.Slice(bodyLength);
var bodySlice = copy.Slice(0, bodyLength);
using (var body = new MemoryStream(bodySlice.GetArray())) using (var body = new MemoryStream(bodySlice.GetArray()))
{ {
packet = serializer.Deserialize(header, body);
packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, body));
return true; return true;
} }
} }
@@ -93,19 +86,24 @@ namespace MQTTnet.AspNetCore
public static bool TryDeserialize(this IMqttPacketSerializer serializer, in ReadOnlySequence<byte> input, out MqttBasePacket packet, out SequencePosition consumed, out SequencePosition observed) public static bool TryDeserialize(this IMqttPacketSerializer serializer, in ReadOnlySequence<byte> input, out MqttBasePacket packet, out SequencePosition consumed, out SequencePosition observed)
{ {
packet = null; packet = null;
consumed = input.Start;
observed = input.End;
var copy = input; var copy = input;
var header = copy.ReadHeader();
if (header == null || copy.Length < header.BodyLength)
if (copy.Length < 2)
{
return false;
}

var fixedheader = copy.First.Span[0];
if (!TryReadBodyLength(ref copy, out var bodyLength))
{ {
consumed = input.Start;
observed = input.End;
return false; return false;
} }


var bodySlice = copy.Slice(0, header.BodyLength);
var bodySlice = copy.Slice(0, bodyLength);
using (var body = new MemoryStream(bodySlice.GetArray())) using (var body = new MemoryStream(bodySlice.GetArray()))
{ {
packet = serializer.Deserialize(header, body);
packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, body));
consumed = bodySlice.End; consumed = bodySlice.End;
observed = bodySlice.End; observed = bodySlice.End;
return true; return true;


Loading…
Cancel
Save