Browse Source

Merge pull request #157 from kpreisser/trueAsync

Use asynchronous methods for blocking operations
release/3.x.x
Christian 7 years ago
committed by GitHub
parent
commit
5becf7980a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 25 additions and 17 deletions
  1. +0
    -1
      .gitignore
  2. +1
    -1
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  3. +13
    -10
      Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs
  4. +11
    -5
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
  5. BIN
     

+ 0
- 1
.gitignore View File

@@ -292,5 +292,4 @@ Build/nuget.exe
*.js
*.map

*codeSigningKey.pfx
/Tests/MQTTnet.TestApp.NetCore/RetainedMessages.json

+ 1
- 1
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs View File

@@ -120,7 +120,7 @@ namespace MQTTnet.Adapter

private static async Task<ReceivedMqttPacket> ReceiveAsync(Stream stream, CancellationToken cancellationToken)
{
var header = MqttPacketReader.ReadHeaderFromSource(stream, cancellationToken);
var header = await MqttPacketReader.ReadHeaderFromSourceAsync(stream, cancellationToken).ConfigureAwait(false);
if (header == null)
{
return null;


+ 13
- 10
Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs View File

@@ -22,17 +22,18 @@ namespace MQTTnet.Serializer

public bool EndOfRemainingData => BaseStream.Position == _header.BodyLength;

public static MqttPacketHeader ReadHeaderFromSource(Stream stream, CancellationToken cancellationToken)
public static async Task<MqttPacketHeader> ReadHeaderFromSourceAsync(Stream stream, CancellationToken cancellationToken)
{
var buffer = stream.ReadByte();
if (buffer == -1)
byte[] singleByteBuf = new byte[1];
var readCount = await stream.ReadAsync(singleByteBuf, 0, singleByteBuf.Length).ConfigureAwait(false);
if (readCount <= 0)
{
return null;
}
var fixedHeader = (byte)buffer;
var fixedHeader = singleByteBuf[0];
var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4);
var bodyLength = ReadBodyLengthFromSource(stream, cancellationToken);
var bodyLength = await ReadBodyLengthFromSourceAsync(stream, cancellationToken).ConfigureAwait(false);

return new MqttPacketHeader
{
@@ -80,13 +81,15 @@ namespace MQTTnet.Serializer
return ReadBytes(_header.BodyLength - (int)BaseStream.Position);
}

private static int ReadBodyLengthFromSource(Stream stream, CancellationToken cancellationToken)
private static async Task<int> ReadBodyLengthFromSourceAsync(Stream stream, CancellationToken cancellationToken)
{
// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var multiplier = 1;
var value = 0;
byte encodedByte;

byte[] singleByteBuf = new byte[1];

var readBytes = new List<byte>();
do
{
@@ -95,13 +98,13 @@ namespace MQTTnet.Serializer
throw new TaskCanceledException();
}

var buffer = stream.ReadByte();
if (buffer == -1)
int readCount = await stream.ReadAsync(singleByteBuf, 0, singleByteBuf.Length).ConfigureAwait(false);
if (readCount <= 0)
{
throw new MqttCommunicationException("Connection closed while reading remaining length data.");
}

encodedByte = (byte)buffer;
encodedByte = singleByteBuf[0];
readBytes.Add(encodedByte);

value += (byte)(encodedByte & 127) * multiplier;


+ 11
- 5
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs View File

@@ -12,7 +12,8 @@ namespace MQTTnet.Server
{
public sealed class MqttClientPendingMessagesQueue : IDisposable
{
private readonly BlockingCollection<MqttBasePacket> _queue = new BlockingCollection<MqttBasePacket>();
private readonly ConcurrentQueue<MqttBasePacket> _queue = new ConcurrentQueue<MqttBasePacket>();
private readonly SemaphoreSlim _queueWaitSemaphore = new SemaphoreSlim(0);
private readonly IMqttServerOptions _options;
private readonly MqttClientSession _session;
private readonly IMqttNetLogger _logger;
@@ -40,7 +41,8 @@ namespace MQTTnet.Server
{
if (packet == null) throw new ArgumentNullException(nameof(packet));

_queue.Add(packet);
_queue.Enqueue(packet);
_queueWaitSemaphore.Release();
_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _session.ClientId);
}

@@ -67,7 +69,10 @@ namespace MQTTnet.Server
MqttBasePacket packet = null;
try
{
packet = _queue.Take(cancellationToken);
await _queueWaitSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
if (!_queue.TryDequeue(out packet)) {
throw new InvalidOperationException(); // should not happen
}
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false);

_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _session.ClientId);
@@ -95,7 +100,8 @@ namespace MQTTnet.Server
if (publishPacket.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
publishPacket.Dup = true;
_queue.Add(packet, CancellationToken.None);
_queue.Enqueue(packet);
_queueWaitSemaphore.Release();
}
}

@@ -105,7 +111,7 @@ namespace MQTTnet.Server

public void Dispose()
{
_queue?.Dispose();
_queueWaitSemaphore?.Dispose();
}
}
}

BIN
View File


Loading…
Cancel
Save