diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index 888539c..f537c41 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -87,7 +87,7 @@ namespace MQTTnet.Core.Adapter { try { - await _semaphore.WaitAsync(cancellationToken); + await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); foreach (var packet in packets) { diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 5cc70cd..202cbc5 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -151,7 +151,7 @@ namespace MQTTnet.Core.Client case MqttQualityOfServiceLevel.AtMostOnce: { // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - await _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, qosPackets); + await _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, qosPackets).ConfigureAwait(false); break; } case MqttQualityOfServiceLevel.AtLeastOnce: @@ -159,7 +159,7 @@ namespace MQTTnet.Core.Client foreach (var publishPacket in qosPackets) { publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - await SendAndReceiveAsync(publishPacket); + await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); } break; diff --git a/MQTTnet.Core/Diagnostics/MqttNetLogger.cs b/MQTTnet.Core/Diagnostics/MqttNetLogger.cs index 54ab6d4..32d9609 100644 --- a/MQTTnet.Core/Diagnostics/MqttNetLogger.cs +++ b/MQTTnet.Core/Diagnostics/MqttNetLogger.cs @@ -21,15 +21,19 @@ namespace MQTTnet.Core.Diagnostics throw new ArgumentNullException(nameof(formatter)); } - var message = formatter(state, exception); + if (!MqttNetTrace.HasListeners) + { + return; + } + var message = formatter(state, exception); var traceMessage = new MqttNetTraceMessage(DateTime.Now, Environment.CurrentManagedThreadId, _categoryName, logLevel, message, exception); _mqttNetTrace.Publish(traceMessage); } public bool IsEnabled(LogLevel logLevel) { - return true; + return MqttNetTrace.HasListeners; } //not supported: async local requires netstandard1.3 diff --git a/MQTTnet.Core/Diagnostics/MqttNetTrace.cs b/MQTTnet.Core/Diagnostics/MqttNetTrace.cs index c14d493..f75c7a9 100644 --- a/MQTTnet.Core/Diagnostics/MqttNetTrace.cs +++ b/MQTTnet.Core/Diagnostics/MqttNetTrace.cs @@ -10,9 +10,11 @@ namespace MQTTnet.Core.Diagnostics public static event EventHandler TraceMessagePublished; - public void Publish(MqttNetTraceMessage msg) + public static bool HasListeners => TraceMessagePublished != null; + + public void Publish(MqttNetTraceMessage traceMessage) { - TraceMessagePublished?.Invoke(this, new MqttNetTraceMessagePublishedEventArgs(msg)); + TraceMessagePublished?.Invoke(this, new MqttNetTraceMessagePublishedEventArgs(traceMessage)); } public void Dispose() diff --git a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs index 00906f4..6c2fc56 100644 --- a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs @@ -24,15 +24,18 @@ namespace MQTTnet.Core.Serializer using (var stream = new MemoryStream()) using (var writer = new MqttPacketWriter(stream)) { - var header = new List { SerializePacket(packet, writer) }; + var fixedHeader = SerializePacket(packet, writer); + var headerBuffer = new List { fixedHeader }; + MqttPacketWriter.WriteRemainingLength((int)stream.Length, headerBuffer); + + var header = headerBuffer.ToArray(); var body = stream.ToArray(); - MqttPacketWriter.BuildLengthHeader(body.Length, header); - var headerArray = header.ToArray(); - var writeBuffer = new byte[header.Count + body.Length]; - Buffer.BlockCopy(headerArray, 0, writeBuffer, 0, headerArray.Length); - Buffer.BlockCopy(body, 0, writeBuffer, headerArray.Length, body.Length); - return writeBuffer; + var buffer = new byte[header.Length + body.Length]; + Buffer.BlockCopy(header, 0, buffer, 0, header.Length); + Buffer.BlockCopy(body, 0, buffer, header.Length, body.Length); + + return buffer; } } diff --git a/MQTTnet.Core/Serializer/MqttPacketWriter.cs b/MQTTnet.Core/Serializer/MqttPacketWriter.cs index d8c997d..fcd883c 100644 --- a/MQTTnet.Core/Serializer/MqttPacketWriter.cs +++ b/MQTTnet.Core/Serializer/MqttPacketWriter.cs @@ -56,11 +56,11 @@ namespace MQTTnet.Core.Serializer Write(value); } - public static void BuildLengthHeader(int length, List header) + public static void WriteRemainingLength(int length, List target) { if (length == 0) { - header.Add(0); + target.Add(0); return; } @@ -75,7 +75,7 @@ namespace MQTTnet.Core.Serializer encodedByte = encodedByte | 128; } - header.Add((byte)encodedByte); + target.Add((byte)encodedByte); } while (x > 0); } } diff --git a/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs b/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs index dbdb1d7..ab1d487 100644 --- a/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs +++ b/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs @@ -54,7 +54,7 @@ namespace MQTTnet.Core.Server { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); - await _gate.WaitAsync(); + await _gate.WaitAsync().ConfigureAwait(false); try { var saveIsRequired = false; @@ -108,7 +108,7 @@ namespace MQTTnet.Core.Server { var retainedMessages = new List(); - await _gate.WaitAsync(); + await _gate.WaitAsync().ConfigureAwait(false); try { foreach (var retainedMessage in _retainedMessages.Values) diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 9b14e58..fb859b8 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -29,11 +29,11 @@ namespace MQTTnet.Core.Server private MqttApplicationMessage _willMessage; public MqttClientSession( - string clientId, + string clientId, IOptions options, MqttClientSessionsManager sessionsManager, MqttClientSubscriptionsManager subscriptionsManager, - ILogger logger, + ILogger logger, ILogger messageQueueLogger, IMqttClientRetainedMessageManager clientRetainedMessageManager) { @@ -65,7 +65,7 @@ namespace MQTTnet.Core.Server _cancellationTokenSource = new CancellationTokenSource(); _pendingMessagesQueue.Start(adapter, _cancellationTokenSource.Token); - await ReceivePacketsAsync(adapter, _cancellationTokenSource.Token); + await ReceivePacketsAsync(adapter, _cancellationTokenSource.Token).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -175,7 +175,7 @@ namespace MQTTnet.Core.Server { return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, _subscriptionsManager.Unsubscribe(unsubscribePacket)); } - + if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) { Stop(); @@ -192,19 +192,19 @@ namespace MQTTnet.Core.Server { var subscribeResult = _subscriptionsManager.Subscribe(subscribePacket, ClientId); - await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket); - await EnqueueSubscribedRetainedMessagesAsync(subscribePacket); + await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket).ConfigureAwait(false); + await EnqueueSubscribedRetainedMessagesAsync(subscribePacket).ConfigureAwait(false); if (subscribeResult.CloseConnection) { - await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttDisconnectPacket()); + await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttDisconnectPacket()).ConfigureAwait(false); Stop(); } } private async Task EnqueueSubscribedRetainedMessagesAsync(MqttSubscribePacket subscribePacket) { - var retainedMessages = await _clientRetainedMessageManager.GetSubscribedMessagesAsync(subscribePacket); + var retainedMessages = await _clientRetainedMessageManager.GetSubscribedMessagesAsync(subscribePacket).ConfigureAwait(false); foreach (var publishPacket in retainedMessages) { EnqueuePublishPacket(publishPacket.ToPublishPacket()); @@ -225,7 +225,7 @@ namespace MQTTnet.Core.Server if (applicationMessage.Retain) { - await _clientRetainedMessageManager.HandleMessageAsync(ClientId, applicationMessage); + await _clientRetainedMessageManager.HandleMessageAsync(ClientId, applicationMessage).ConfigureAwait(false); } switch (applicationMessage.QualityOfServiceLevel) @@ -255,7 +255,7 @@ namespace MQTTnet.Core.Server _sessionsManager.DispatchApplicationMessage(this, applicationMessage); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, - new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); + new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }).ConfigureAwait(false); return; } diff --git a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs index 6ef7668..e21a7ea 100644 --- a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs @@ -41,7 +41,7 @@ namespace MQTTnet.TestApp.NetCore .AddLogging() .BuildServiceProvider(); - services.GetService().AddConsole(LogLevel.Warning, true); + //services.GetService().AddConsole(LogLevel.Warning, true); Console.WriteLine("Press 'c' for concurrent sends. Otherwise in one batch."); var concurrent = Console.ReadKey(true).KeyChar == 'c'; @@ -120,11 +120,15 @@ namespace MQTTnet.TestApp.NetCore stopwatch.Stop(); Console.WriteLine($"Sent 10.000 messages within {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / (float)testMessageCount} ms / message)."); - stopwatch.Restart(); + + var messages = new[] { message }; var sentMessagesCount = 0; + + stopwatch.Restart(); + while (stopwatch.ElapsedMilliseconds < 1000) { - await client.PublishAsync(message); + await client.PublishAsync(messages).ConfigureAwait(false); sentMessagesCount++; } @@ -180,7 +184,7 @@ namespace MQTTnet.TestApp.NetCore { Topic = "A/B/C", Payload = Encoding.UTF8.GetBytes("Hello World"), - QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce + QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }; }