Browse Source

Performance improvements in area of logging and async/await

release/3.x.x
Christian Kratky 7 years ago
parent
commit
8aec1583df
12 changed files with 153 additions and 132 deletions
  1. +3
    -3
      Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs
  2. +2
    -2
      Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
  3. +2
    -2
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  4. +12
    -12
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  5. +37
    -28
      MQTTnet.Core/Client/MqttClient.cs
  6. +27
    -14
      MQTTnet.Core/Diagnostics/MqttTrace.cs
  7. +18
    -19
      MQTTnet.Core/Serializer/MqttPacketReader.cs
  8. +41
    -41
      MQTTnet.Core/Serializer/MqttPacketSerializer.cs
  9. +3
    -3
      MQTTnet.Core/Server/MqttClientSession.cs
  10. +3
    -3
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  11. +1
    -1
      MQTTnet.Core/Server/MqttServer.cs
  12. +4
    -4
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs

+ 3
- 3
Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs View File

@@ -85,7 +85,7 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false);
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
} }
@@ -105,10 +105,10 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null).ConfigureAwait(false);
var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null);


var sslStream = new SslStream(new NetworkStream(clientSocket)); var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false);
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));


+ 2
- 2
Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs View File

@@ -35,12 +35,12 @@ namespace MQTTnet.Implementations
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp); _socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
} }


await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null).ConfigureAwait(false);
await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null);


if (options.TlsOptions.UseTls) if (options.TlsOptions.UseTls)
{ {
_sslStream = new SslStream(new NetworkStream(_socket, true)); _sslStream = new SslStream(new NetworkStream(_socket, true));
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation);
} }
} }
catch (SocketException exception) catch (SocketException exception)


+ 2
- 2
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs View File

@@ -35,12 +35,12 @@ namespace MQTTnet.Implementations
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp); _socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
} }


await _socket.ConnectAsync(options.Server, options.GetPort()).ConfigureAwait(false);
await _socket.ConnectAsync(options.Server, options.GetPort());
if (options.TlsOptions.UseTls) if (options.TlsOptions.UseTls)
{ {
_sslStream = new SslStream(new NetworkStream(_socket, true)); _sslStream = new SslStream(new NetworkStream(_socket, true));
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation);
} }
} }
catch (SocketException exception) catch (SocketException exception)


+ 12
- 12
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs View File

@@ -21,21 +21,21 @@ namespace MQTTnet.Core.Adapter


public IMqttPacketSerializer PacketSerializer { get; } public IMqttPacketSerializer PacketSerializer { get; }


public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
public Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
{ {
await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout).ConfigureAwait(false);
return ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout);
} }


public async Task DisconnectAsync()
public Task DisconnectAsync()
{ {
await _channel.DisconnectAsync().ConfigureAwait(false);
return _channel.DisconnectAsync();
} }


public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout)
public Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout)
{ {
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]");
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout);


await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout).ConfigureAwait(false);
return ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout);
} }


public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout) public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
@@ -43,11 +43,11 @@ namespace MQTTnet.Core.Adapter
MqttBasePacket packet; MqttBasePacket packet;
if (timeout > TimeSpan.Zero) if (timeout > TimeSpan.Zero)
{ {
packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout).ConfigureAwait(false);
packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout);
} }
else else
{ {
packet = await PacketSerializer.DeserializeAsync(_channel).ConfigureAwait(false);
packet = await PacketSerializer.DeserializeAsync(_channel);
} }


if (packet == null) if (packet == null)
@@ -55,14 +55,14 @@ namespace MQTTnet.Core.Adapter
throw new MqttProtocolViolationException("Received malformed packet."); throw new MqttProtocolViolationException("Received malformed packet.");
} }


MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"RX <<< {packet}");
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "RX <<< {0}", packet);
return packet; return packet;
} }


private static async Task<TResult> ExecuteWithTimeoutAsync<TResult>(Task<TResult> task, TimeSpan timeout) private static async Task<TResult> ExecuteWithTimeoutAsync<TResult>(Task<TResult> task, TimeSpan timeout)
{ {
var timeoutTask = Task.Delay(timeout); var timeoutTask = Task.Delay(timeout);
if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask)
if (await Task.WhenAny(timeoutTask, task) == timeoutTask)
{ {
throw new MqttCommunicationTimedOutException(); throw new MqttCommunicationTimedOutException();
} }
@@ -78,7 +78,7 @@ namespace MQTTnet.Core.Adapter
private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout)
{ {
var timeoutTask = Task.Delay(timeout); var timeoutTask = Task.Delay(timeout);
if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask)
if (await Task.WhenAny(timeoutTask, task) == timeoutTask)
{ {
throw new MqttCommunicationTimedOutException(); throw new MqttCommunicationTimedOutException();
} }


+ 37
- 28
MQTTnet.Core/Client/MqttClient.cs View File

@@ -53,7 +53,7 @@ namespace MQTTnet.Core.Client
{ {
_disconnectedEventSuspended = false; _disconnectedEventSuspended = false;


await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout).ConfigureAwait(false);
await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout);


MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); MqttTrace.Verbose(nameof(MqttClient), "Connection with server established.");


@@ -73,10 +73,10 @@ namespace MQTTnet.Core.Client


StartReceivePackets(); StartReceivePackets();


var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket).ConfigureAwait(false);
var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket);
if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{ {
await DisconnectInternalAsync().ConfigureAwait(false);
await DisconnectInternalAsync();
throw new MqttConnectingFailedException(response.ConnectReturnCode); throw new MqttConnectingFailedException(response.ConnectReturnCode);
} }


@@ -92,7 +92,7 @@ namespace MQTTnet.Core.Client
} }
catch (Exception) catch (Exception)
{ {
await DisconnectInternalAsync().ConfigureAwait(false);
await DisconnectInternalAsync();
throw; throw;
} }
} }
@@ -101,11 +101,11 @@ namespace MQTTnet.Core.Client
{ {
try try
{ {
await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false);
await SendAsync(new MqttDisconnectPacket());
} }
finally finally
{ {
await DisconnectInternalAsync().ConfigureAwait(false);
await DisconnectInternalAsync();
} }
} }


@@ -146,7 +146,7 @@ namespace MQTTnet.Core.Client
return Unsubscribe(topicFilters.ToList()); return Unsubscribe(topicFilters.ToList());
} }


public async Task Unsubscribe(IList<string> topicFilters)
public Task Unsubscribe(IList<string> topicFilters)
{ {
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
if (!topicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2]."); if (!topicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");
@@ -158,10 +158,10 @@ namespace MQTTnet.Core.Client
TopicFilters = topicFilters TopicFilters = topicFilters
}; };


await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket).ConfigureAwait(false);
return SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket);
} }


public async Task PublishAsync(MqttApplicationMessage applicationMessage)
public Task PublishAsync(MqttApplicationMessage applicationMessage)
{ {
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
ThrowIfNotConnected(); ThrowIfNotConnected();
@@ -171,19 +171,28 @@ namespace MQTTnet.Core.Client
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{ {
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await SendAsync(publishPacket).ConfigureAwait(false);
return SendAsync(publishPacket);
} }
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{ {
publishPacket.PacketIdentifier = GetNewPacketIdentifier(); publishPacket.PacketIdentifier = GetNewPacketIdentifier();
await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket).ConfigureAwait(false);
return SendAndReceiveAsync<MqttPubAckPacket>(publishPacket);
} }
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{ {
publishPacket.PacketIdentifier = GetNewPacketIdentifier(); publishPacket.PacketIdentifier = GetNewPacketIdentifier();
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket).ConfigureAwait(false);
await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>()).ConfigureAwait(false);
return PublishExactlyOncePacketAsync(publishPacket);
} }

throw new InvalidOperationException();
}

private async Task PublishExactlyOncePacketAsync(MqttBasePacket publishPacket)
{
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket);
await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>());
} }


private void ThrowIfNotConnected() private void ThrowIfNotConnected()
@@ -195,7 +204,7 @@ namespace MQTTnet.Core.Client
{ {
try try
{ {
await _adapter.DisconnectAsync().ConfigureAwait(false);
await _adapter.DisconnectAsync();
} }
catch (Exception exception) catch (Exception exception)
{ {
@@ -294,14 +303,14 @@ namespace MQTTnet.Core.Client
throw new MqttCommunicationException("Received a not supported QoS level."); throw new MqttCommunicationException("Received a not supported QoS level.");
} }


private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
{ {
lock (_unacknowledgedPublishPackets) lock (_unacknowledgedPublishPackets)
{ {
_unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
} }


await SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>()).ConfigureAwait(false);
return SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>());
} }


private Task SendAsync(MqttBasePacket packet) private Task SendAsync(MqttBasePacket packet)
@@ -326,8 +335,8 @@ namespace MQTTnet.Core.Client
return pi1.PacketIdentifier == pi2.PacketIdentifier; return pi1.PacketIdentifier == pi2.PacketIdentifier;
} }


await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false);
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout).ConfigureAwait(false);
await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout);
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout);
} }


private ushort GetNewPacketIdentifier() private ushort GetNewPacketIdentifier()
@@ -343,19 +352,19 @@ namespace MQTTnet.Core.Client
{ {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false);
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false);
await Task.Delay(_options.KeepAlivePeriod, cancellationToken);
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket());
} }
} }
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets.");
await DisconnectInternalAsync().ConfigureAwait(false);
await DisconnectInternalAsync();
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets."); MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets.");
await DisconnectInternalAsync().ConfigureAwait(false);
await DisconnectInternalAsync();
} }
finally finally
{ {
@@ -370,8 +379,8 @@ namespace MQTTnet.Core.Client
{ {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero).ConfigureAwait(false);
MqttTrace.Information(nameof(MqttClient), $"Received <<< {packet}");
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero);
MqttTrace.Information(nameof(MqttClient), "Received <<< {0}", packet);


StartProcessReceivedPacket(packet, cancellationToken); StartProcessReceivedPacket(packet, cancellationToken);
} }
@@ -379,12 +388,12 @@ namespace MQTTnet.Core.Client
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets.");
await DisconnectInternalAsync().ConfigureAwait(false);
await DisconnectInternalAsync();
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets.");
await DisconnectInternalAsync().ConfigureAwait(false);
await DisconnectInternalAsync();
} }
finally finally
{ {


+ 27
- 14
MQTTnet.Core/Diagnostics/MqttTrace.cs View File

@@ -1,4 +1,5 @@
using System; using System;
using System.Linq;


namespace MQTTnet.Core.Diagnostics namespace MQTTnet.Core.Diagnostics
{ {
@@ -6,37 +7,37 @@ namespace MQTTnet.Core.Diagnostics
{ {
public static event EventHandler<MqttTraceMessagePublishedEventArgs> TraceMessagePublished; public static event EventHandler<MqttTraceMessagePublishedEventArgs> TraceMessagePublished;


public static void Verbose(string source, string message)
public static void Verbose(string source, string message, params object[] parameters)
{ {
Publish(source, MqttTraceLevel.Verbose, null, message);
Publish(source, MqttTraceLevel.Verbose, null, message, parameters);
} }


public static void Information(string source, string message)
public static void Information(string source, string message, params object[] parameters)
{ {
Publish(source, MqttTraceLevel.Information, null, message);
Publish(source, MqttTraceLevel.Information, null, message, parameters);
} }


public static void Warning(string source, string message)
public static void Warning(string source, string message, params object[] parameters)
{ {
Publish(source, MqttTraceLevel.Warning, null, message);
Publish(source, MqttTraceLevel.Warning, null, message, parameters);
} }


public static void Warning(string source, Exception exception, string message)
public static void Warning(string source, Exception exception, string message, params object[] parameters)
{ {
Publish(source, MqttTraceLevel.Warning, exception, message);
Publish(source, MqttTraceLevel.Warning, exception, message, parameters);
} }


public static void Error(string source, string message)
public static void Error(string source, string message, params object[] parameters)
{ {
Publish(source, MqttTraceLevel.Error, null, message);
Publish(source, MqttTraceLevel.Error, null, message, parameters);
} }


public static void Error(string source, Exception exception, string message)
public static void Error(string source, Exception exception, string message, params object[] parameters)
{ {
Publish(source, MqttTraceLevel.Error, exception, message);
Publish(source, MqttTraceLevel.Error, exception, message, parameters);
} }


private static void Publish(string source, MqttTraceLevel traceLevel, Exception exception, string message)
private static void Publish(string source, MqttTraceLevel traceLevel, Exception exception, string message, params object[] parameters)
{ {
var handler = TraceMessagePublished; var handler = TraceMessagePublished;
if (handler == null) if (handler == null)
@@ -44,7 +45,19 @@ namespace MQTTnet.Core.Diagnostics
return; return;
} }


message = string.Format(message, 1);
if (parameters?.Length > 0)
{
try
{
message = string.Format(message, parameters);
}
catch (Exception formatException)
{
Error(nameof(MqttTrace), formatException, "Error while tracing message: " + message);
return;
}
}

handler.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, traceLevel, message, exception)); handler.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, traceLevel, message, exception));
} }
} }


+ 18
- 19
MQTTnet.Core/Serializer/MqttPacketReader.cs View File

@@ -29,7 +29,7 @@ namespace MQTTnet.Core.Serializer
public async Task ReadToEndAsync() public async Task ReadToEndAsync()
{ {
await ReadFixedHeaderAsync(); await ReadFixedHeaderAsync();
await ReadRemainingLengthAsync().ConfigureAwait(false);
await ReadRemainingLengthAsync();


if (_remainingLength == 0) if (_remainingLength == 0)
{ {
@@ -37,20 +37,20 @@ namespace MQTTnet.Core.Serializer
} }


var buffer = new byte[_remainingLength]; var buffer = new byte[_remainingLength];
await ReadFromSourceAsync(buffer).ConfigureAwait(false);
await ReadFromSourceAsync(buffer);
_remainingData.Write(buffer, 0, buffer.Length); _remainingData.Write(buffer, 0, buffer.Length);
_remainingData.Position = 0; _remainingData.Position = 0;
} }


public async Task<byte> ReadRemainingDataByteAsync()
public byte ReadRemainingDataByte()
{ {
return (await ReadRemainingDataAsync(1).ConfigureAwait(false))[0];
return ReadRemainingData(1)[0];
} }


public async Task<ushort> ReadRemainingDataUShortAsync()
public ushort ReadRemainingDataUShort()
{ {
var buffer = await ReadRemainingDataAsync(2).ConfigureAwait(false);
var buffer = ReadRemainingData(2);


var temp = buffer[0]; var temp = buffer[0];
buffer[0] = buffer[1]; buffer[0] = buffer[1];
@@ -59,28 +59,27 @@ namespace MQTTnet.Core.Serializer
return BitConverter.ToUInt16(buffer, 0); return BitConverter.ToUInt16(buffer, 0);
} }


public async Task<string> ReadRemainingDataStringWithLengthPrefixAsync()
public string ReadRemainingDataStringWithLengthPrefix()
{ {
var buffer = await ReadRemainingDataWithLengthPrefixAsync();
var buffer = ReadRemainingDataWithLengthPrefix();
return Encoding.UTF8.GetString(buffer, 0, buffer.Length); return Encoding.UTF8.GetString(buffer, 0, buffer.Length);
} }


public async Task<byte[]> ReadRemainingDataWithLengthPrefixAsync()
public byte[] ReadRemainingDataWithLengthPrefix()
{ {
var length = await ReadRemainingDataUShortAsync();
return await ReadRemainingDataAsync(length).ConfigureAwait(false);
var length = ReadRemainingDataUShort();
return ReadRemainingData(length);
} }


public Task<byte[]> ReadRemainingDataAsync()
public byte[] ReadRemainingData()
{ {
return ReadRemainingDataAsync(_remainingLength - (int)_remainingData.Position);
return ReadRemainingData(_remainingLength - (int)_remainingData.Position);
} }


public async Task<byte[]> ReadRemainingDataAsync(int length)
public byte[] ReadRemainingData(int length)
{ {
var buffer = new byte[length]; var buffer = new byte[length];
await _remainingData.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);

_remainingData.Read(buffer, 0, buffer.Length);
return buffer; return buffer;
} }


@@ -92,7 +91,7 @@ namespace MQTTnet.Core.Serializer
byte encodedByte; byte encodedByte;
do do
{ {
encodedByte = await ReadStreamByteAsync().ConfigureAwait(false);
encodedByte = await ReadStreamByteAsync();
value += (encodedByte & 127) * multiplier; value += (encodedByte & 127) * multiplier;
multiplier *= 128; multiplier *= 128;
if (multiplier > 128 * 128 * 128) if (multiplier > 128 * 128 * 128)
@@ -119,13 +118,13 @@ namespace MQTTnet.Core.Serializer
private async Task<byte> ReadStreamByteAsync() private async Task<byte> ReadStreamByteAsync()
{ {
var buffer = new byte[1]; var buffer = new byte[1];
await ReadFromSourceAsync(buffer).ConfigureAwait(false);
await ReadFromSourceAsync(buffer);
return buffer[0]; return buffer[0];
} }


private async Task ReadFixedHeaderAsync() private async Task ReadFixedHeaderAsync()
{ {
FixedHeader = await ReadStreamByteAsync().ConfigureAwait(false);
FixedHeader = await ReadStreamByteAsync();


var byteReader = new ByteReader(FixedHeader); var byteReader = new ByteReader(FixedHeader);
byteReader.Read(4); byteReader.Read(4);


+ 41
- 41
MQTTnet.Core/Serializer/MqttPacketSerializer.cs View File

@@ -106,12 +106,12 @@ namespace MQTTnet.Core.Serializer
{ {
case MqttControlPacketType.Connect: case MqttControlPacketType.Connect:
{ {
return await DeserializeConnectAsync(mqttPacketReader).ConfigureAwait(false);
return DeserializeConnect(mqttPacketReader);
} }


case MqttControlPacketType.ConnAck: case MqttControlPacketType.ConnAck:
{ {
return await DeserializeConnAck(mqttPacketReader).ConfigureAwait(false);
return DeserializeConnAck(mqttPacketReader);
} }


case MqttControlPacketType.Disconnect: case MqttControlPacketType.Disconnect:
@@ -121,14 +121,14 @@ namespace MQTTnet.Core.Serializer


case MqttControlPacketType.Publish: case MqttControlPacketType.Publish:
{ {
return await DeserializePublishAsync(mqttPacketReader).ConfigureAwait(false);
return DeserializePublish(mqttPacketReader);
} }


case MqttControlPacketType.PubAck: case MqttControlPacketType.PubAck:
{ {
return new MqttPubAckPacket return new MqttPubAckPacket
{ {
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false)
PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort()
}; };
} }


@@ -136,7 +136,7 @@ namespace MQTTnet.Core.Serializer
{ {
return new MqttPubRecPacket return new MqttPubRecPacket
{ {
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false)
PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort()
}; };
} }


@@ -144,7 +144,7 @@ namespace MQTTnet.Core.Serializer
{ {
return new MqttPubRelPacket return new MqttPubRelPacket
{ {
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false)
PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort()
}; };
} }


@@ -152,7 +152,7 @@ namespace MQTTnet.Core.Serializer
{ {
return new MqttPubCompPacket return new MqttPubCompPacket
{ {
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false)
PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort()
}; };
} }


@@ -168,24 +168,24 @@ namespace MQTTnet.Core.Serializer


case MqttControlPacketType.Subscribe: case MqttControlPacketType.Subscribe:
{ {
return await DeserializeSubscribeAsync(mqttPacketReader).ConfigureAwait(false);
return DeserializeSubscribe(mqttPacketReader);
} }


case MqttControlPacketType.SubAck: case MqttControlPacketType.SubAck:
{ {
return await DeserializeSubAck(mqttPacketReader).ConfigureAwait(false);
return DeserializeSubAck(mqttPacketReader);
} }


case MqttControlPacketType.Unsubscibe: case MqttControlPacketType.Unsubscibe:
{ {
return await DeserializeUnsubscribeAsync(mqttPacketReader).ConfigureAwait(false);
return DeserializeUnsubscribe(mqttPacketReader);
} }


case MqttControlPacketType.UnsubAck: case MqttControlPacketType.UnsubAck:
{ {
return new MqttUnsubAckPacket return new MqttUnsubAckPacket
{ {
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false)
PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort()
}; };
} }


@@ -197,51 +197,51 @@ namespace MQTTnet.Core.Serializer
} }
} }


private static async Task<MqttBasePacket> DeserializeUnsubscribeAsync(MqttPacketReader reader)
private static MqttBasePacket DeserializeUnsubscribe(MqttPacketReader reader)
{ {
var packet = new MqttUnsubscribePacket var packet = new MqttUnsubscribePacket
{ {
PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false),
PacketIdentifier = reader.ReadRemainingDataUShort(),
}; };


while (!reader.EndOfRemainingData) while (!reader.EndOfRemainingData)
{ {
packet.TopicFilters.Add(await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false));
packet.TopicFilters.Add(reader.ReadRemainingDataStringWithLengthPrefix());
} }


return packet; return packet;
} }


private static async Task<MqttBasePacket> DeserializeSubscribeAsync(MqttPacketReader reader)
private static MqttBasePacket DeserializeSubscribe(MqttPacketReader reader)
{ {
var packet = new MqttSubscribePacket var packet = new MqttSubscribePacket
{ {
PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false),
PacketIdentifier = reader.ReadRemainingDataUShort()
}; };


while (!reader.EndOfRemainingData) while (!reader.EndOfRemainingData)
{ {
packet.TopicFilters.Add(new TopicFilter( packet.TopicFilters.Add(new TopicFilter(
await reader.ReadRemainingDataStringWithLengthPrefixAsync(),
(MqttQualityOfServiceLevel)await reader.ReadRemainingDataByteAsync().ConfigureAwait(false)));
reader.ReadRemainingDataStringWithLengthPrefix(),
(MqttQualityOfServiceLevel)reader.ReadRemainingDataByte()));
} }


return packet; return packet;
} }


private static async Task<MqttBasePacket> DeserializePublishAsync(MqttPacketReader reader)
private static MqttBasePacket DeserializePublish(MqttPacketReader reader)
{ {
var fixedHeader = new ByteReader(reader.FixedHeader); var fixedHeader = new ByteReader(reader.FixedHeader);
var retain = fixedHeader.Read(); var retain = fixedHeader.Read();
var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2); var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2);
var dup = fixedHeader.Read(); var dup = fixedHeader.Read();


var topic = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false);
var topic = reader.ReadRemainingDataStringWithLengthPrefix();


ushort packetIdentifier = 0; ushort packetIdentifier = 0;
if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{ {
packetIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false);
packetIdentifier = reader.ReadRemainingDataUShort();
} }


var packet = new MqttPublishPacket var packet = new MqttPublishPacket
@@ -250,22 +250,22 @@ namespace MQTTnet.Core.Serializer
QualityOfServiceLevel = qualityOfServiceLevel, QualityOfServiceLevel = qualityOfServiceLevel,
Dup = dup, Dup = dup,
Topic = topic, Topic = topic,
Payload = await reader.ReadRemainingDataAsync().ConfigureAwait(false),
Payload = reader.ReadRemainingData(),
PacketIdentifier = packetIdentifier PacketIdentifier = packetIdentifier
}; };


return packet; return packet;
} }


private static async Task<MqttBasePacket> DeserializeConnectAsync(MqttPacketReader reader)
private static MqttBasePacket DeserializeConnect(MqttPacketReader reader)
{ {
await reader.ReadRemainingDataAsync(2).ConfigureAwait(false); // Skip 2 bytes
reader.ReadRemainingData(2); // Skip 2 bytes


MqttProtocolVersion protocolVersion; MqttProtocolVersion protocolVersion;
var protocolName = await reader.ReadRemainingDataAsync(4).ConfigureAwait(false);
var protocolName = reader.ReadRemainingData(4);
if (protocolName.SequenceEqual(ProtocolVersionV310Name)) if (protocolName.SequenceEqual(ProtocolVersionV310Name))
{ {
await reader.ReadRemainingDataAsync(2).ConfigureAwait(false);
reader.ReadRemainingData(2);
protocolVersion = MqttProtocolVersion.V310; protocolVersion = MqttProtocolVersion.V310;
} }
else if (protocolName.SequenceEqual(ProtocolVersionV311Name)) else if (protocolName.SequenceEqual(ProtocolVersionV311Name))
@@ -277,8 +277,8 @@ namespace MQTTnet.Core.Serializer
throw new MqttProtocolViolationException("Protocol name is not supported."); throw new MqttProtocolViolationException("Protocol name is not supported.");
} }


var protocolLevel = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false);
var connectFlags = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false);
var protocolLevel = reader.ReadRemainingDataByte();
var connectFlags = reader.ReadRemainingDataByte();


var connectFlagsReader = new ByteReader(connectFlags); var connectFlagsReader = new ByteReader(connectFlags);
connectFlagsReader.Read(); // Reserved. connectFlagsReader.Read(); // Reserved.
@@ -295,51 +295,51 @@ namespace MQTTnet.Core.Serializer
var passwordFlag = connectFlagsReader.Read(); var passwordFlag = connectFlagsReader.Read();
var usernameFlag = connectFlagsReader.Read(); var usernameFlag = connectFlagsReader.Read();


packet.KeepAlivePeriod = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false);
packet.ClientId = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false);
packet.KeepAlivePeriod = reader.ReadRemainingDataUShort();
packet.ClientId = reader.ReadRemainingDataStringWithLengthPrefix();


if (willFlag) if (willFlag)
{ {
packet.WillMessage = new MqttApplicationMessage( packet.WillMessage = new MqttApplicationMessage(
await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false),
await reader.ReadRemainingDataWithLengthPrefixAsync().ConfigureAwait(false),
reader.ReadRemainingDataStringWithLengthPrefix(),
reader.ReadRemainingDataWithLengthPrefix(),
(MqttQualityOfServiceLevel)willQoS, (MqttQualityOfServiceLevel)willQoS,
willRetain); willRetain);
} }


if (usernameFlag) if (usernameFlag)
{ {
packet.Username = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false);
packet.Username = reader.ReadRemainingDataStringWithLengthPrefix();
} }


if (passwordFlag) if (passwordFlag)
{ {
packet.Password = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false);
packet.Password = reader.ReadRemainingDataStringWithLengthPrefix();
} }


ValidateConnectPacket(packet); ValidateConnectPacket(packet);
return packet; return packet;
} }


private static async Task<MqttBasePacket> DeserializeSubAck(MqttPacketReader reader)
private static MqttBasePacket DeserializeSubAck(MqttPacketReader reader)
{ {
var packet = new MqttSubAckPacket var packet = new MqttSubAckPacket
{ {
PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false)
PacketIdentifier = reader.ReadRemainingDataUShort()
}; };


while (!reader.EndOfRemainingData) while (!reader.EndOfRemainingData)
{ {
packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)await reader.ReadRemainingDataByteAsync().ConfigureAwait(false));
packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)reader.ReadRemainingDataByte());
} }


return packet; return packet;
} }


private static async Task<MqttBasePacket> DeserializeConnAck(MqttPacketReader reader)
private static MqttBasePacket DeserializeConnAck(MqttPacketReader reader)
{ {
var variableHeader1 = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false);
var variableHeader2 = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false);
var variableHeader1 = reader.ReadRemainingDataByte();
var variableHeader2 = reader.ReadRemainingDataByte();


var packet = new MqttConnAckPacket var packet = new MqttConnAckPacket
{ {
@@ -457,7 +457,7 @@ namespace MQTTnet.Core.Serializer
output.Write(packet.PacketIdentifier); output.Write(packet.PacketIdentifier);


output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02); output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02);
await output.WriteToAsync(destination).ConfigureAwait(false);
await output.WriteToAsync(destination);
} }
} }




+ 3
- 3
MQTTnet.Core/Server/MqttClientSession.cs View File

@@ -63,7 +63,7 @@ namespace MQTTnet.Core.Server
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Error(nameof(MqttClientSession), exception, $"Client '{_identifier}': Unhandled exception while processing client packets.");
MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", _identifier);
} }
finally finally
{ {
@@ -90,7 +90,7 @@ namespace MQTTnet.Core.Server
} }


_messageQueue.Enqueue(publishPacket); _messageQueue.Enqueue(publishPacket);
MqttTrace.Verbose(nameof(MqttClientSession), $"Client '{_identifier}: Enqueued pending publish packet.");
MqttTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", _identifier);
} }


public void Dispose() public void Dispose()
@@ -143,7 +143,7 @@ namespace MQTTnet.Core.Server
return Task.FromResult((object)null); return Task.FromResult((object)null);
} }


MqttTrace.Warning(nameof(MqttClientSession), $"Client '{_identifier}': Received not supported packet ({packet}). Closing connection.");
MqttTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", _identifier, packet);
_cancellationTokenSource.Cancel(); _cancellationTokenSource.Cancel();


return Task.FromResult((object)null); return Task.FromResult((object)null);


+ 3
- 3
MQTTnet.Core/Server/MqttClientSessionsManager.cs View File

@@ -111,11 +111,11 @@ namespace MQTTnet.Core.Server
_clientSessions.Remove(connectPacket.ClientId); _clientSessions.Remove(connectPacket.ClientId);
clientSession.Dispose(); clientSession.Dispose();
clientSession = null; clientSession = null;
MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Disposed existing session of client '{connectPacket.ClientId}'.");
MqttTrace.Verbose(nameof(MqttClientSessionsManager), "Disposed existing session of client '{0}'.", connectPacket.ClientId);
} }
else else
{ {
MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Reusing existing session of client '{connectPacket.ClientId}'.");
MqttTrace.Verbose(nameof(MqttClientSessionsManager), "Reusing existing session of client '{0}'.", connectPacket.ClientId);
} }
} }


@@ -127,7 +127,7 @@ namespace MQTTnet.Core.Server
clientSession = new MqttClientSession(connectPacket.ClientId, _options, DispatchPublishPacket); clientSession = new MqttClientSession(connectPacket.ClientId, _options, DispatchPublishPacket);
_clientSessions[connectPacket.ClientId] = clientSession; _clientSessions[connectPacket.ClientId] = clientSession;


MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Created a new session for client '{connectPacket.ClientId}'.");
MqttTrace.Verbose(nameof(MqttClientSessionsManager), "Created a new session for client '{0}'.", connectPacket.ClientId);
} }


return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession };


+ 1
- 1
MQTTnet.Core/Server/MqttServer.cs View File

@@ -84,7 +84,7 @@ namespace MQTTnet.Core.Server


private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs) private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs)
{ {
MqttTrace.Information(nameof(MqttServer), $"Client '{eventArgs.Identifier}': Connected.");
MqttTrace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Identifier);
ClientConnected?.Invoke(this, eventArgs); ClientConnected?.Invoke(this, eventArgs);


Task.Run(() => _clientSessionsManager.RunClientSessionAsync(eventArgs), _cancellationTokenSource.Token); Task.Run(() => _clientSessionsManager.RunClientSessionAsync(eventArgs), _cancellationTokenSource.Token);


+ 4
- 4
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs View File

@@ -18,12 +18,12 @@ namespace MQTTnet.Core.Tests


public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
{ {
await Task.FromResult(0).ConfigureAwait(false);
await Task.FromResult(0);
} }


public async Task DisconnectAsync() public async Task DisconnectAsync()
{ {
await Task.FromResult(0).ConfigureAwait(false);
await Task.FromResult(0);
} }


public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout)
@@ -31,14 +31,14 @@ namespace MQTTnet.Core.Tests
ThrowIfPartnerIsNull(); ThrowIfPartnerIsNull();


Partner.SendPacketInternal(packet); Partner.SendPacketInternal(packet);
await Task.FromResult(0).ConfigureAwait(false);
await Task.FromResult(0);
} }


public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout) public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
{ {
ThrowIfPartnerIsNull(); ThrowIfPartnerIsNull();


return await Task.Run(() => _incomingPackets.Take()).ConfigureAwait(false);
return await Task.Run(() => _incomingPackets.Take());
} }


private void SendPacketInternal(MqttBasePacket packet) private void SendPacketInternal(MqttBasePacket packet)


Loading…
Cancel
Save