From 4f66614cdd0a8dc5fd6428482c289e21d154354f Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Tue, 9 Jul 2019 14:13:20 +0200 Subject: [PATCH] fixed #691 --- .../MqttConnectionContext.cs | 9 ++--- .../Mockups/DuplexPipeMockup.cs | 12 +++++-- .../Mockups/LimitedMemoryPool.cs | 18 ++++++++++ .../Mockups/MemoryOwner.cs | 33 +++++++++++++++++++ .../MqttConnectionContextTest.cs | 16 +++++++++ 5 files changed, 82 insertions(+), 6 deletions(-) create mode 100644 Tests/MQTTnet.AspNetCore.Tests/Mockups/LimitedMemoryPool.cs create mode 100644 Tests/MQTTnet.AspNetCore.Tests/Mockups/MemoryOwner.cs diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs index f50d85d..2aa9842 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -162,11 +162,12 @@ namespace MQTTnet.AspNetCore var buffer = formatter.Encode(packet); var msg = buffer.AsMemory(); var output = _output; - msg.CopyTo(output.GetMemory(msg.Length)); - BytesSent += msg.Length; + var result = await output.WriteAsync(msg, cancellationToken).ConfigureAwait(false); + if (result.IsCompleted) + { + BytesSent += msg.Length; + } PacketFormatterAdapter.FreeBuffer(); - output.Advance(msg.Length); - await output.FlushAsync().ConfigureAwait(false); } finally { diff --git a/Tests/MQTTnet.AspNetCore.Tests/Mockups/DuplexPipeMockup.cs b/Tests/MQTTnet.AspNetCore.Tests/Mockups/DuplexPipeMockup.cs index 1774f18..306749b 100644 --- a/Tests/MQTTnet.AspNetCore.Tests/Mockups/DuplexPipeMockup.cs +++ b/Tests/MQTTnet.AspNetCore.Tests/Mockups/DuplexPipeMockup.cs @@ -4,13 +4,21 @@ namespace MQTTnet.AspNetCore.Tests.Mockups { public class DuplexPipeMockup : IDuplexPipe { + public DuplexPipeMockup() + { + var pool = new LimitedMemoryPool(); + var pipeOptions = new PipeOptions(pool); + Receive = new Pipe(pipeOptions); + Send = new Pipe(pipeOptions); + } + PipeReader IDuplexPipe.Input => Receive.Reader; PipeWriter IDuplexPipe.Output => Send.Writer; - public Pipe Receive { get; set; } = new Pipe(); + public Pipe Receive { get; set; } - public Pipe Send { get; set; } = new Pipe(); + public Pipe Send { get; set; } } } diff --git a/Tests/MQTTnet.AspNetCore.Tests/Mockups/LimitedMemoryPool.cs b/Tests/MQTTnet.AspNetCore.Tests/Mockups/LimitedMemoryPool.cs new file mode 100644 index 0000000..ac5c23c --- /dev/null +++ b/Tests/MQTTnet.AspNetCore.Tests/Mockups/LimitedMemoryPool.cs @@ -0,0 +1,18 @@ +using System.Buffers; + +namespace MQTTnet.AspNetCore.Tests.Mockups +{ + public class LimitedMemoryPool : MemoryPool + { + protected override void Dispose(bool disposing) + { + } + + public override IMemoryOwner Rent(int minBufferSize = -1) + { + return new MemoryOwner(minBufferSize); + } + + public override int MaxBufferSize { get; } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.AspNetCore.Tests/Mockups/MemoryOwner.cs b/Tests/MQTTnet.AspNetCore.Tests/Mockups/MemoryOwner.cs new file mode 100644 index 0000000..1b7b02f --- /dev/null +++ b/Tests/MQTTnet.AspNetCore.Tests/Mockups/MemoryOwner.cs @@ -0,0 +1,33 @@ +using System; +using System.Buffers; + +namespace MQTTnet.AspNetCore.Tests.Mockups +{ + public class MemoryOwner : IMemoryOwner + { + private readonly byte[] _raw; + + public MemoryOwner(int size) + { + if (size <= 0) + { + size = 1024; + } + + if (size > 4096) + { + size = 4096; + } + + _raw = ArrayPool.Shared.Rent(size); + Memory = _raw; + } + + public void Dispose() + { + ArrayPool.Shared.Return(_raw); + } + + public Memory Memory { get; } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs b/Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs index 8fb74ae..f916779 100644 --- a/Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs +++ b/Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs @@ -47,5 +47,21 @@ namespace MQTTnet.AspNetCore.Tests await Task.WhenAll(tasks).ConfigureAwait(false); } + + + [TestMethod] + public async Task TestLargePacket() + { + var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311); + var pipe = new DuplexPipeMockup(); + var connection = new DefaultConnectionContext(); + connection.Transport = pipe; + var ctx = new MqttConnectionContext(serializer, connection); + + await ctx.SendPacketAsync(new MqttPublishPacket() { Payload = new byte[20_000] }, TimeSpan.Zero, CancellationToken.None).ConfigureAwait(false); + + var readResult = await pipe.Send.Reader.ReadAsync(); + Assert.IsTrue(readResult.Buffer.Length > 20000); + } } }