@@ -23,9 +23,14 @@ namespace MQTTnet.AspNetCore | |||||
_input = Connection.Transport.Input; | _input = Connection.Transport.Input; | ||||
_output = Connection.Transport.Output; | _output = Connection.Transport.Output; | ||||
} | } | ||||
_reader = new SpanBasedMqttPacketBodyReader(); | |||||
} | } | ||||
private PipeReader _input; | private PipeReader _input; | ||||
private PipeWriter _output; | private PipeWriter _output; | ||||
private readonly SpanBasedMqttPacketBodyReader _reader; | |||||
public string Endpoint => Connection.ConnectionId; | public string Endpoint => Connection.ConnectionId; | ||||
public bool IsSecureConnection => false; // TODO: Fix detection (WS vs. WSS). | public bool IsSecureConnection => false; // TODO: Fix detection (WS vs. WSS). | ||||
@@ -88,7 +93,7 @@ namespace MQTTnet.AspNetCore | |||||
{ | { | ||||
if (!buffer.IsEmpty) | if (!buffer.IsEmpty) | ||||
{ | { | ||||
if (PacketFormatterAdapter.TryDecode(buffer, out var packet, out consumed, out observed)) | |||||
if (PacketFormatterAdapter.TryDecode(_reader, buffer, out var packet, out consumed, out observed)) | |||||
{ | { | ||||
return packet; | return packet; | ||||
} | } | ||||
@@ -9,7 +9,7 @@ namespace MQTTnet.AspNetCore | |||||
{ | { | ||||
public static class ReaderExtensions | public static class ReaderExtensions | ||||
{ | { | ||||
public static bool TryDecode(this MqttPacketFormatterAdapter formatter, in ReadOnlySequence<byte> input, out MqttBasePacket packet, out SequencePosition consumed, out SequencePosition observed) | |||||
public static bool TryDecode(this MqttPacketFormatterAdapter formatter, SpanBasedMqttPacketBodyReader reader, in ReadOnlySequence<byte> input, out MqttBasePacket packet, out SequencePosition consumed, out SequencePosition observed) | |||||
{ | { | ||||
if (formatter == null) throw new ArgumentNullException(nameof(formatter)); | if (formatter == null) throw new ArgumentNullException(nameof(formatter)); | ||||
@@ -36,8 +36,6 @@ namespace MQTTnet.AspNetCore | |||||
var bodySlice = copy.Slice(0, bodyLength); | var bodySlice = copy.Slice(0, bodyLength); | ||||
var buffer = bodySlice.GetMemory(); | var buffer = bodySlice.GetMemory(); | ||||
var reader = new SpanBasedMqttPacketBodyReader(); | |||||
reader.SetBuffer(buffer); | reader.SetBuffer(buffer); | ||||
var receivedMqttPacket = new ReceivedMqttPacket(fixedheader, reader, buffer.Length + 2); | var receivedMqttPacket = new ReceivedMqttPacket(fixedheader, reader, buffer.Length + 2); | ||||
@@ -24,21 +24,23 @@ namespace MQTTnet.AspNetCore.Tests | |||||
var observed = part.Start; | var observed = part.Start; | ||||
var result = false; | var result = false; | ||||
var reader = new SpanBasedMqttPacketBodyReader(); | |||||
part = sequence.Slice(sequence.Start, 0); // empty message should fail | part = sequence.Slice(sequence.Start, 0); // empty message should fail | ||||
result = serializer.TryDecode(part, out packet, out consumed, out observed); | |||||
result = serializer.TryDecode(reader, part, out packet, out consumed, out observed); | |||||
Assert.IsFalse(result); | Assert.IsFalse(result); | ||||
part = sequence.Slice(sequence.Start, 1); // partial fixed header should fail | part = sequence.Slice(sequence.Start, 1); // partial fixed header should fail | ||||
result = serializer.TryDecode(part, out packet, out consumed, out observed); | |||||
result = serializer.TryDecode(reader, part, out packet, out consumed, out observed); | |||||
Assert.IsFalse(result); | Assert.IsFalse(result); | ||||
part = sequence.Slice(sequence.Start, 4); // partial body should fail | part = sequence.Slice(sequence.Start, 4); // partial body should fail | ||||
result = serializer.TryDecode(part, out packet, out consumed, out observed); | |||||
result = serializer.TryDecode(reader, part, out packet, out consumed, out observed); | |||||
Assert.IsFalse(result); | Assert.IsFalse(result); | ||||
part = sequence; // complete msg should work | part = sequence; // complete msg should work | ||||
result = serializer.TryDecode(part, out packet, out consumed, out observed); | |||||
result = serializer.TryDecode(reader, part, out packet, out consumed, out observed); | |||||
Assert.IsTrue(result); | Assert.IsTrue(result); | ||||
} | } | ||||
} | } | ||||