From 8df8cb1eb6fc88923f6180aed62a43b9bd1fe1e0 Mon Sep 17 00:00:00 2001 From: Christian Date: Fri, 11 May 2018 22:00:54 +0200 Subject: [PATCH] Refactor logging and add benchmark. --- .../MQTTnet.AspnetCore/MqttHostedServer.cs | 2 +- .../MqttWebSocketServerAdapter.cs | 2 +- .../Adapter/MqttChannelAdapter.cs | 17 ++--- .../Client/IMqttClientAdapterFactory.cs | 2 +- .../MQTTnet.NetStandard/Client/MqttClient.cs | 46 +++++++------- .../Diagnostics/IMqttNetChildLogger.cs | 17 +++++ .../Diagnostics/IMqttNetLogger.cs | 16 +---- .../Diagnostics/MqttNetChildLogger.cs | 51 +++++++++++++++ .../Diagnostics/MqttNetLogger.cs | 62 +++++-------------- .../MqttClientAdapterFactory.cs | 2 +- .../MqttTcpServerAdapter.Uwp.cs | 10 +-- .../Implementations/MqttTcpServerAdapter.cs | 12 ++-- .../ManagedClient/ManagedMqttClient.cs | 31 +++++----- Frameworks/MQTTnet.NetStandard/MqttFactory.cs | 8 +-- .../Server/MqttClientKeepAliveMonitor.cs | 14 +++-- .../Server/MqttClientPendingMessagesQueue.cs | 21 ++++--- .../Server/MqttClientSession.cs | 27 ++++---- .../Server/MqttClientSessionsManager.cs | 48 +++++++------- .../Server/MqttRetainedMessagesManager.cs | 56 +++++++++-------- .../MQTTnet.NetStandard/Server/MqttServer.cs | 15 ++--- Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs | 54 ++++++++++++++++ .../MQTTnet.Benchmarks.csproj | 1 + Tests/MQTTnet.Benchmarks/Program.cs | 5 +- .../MQTTnet.Benchmarks/SerializerBenchmark.cs | 2 - .../MqttKeepAliveMonitorTests.cs | 4 +- .../MqttSubscriptionsManagerTests.cs | 10 +-- .../TestMqttCommunicationAdapterFactory.cs | 2 +- 27 files changed, 318 insertions(+), 219 deletions(-) create mode 100644 Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetChildLogger.cs create mode 100644 Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetChildLogger.cs create mode 100644 Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs diff --git a/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs b/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs index 8e6761a..708d383 100644 --- a/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs +++ b/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs @@ -13,7 +13,7 @@ namespace MQTTnet.AspNetCore { private readonly IMqttServerOptions _options; - public MqttHostedServer(IMqttServerOptions options, IEnumerable adapters, IMqttNetLogger logger) : base(adapters, logger) + public MqttHostedServer(IMqttServerOptions options, IEnumerable adapters, IMqttNetLogger logger) : base(adapters, logger.CreateChildLogger(nameof(MqttHostedServer))) { _options = options ?? throw new ArgumentNullException(nameof(options)); } diff --git a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index babc59b..666c71e 100644 --- a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs +++ b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -27,7 +27,7 @@ namespace MQTTnet.AspNetCore { if (webSocket == null) throw new ArgumentNullException(nameof(webSocket)); - var clientAdapter = new MqttChannelAdapter(new MqttWebSocketChannel(webSocket), new MqttPacketSerializer(), new MqttNetLogger()); + var clientAdapter = new MqttChannelAdapter(new MqttWebSocketChannel(webSocket), new MqttPacketSerializer(), new MqttNetLogger().CreateChildLogger(nameof(MqttWebSocketServerAdapter))); var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter); ClientAccepted?.Invoke(this, eventArgs); diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index c531add..9af6031 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -18,16 +18,19 @@ namespace MQTTnet.Adapter private const uint ErrorOperationAborted = 0x800703E3; private const int ReadBufferSize = 4096; // TODO: Move buffer size to config - private readonly IMqttNetLogger _logger; + private readonly IMqttNetChildLogger _logger; private readonly IMqttChannel _channel; private bool _isDisposed; - public MqttChannelAdapter(IMqttChannel channel, IMqttPacketSerializer serializer, IMqttNetLogger logger) + public MqttChannelAdapter(IMqttChannel channel, IMqttPacketSerializer serializer, IMqttNetChildLogger logger) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + if (logger == null) throw new ArgumentNullException(nameof(logger)); + _channel = channel ?? throw new ArgumentNullException(nameof(channel)); PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); + + _logger = logger.CreateChildLogger(nameof(MqttChannelAdapter)); } public IMqttPacketSerializer PacketSerializer { get; } @@ -35,7 +38,7 @@ namespace MQTTnet.Adapter public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) { ThrowIfDisposed(); - _logger.Verbose("Connecting [Timeout={0}]", timeout); + _logger.Verbose("Connecting [Timeout={0}]", timeout); return ExecuteAndWrapExceptionAsync(() => Internal.TaskExtensions.TimeoutAfter(ct => _channel.ConnectAsync(ct), timeout, cancellationToken)); @@ -44,7 +47,7 @@ namespace MQTTnet.Adapter public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken) { ThrowIfDisposed(); - _logger.Verbose("Disconnecting [Timeout={0}]", timeout); + _logger.Verbose("Disconnecting [Timeout={0}]", timeout); return ExecuteAndWrapExceptionAsync(() => Internal.TaskExtensions.TimeoutAfter(ct => _channel.DisconnectAsync(), timeout, cancellationToken)); @@ -69,7 +72,7 @@ namespace MQTTnet.Adapter { return ExecuteAndWrapExceptionAsync(() => { - _logger.Verbose("TX >>> {0} [Timeout={1}]", packet, timeout); + _logger.Verbose("TX >>> {0} [Timeout={1}]", packet, timeout); var packetData = PacketSerializer.Serialize(packet); @@ -111,7 +114,7 @@ namespace MQTTnet.Adapter throw new MqttProtocolViolationException("Received malformed packet."); } - _logger.Verbose("RX <<< {0}", packet); + _logger.Verbose("RX <<< {0}", packet); } finally { diff --git a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs index c2a7e74..197858b 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs @@ -5,6 +5,6 @@ namespace MQTTnet.Client { public interface IMqttClientAdapterFactory { - IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger); + IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger); } } diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs index 4f28a90..b342ddc 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs @@ -21,7 +21,7 @@ namespace MQTTnet.Client private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); private readonly IMqttClientAdapterFactory _adapterFactory; - private readonly IMqttNetLogger _logger; + private readonly IMqttNetChildLogger _logger; private IMqttClientOptions _options; private CancellationTokenSource _cancellationTokenSource; @@ -31,8 +31,10 @@ namespace MQTTnet.Client public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger) { + if (logger == null) throw new ArgumentNullException(nameof(logger)); + _adapterFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _logger = logger.CreateChildLogger(nameof(MqttClient)); } public event EventHandler Connected; @@ -57,14 +59,14 @@ namespace MQTTnet.Client _adapter = _adapterFactory.CreateClientAdapter(options, _logger); - _logger.Verbose(this, "Trying to connect with server."); + _logger.Verbose("Trying to connect with server."); await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false); - _logger.Verbose(this, "Connection with server established."); + _logger.Verbose("Connection with server established."); StartReceivingPackets(_cancellationTokenSource.Token); var connectResponse = await AuthenticateAsync(options.WillMessage, _cancellationTokenSource.Token).ConfigureAwait(false); - _logger.Verbose(this, "MQTT connection with server established."); + _logger.Verbose("MQTT connection with server established."); _sendTracker.Restart(); @@ -76,12 +78,12 @@ namespace MQTTnet.Client IsConnected = true; Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent)); - _logger.Info(this, "Connected."); + _logger.Info("Connected."); return new MqttClientConnectResult(connectResponse.IsSessionPresent); } catch (Exception exception) { - _logger.Error(this, exception, "Error while connecting with server."); + _logger.Error(exception, "Error while connecting with server."); await DisconnectInternalAsync(null, exception).ConfigureAwait(false); throw; @@ -245,7 +247,7 @@ namespace MQTTnet.Client } catch (Exception adapterException) { - _logger.Warning(this, adapterException, "Error while disconnecting from adapter."); + _logger.Warning(adapterException, "Error while disconnecting from adapter."); } finally { @@ -265,11 +267,11 @@ namespace MQTTnet.Client await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false); } - _logger.Verbose(this, "Disconnected from adapter."); + _logger.Verbose("Disconnected from adapter."); } catch (Exception adapterException) { - _logger.Warning(this, adapterException, "Error while disconnecting from adapter."); + _logger.Warning(adapterException, "Error while disconnecting from adapter."); } finally { @@ -278,7 +280,7 @@ namespace MQTTnet.Client _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; - _logger.Info(this, "Disconnected."); + _logger.Info("Disconnected."); Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception)); } } @@ -324,7 +326,7 @@ namespace MQTTnet.Client } catch (MqttCommunicationTimedOutException) { - _logger.Warning(this, null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Namespace); + _logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Namespace); throw; } finally @@ -335,7 +337,7 @@ namespace MQTTnet.Client private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) { - _logger.Verbose(this, "Start sending keep alive packets."); + _logger.Verbose("Start sending keep alive packets."); try { @@ -362,24 +364,24 @@ namespace MQTTnet.Client } else if (exception is MqttCommunicationException) { - _logger.Warning(this, exception, "MQTT communication exception while sending/receiving keep alive packets."); + _logger.Warning(exception, "MQTT communication exception while sending/receiving keep alive packets."); } else { - _logger.Error(this, exception, "Unhandled exception while sending/receiving keep alive packets."); + _logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets."); } await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false); } finally { - _logger.Verbose(this, "Stopped sending keep alive packets."); + _logger.Verbose("Stopped sending keep alive packets."); } } private async Task ReceivePacketsAsync(CancellationToken cancellationToken) { - _logger.Verbose(this, "Start receiving packets."); + _logger.Verbose("Start receiving packets."); try { @@ -414,11 +416,11 @@ namespace MQTTnet.Client } else if (exception is MqttCommunicationException) { - _logger.Warning(this, exception, "MQTT communication exception while receiving packets."); + _logger.Warning(exception, "MQTT communication exception while receiving packets."); } else { - _logger.Error(this, exception, "Unhandled exception while receiving packets."); + _logger.Error(exception, "Unhandled exception while receiving packets."); } await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); @@ -426,7 +428,7 @@ namespace MQTTnet.Client } finally { - _logger.Verbose(this, "Stopped receiving packets."); + _logger.Verbose("Stopped receiving packets."); } } @@ -462,7 +464,7 @@ namespace MQTTnet.Client } catch (Exception exception) { - _logger.Error(this, exception, "Unhandled exception while processing received packet."); + _logger.Error(exception, "Unhandled exception while processing received packet."); } } @@ -534,7 +536,7 @@ namespace MQTTnet.Client } catch (Exception exception) { - _logger.Error(this, exception, "Unhandled exception while handling application message."); + _logger.Error(exception, "Unhandled exception while handling application message."); } } diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetChildLogger.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetChildLogger.cs new file mode 100644 index 0000000..55d99e5 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetChildLogger.cs @@ -0,0 +1,17 @@ +using System; + +namespace MQTTnet.Diagnostics +{ + public interface IMqttNetChildLogger + { + IMqttNetChildLogger CreateChildLogger(string source); + + void Verbose(string message, params object[] parameters); + + void Info(string message, params object[] parameters); + + void Warning(Exception exception, string message, params object[] parameters); + + void Error(Exception exception, string message, params object[] parameters); + } +} diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs index b7d85fe..d2aacf4 100644 --- a/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs +++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs @@ -6,20 +6,8 @@ namespace MQTTnet.Diagnostics { event EventHandler LogMessagePublished; - void Verbose(string message, params object[] parameters); + IMqttNetChildLogger CreateChildLogger(string source); - void Verbose(object source, string message, params object[] parameters); - - void Info(string message, params object[] parameters); - - void Info(object source, string message, params object[] parameters); - - void Warning(Exception exception, string message, params object[] parameters); - - void Warning(object source, Exception exception, string message, params object[] parameters); - - void Error(Exception exception, string message, params object[] parameters); - - void Error(object source, Exception exception, string message, params object[] parameters); + void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception); } } diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetChildLogger.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetChildLogger.cs new file mode 100644 index 0000000..5055731 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetChildLogger.cs @@ -0,0 +1,51 @@ +using System; + +namespace MQTTnet.Diagnostics +{ + public sealed class MqttNetChildLogger : IMqttNetChildLogger + { + private readonly IMqttNetLogger _logger; + private readonly string _source; + + public MqttNetChildLogger(IMqttNetLogger logger, string source) + { + _logger = logger; + _source = source; + } + + public IMqttNetChildLogger CreateChildLogger(string source) + { + string childSource; + if (!string.IsNullOrEmpty(_source)) + { + childSource = _source + "." + source; + } + else + { + childSource = source; + } + + return new MqttNetChildLogger(_logger, childSource); + } + + public void Verbose(string message, params object[] parameters) + { + _logger.Publish(MqttNetLogLevel.Verbose, _source, message, parameters, null); + } + + public void Info(string message, params object[] parameters) + { + _logger.Publish(MqttNetLogLevel.Info, _source, message, parameters, null); + } + + public void Warning(Exception exception, string message, params object[] parameters) + { + _logger.Publish(MqttNetLogLevel.Warning, _source, message, parameters, exception); + } + + public void Error(Exception exception, string message, params object[] parameters) + { + _logger.Publish(MqttNetLogLevel.Error, _source, message, parameters, exception); + } + } +} diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs index 9421921..868e5d7 100644 --- a/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs +++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs @@ -2,7 +2,7 @@ namespace MQTTnet.Diagnostics { - public class MqttNetLogger : IMqttNetLogger + public sealed class MqttNetLogger : IMqttNetLogger { private readonly string _logId; @@ -13,47 +13,12 @@ namespace MQTTnet.Diagnostics public event EventHandler LogMessagePublished; - public void Verbose(string message, params object[] parameters) + public IMqttNetChildLogger CreateChildLogger(string source) { - Publish(MqttNetLogLevel.Verbose, typeof(TSource), message, parameters, null); + return new MqttNetChildLogger(this, source); } - public void Verbose(object source, string message, params object[] parameters) - { - Publish(MqttNetLogLevel.Verbose, source, message, parameters, null); - } - - public void Info(string message, params object[] parameters) - { - Publish(MqttNetLogLevel.Info, typeof(TSource), message, parameters, null); - } - - public void Info(object source, string message, params object[] parameters) - { - Publish(MqttNetLogLevel.Info, source, message, parameters, null); - } - - public void Warning(Exception exception, string message, params object[] parameters) - { - Publish(MqttNetLogLevel.Warning, typeof(TSource), message, parameters, null); - } - - public void Warning(object source, Exception exception, string message, params object[] parameters) - { - Publish(MqttNetLogLevel.Warning, source, message, parameters, null); - } - - public void Error(Exception exception, string message, params object[] parameters) - { - Publish(MqttNetLogLevel.Error, typeof(TSource), message, parameters, null); - } - - public void Error(object source, Exception exception, string message, params object[] parameters) - { - Publish(MqttNetLogLevel.Error, source, message, parameters, null); - } - - private void Publish(MqttNetLogLevel logLevel, object source, string message, object[] parameters, Exception exception) + public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception) { var hasLocalListeners = LogMessagePublished != null; var hasGlobalListeners = MqttNetGlobalLogger.HasListeners; @@ -63,18 +28,19 @@ namespace MQTTnet.Diagnostics return; } - if (parameters.Length > 0) - { - message = string.Format(message, parameters); - } - - string sourceName = null; - if (source != null) + if (parameters?.Length > 0) { - sourceName = source.GetType().Name; + try + { + message = string.Format(message, parameters); + } + catch + { + message = "MESSAGE FORMAT INVALID: " + message; + } } - var traceMessage = new MqttNetLogMessage(_logId, DateTime.Now, Environment.CurrentManagedThreadId, sourceName, logLevel, message, exception); + var traceMessage = new MqttNetLogMessage(_logId, DateTime.UtcNow, Environment.CurrentManagedThreadId, source, logLevel, message, exception); if (hasGlobalListeners) { diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs index 4692d7e..5256da3 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs @@ -8,7 +8,7 @@ namespace MQTTnet.Implementations { public class MqttClientAdapterFactory : IMqttClientAdapterFactory { - public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.Uwp.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.Uwp.cs index b2c3110..26bae2e 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.Uwp.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.Uwp.cs @@ -11,12 +11,14 @@ namespace MQTTnet.Implementations { public class MqttTcpServerAdapter : IMqttServerAdapter { - private readonly IMqttNetLogger _logger; + private readonly IMqttNetChildLogger _logger; private StreamSocketListener _defaultEndpointSocket; - public MqttTcpServerAdapter(IMqttNetLogger logger) + public MqttTcpServerAdapter(IMqttNetChildLogger logger) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + if (logger == null) throw new ArgumentNullException(nameof(logger)); + + _logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter)); } public event EventHandler ClientAccepted; @@ -74,7 +76,7 @@ namespace MQTTnet.Implementations } catch (Exception exception) { - _logger.Error(exception, "Error while accepting connection at default endpoint."); + _logger.Error(exception, "Error while accepting connection at default endpoint."); } } } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs index 0404de3..85c15ce 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs @@ -16,16 +16,18 @@ namespace MQTTnet.Implementations { public class MqttTcpServerAdapter : IMqttServerAdapter { - private readonly IMqttNetLogger _logger; + private readonly IMqttNetChildLogger _logger; private CancellationTokenSource _cancellationTokenSource; private Socket _defaultEndpointSocket; private Socket _tlsEndpointSocket; private X509Certificate2 _tlsCertificate; - public MqttTcpServerAdapter(IMqttNetLogger logger) + public MqttTcpServerAdapter(IMqttNetChildLogger logger) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + if (logger == null) throw new ArgumentNullException(nameof(logger)); + + _logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter)); } public event EventHandler ClientAccepted; @@ -127,7 +129,7 @@ namespace MQTTnet.Implementations return; } - _logger.Error(exception, "Error while accepting connection at default endpoint."); + _logger.Error(exception, "Error while accepting connection at default endpoint."); await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); } } @@ -162,7 +164,7 @@ namespace MQTTnet.Implementations return; } - _logger.Error(exception, "Error while accepting connection at TLS endpoint."); + _logger.Error(exception, "Error while accepting connection at TLS endpoint."); await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); } } diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs index bbfc1c7..af9cc83 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs @@ -19,7 +19,7 @@ namespace MQTTnet.ManagedClient private readonly List _unsubscriptions = new List(); private readonly IMqttClient _mqttClient; - private readonly IMqttNetLogger _logger; + private readonly IMqttNetChildLogger _logger; private CancellationTokenSource _connectionCancellationToken; private CancellationTokenSource _publishingCancellationToken; @@ -29,14 +29,17 @@ namespace MQTTnet.ManagedClient private bool _subscriptionsNotPushed; - public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger) + public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + if (logger == null) throw new ArgumentNullException(nameof(logger)); + _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); _mqttClient.Connected += OnConnected; _mqttClient.Disconnected += OnDisconnected; _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; + + _logger = logger.CreateChildLogger(nameof(ManagedMqttClient)); } public bool IsConnected => _mqttClient.IsConnected; @@ -74,7 +77,7 @@ namespace MQTTnet.ManagedClient Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - _logger.Info("Started"); + _logger.Info("Started"); } public Task StopAsync() @@ -166,12 +169,12 @@ namespace MQTTnet.ManagedClient } catch (Exception exception) { - _logger.Error(exception, "Unhandled exception while maintaining connection."); + _logger.Error(exception, "Unhandled exception while maintaining connection."); } finally { await _mqttClient.DisconnectAsync().ConfigureAwait(false); - _logger.Info("Stopped"); + _logger.Info("Stopped"); } } @@ -204,11 +207,11 @@ namespace MQTTnet.ManagedClient } catch (MqttCommunicationException exception) { - _logger.Warning(exception, "Communication exception while maintaining connection."); + _logger.Warning(exception, "Communication exception while maintaining connection."); } catch (Exception exception) { - _logger.Error(exception, "Unhandled exception while maintaining connection."); + _logger.Error(exception, "Unhandled exception while maintaining connection."); } } @@ -237,11 +240,11 @@ namespace MQTTnet.ManagedClient } catch (Exception exception) { - _logger.Error(exception, "Unhandled exception while publishing queued application messages."); + _logger.Error(exception, "Unhandled exception while publishing queued application messages."); } finally { - _logger.Verbose("Stopped publishing messages."); + _logger.Verbose("Stopped publishing messages."); } } @@ -261,7 +264,7 @@ namespace MQTTnet.ManagedClient { transmitException = exception; - _logger.Warning(exception, "Publishing application message failed."); + _logger.Warning(exception, "Publishing application message failed."); if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) { @@ -271,7 +274,7 @@ namespace MQTTnet.ManagedClient catch (Exception exception) { transmitException = exception; - _logger.Error(exception, "Unhandled exception while publishing queued application message."); + _logger.Error(exception, "Unhandled exception while publishing queued application message."); } finally { @@ -281,7 +284,7 @@ namespace MQTTnet.ManagedClient private async Task SynchronizeSubscriptionsAsync() { - _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions"); + _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions"); List subscriptions; List unsubscriptions; @@ -320,7 +323,7 @@ namespace MQTTnet.ManagedClient } catch (Exception exception) { - _logger.Warning(exception, "Synchronizing subscriptions failed."); + _logger.Warning(exception, "Synchronizing subscriptions failed."); _subscriptionsNotPushed = true; SynchronizingSubscriptionsFailed?.Invoke(this, EventArgs.Empty); diff --git a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs index e3b3549..d9aa944 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs @@ -25,14 +25,14 @@ namespace MQTTnet public IManagedMqttClient CreateManagedMqttClient() { - return new ManagedMqttClient(CreateMqttClient(), new MqttNetLogger()); + return new ManagedMqttClient(CreateMqttClient(), new MqttNetLogger().CreateChildLogger(string.Empty)); } public IManagedMqttClient CreateManagedMqttClient(IMqttNetLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); - return new ManagedMqttClient(CreateMqttClient(), logger); + return new ManagedMqttClient(CreateMqttClient(), logger.CreateChildLogger(string.Empty)); } public IMqttServer CreateMqttServer() @@ -45,7 +45,7 @@ namespace MQTTnet { if (logger == null) throw new ArgumentNullException(nameof(logger)); - return CreateMqttServer(new List { new MqttTcpServerAdapter(logger) }, logger); + return CreateMqttServer(new List { new MqttTcpServerAdapter(logger.CreateChildLogger(string.Empty)) }, logger); } public IMqttServer CreateMqttServer(IEnumerable adapters, IMqttNetLogger logger) @@ -53,7 +53,7 @@ namespace MQTTnet if (adapters == null) throw new ArgumentNullException(nameof(adapters)); if (logger == null) throw new ArgumentNullException(nameof(logger)); - return new MqttServer(adapters, logger); + return new MqttServer(adapters, logger.CreateChildLogger(string.Empty)); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs index ac95807..4619600 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs @@ -14,15 +14,17 @@ namespace MQTTnet.Server private readonly string _clientId; private readonly Action _timeoutCallback; - private readonly IMqttNetLogger _logger; + private readonly IMqttNetChildLogger _logger; private Task _workerTask; - public MqttClientKeepAliveMonitor(string clientId, Action timeoutCallback, IMqttNetLogger logger) + public MqttClientKeepAliveMonitor(string clientId, Action timeoutCallback, IMqttNetChildLogger logger) { + if (logger == null) throw new ArgumentNullException(nameof(logger)); + _clientId = clientId; _timeoutCallback = timeoutCallback; - _logger = logger; + _logger = logger.CreateChildLogger(nameof(MqttClientKeepAliveMonitor)); } public TimeSpan LastPacketReceived => _lastPacketReceivedTracker.Elapsed; @@ -59,7 +61,7 @@ namespace MQTTnet.Server // Values described here: [MQTT-3.1.2-24]. if (_lastPacketReceivedTracker.Elapsed.TotalSeconds > keepAlivePeriod * 1.5D) { - _logger.Warning(this, null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientId); + _logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientId); _timeoutCallback?.Invoke(); @@ -74,11 +76,11 @@ namespace MQTTnet.Server } catch (Exception exception) { - _logger.Error(this, exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId); + _logger.Error(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId); } finally { - _logger.Verbose(this, "Client {0}: Stopped checking keep alive timeout.", _clientId); + _logger.Verbose("Client {0}: Stopped checking keep alive timeout.", _clientId); } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs index a6e2204..fc74d4c 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs @@ -17,15 +17,17 @@ namespace MQTTnet.Server private readonly AsyncAutoResetEvent _queueAutoResetEvent = new AsyncAutoResetEvent(); private readonly IMqttServerOptions _options; private readonly MqttClientSession _clientSession; - private readonly IMqttNetLogger _logger; + private readonly IMqttNetChildLogger _logger; private Task _workerTask; - public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession clientSession, IMqttNetLogger logger) + public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession clientSession, IMqttNetChildLogger logger) { + if (logger == null) throw new ArgumentNullException(nameof(logger)); _options = options ?? throw new ArgumentNullException(nameof(options)); _clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + _logger = logger.CreateChildLogger(nameof(MqttClientPendingMessagesQueue)); } public int Count => _queue.Count; @@ -70,7 +72,7 @@ namespace MQTTnet.Server _queue.Enqueue(packet); _queueAutoResetEvent.Set(); - _logger.Verbose(this, "Enqueued packet (ClientId: {0}).", _clientSession.ClientId); + _logger.Verbose("Enqueued packet (ClientId: {0}).", _clientSession.ClientId); } private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) @@ -87,7 +89,7 @@ namespace MQTTnet.Server } catch (Exception exception) { - _logger.Error(this, exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId); + _logger.Error(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId); } } @@ -113,25 +115,24 @@ namespace MQTTnet.Server await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { packet }, cancellationToken).ConfigureAwait(false); - _logger.Verbose("Enqueued packet sent (ClientId: {0}).", - _clientSession.ClientId); + _logger.Verbose("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId); } catch (Exception exception) { if (exception is MqttCommunicationTimedOutException) { - _logger.Warning(this, exception, "Sending publish packet failed: Timeout (ClientId: {0}).", _clientSession.ClientId); + _logger.Warning(exception, "Sending publish packet failed: Timeout (ClientId: {0}).", _clientSession.ClientId); } else if (exception is MqttCommunicationException) { - _logger.Warning(this, exception, "Sending publish packet failed: Communication exception (ClientId: {0}).", _clientSession.ClientId); + _logger.Warning(exception, "Sending publish packet failed: Communication exception (ClientId: {0}).", _clientSession.ClientId); } else if (exception is OperationCanceledException) { } else { - _logger.Error(this, exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId); + _logger.Error(exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId); } if (packet is MqttPublishPacket publishPacket) diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs index f4f8c99..1d0b77d 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs @@ -17,7 +17,7 @@ namespace MQTTnet.Server { private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); private readonly MqttRetainedMessagesManager _retainedMessagesManager; - private readonly IMqttNetLogger _logger; + private readonly IMqttNetChildLogger _logger; private readonly IMqttServerOptions _options; private readonly MqttClientSessionsManager _sessionsManager; @@ -30,15 +30,18 @@ namespace MQTTnet.Server IMqttServerOptions options, MqttClientSessionsManager sessionsManager, MqttRetainedMessagesManager retainedMessagesManager, - IMqttNetLogger logger) + IMqttNetChildLogger logger) { + if (logger == null) throw new ArgumentNullException(nameof(logger)); + _options = options ?? throw new ArgumentNullException(nameof(options)); _sessionsManager = sessionsManager; _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - + ClientId = clientId; + _logger = logger.CreateChildLogger(nameof(MqttClientSession)); + KeepAliveMonitor = new MqttClientKeepAliveMonitor(clientId, StopDueToKeepAliveTimeout, _logger); SubscriptionsManager = new MqttClientSubscriptionsManager(clientId, _options, sessionsManager.Server); PendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger); @@ -81,13 +84,11 @@ namespace MQTTnet.Server } catch (MqttCommunicationException exception) { - _logger.Warning(exception, - "Client '{0}': Communication exception while processing client packets.", ClientId); + _logger.Warning(exception, "Client '{0}': Communication exception while processing client packets.", ClientId); } catch (Exception exception) { - _logger.Error(exception, - "Client '{0}': Unhandled exception while processing client packets.", ClientId); + _logger.Error(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); } finally { @@ -126,7 +127,7 @@ namespace MQTTnet.Server } finally { - _logger.Info("Client '{0}': Session stopped.", ClientId); + _logger.Info("Client '{0}': Session stopped.", ClientId); } } @@ -185,7 +186,7 @@ namespace MQTTnet.Server private void StopDueToKeepAliveTimeout() { - _logger.Info("Client '{0}': Timeout while waiting for KeepAlive packet.", ClientId); + _logger.Info("Client '{0}': Timeout while waiting for KeepAlive packet.", ClientId); Stop(MqttClientDisconnectType.NotClean); } @@ -207,11 +208,11 @@ namespace MQTTnet.Server { if (exception is MqttCommunicationException) { - _logger.Warning(exception, "Client '{0}': Communication exception while processing client packets.", ClientId); + _logger.Warning(exception, "Client '{0}': Communication exception while processing client packets.", ClientId); } else { - _logger.Error(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); + _logger.Error(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); } Stop(MqttClientDisconnectType.NotClean); @@ -273,7 +274,7 @@ namespace MQTTnet.Server return Task.FromResult(0); } - _logger.Warning(this, null, "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); + _logger.Warning(null, "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); Stop(MqttClientDisconnectType.NotClean); return Task.FromResult(0); diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs index 98e5835..e50b8c2 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using MQTTnet.Adapter; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; +using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; @@ -14,15 +15,18 @@ namespace MQTTnet.Server public sealed class MqttClientSessionsManager : IDisposable { private readonly Dictionary _sessions = new Dictionary(); - private readonly SemaphoreSlim _sessionsLock = new SemaphoreSlim(1, 1); + private readonly AsyncLock _sessionsLock = new AsyncLock(); private readonly MqttRetainedMessagesManager _retainedMessagesManager; private readonly IMqttServerOptions _options; - private readonly IMqttNetLogger _logger; + private readonly IMqttNetChildLogger _logger; - public MqttClientSessionsManager(IMqttServerOptions options, MqttServer server, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger) + public MqttClientSessionsManager(IMqttServerOptions options, MqttServer server, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetChildLogger logger) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + if (logger == null) throw new ArgumentNullException(nameof(logger)); + + _logger = logger.CreateChildLogger(nameof(MqttClientSessionsManager)); + _options = options ?? throw new ArgumentNullException(nameof(options)); Server = server ?? throw new ArgumentNullException(nameof(server)); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); @@ -89,7 +93,7 @@ namespace MQTTnet.Server } catch (Exception exception) { - _logger.Error(exception, exception.Message); + _logger.Error(exception, exception.Message); } finally { @@ -100,7 +104,7 @@ namespace MQTTnet.Server } catch (Exception exception) { - _logger.Error(exception, exception.Message); + _logger.Error(exception, exception.Message); } Server.OnClientDisconnected(new ConnectedMqttClient @@ -115,7 +119,7 @@ namespace MQTTnet.Server public async Task StopAsync() { - await _sessionsLock.WaitAsync().ConfigureAwait(false); + await _sessionsLock.EnterAsync(CancellationToken.None).ConfigureAwait(false); try { foreach (var session in _sessions) @@ -127,13 +131,13 @@ namespace MQTTnet.Server } finally { - _sessionsLock.Release(); + _sessionsLock.Exit(); } } public async Task> GetConnectedClientsAsync() { - await _sessionsLock.WaitAsync().ConfigureAwait(false); + await _sessionsLock.EnterAsync(CancellationToken.None).ConfigureAwait(false); try { return _sessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient @@ -147,7 +151,7 @@ namespace MQTTnet.Server } finally { - _sessionsLock.Release(); + _sessionsLock.Exit(); } } @@ -161,7 +165,7 @@ namespace MQTTnet.Server if (clientId == null) throw new ArgumentNullException(nameof(clientId)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - await _sessionsLock.WaitAsync().ConfigureAwait(false); + await _sessionsLock.EnterAsync(CancellationToken.None).ConfigureAwait(false); try { if (!_sessions.TryGetValue(clientId, out var session)) @@ -173,7 +177,7 @@ namespace MQTTnet.Server } finally { - _sessionsLock.Release(); + _sessionsLock.Exit(); } } @@ -182,7 +186,7 @@ namespace MQTTnet.Server if (clientId == null) throw new ArgumentNullException(nameof(clientId)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - await _sessionsLock.WaitAsync().ConfigureAwait(false); + await _sessionsLock.EnterAsync(CancellationToken.None).ConfigureAwait(false); try { if (!_sessions.TryGetValue(clientId, out var session)) @@ -194,7 +198,7 @@ namespace MQTTnet.Server } finally { - _sessionsLock.Release(); + _sessionsLock.Exit(); } } @@ -222,7 +226,7 @@ namespace MQTTnet.Server private async Task PrepareClientSessionAsync(MqttConnectPacket connectPacket) { - await _sessionsLock.WaitAsync().ConfigureAwait(false); + await _sessionsLock.EnterAsync(CancellationToken.None).ConfigureAwait(false); try { var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var clientSession); @@ -236,11 +240,11 @@ namespace MQTTnet.Server clientSession.Dispose(); clientSession = null; - _logger.Verbose("Stopped existing session of client '{0}'.", connectPacket.ClientId); + _logger.Verbose("Stopped existing session of client '{0}'.", connectPacket.ClientId); } else { - _logger.Verbose("Reusing existing session of client '{0}'.", connectPacket.ClientId); + _logger.Verbose("Reusing existing session of client '{0}'.", connectPacket.ClientId); } } @@ -252,14 +256,14 @@ namespace MQTTnet.Server clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _retainedMessagesManager, _logger); _sessions[connectPacket.ClientId] = clientSession; - _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); + _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); } return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; } finally { - _sessionsLock.Release(); + _sessionsLock.Exit(); } } @@ -287,10 +291,10 @@ namespace MQTTnet.Server } catch (Exception exception) { - _logger.Error(exception, "Error while processing application message"); + _logger.Error(exception, "Error while processing application message"); } - await _sessionsLock.WaitAsync().ConfigureAwait(false); + await _sessionsLock.EnterAsync(CancellationToken.None).ConfigureAwait(false); try { foreach (var clientSession in _sessions.Values) @@ -300,7 +304,7 @@ namespace MQTTnet.Server } finally { - _sessionsLock.Release(); + _sessionsLock.Exit(); } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs index f998557..be0a5b7 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs @@ -4,19 +4,21 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Diagnostics; +using MQTTnet.Internal; namespace MQTTnet.Server { public sealed class MqttRetainedMessagesManager : IDisposable { - private readonly Dictionary _retainedMessages = new Dictionary(); - private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); - private readonly IMqttNetLogger _logger; + private readonly Dictionary _messages = new Dictionary(); + private readonly AsyncLock _messagesLock = new AsyncLock(); + private readonly IMqttNetChildLogger _logger; private readonly IMqttServerOptions _options; - public MqttRetainedMessagesManager(IMqttServerOptions options, IMqttNetLogger logger) + public MqttRetainedMessagesManager(IMqttServerOptions options, IMqttNetChildLogger logger) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + if (logger == null) throw new ArgumentNullException(nameof(logger)); + _logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager)); _options = options ?? throw new ArgumentNullException(nameof(options)); } @@ -27,24 +29,24 @@ namespace MQTTnet.Server return; } - await _semaphore.WaitAsync().ConfigureAwait(false); + await _messagesLock.EnterAsync(CancellationToken.None).ConfigureAwait(false); try { - var retainedMessages = await _options.Storage.LoadRetainedMessagesAsync(); + var retainedMessages = await _options.Storage.LoadRetainedMessagesAsync().ConfigureAwait(false); - _retainedMessages.Clear(); + _messages.Clear(); foreach (var retainedMessage in retainedMessages) { - _retainedMessages[retainedMessage.Topic] = retainedMessage; + _messages[retainedMessage.Topic] = retainedMessage; } } catch (Exception exception) { - _logger.Error(exception, "Unhandled exception while loading retained messages."); + _logger.Error(exception, "Unhandled exception while loading retained messages."); } finally { - _semaphore.Release(); + _messagesLock.Exit(); } } @@ -52,18 +54,18 @@ namespace MQTTnet.Server { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); - await _semaphore.WaitAsync().ConfigureAwait(false); + await _messagesLock.EnterAsync(CancellationToken.None).ConfigureAwait(false); try { await HandleMessageInternalAsync(clientId, applicationMessage); } catch (Exception exception) { - _logger.Error(exception, "Unhandled exception while handling retained messages."); + _logger.Error(exception, "Unhandled exception while handling retained messages."); } finally { - _semaphore.Release(); + _messagesLock.Exit(); } } @@ -71,10 +73,10 @@ namespace MQTTnet.Server { var retainedMessages = new List(); - await _semaphore.WaitAsync().ConfigureAwait(false); + await _messagesLock.EnterAsync(CancellationToken.None).ConfigureAwait(false); try { - foreach (var retainedMessage in _retainedMessages.Values) + foreach (var retainedMessage in _messages.Values) { foreach (var topicFilter in topicFilters) { @@ -90,7 +92,7 @@ namespace MQTTnet.Server } finally { - _semaphore.Release(); + _messagesLock.Exit(); } return retainedMessages; @@ -98,7 +100,7 @@ namespace MQTTnet.Server public void Dispose() { - _semaphore?.Dispose(); + _messagesLock?.Dispose(); } private async Task HandleMessageInternalAsync(string clientId, MqttApplicationMessage applicationMessage) @@ -107,37 +109,37 @@ namespace MQTTnet.Server if (applicationMessage.Payload?.Any() == false) { - saveIsRequired = _retainedMessages.Remove(applicationMessage.Topic); - _logger.Info("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic); + saveIsRequired = _messages.Remove(applicationMessage.Topic); + _logger.Info("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic); } else { - if (!_retainedMessages.ContainsKey(applicationMessage.Topic)) + if (!_messages.ContainsKey(applicationMessage.Topic)) { - _retainedMessages[applicationMessage.Topic] = applicationMessage; + _messages[applicationMessage.Topic] = applicationMessage; saveIsRequired = true; } else { - var existingMessage = _retainedMessages[applicationMessage.Topic]; + var existingMessage = _messages[applicationMessage.Topic]; if (existingMessage.QualityOfServiceLevel != applicationMessage.QualityOfServiceLevel || !existingMessage.Payload.SequenceEqual(applicationMessage.Payload ?? new byte[0])) { - _retainedMessages[applicationMessage.Topic] = applicationMessage; + _messages[applicationMessage.Topic] = applicationMessage; saveIsRequired = true; } } - _logger.Info("Client '{0}' set retained message for topic '{1}'.", clientId, applicationMessage.Topic); + _logger.Info("Client '{0}' set retained message for topic '{1}'.", clientId, applicationMessage.Topic); } if (!saveIsRequired) { - _logger.Verbose("Skipped saving retained messages because no changes were detected."); + _logger.Verbose("Skipped saving retained messages because no changes were detected."); } if (saveIsRequired && _options.Storage != null) { - await _options.Storage.SaveRetainedMessagesAsync(_retainedMessages.Values.ToList()); + await _options.Storage.SaveRetainedMessagesAsync(_messages.Values.ToList()).ConfigureAwait(false); } } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs index 0324134..a10d19c 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs @@ -11,16 +11,17 @@ namespace MQTTnet.Server public class MqttServer : IMqttServer { private readonly ICollection _adapters; - private readonly IMqttNetLogger _logger; + private readonly IMqttNetChildLogger _logger; private MqttClientSessionsManager _clientSessionsManager; private MqttRetainedMessagesManager _retainedMessagesManager; private CancellationTokenSource _cancellationTokenSource; - public MqttServer(IEnumerable adapters, IMqttNetLogger logger) + public MqttServer(IEnumerable adapters, IMqttNetChildLogger logger) { if (adapters == null) throw new ArgumentNullException(nameof(adapters)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + if (logger == null) throw new ArgumentNullException(nameof(logger)); + _logger = logger.CreateChildLogger(nameof(MqttServer)); _adapters = adapters.ToList(); } @@ -90,7 +91,7 @@ namespace MQTTnet.Server await adapter.StartAsync(Options); } - _logger.Info("Started."); + _logger.Info("Started."); Started?.Invoke(this, new MqttServerStartedEventArgs()); } @@ -114,7 +115,7 @@ namespace MQTTnet.Server await _clientSessionsManager.StopAsync(); - _logger.Info("Stopped."); + _logger.Info("Stopped."); } finally { @@ -129,13 +130,13 @@ namespace MQTTnet.Server internal void OnClientConnected(ConnectedMqttClient client) { - _logger.Info("Client '{0}': Connected.", client.ClientId); + _logger.Info("Client '{0}': Connected.", client.ClientId); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(client)); } internal void OnClientDisconnected(ConnectedMqttClient client, bool wasCleanDisconnect) { - _logger.Info("Client '{0}': Disconnected (clean={1}).", client.ClientId, wasCleanDisconnect); + _logger.Info("Client '{0}': Disconnected (clean={1}).", client.ClientId, wasCleanDisconnect); ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client, wasCleanDisconnect)); } diff --git a/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs b/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs new file mode 100644 index 0000000..2dce560 --- /dev/null +++ b/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs @@ -0,0 +1,54 @@ +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Attributes.Exporters; +using BenchmarkDotNet.Attributes.Jobs; +using MQTTnet.Diagnostics; + +namespace MQTTnet.Benchmarks +{ + [ClrJob] + [RPlotExporter] + [MemoryDiagnoser] + public class LoggerBenchmark + { + private IMqttNetLogger _logger; + private IMqttNetChildLogger _childLogger; + private bool _useHandler; + + [GlobalSetup] + public void Setup() + { + _logger = new MqttNetLogger("1"); + _childLogger = _logger.CreateChildLogger("child"); + + MqttNetGlobalLogger.LogMessagePublished += OnLogMessagePublished; + } + + private void OnLogMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs eventArgs) + { + if (_useHandler) + { + eventArgs.TraceMessage.ToString(); + } + } + + [Benchmark] + public void Log_10000_Messages_NoHandler() + { + _useHandler = false; + for (var i = 0; i < 10000; i++) + { + _childLogger.Verbose("test log message {0}", "parameter"); + } + } + + [Benchmark] + public void Log_10000_Messages_WithHandler() + { + _useHandler = true; + for (var i = 0; i < 10000; i++) + { + _childLogger.Verbose("test log message {0}", "parameter"); + } + } + } +} diff --git a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj index 3b52bba..e068fa9 100644 --- a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj +++ b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj @@ -147,6 +147,7 @@ + diff --git a/Tests/MQTTnet.Benchmarks/Program.cs b/Tests/MQTTnet.Benchmarks/Program.cs index ad2c363..363cf8f 100644 --- a/Tests/MQTTnet.Benchmarks/Program.cs +++ b/Tests/MQTTnet.Benchmarks/Program.cs @@ -1,5 +1,4 @@ using System; -using System.Threading; using BenchmarkDotNet.Running; using MQTTnet.Diagnostics; @@ -12,6 +11,7 @@ namespace MQTTnet.Benchmarks Console.WriteLine($"MQTTnet - BenchmarkApp.{TargetFrameworkInfoProvider.TargetFramework}"); Console.WriteLine("1 = MessageProcessingBenchmark"); Console.WriteLine("2 = SerializerBenchmark"); + Console.WriteLine("3 = LoggerBenchmark"); var pressedKey = Console.ReadKey(true); switch (pressedKey.KeyChar) @@ -22,7 +22,8 @@ namespace MQTTnet.Benchmarks case '2': BenchmarkRunner.Run(); break; - default: + case '3': + BenchmarkRunner.Run(); break; } diff --git a/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs b/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs index bcfff1c..fdc03b6 100644 --- a/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs @@ -1,9 +1,7 @@ using BenchmarkDotNet.Attributes; -using MQTTnet.Client; using MQTTnet.Packets; using MQTTnet.Serializer; using MQTTnet.Internal; -using MQTTnet.Server; using BenchmarkDotNet.Attributes.Jobs; using BenchmarkDotNet.Attributes.Exporters; using System; diff --git a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs index f3c4320..9563a41 100644 --- a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs @@ -17,7 +17,7 @@ namespace MQTTnet.Core.Tests var monitor = new MqttClientKeepAliveMonitor(string.Empty, delegate { timeoutCalledCount++; - }, new MqttNetLogger()); + }, new MqttNetLogger().CreateChildLogger("")); Assert.AreEqual(0, timeoutCalledCount); @@ -38,7 +38,7 @@ namespace MQTTnet.Core.Tests var monitor = new MqttClientKeepAliveMonitor(string.Empty, delegate { timeoutCalledCount++; - }, new MqttNetLogger()); + }, new MqttNetLogger().CreateChildLogger("")); Assert.AreEqual(0, timeoutCalledCount); diff --git a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs index 6f16ab5..cf922a1 100644 --- a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeSingleSuccess() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger())); + var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger(""))); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build()); @@ -34,7 +34,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeDifferentQoSSuccess() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger())); + var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger(""))); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce)); @@ -55,7 +55,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeTwoTimesSuccess() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger())); + var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger(""))); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)); @@ -77,7 +77,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeSingleNoSuccess() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger())); + var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger(""))); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build()); @@ -96,7 +96,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle() { - var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger())); + var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger(""))); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build()); diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs index 4fe3537..6873c69 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Core.Tests _adapter = adapter; } - public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) { return _adapter; }