diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs index d798be8..d3e583f 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -18,6 +18,8 @@ namespace MQTTnet.AspNetCore PacketFormatterAdapter = packetFormatterAdapter ?? throw new ArgumentNullException(nameof(packetFormatterAdapter)); Connection = connection ?? throw new ArgumentNullException(nameof(connection)); } + private PipeReader _input; + private PipeWriter _output; public string Endpoint => Connection.ConnectionId; public bool IsSecureConnection => false; // TODO: Fix detection (WS vs. WSS). @@ -33,20 +35,21 @@ namespace MQTTnet.AspNetCore private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); - public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) + public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) { if (Connection is TcpConnection tcp && !tcp.IsConnected) { - return tcp.StartAsync(); + await tcp.StartAsync().ConfigureAwait(false); } - return Task.CompletedTask; + _input = Connection.Transport.Input; + _output = Connection.Transport.Output; } public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken) { - Connection.Transport.Input.Complete(); - Connection.Transport.Output.Complete(); + _input?.Complete(); + _output?.Complete(); return Task.CompletedTask; } @@ -54,7 +57,6 @@ namespace MQTTnet.AspNetCore public async Task ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken) { var input = Connection.Transport.Input; - var reader = new SpanBasedMqttPacketBodyReader(); try { diff --git a/Source/MQTTnet.AspnetCore/SpanBasedMqttPacketBodyReader.cs b/Source/MQTTnet.AspnetCore/SpanBasedMqttPacketBodyReader.cs index 01ccb3a..9e16dac 100644 --- a/Source/MQTTnet.AspnetCore/SpanBasedMqttPacketBodyReader.cs +++ b/Source/MQTTnet.AspnetCore/SpanBasedMqttPacketBodyReader.cs @@ -30,7 +30,7 @@ namespace MQTTnet.AspNetCore public byte[] ReadRemainingData() { - return _buffer.ToArray(); + return _buffer.Slice(_offset).ToArray(); } public ushort ReadUInt16() @@ -47,13 +47,13 @@ namespace MQTTnet.AspNetCore return ReadSegmentWithLengthPrefix().ToArray(); } - private ReadOnlyMemory ReadSegmentWithLengthPrefix() + private ReadOnlySpan ReadSegmentWithLengthPrefix() { var length = ReadUInt16(); ValidateReceiveBuffer(length); - var result = _buffer.Slice(_offset, length); + var result = _buffer.Slice(_offset, length).Span; _offset += length; return result; } @@ -69,7 +69,7 @@ namespace MQTTnet.AspNetCore public unsafe string ReadStringWithLengthPrefix() { var buffer = ReadSegmentWithLengthPrefix(); - fixed (byte* bytes = &buffer.Span.GetPinnableReference()) + fixed (byte* bytes = &buffer.GetPinnableReference()) { var result = Encoding.UTF8.GetString(bytes, buffer.Length); return result;