From 8aec1583df007287f82cea9ec5d5f98c99f13d4f Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 9 Sep 2017 22:40:37 +0200 Subject: [PATCH] Performance improvements in area of logging and async/await --- .../Implementations/MqttServerAdapter.cs | 6 +- .../Implementations/MqttTcpChannel.cs | 4 +- .../Implementations/MqttTcpChannel.cs | 4 +- .../MqttChannelCommunicationAdapter.cs | 24 +++--- MQTTnet.Core/Client/MqttClient.cs | 65 ++++++++------- MQTTnet.Core/Diagnostics/MqttTrace.cs | 41 ++++++---- MQTTnet.Core/Serializer/MqttPacketReader.cs | 37 ++++----- .../Serializer/MqttPacketSerializer.cs | 82 +++++++++---------- MQTTnet.Core/Server/MqttClientSession.cs | 6 +- .../Server/MqttClientSessionsManager.cs | 6 +- MQTTnet.Core/Server/MqttServer.cs | 2 +- .../TestMqttCommunicationAdapter.cs | 8 +- 12 files changed, 153 insertions(+), 132 deletions(-) diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs index faa5eff..92e071e 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs @@ -85,7 +85,7 @@ namespace MQTTnet.Implementations { try { - var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false); + var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } @@ -105,10 +105,10 @@ namespace MQTTnet.Implementations { try { - var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null).ConfigureAwait(false); + var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null); var sslStream = new SslStream(new NetworkStream(clientSocket)); - await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); + await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index f5dd326..c9c080d 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -35,12 +35,12 @@ namespace MQTTnet.Implementations _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); } - await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null).ConfigureAwait(false); + await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null); if (options.TlsOptions.UseTls) { _sslStream = new SslStream(new NetworkStream(_socket, true)); - await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false); + await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation); } } catch (SocketException exception) diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index 75fedba..a4247b0 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -35,12 +35,12 @@ namespace MQTTnet.Implementations _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); } - await _socket.ConnectAsync(options.Server, options.GetPort()).ConfigureAwait(false); + await _socket.ConnectAsync(options.Server, options.GetPort()); if (options.TlsOptions.UseTls) { _sslStream = new SslStream(new NetworkStream(_socket, true)); - await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false); + await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation); } } catch (SocketException exception) diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index fcea65a..5042167 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -21,21 +21,21 @@ namespace MQTTnet.Core.Adapter public IMqttPacketSerializer PacketSerializer { get; } - public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) + public Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { - await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout).ConfigureAwait(false); + return ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout); } - public async Task DisconnectAsync() + public Task DisconnectAsync() { - await _channel.DisconnectAsync().ConfigureAwait(false); + return _channel.DisconnectAsync(); } - public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) + public Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) { - MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); + MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout); - await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout).ConfigureAwait(false); + return ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout); } public async Task ReceivePacketAsync(TimeSpan timeout) @@ -43,11 +43,11 @@ namespace MQTTnet.Core.Adapter MqttBasePacket packet; if (timeout > TimeSpan.Zero) { - packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout).ConfigureAwait(false); + packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout); } else { - packet = await PacketSerializer.DeserializeAsync(_channel).ConfigureAwait(false); + packet = await PacketSerializer.DeserializeAsync(_channel); } if (packet == null) @@ -55,14 +55,14 @@ namespace MQTTnet.Core.Adapter throw new MqttProtocolViolationException("Received malformed packet."); } - MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"RX <<< {packet}"); + MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "RX <<< {0}", packet); return packet; } private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) { var timeoutTask = Task.Delay(timeout); - if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask) + if (await Task.WhenAny(timeoutTask, task) == timeoutTask) { throw new MqttCommunicationTimedOutException(); } @@ -78,7 +78,7 @@ namespace MQTTnet.Core.Adapter private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) { var timeoutTask = Task.Delay(timeout); - if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask) + if (await Task.WhenAny(timeoutTask, task) == timeoutTask) { throw new MqttCommunicationTimedOutException(); } diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index efd794d..518eb37 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -53,7 +53,7 @@ namespace MQTTnet.Core.Client { _disconnectedEventSuspended = false; - await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout).ConfigureAwait(false); + await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout); MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); @@ -73,10 +73,10 @@ namespace MQTTnet.Core.Client StartReceivePackets(); - var response = await SendAndReceiveAsync(connectPacket).ConfigureAwait(false); + var response = await SendAndReceiveAsync(connectPacket); if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { - await DisconnectInternalAsync().ConfigureAwait(false); + await DisconnectInternalAsync(); throw new MqttConnectingFailedException(response.ConnectReturnCode); } @@ -92,7 +92,7 @@ namespace MQTTnet.Core.Client } catch (Exception) { - await DisconnectInternalAsync().ConfigureAwait(false); + await DisconnectInternalAsync(); throw; } } @@ -101,11 +101,11 @@ namespace MQTTnet.Core.Client { try { - await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false); + await SendAsync(new MqttDisconnectPacket()); } finally { - await DisconnectInternalAsync().ConfigureAwait(false); + await DisconnectInternalAsync(); } } @@ -146,7 +146,7 @@ namespace MQTTnet.Core.Client return Unsubscribe(topicFilters.ToList()); } - public async Task Unsubscribe(IList topicFilters) + public Task Unsubscribe(IList topicFilters) { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); if (!topicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2]."); @@ -158,10 +158,10 @@ namespace MQTTnet.Core.Client TopicFilters = topicFilters }; - await SendAndReceiveAsync(unsubscribePacket).ConfigureAwait(false); + return SendAndReceiveAsync(unsubscribePacket); } - public async Task PublishAsync(MqttApplicationMessage applicationMessage) + public Task PublishAsync(MqttApplicationMessage applicationMessage) { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); ThrowIfNotConnected(); @@ -171,19 +171,28 @@ namespace MQTTnet.Core.Client if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - await SendAsync(publishPacket).ConfigureAwait(false); + return SendAsync(publishPacket); } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); + return SendAndReceiveAsync(publishPacket); } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - var pubRecPacket = await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); - await SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); + return PublishExactlyOncePacketAsync(publishPacket); } + + throw new InvalidOperationException(); + } + + private async Task PublishExactlyOncePacketAsync(MqttBasePacket publishPacket) + { + var pubRecPacket = await SendAndReceiveAsync(publishPacket); + await SendAndReceiveAsync(pubRecPacket.CreateResponse()); } private void ThrowIfNotConnected() @@ -195,7 +204,7 @@ namespace MQTTnet.Core.Client { try { - await _adapter.DisconnectAsync().ConfigureAwait(false); + await _adapter.DisconnectAsync(); } catch (Exception exception) { @@ -294,14 +303,14 @@ namespace MQTTnet.Core.Client throw new MqttCommunicationException("Received a not supported QoS level."); } - private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) + private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) { lock (_unacknowledgedPublishPackets) { _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - await SendAsync(pubRelPacket.CreateResponse()).ConfigureAwait(false); + return SendAsync(pubRelPacket.CreateResponse()); } private Task SendAsync(MqttBasePacket packet) @@ -326,8 +335,8 @@ namespace MQTTnet.Core.Client return pi1.PacketIdentifier == pi2.PacketIdentifier; } - await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false); - return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout).ConfigureAwait(false); + await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout); + return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout); } private ushort GetNewPacketIdentifier() @@ -343,19 +352,19 @@ namespace MQTTnet.Core.Client { while (!cancellationToken.IsCancellationRequested) { - await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false); - await SendAndReceiveAsync(new MqttPingReqPacket()).ConfigureAwait(false); + await Task.Delay(_options.KeepAlivePeriod, cancellationToken); + await SendAndReceiveAsync(new MqttPingReqPacket()); } } catch (MqttCommunicationException exception) { MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); - await DisconnectInternalAsync().ConfigureAwait(false); + await DisconnectInternalAsync(); } catch (Exception exception) { MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets."); - await DisconnectInternalAsync().ConfigureAwait(false); + await DisconnectInternalAsync(); } finally { @@ -370,8 +379,8 @@ namespace MQTTnet.Core.Client { while (!cancellationToken.IsCancellationRequested) { - var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero).ConfigureAwait(false); - MqttTrace.Information(nameof(MqttClient), $"Received <<< {packet}"); + var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero); + MqttTrace.Information(nameof(MqttClient), "Received <<< {0}", packet); StartProcessReceivedPacket(packet, cancellationToken); } @@ -379,12 +388,12 @@ namespace MQTTnet.Core.Client catch (MqttCommunicationException exception) { MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); - await DisconnectInternalAsync().ConfigureAwait(false); + await DisconnectInternalAsync(); } catch (Exception exception) { MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); - await DisconnectInternalAsync().ConfigureAwait(false); + await DisconnectInternalAsync(); } finally { diff --git a/MQTTnet.Core/Diagnostics/MqttTrace.cs b/MQTTnet.Core/Diagnostics/MqttTrace.cs index c6104dc..1036028 100644 --- a/MQTTnet.Core/Diagnostics/MqttTrace.cs +++ b/MQTTnet.Core/Diagnostics/MqttTrace.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; namespace MQTTnet.Core.Diagnostics { @@ -6,37 +7,37 @@ namespace MQTTnet.Core.Diagnostics { public static event EventHandler TraceMessagePublished; - public static void Verbose(string source, string message) + public static void Verbose(string source, string message, params object[] parameters) { - Publish(source, MqttTraceLevel.Verbose, null, message); + Publish(source, MqttTraceLevel.Verbose, null, message, parameters); } - public static void Information(string source, string message) + public static void Information(string source, string message, params object[] parameters) { - Publish(source, MqttTraceLevel.Information, null, message); + Publish(source, MqttTraceLevel.Information, null, message, parameters); } - public static void Warning(string source, string message) + public static void Warning(string source, string message, params object[] parameters) { - Publish(source, MqttTraceLevel.Warning, null, message); + Publish(source, MqttTraceLevel.Warning, null, message, parameters); } - public static void Warning(string source, Exception exception, string message) + public static void Warning(string source, Exception exception, string message, params object[] parameters) { - Publish(source, MqttTraceLevel.Warning, exception, message); + Publish(source, MqttTraceLevel.Warning, exception, message, parameters); } - public static void Error(string source, string message) + public static void Error(string source, string message, params object[] parameters) { - Publish(source, MqttTraceLevel.Error, null, message); + Publish(source, MqttTraceLevel.Error, null, message, parameters); } - public static void Error(string source, Exception exception, string message) + public static void Error(string source, Exception exception, string message, params object[] parameters) { - Publish(source, MqttTraceLevel.Error, exception, message); + Publish(source, MqttTraceLevel.Error, exception, message, parameters); } - private static void Publish(string source, MqttTraceLevel traceLevel, Exception exception, string message) + private static void Publish(string source, MqttTraceLevel traceLevel, Exception exception, string message, params object[] parameters) { var handler = TraceMessagePublished; if (handler == null) @@ -44,7 +45,19 @@ namespace MQTTnet.Core.Diagnostics return; } - message = string.Format(message, 1); + if (parameters?.Length > 0) + { + try + { + message = string.Format(message, parameters); + } + catch (Exception formatException) + { + Error(nameof(MqttTrace), formatException, "Error while tracing message: " + message); + return; + } + } + handler.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, traceLevel, message, exception)); } } diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index 349c137..6a31fca 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -29,7 +29,7 @@ namespace MQTTnet.Core.Serializer public async Task ReadToEndAsync() { await ReadFixedHeaderAsync(); - await ReadRemainingLengthAsync().ConfigureAwait(false); + await ReadRemainingLengthAsync(); if (_remainingLength == 0) { @@ -37,20 +37,20 @@ namespace MQTTnet.Core.Serializer } var buffer = new byte[_remainingLength]; - await ReadFromSourceAsync(buffer).ConfigureAwait(false); + await ReadFromSourceAsync(buffer); _remainingData.Write(buffer, 0, buffer.Length); _remainingData.Position = 0; } - public async Task ReadRemainingDataByteAsync() + public byte ReadRemainingDataByte() { - return (await ReadRemainingDataAsync(1).ConfigureAwait(false))[0]; + return ReadRemainingData(1)[0]; } - public async Task ReadRemainingDataUShortAsync() + public ushort ReadRemainingDataUShort() { - var buffer = await ReadRemainingDataAsync(2).ConfigureAwait(false); + var buffer = ReadRemainingData(2); var temp = buffer[0]; buffer[0] = buffer[1]; @@ -59,28 +59,27 @@ namespace MQTTnet.Core.Serializer return BitConverter.ToUInt16(buffer, 0); } - public async Task ReadRemainingDataStringWithLengthPrefixAsync() + public string ReadRemainingDataStringWithLengthPrefix() { - var buffer = await ReadRemainingDataWithLengthPrefixAsync(); + var buffer = ReadRemainingDataWithLengthPrefix(); return Encoding.UTF8.GetString(buffer, 0, buffer.Length); } - public async Task ReadRemainingDataWithLengthPrefixAsync() + public byte[] ReadRemainingDataWithLengthPrefix() { - var length = await ReadRemainingDataUShortAsync(); - return await ReadRemainingDataAsync(length).ConfigureAwait(false); + var length = ReadRemainingDataUShort(); + return ReadRemainingData(length); } - public Task ReadRemainingDataAsync() + public byte[] ReadRemainingData() { - return ReadRemainingDataAsync(_remainingLength - (int)_remainingData.Position); + return ReadRemainingData(_remainingLength - (int)_remainingData.Position); } - public async Task ReadRemainingDataAsync(int length) + public byte[] ReadRemainingData(int length) { var buffer = new byte[length]; - await _remainingData.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false); - + _remainingData.Read(buffer, 0, buffer.Length); return buffer; } @@ -92,7 +91,7 @@ namespace MQTTnet.Core.Serializer byte encodedByte; do { - encodedByte = await ReadStreamByteAsync().ConfigureAwait(false); + encodedByte = await ReadStreamByteAsync(); value += (encodedByte & 127) * multiplier; multiplier *= 128; if (multiplier > 128 * 128 * 128) @@ -119,13 +118,13 @@ namespace MQTTnet.Core.Serializer private async Task ReadStreamByteAsync() { var buffer = new byte[1]; - await ReadFromSourceAsync(buffer).ConfigureAwait(false); + await ReadFromSourceAsync(buffer); return buffer[0]; } private async Task ReadFixedHeaderAsync() { - FixedHeader = await ReadStreamByteAsync().ConfigureAwait(false); + FixedHeader = await ReadStreamByteAsync(); var byteReader = new ByteReader(FixedHeader); byteReader.Read(4); diff --git a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs index 7d6bb81..38017b6 100644 --- a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs @@ -106,12 +106,12 @@ namespace MQTTnet.Core.Serializer { case MqttControlPacketType.Connect: { - return await DeserializeConnectAsync(mqttPacketReader).ConfigureAwait(false); + return DeserializeConnect(mqttPacketReader); } case MqttControlPacketType.ConnAck: { - return await DeserializeConnAck(mqttPacketReader).ConfigureAwait(false); + return DeserializeConnAck(mqttPacketReader); } case MqttControlPacketType.Disconnect: @@ -121,14 +121,14 @@ namespace MQTTnet.Core.Serializer case MqttControlPacketType.Publish: { - return await DeserializePublishAsync(mqttPacketReader).ConfigureAwait(false); + return DeserializePublish(mqttPacketReader); } case MqttControlPacketType.PubAck: { return new MqttPubAckPacket { - PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) + PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort() }; } @@ -136,7 +136,7 @@ namespace MQTTnet.Core.Serializer { return new MqttPubRecPacket { - PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) + PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort() }; } @@ -144,7 +144,7 @@ namespace MQTTnet.Core.Serializer { return new MqttPubRelPacket { - PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) + PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort() }; } @@ -152,7 +152,7 @@ namespace MQTTnet.Core.Serializer { return new MqttPubCompPacket { - PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) + PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort() }; } @@ -168,24 +168,24 @@ namespace MQTTnet.Core.Serializer case MqttControlPacketType.Subscribe: { - return await DeserializeSubscribeAsync(mqttPacketReader).ConfigureAwait(false); + return DeserializeSubscribe(mqttPacketReader); } case MqttControlPacketType.SubAck: { - return await DeserializeSubAck(mqttPacketReader).ConfigureAwait(false); + return DeserializeSubAck(mqttPacketReader); } case MqttControlPacketType.Unsubscibe: { - return await DeserializeUnsubscribeAsync(mqttPacketReader).ConfigureAwait(false); + return DeserializeUnsubscribe(mqttPacketReader); } case MqttControlPacketType.UnsubAck: { return new MqttUnsubAckPacket { - PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) + PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort() }; } @@ -197,51 +197,51 @@ namespace MQTTnet.Core.Serializer } } - private static async Task DeserializeUnsubscribeAsync(MqttPacketReader reader) + private static MqttBasePacket DeserializeUnsubscribe(MqttPacketReader reader) { var packet = new MqttUnsubscribePacket { - PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false), + PacketIdentifier = reader.ReadRemainingDataUShort(), }; while (!reader.EndOfRemainingData) { - packet.TopicFilters.Add(await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false)); + packet.TopicFilters.Add(reader.ReadRemainingDataStringWithLengthPrefix()); } return packet; } - private static async Task DeserializeSubscribeAsync(MqttPacketReader reader) + private static MqttBasePacket DeserializeSubscribe(MqttPacketReader reader) { var packet = new MqttSubscribePacket { - PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false), + PacketIdentifier = reader.ReadRemainingDataUShort() }; while (!reader.EndOfRemainingData) { packet.TopicFilters.Add(new TopicFilter( - await reader.ReadRemainingDataStringWithLengthPrefixAsync(), - (MqttQualityOfServiceLevel)await reader.ReadRemainingDataByteAsync().ConfigureAwait(false))); + reader.ReadRemainingDataStringWithLengthPrefix(), + (MqttQualityOfServiceLevel)reader.ReadRemainingDataByte())); } return packet; } - private static async Task DeserializePublishAsync(MqttPacketReader reader) + private static MqttBasePacket DeserializePublish(MqttPacketReader reader) { var fixedHeader = new ByteReader(reader.FixedHeader); var retain = fixedHeader.Read(); var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2); var dup = fixedHeader.Read(); - var topic = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false); + var topic = reader.ReadRemainingDataStringWithLengthPrefix(); ushort packetIdentifier = 0; if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) { - packetIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false); + packetIdentifier = reader.ReadRemainingDataUShort(); } var packet = new MqttPublishPacket @@ -250,22 +250,22 @@ namespace MQTTnet.Core.Serializer QualityOfServiceLevel = qualityOfServiceLevel, Dup = dup, Topic = topic, - Payload = await reader.ReadRemainingDataAsync().ConfigureAwait(false), + Payload = reader.ReadRemainingData(), PacketIdentifier = packetIdentifier }; return packet; } - private static async Task DeserializeConnectAsync(MqttPacketReader reader) + private static MqttBasePacket DeserializeConnect(MqttPacketReader reader) { - await reader.ReadRemainingDataAsync(2).ConfigureAwait(false); // Skip 2 bytes + reader.ReadRemainingData(2); // Skip 2 bytes MqttProtocolVersion protocolVersion; - var protocolName = await reader.ReadRemainingDataAsync(4).ConfigureAwait(false); + var protocolName = reader.ReadRemainingData(4); if (protocolName.SequenceEqual(ProtocolVersionV310Name)) { - await reader.ReadRemainingDataAsync(2).ConfigureAwait(false); + reader.ReadRemainingData(2); protocolVersion = MqttProtocolVersion.V310; } else if (protocolName.SequenceEqual(ProtocolVersionV311Name)) @@ -277,8 +277,8 @@ namespace MQTTnet.Core.Serializer throw new MqttProtocolViolationException("Protocol name is not supported."); } - var protocolLevel = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false); - var connectFlags = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false); + var protocolLevel = reader.ReadRemainingDataByte(); + var connectFlags = reader.ReadRemainingDataByte(); var connectFlagsReader = new ByteReader(connectFlags); connectFlagsReader.Read(); // Reserved. @@ -295,51 +295,51 @@ namespace MQTTnet.Core.Serializer var passwordFlag = connectFlagsReader.Read(); var usernameFlag = connectFlagsReader.Read(); - packet.KeepAlivePeriod = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false); - packet.ClientId = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false); + packet.KeepAlivePeriod = reader.ReadRemainingDataUShort(); + packet.ClientId = reader.ReadRemainingDataStringWithLengthPrefix(); if (willFlag) { packet.WillMessage = new MqttApplicationMessage( - await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false), - await reader.ReadRemainingDataWithLengthPrefixAsync().ConfigureAwait(false), + reader.ReadRemainingDataStringWithLengthPrefix(), + reader.ReadRemainingDataWithLengthPrefix(), (MqttQualityOfServiceLevel)willQoS, willRetain); } if (usernameFlag) { - packet.Username = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false); + packet.Username = reader.ReadRemainingDataStringWithLengthPrefix(); } if (passwordFlag) { - packet.Password = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false); + packet.Password = reader.ReadRemainingDataStringWithLengthPrefix(); } ValidateConnectPacket(packet); return packet; } - private static async Task DeserializeSubAck(MqttPacketReader reader) + private static MqttBasePacket DeserializeSubAck(MqttPacketReader reader) { var packet = new MqttSubAckPacket { - PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false) + PacketIdentifier = reader.ReadRemainingDataUShort() }; while (!reader.EndOfRemainingData) { - packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)await reader.ReadRemainingDataByteAsync().ConfigureAwait(false)); + packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)reader.ReadRemainingDataByte()); } return packet; } - private static async Task DeserializeConnAck(MqttPacketReader reader) + private static MqttBasePacket DeserializeConnAck(MqttPacketReader reader) { - var variableHeader1 = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false); - var variableHeader2 = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false); + var variableHeader1 = reader.ReadRemainingDataByte(); + var variableHeader2 = reader.ReadRemainingDataByte(); var packet = new MqttConnAckPacket { @@ -457,7 +457,7 @@ namespace MQTTnet.Core.Serializer output.Write(packet.PacketIdentifier); output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02); - await output.WriteToAsync(destination).ConfigureAwait(false); + await output.WriteToAsync(destination); } } diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index bd99fe3..0b69425 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -63,7 +63,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - MqttTrace.Error(nameof(MqttClientSession), exception, $"Client '{_identifier}': Unhandled exception while processing client packets."); + MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", _identifier); } finally { @@ -90,7 +90,7 @@ namespace MQTTnet.Core.Server } _messageQueue.Enqueue(publishPacket); - MqttTrace.Verbose(nameof(MqttClientSession), $"Client '{_identifier}: Enqueued pending publish packet."); + MqttTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", _identifier); } public void Dispose() @@ -143,7 +143,7 @@ namespace MQTTnet.Core.Server return Task.FromResult((object)null); } - MqttTrace.Warning(nameof(MqttClientSession), $"Client '{_identifier}': Received not supported packet ({packet}). Closing connection."); + MqttTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", _identifier, packet); _cancellationTokenSource.Cancel(); return Task.FromResult((object)null); diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index 6165143..79823e7 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -111,11 +111,11 @@ namespace MQTTnet.Core.Server _clientSessions.Remove(connectPacket.ClientId); clientSession.Dispose(); clientSession = null; - MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Disposed existing session of client '{connectPacket.ClientId}'."); + MqttTrace.Verbose(nameof(MqttClientSessionsManager), "Disposed existing session of client '{0}'.", connectPacket.ClientId); } else { - MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Reusing existing session of client '{connectPacket.ClientId}'."); + MqttTrace.Verbose(nameof(MqttClientSessionsManager), "Reusing existing session of client '{0}'.", connectPacket.ClientId); } } @@ -127,7 +127,7 @@ namespace MQTTnet.Core.Server clientSession = new MqttClientSession(connectPacket.ClientId, _options, DispatchPublishPacket); _clientSessions[connectPacket.ClientId] = clientSession; - MqttTrace.Verbose(nameof(MqttClientSessionsManager), $"Created a new session for client '{connectPacket.ClientId}'."); + MqttTrace.Verbose(nameof(MqttClientSessionsManager), "Created a new session for client '{0}'.", connectPacket.ClientId); } return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index 231cfcf..f3025e6 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -84,7 +84,7 @@ namespace MQTTnet.Core.Server private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs) { - MqttTrace.Information(nameof(MqttServer), $"Client '{eventArgs.Identifier}': Connected."); + MqttTrace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Identifier); ClientConnected?.Invoke(this, eventArgs); Task.Run(() => _clientSessionsManager.RunClientSessionAsync(eventArgs), _cancellationTokenSource.Token); diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index 01ec484..c8c6da6 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -18,12 +18,12 @@ namespace MQTTnet.Core.Tests public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { - await Task.FromResult(0).ConfigureAwait(false); + await Task.FromResult(0); } public async Task DisconnectAsync() { - await Task.FromResult(0).ConfigureAwait(false); + await Task.FromResult(0); } public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) @@ -31,14 +31,14 @@ namespace MQTTnet.Core.Tests ThrowIfPartnerIsNull(); Partner.SendPacketInternal(packet); - await Task.FromResult(0).ConfigureAwait(false); + await Task.FromResult(0); } public async Task ReceivePacketAsync(TimeSpan timeout) { ThrowIfPartnerIsNull(); - return await Task.Run(() => _incomingPackets.Take()).ConfigureAwait(false); + return await Task.Run(() => _incomingPackets.Take()); } private void SendPacketInternal(MqttBasePacket packet)