diff --git a/Build/MQTTnet.AspNetCore.nuspec b/Build/MQTTnet.AspNetCore.nuspec index 31ffeca..5918514 100644 --- a/Build/MQTTnet.AspNetCore.nuspec +++ b/Build/MQTTnet.AspNetCore.nuspec @@ -10,7 +10,9 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false This is a support library to integrate MQTTnet into AspNetCore. - For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/). + For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/). + * fixed concurrent sends with Aspnetcore.Connections.Abstractions based transport + Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs index 361eb6a..e49b36f 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -27,6 +27,8 @@ namespace MQTTnet.AspNetCore public event EventHandler ReadingPacketStarted; public event EventHandler ReadingPacketCompleted; + private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); + public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) { if (Connection is TcpConnection tcp && !tcp.IsConnected) @@ -105,10 +107,20 @@ namespace MQTTnet.AspNetCore return null; } - public Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) + public async Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) { - var buffer = PacketSerializer.Serialize(packet); - return Connection.Transport.Output.WriteAsync(buffer.AsMemory(), cancellationToken).AsTask(); + var buffer = PacketSerializer.Serialize(packet).AsMemory(); + var output = Connection.Transport.Output; + + await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + await output.WriteAsync(buffer, cancellationToken); + } + finally + { + _writerSemaphore.Release(); + } } public void Dispose() diff --git a/Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs b/Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs index 7498015..699aa65 100644 --- a/Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs +++ b/Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs @@ -2,8 +2,10 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.AspNetCore.Tests.Mockups; using MQTTnet.Exceptions; +using MQTTnet.Packets; using MQTTnet.Serializer; using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -25,5 +27,25 @@ namespace MQTTnet.AspNetCore.Tests await Assert.ThrowsExceptionAsync(() => ctx.ReceivePacketAsync(TimeSpan.FromSeconds(1), CancellationToken.None)); } + + [TestMethod] + public async Task TestParallelWrites() + { + var serializer = new MqttPacketSerializer { }; + var pipe = new DuplexPipeMockup(); + var connection = new DefaultConnectionContext(); + connection.Transport = pipe; + var ctx = new MqttConnectionContext(serializer, connection); + + var tasks = Enumerable.Range(1, 10).Select(_ => Task.Run(async () => + { + for (int i = 0; i < 100; i++) + { + await ctx.SendPacketAsync(new MqttPublishPacket(), CancellationToken.None).ConfigureAwait(false); + } + })); + + await Task.WhenAll(tasks).ConfigureAwait(false); + } } }