Browse Source

Merge pull request #469 from JanEggers/fixedConcurrentWrites

fixed parallel writes
release/3.x.x
Christian 6 years ago
committed by GitHub
parent
commit
cc4e37fa40
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 4 deletions
  1. +3
    -1
      Build/MQTTnet.AspNetCore.nuspec
  2. +15
    -3
      Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
  3. +22
    -0
      Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs

+ 3
- 1
Build/MQTTnet.AspNetCore.nuspec View File

@@ -10,7 +10,9 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is a support library to integrate MQTTnet into AspNetCore.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).
* fixed concurrent sends with Aspnetcore.Connections.Abstractions based transport
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>


+ 15
- 3
Source/MQTTnet.AspnetCore/MqttConnectionContext.cs View File

@@ -27,6 +27,8 @@ namespace MQTTnet.AspNetCore
public event EventHandler ReadingPacketStarted;
public event EventHandler ReadingPacketCompleted;

private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1);

public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
if (Connection is TcpConnection tcp && !tcp.IsConnected)
@@ -105,10 +107,20 @@ namespace MQTTnet.AspNetCore
return null;
}

public Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
public async Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{
var buffer = PacketSerializer.Serialize(packet);
return Connection.Transport.Output.WriteAsync(buffer.AsMemory(), cancellationToken).AsTask();
var buffer = PacketSerializer.Serialize(packet).AsMemory();
var output = Connection.Transport.Output;

await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await output.WriteAsync(buffer, cancellationToken);
}
finally
{
_writerSemaphore.Release();
}
}

public void Dispose()


+ 22
- 0
Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs View File

@@ -2,8 +2,10 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.AspNetCore.Tests.Mockups;
using MQTTnet.Exceptions;
using MQTTnet.Packets;
using MQTTnet.Serializer;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

@@ -25,5 +27,25 @@ namespace MQTTnet.AspNetCore.Tests

await Assert.ThrowsExceptionAsync<MqttCommunicationException>(() => ctx.ReceivePacketAsync(TimeSpan.FromSeconds(1), CancellationToken.None));
}
[TestMethod]
public async Task TestParallelWrites()
{
var serializer = new MqttPacketSerializer { };
var pipe = new DuplexPipeMockup();
var connection = new DefaultConnectionContext();
connection.Transport = pipe;
var ctx = new MqttConnectionContext(serializer, connection);

var tasks = Enumerable.Range(1, 10).Select(_ => Task.Run(async () =>
{
for (int i = 0; i < 100; i++)
{
await ctx.SendPacketAsync(new MqttPublishPacket(), CancellationToken.None).ConfigureAwait(false);
}
}));

await Task.WhenAll(tasks).ConfigureAwait(false);
}
}
}

Loading…
Cancel
Save