Browse Source

Merge branch 'develop' of https://github.com/chkr1011/MQTTnet into develop

release/3.x.x
Christian Kratky 7 years ago
parent
commit
68aae70b44
8 changed files with 86 additions and 90 deletions
  1. +3
    -3
      Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs
  2. +2
    -2
      Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
  3. +2
    -2
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  4. +7
    -7
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  5. +24
    -28
      MQTTnet.Core/Client/MqttClient.cs
  6. +9
    -9
      MQTTnet.Core/Serializer/MqttPacketReader.cs
  7. +35
    -35
      MQTTnet.Core/Serializer/MqttPacketSerializer.cs
  8. +4
    -4
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs

+ 3
- 3
Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs View File

@@ -85,7 +85,7 @@ namespace MQTTnet.Implementations
{
try
{
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null);
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
}
@@ -105,10 +105,10 @@ namespace MQTTnet.Implementations
{
try
{
var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null);
var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null).ConfigureAwait(false);

var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false);
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));


+ 2
- 2
Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs View File

@@ -35,12 +35,12 @@ namespace MQTTnet.Implementations
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}

await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null);
await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null).ConfigureAwait(false);

if (options.TlsOptions.UseTls)
{
_sslStream = new SslStream(new NetworkStream(_socket, true));
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation);
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
}
}
catch (SocketException exception)


+ 2
- 2
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs View File

@@ -35,12 +35,12 @@ namespace MQTTnet.Implementations
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}

await _socket.ConnectAsync(options.Server, options.GetPort());
await _socket.ConnectAsync(options.Server, options.GetPort()).ConfigureAwait(false);
if (options.TlsOptions.UseTls)
{
_sslStream = new SslStream(new NetworkStream(_socket, true));
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation);
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
}
}
catch (SocketException exception)


+ 7
- 7
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs View File

@@ -23,19 +23,19 @@ namespace MQTTnet.Core.Adapter

public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
{
await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout);
await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout).ConfigureAwait(false);
}

public async Task DisconnectAsync()
{
await _channel.DisconnectAsync();
await _channel.DisconnectAsync().ConfigureAwait(false);
}

public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout)
{
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]");

await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout);
await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout).ConfigureAwait(false);
}

public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
@@ -43,11 +43,11 @@ namespace MQTTnet.Core.Adapter
MqttBasePacket packet;
if (timeout > TimeSpan.Zero)
{
packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout);
packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout).ConfigureAwait(false);
}
else
{
packet = await PacketSerializer.DeserializeAsync(_channel);
packet = await PacketSerializer.DeserializeAsync(_channel).ConfigureAwait(false);
}

if (packet == null)
@@ -62,7 +62,7 @@ namespace MQTTnet.Core.Adapter
private static async Task<TResult> ExecuteWithTimeoutAsync<TResult>(Task<TResult> task, TimeSpan timeout)
{
var timeoutTask = Task.Delay(timeout);
if (await Task.WhenAny(timeoutTask, task) == timeoutTask)
if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask)
{
throw new MqttCommunicationTimedOutException();
}
@@ -78,7 +78,7 @@ namespace MQTTnet.Core.Adapter
private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout)
{
var timeoutTask = Task.Delay(timeout);
if (await Task.WhenAny(timeoutTask, task) == timeoutTask)
if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask)
{
throw new MqttCommunicationTimedOutException();
}


+ 24
- 28
MQTTnet.Core/Client/MqttClient.cs View File

@@ -53,7 +53,7 @@ namespace MQTTnet.Core.Client
{
_disconnectedEventSuspended = false;

await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout);
await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout).ConfigureAwait(false);

MqttTrace.Verbose(nameof(MqttClient), "Connection with server established.");

@@ -73,10 +73,10 @@ namespace MQTTnet.Core.Client

StartReceivePackets();

var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket);
var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket).ConfigureAwait(false);
if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
throw new MqttConnectingFailedException(response.ConnectReturnCode);
}

@@ -92,7 +92,7 @@ namespace MQTTnet.Core.Client
}
catch (Exception)
{
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
throw;
}
}
@@ -101,11 +101,11 @@ namespace MQTTnet.Core.Client
{
try
{
await SendAsync(new MqttDisconnectPacket());
await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false);
}
finally
{
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
}
}

@@ -158,7 +158,7 @@ namespace MQTTnet.Core.Client
TopicFilters = topicFilters
};

await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket);
await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket).ConfigureAwait(false);
}

public async Task PublishAsync(MqttApplicationMessage applicationMessage)
@@ -171,18 +171,18 @@ namespace MQTTnet.Core.Client
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await SendAsync(publishPacket);
await SendAsync(publishPacket).ConfigureAwait(false);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket);
await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket).ConfigureAwait(false);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket);
await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>());
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket).ConfigureAwait(false);
await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>()).ConfigureAwait(false);
}
}

@@ -195,7 +195,7 @@ namespace MQTTnet.Core.Client
{
try
{
await _adapter.DisconnectAsync();
await _adapter.DisconnectAsync().ConfigureAwait(false);
}
catch (Exception exception)
{
@@ -301,7 +301,7 @@ namespace MQTTnet.Core.Client
_unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
}

await SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>());
await SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>()).ConfigureAwait(false);
}

private Task SendAsync(MqttBasePacket packet)
@@ -313,16 +313,12 @@ namespace MQTTnet.Core.Client
{
bool ResponsePacketSelector(MqttBasePacket p)
{
var p1 = p as TResponsePacket;
if (p1 == null)
if (!(p is TResponsePacket p1))
{
return false;
}

var pi1 = requestPacket as IMqttPacketWithIdentifier;
var pi2 = p as IMqttPacketWithIdentifier;

if (pi1 == null || pi2 == null)
if (!(requestPacket is IMqttPacketWithIdentifier pi1) || !(p is IMqttPacketWithIdentifier pi2))
{
return true;
}
@@ -330,8 +326,8 @@ namespace MQTTnet.Core.Client
return pi1.PacketIdentifier == pi2.PacketIdentifier;
}

await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout);
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout);
await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false);
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout).ConfigureAwait(false);
}

private ushort GetNewPacketIdentifier()
@@ -347,19 +343,19 @@ namespace MQTTnet.Core.Client
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(_options.KeepAlivePeriod, cancellationToken);
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket());
await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false);
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false);
}
}
catch (MqttCommunicationException exception)
{
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets.");
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
}
catch (Exception exception)
{
MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets.");
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
}
finally
{
@@ -374,7 +370,7 @@ namespace MQTTnet.Core.Client
{
while (!cancellationToken.IsCancellationRequested)
{
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero);
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero).ConfigureAwait(false);
MqttTrace.Information(nameof(MqttClient), $"Received <<< {packet}");

StartProcessReceivedPacket(packet, cancellationToken);
@@ -383,12 +379,12 @@ namespace MQTTnet.Core.Client
catch (MqttCommunicationException exception)
{
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets.");
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
}
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets.");
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
}
finally
{


+ 9
- 9
MQTTnet.Core/Serializer/MqttPacketReader.cs View File

@@ -29,7 +29,7 @@ namespace MQTTnet.Core.Serializer
public async Task ReadToEndAsync()
{
await ReadFixedHeaderAsync();
await ReadRemainingLengthAsync();
await ReadRemainingLengthAsync().ConfigureAwait(false);

if (_remainingLength == 0)
{
@@ -37,7 +37,7 @@ namespace MQTTnet.Core.Serializer
}

var buffer = new byte[_remainingLength];
await ReadFromSourceAsync(buffer);
await ReadFromSourceAsync(buffer).ConfigureAwait(false);
_remainingData.Write(buffer, 0, buffer.Length);
_remainingData.Position = 0;
@@ -45,12 +45,12 @@ namespace MQTTnet.Core.Serializer

public async Task<byte> ReadRemainingDataByteAsync()
{
return (await ReadRemainingDataAsync(1))[0];
return (await ReadRemainingDataAsync(1).ConfigureAwait(false))[0];
}

public async Task<ushort> ReadRemainingDataUShortAsync()
{
var buffer = await ReadRemainingDataAsync(2);
var buffer = await ReadRemainingDataAsync(2).ConfigureAwait(false);

var temp = buffer[0];
buffer[0] = buffer[1];
@@ -68,7 +68,7 @@ namespace MQTTnet.Core.Serializer
public async Task<byte[]> ReadRemainingDataWithLengthPrefixAsync()
{
var length = await ReadRemainingDataUShortAsync();
return await ReadRemainingDataAsync(length);
return await ReadRemainingDataAsync(length).ConfigureAwait(false);
}

public Task<byte[]> ReadRemainingDataAsync()
@@ -79,7 +79,7 @@ namespace MQTTnet.Core.Serializer
public async Task<byte[]> ReadRemainingDataAsync(int length)
{
var buffer = new byte[length];
await _remainingData.ReadAsync(buffer, 0, buffer.Length);
await _remainingData.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);

return buffer;
}
@@ -92,7 +92,7 @@ namespace MQTTnet.Core.Serializer
byte encodedByte;
do
{
encodedByte = await ReadStreamByteAsync();
encodedByte = await ReadStreamByteAsync().ConfigureAwait(false);
value += (encodedByte & 127) * multiplier;
multiplier *= 128;
if (multiplier > 128 * 128 * 128)
@@ -119,13 +119,13 @@ namespace MQTTnet.Core.Serializer
private async Task<byte> ReadStreamByteAsync()
{
var buffer = new byte[1];
await ReadFromSourceAsync(buffer);
await ReadFromSourceAsync(buffer).ConfigureAwait(false);
return buffer[0];
}

private async Task ReadFixedHeaderAsync()
{
FixedHeader = await ReadStreamByteAsync();
FixedHeader = await ReadStreamByteAsync().ConfigureAwait(false);

var byteReader = new ByteReader(FixedHeader);
byteReader.Read(4);


+ 35
- 35
MQTTnet.Core/Serializer/MqttPacketSerializer.cs View File

@@ -100,18 +100,18 @@ namespace MQTTnet.Core.Serializer

using (var mqttPacketReader = new MqttPacketReader(source))
{
await mqttPacketReader.ReadToEndAsync();
await mqttPacketReader.ReadToEndAsync().ConfigureAwait(false);

switch (mqttPacketReader.ControlPacketType)
{
case MqttControlPacketType.Connect:
{
return await DeserializeConnectAsync(mqttPacketReader);
return await DeserializeConnectAsync(mqttPacketReader).ConfigureAwait(false);
}

case MqttControlPacketType.ConnAck:
{
return await DeserializeConnAck(mqttPacketReader);
return await DeserializeConnAck(mqttPacketReader).ConfigureAwait(false);
}

case MqttControlPacketType.Disconnect:
@@ -121,14 +121,14 @@ namespace MQTTnet.Core.Serializer

case MqttControlPacketType.Publish:
{
return await DeserializePublishAsync(mqttPacketReader);
return await DeserializePublishAsync(mqttPacketReader).ConfigureAwait(false);
}

case MqttControlPacketType.PubAck:
{
return new MqttPubAckPacket
{
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync()
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false)
};
}

@@ -136,7 +136,7 @@ namespace MQTTnet.Core.Serializer
{
return new MqttPubRecPacket
{
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync()
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false)
};
}

@@ -144,7 +144,7 @@ namespace MQTTnet.Core.Serializer
{
return new MqttPubRelPacket
{
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync()
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false)
};
}

@@ -152,7 +152,7 @@ namespace MQTTnet.Core.Serializer
{
return new MqttPubCompPacket
{
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync()
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false)
};
}

@@ -168,24 +168,24 @@ namespace MQTTnet.Core.Serializer

case MqttControlPacketType.Subscribe:
{
return await DeserializeSubscribeAsync(mqttPacketReader);
return await DeserializeSubscribeAsync(mqttPacketReader).ConfigureAwait(false);
}

case MqttControlPacketType.SubAck:
{
return await DeserializeSubAck(mqttPacketReader);
return await DeserializeSubAck(mqttPacketReader).ConfigureAwait(false);
}

case MqttControlPacketType.Unsubscibe:
{
return await DeserializeUnsubscribeAsync(mqttPacketReader);
return await DeserializeUnsubscribeAsync(mqttPacketReader).ConfigureAwait(false);
}

case MqttControlPacketType.UnsubAck:
{
return new MqttUnsubAckPacket
{
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync()
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false)
};
}

@@ -201,12 +201,12 @@ namespace MQTTnet.Core.Serializer
{
var packet = new MqttUnsubscribePacket
{
PacketIdentifier = await reader.ReadRemainingDataUShortAsync(),
PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false),
};

while (!reader.EndOfRemainingData)
{
packet.TopicFilters.Add(await reader.ReadRemainingDataStringWithLengthPrefixAsync());
packet.TopicFilters.Add(await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false));
}

return packet;
@@ -216,14 +216,14 @@ namespace MQTTnet.Core.Serializer
{
var packet = new MqttSubscribePacket
{
PacketIdentifier = await reader.ReadRemainingDataUShortAsync(),
PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false),
};

while (!reader.EndOfRemainingData)
{
packet.TopicFilters.Add(new TopicFilter(
await reader.ReadRemainingDataStringWithLengthPrefixAsync(),
(MqttQualityOfServiceLevel)await reader.ReadRemainingDataByteAsync()));
(MqttQualityOfServiceLevel)await reader.ReadRemainingDataByteAsync().ConfigureAwait(false)));
}

return packet;
@@ -236,12 +236,12 @@ namespace MQTTnet.Core.Serializer
var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2);
var dup = fixedHeader.Read();

var topic = await reader.ReadRemainingDataStringWithLengthPrefixAsync();
var topic = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false);

ushort packetIdentifier = 0;
if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
packetIdentifier = await reader.ReadRemainingDataUShortAsync();
packetIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false);
}

var packet = new MqttPublishPacket
@@ -250,7 +250,7 @@ namespace MQTTnet.Core.Serializer
QualityOfServiceLevel = qualityOfServiceLevel,
Dup = dup,
Topic = topic,
Payload = await reader.ReadRemainingDataAsync(),
Payload = await reader.ReadRemainingDataAsync().ConfigureAwait(false),
PacketIdentifier = packetIdentifier
};

@@ -259,13 +259,13 @@ namespace MQTTnet.Core.Serializer

private static async Task<MqttBasePacket> DeserializeConnectAsync(MqttPacketReader reader)
{
await reader.ReadRemainingDataAsync(2); // Skip 2 bytes
await reader.ReadRemainingDataAsync(2).ConfigureAwait(false); // Skip 2 bytes

MqttProtocolVersion protocolVersion;
var protocolName = await reader.ReadRemainingDataAsync(4);
var protocolName = await reader.ReadRemainingDataAsync(4).ConfigureAwait(false);
if (protocolName.SequenceEqual(ProtocolVersionV310Name))
{
await reader.ReadRemainingDataAsync(2);
await reader.ReadRemainingDataAsync(2).ConfigureAwait(false);
protocolVersion = MqttProtocolVersion.V310;
}
else if (protocolName.SequenceEqual(ProtocolVersionV311Name))
@@ -277,8 +277,8 @@ namespace MQTTnet.Core.Serializer
throw new MqttProtocolViolationException("Protocol name is not supported.");
}

var protocolLevel = await reader.ReadRemainingDataByteAsync();
var connectFlags = await reader.ReadRemainingDataByteAsync();
var protocolLevel = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false);
var connectFlags = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false);

var connectFlagsReader = new ByteReader(connectFlags);
connectFlagsReader.Read(); // Reserved.
@@ -295,26 +295,26 @@ namespace MQTTnet.Core.Serializer
var passwordFlag = connectFlagsReader.Read();
var usernameFlag = connectFlagsReader.Read();

packet.KeepAlivePeriod = await reader.ReadRemainingDataUShortAsync();
packet.ClientId = await reader.ReadRemainingDataStringWithLengthPrefixAsync();
packet.KeepAlivePeriod = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false);
packet.ClientId = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false);

if (willFlag)
{
packet.WillMessage = new MqttApplicationMessage(
await reader.ReadRemainingDataStringWithLengthPrefixAsync(),
await reader.ReadRemainingDataWithLengthPrefixAsync(),
await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false),
await reader.ReadRemainingDataWithLengthPrefixAsync().ConfigureAwait(false),
(MqttQualityOfServiceLevel)willQoS,
willRetain);
}

if (usernameFlag)
{
packet.Username = await reader.ReadRemainingDataStringWithLengthPrefixAsync();
packet.Username = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false);
}

if (passwordFlag)
{
packet.Password = await reader.ReadRemainingDataStringWithLengthPrefixAsync();
packet.Password = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false);
}

ValidateConnectPacket(packet);
@@ -325,12 +325,12 @@ namespace MQTTnet.Core.Serializer
{
var packet = new MqttSubAckPacket
{
PacketIdentifier = await reader.ReadRemainingDataUShortAsync()
PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false)
};

while (!reader.EndOfRemainingData)
{
packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)await reader.ReadRemainingDataByteAsync());
packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)await reader.ReadRemainingDataByteAsync().ConfigureAwait(false));
}

return packet;
@@ -338,8 +338,8 @@ namespace MQTTnet.Core.Serializer

private static async Task<MqttBasePacket> DeserializeConnAck(MqttPacketReader reader)
{
var variableHeader1 = await reader.ReadRemainingDataByteAsync();
var variableHeader2 = await reader.ReadRemainingDataByteAsync();
var variableHeader1 = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false);
var variableHeader2 = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false);

var packet = new MqttConnAckPacket
{
@@ -457,7 +457,7 @@ namespace MQTTnet.Core.Serializer
output.Write(packet.PacketIdentifier);

output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02);
await output.WriteToAsync(destination);
await output.WriteToAsync(destination).ConfigureAwait(false);
}
}



+ 4
- 4
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs View File

@@ -18,12 +18,12 @@ namespace MQTTnet.Core.Tests

public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
{
await Task.FromResult(0);
await Task.FromResult(0).ConfigureAwait(false);
}

public async Task DisconnectAsync()
{
await Task.FromResult(0);
await Task.FromResult(0).ConfigureAwait(false);
}

public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout)
@@ -31,14 +31,14 @@ namespace MQTTnet.Core.Tests
ThrowIfPartnerIsNull();

Partner.SendPacketInternal(packet);
await Task.FromResult(0);
await Task.FromResult(0).ConfigureAwait(false);
}

public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
{
ThrowIfPartnerIsNull();

return await Task.Run(() => _incomingPackets.Take());
return await Task.Run(() => _incomingPackets.Take()).ConfigureAwait(false);
}

private void SendPacketInternal(MqttBasePacket packet)


Loading…
Cancel
Save