Browse Source

Improve logger performance.

release/3.x.x
Christian Kratky 4 years ago
parent
commit
a58ab607ad
51 changed files with 290 additions and 300 deletions
  1. +2
    -0
      Build/MQTTnet.nuspec
  2. +1
    -2
      Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs
  3. +4
    -4
      Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
  4. +1
    -1
      Source/MQTTnet.AspnetCore/MqttHostedServer.cs
  5. +4
    -6
      Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs
  6. +2
    -2
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  7. +1
    -1
      Source/MQTTnet.Extensions.WebSocket4Net/MqttFactoryExtensions.cs
  8. +10
    -4
      Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs
  9. +3
    -3
      Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs
  10. +1
    -2
      Source/MQTTnet/Adapter/IMqttClientAdapterFactory.cs
  11. +3
    -4
      Source/MQTTnet/Adapter/MqttChannelAdapter.cs
  12. +19
    -25
      Source/MQTTnet/Client/MqttClient.cs
  13. +0
    -2
      Source/MQTTnet/Client/Options/IMqttClientOptions.cs
  14. +0
    -1
      Source/MQTTnet/Client/Options/MqttClientOptions.cs
  15. +4
    -4
      Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs
  16. +2
    -2
      Source/MQTTnet/Diagnostics/IMqttNetLogger.cs
  17. +11
    -0
      Source/MQTTnet/Diagnostics/IMqttNetScopedLogger.cs
  18. +4
    -29
      Source/MQTTnet/Diagnostics/MqttNetLogger.cs
  19. +0
    -35
      Source/MQTTnet/Diagnostics/MqttNetLoggerExtensions.cs
  20. +26
    -0
      Source/MQTTnet/Diagnostics/MqttNetScopedLogger.cs
  21. +27
    -0
      Source/MQTTnet/Diagnostics/MqttNetScopedLoggerExtensions.cs
  22. +5
    -8
      Source/MQTTnet/Formatter/MqttPacketFormatterAdapter.cs
  23. +9
    -9
      Source/MQTTnet/Formatter/MqttPacketWriter.cs
  24. +29
    -35
      Source/MQTTnet/Formatter/V3/MqttV310PacketFormatter.cs
  25. +0
    -5
      Source/MQTTnet/Formatter/V3/MqttV311PacketFormatter.cs
  26. +10
    -3
      Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs
  27. +4
    -4
      Source/MQTTnet/Implementations/MqttTcpChannel.cs
  28. +5
    -5
      Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs
  29. +6
    -14
      Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
  30. +5
    -3
      Source/MQTTnet/Implementations/MqttTcpServerListener.cs
  31. +1
    -1
      Source/MQTTnet/Internal/TaskExtensions.cs
  32. +3
    -3
      Source/MQTTnet/LowLevelClient/LowLevelMqttClient.cs
  33. +2
    -1
      Source/MQTTnet/MqttFactory.cs
  34. +9
    -6
      Source/MQTTnet/Protocol/MqttTopicValidator.cs
  35. +3
    -3
      Source/MQTTnet/Server/MqttClientConnection.cs
  36. +3
    -3
      Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs
  37. +9
    -3
      Source/MQTTnet/Server/MqttClientSession.cs
  38. +6
    -4
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  39. +8
    -6
      Source/MQTTnet/Server/MqttRetainedMessagesManager.cs
  40. +7
    -5
      Source/MQTTnet/Server/MqttServer.cs
  41. +5
    -3
      Source/MQTTnet/Server/MqttServerEventDispatcher.cs
  42. +1
    -1
      Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs
  43. +4
    -4
      Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs
  44. +1
    -1
      Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs
  45. +7
    -22
      Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs
  46. +2
    -3
      Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapterFactory.cs
  47. +1
    -1
      Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs
  48. +8
    -7
      Tests/MQTTnet.Core.Tests/MqttNetLogger_Tests.cs
  49. +3
    -3
      Tests/MQTTnet.Core.Tests/MqttPacketSerializer_Tests.cs
  50. +6
    -5
      Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs
  51. +3
    -2
      Tests/MQTTnet.TestApp.NetCore/Program.cs

+ 2
- 0
Build/MQTTnet.nuspec View File

@@ -14,12 +14,14 @@
<releaseNotes> <releaseNotes>
* [Core] Renamed some topic filter relevant classes (BREAKING CHANGE!). * [Core] Renamed some topic filter relevant classes (BREAKING CHANGE!).
* [Core] Improved task management for UWP connections (thanks to @xgstation). * [Core] Improved task management for UWP connections (thanks to @xgstation).
* [Core] Fixed broken logger which decreases overall performance.
* [Client] Added method to trigger PING/PONG manually (connection check etc.). * [Client] Added method to trigger PING/PONG manually (connection check etc.).
* [Client] Added support for certificate validation callback when using Web Sockets (requires netstandard2.1+). * [Client] Added support for certificate validation callback when using Web Sockets (requires netstandard2.1+).
* [Client] Fixed a memory leak when web socket based connections trying to reconnect with an offline server. * [Client] Fixed a memory leak when web socket based connections trying to reconnect with an offline server.
* [Client] Fixed a memory leak when TCP based connections trying to reconnect with an offline server. * [Client] Fixed a memory leak when TCP based connections trying to reconnect with an offline server.
* [Client] Fixed an issue when connecting to an invalid host (format). * [Client] Fixed an issue when connecting to an invalid host (format).
* [Client] Added support for user properties in CONNECT packet. * [Client] Added support for user properties in CONNECT packet.
* [Client] Removed _KeepAliveSendInterval_ and improved keep alive handling (BREAKING CHANGE!).
* [ManagedClient] Added method to trigger PING/PONG manually (connection check etc.). * [ManagedClient] Added method to trigger PING/PONG manually (connection check etc.).
* [MQTTnet.AspNetCore] improved compatibility with AspNetCore 3.1. * [MQTTnet.AspNetCore] improved compatibility with AspNetCore 3.1.
* [MQTTnet.AspNetCore] Fixed several packaging issues with the Nuget package. * [MQTTnet.AspNetCore] Fixed several packaging issues with the Nuget package.


+ 1
- 2
Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs View File

@@ -1,7 +1,6 @@
using MQTTnet.Adapter; using MQTTnet.Adapter;
using MQTTnet.AspNetCore.Client.Tcp; using MQTTnet.AspNetCore.Client.Tcp;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter; using MQTTnet.Formatter;
using System; using System;
using System.Net; using System.Net;
@@ -10,7 +9,7 @@ namespace MQTTnet.AspNetCore.Client
{ {
public class MqttClientConnectionContextFactory : IMqttClientAdapterFactory public class MqttClientConnectionContextFactory : IMqttClientAdapterFactory
{ {
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger)
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));




+ 4
- 4
Source/MQTTnet.AspnetCore/MqttConnectionContext.cs View File

@@ -160,19 +160,19 @@ namespace MQTTnet.AspNetCore
public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken) public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
{ {
var formatter = PacketFormatterAdapter; var formatter = PacketFormatterAdapter;

var buffer = formatter.Encode(packet);
var msg = buffer.AsMemory();
var output = _output;


await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try try
{ {
var buffer = formatter.Encode(packet);
var msg = buffer.AsMemory();
var output = _output;
var result = await output.WriteAsync(msg, cancellationToken).ConfigureAwait(false); var result = await output.WriteAsync(msg, cancellationToken).ConfigureAwait(false);
if (result.IsCompleted) if (result.IsCompleted)
{ {
BytesSent += msg.Length; BytesSent += msg.Length;
} }

PacketFormatterAdapter.FreeBuffer(); PacketFormatterAdapter.FreeBuffer();
} }
finally finally


+ 1
- 1
Source/MQTTnet.AspnetCore/MqttHostedServer.cs View File

@@ -14,7 +14,7 @@ namespace MQTTnet.AspNetCore
private readonly IMqttServerOptions _options; private readonly IMqttServerOptions _options;


public MqttHostedServer(IMqttServerOptions options, IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger) public MqttHostedServer(IMqttServerOptions options, IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger)
: base(adapters, logger.CreateChildLogger(nameof(MqttHostedServer)))
: base(adapters, logger)
{ {
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
} }


+ 4
- 6
Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs View File

@@ -10,15 +10,13 @@ using System.Threading.Tasks;


namespace MQTTnet.AspNetCore namespace MQTTnet.AspNetCore
{ {
public class MqttWebSocketServerAdapter : IMqttServerAdapter
public sealed class MqttWebSocketServerAdapter : IMqttServerAdapter
{ {
private readonly IMqttNetLogger _logger;
readonly IMqttNetLogger _rootLogger;


public MqttWebSocketServerAdapter(IMqttNetLogger logger) public MqttWebSocketServerAdapter(IMqttNetLogger logger)
{ {
if (logger == null) throw new ArgumentNullException(nameof(logger));

_logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter));
_rootLogger = logger ?? throw new ArgumentNullException(nameof(logger));
} }


public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; } public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }
@@ -51,7 +49,7 @@ namespace MQTTnet.AspNetCore
var formatter = new MqttPacketFormatterAdapter(writer); var formatter = new MqttPacketFormatterAdapter(writer);
var channel = new MqttWebSocketChannel(webSocket, endpoint, isSecureConnection, clientCertificate); var channel = new MqttWebSocketChannel(webSocket, endpoint, isSecureConnection, clientCertificate);


using (var channelAdapter = new MqttChannelAdapter(channel, formatter, _logger.CreateChildLogger(nameof(MqttWebSocketServerAdapter))))
using (var channelAdapter = new MqttChannelAdapter(channel, formatter, _rootLogger))
{ {
await clientHandler(channelAdapter).ConfigureAwait(false); await clientHandler(channelAdapter).ConfigureAwait(false);
} }


+ 2
- 2
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -33,7 +33,7 @@ namespace MQTTnet.Extensions.ManagedClient
private readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0); private readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0);


private readonly IMqttClient _mqttClient; private readonly IMqttClient _mqttClient;
private readonly IMqttNetLogger _logger;
private readonly IMqttNetScopedLogger _logger;


private readonly AsyncLock _messageQueueLock = new AsyncLock(); private readonly AsyncLock _messageQueueLock = new AsyncLock();


@@ -48,7 +48,7 @@ namespace MQTTnet.Extensions.ManagedClient
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));


if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(ManagedMqttClient));
_logger = logger.CreateScopedLogger(nameof(ManagedMqttClient));
} }


public bool IsConnected => _mqttClient.IsConnected; public bool IsConnected => _mqttClient.IsConnected;


+ 1
- 1
Source/MQTTnet.Extensions.WebSocket4Net/MqttFactoryExtensions.cs View File

@@ -8,7 +8,7 @@ namespace MQTTnet.Extensions.WebSocket4Net
{ {
if (mqttFactory == null) throw new ArgumentNullException(nameof(mqttFactory)); if (mqttFactory == null) throw new ArgumentNullException(nameof(mqttFactory));


return mqttFactory.UseClientAdapterFactory(new WebSocket4NetMqttClientAdapterFactory());
return mqttFactory.UseClientAdapterFactory(new WebSocket4NetMqttClientAdapterFactory(mqttFactory.DefaultLogger));
} }
} }
} }

+ 10
- 4
Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs View File

@@ -9,21 +9,27 @@ namespace MQTTnet.Extensions.WebSocket4Net
{ {
public class WebSocket4NetMqttClientAdapterFactory : IMqttClientAdapterFactory public class WebSocket4NetMqttClientAdapterFactory : IMqttClientAdapterFactory
{ {
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger)
readonly IMqttNetLogger _logger;

public WebSocket4NetMqttClientAdapterFactory(IMqttNetLogger logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
if (logger == null) throw new ArgumentNullException(nameof(logger));


switch (options.ChannelOptions) switch (options.ChannelOptions)
{ {
case MqttClientTcpOptions _: case MqttClientTcpOptions _:
{ {
return new MqttChannelAdapter(new MqttTcpChannel(options), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger);
return new MqttChannelAdapter(new MqttTcpChannel(options), new MqttPacketFormatterAdapter(options.ProtocolVersion), _logger);
} }


case MqttClientWebSocketOptions webSocketOptions: case MqttClientWebSocketOptions webSocketOptions:
{ {
return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options, webSocketOptions), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger);
return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options, webSocketOptions), new MqttPacketFormatterAdapter(options.ProtocolVersion), _logger);
} }


default: default:


+ 3
- 3
Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs View File

@@ -5,7 +5,7 @@ using System.Threading;


namespace MQTTnet.Server.Logging namespace MQTTnet.Server.Logging
{ {
public class MqttNetLoggerWrapper : IMqttNetLogger
public sealed class MqttNetLoggerWrapper : IMqttNetLogger
{ {
readonly ILogger<MqttServer> _logger; readonly ILogger<MqttServer> _logger;


@@ -16,9 +16,9 @@ namespace MQTTnet.Server.Logging


public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished; public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;


public IMqttNetLogger CreateChildLogger(string source)
public IMqttNetScopedLogger CreateScopedLogger(string source)
{ {
return new MqttNetLogger(source);
return new MqttNetScopedLogger(this, source);
} }


public void Publish(MqttNetLogLevel level, string source, string message, object[] parameters, Exception exception) public void Publish(MqttNetLogLevel level, string source, string message, object[] parameters, Exception exception)


+ 1
- 2
Source/MQTTnet/Adapter/IMqttClientAdapterFactory.cs View File

@@ -1,10 +1,9 @@
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;


namespace MQTTnet.Adapter namespace MQTTnet.Adapter
{ {
public interface IMqttClientAdapterFactory public interface IMqttClientAdapterFactory
{ {
IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger);
IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options);
} }
} }

+ 3
- 4
Source/MQTTnet/Adapter/MqttChannelAdapter.cs View File

@@ -19,7 +19,7 @@ namespace MQTTnet.Adapter
const uint ErrorOperationAborted = 0x800703E3; const uint ErrorOperationAborted = 0x800703E3;
const int ReadBufferSize = 4096; // TODO: Move buffer size to config const int ReadBufferSize = 4096; // TODO: Move buffer size to config


readonly IMqttNetLogger _logger;
readonly IMqttNetScopedLogger _logger;
readonly IMqttChannel _channel; readonly IMqttChannel _channel;
readonly MqttPacketReader _packetReader; readonly MqttPacketReader _packetReader;


@@ -39,7 +39,7 @@ namespace MQTTnet.Adapter


_packetReader = new MqttPacketReader(_channel); _packetReader = new MqttPacketReader(_channel);


_logger = logger.CreateChildLogger(nameof(MqttChannelAdapter));
_logger = logger.CreateScopedLogger(nameof(MqttChannelAdapter));
} }


public string Endpoint => _channel.Endpoint; public string Endpoint => _channel.Endpoint;
@@ -128,8 +128,6 @@ namespace MQTTnet.Adapter


Interlocked.Add(ref _bytesReceived, packetData.Count); Interlocked.Add(ref _bytesReceived, packetData.Count);


PacketFormatterAdapter.FreeBuffer();

_logger.Verbose("TX ({0} bytes) >>> {1}", packetData.Count, packet); _logger.Verbose("TX ({0} bytes) >>> {1}", packetData.Count, packet);
} }
catch (Exception exception) catch (Exception exception)
@@ -143,6 +141,7 @@ namespace MQTTnet.Adapter
} }
finally finally
{ {
PacketFormatterAdapter.FreeBuffer();
_writerSemaphore?.Release(); _writerSemaphore?.Release();
} }
} }


+ 19
- 25
Source/MQTTnet/Client/MqttClient.cs View File

@@ -29,7 +29,7 @@ namespace MQTTnet.Client
readonly object _disconnectLock = new object(); readonly object _disconnectLock = new object();


readonly IMqttClientAdapterFactory _adapterFactory; readonly IMqttClientAdapterFactory _adapterFactory;
readonly IMqttNetLogger _logger;
readonly IMqttNetScopedLogger _logger;


CancellationTokenSource _backgroundCancellationTokenSource; CancellationTokenSource _backgroundCancellationTokenSource;
Task _packetReceiverTask; Task _packetReceiverTask;
@@ -48,7 +48,7 @@ namespace MQTTnet.Client
if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));


_adapterFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory)); _adapterFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory));
_logger = logger.CreateChildLogger(nameof(MqttClient));
_logger = logger.CreateScopedLogger(nameof(MqttClient));
} }


public IMqttClientConnectedHandler ConnectedHandler { get; set; } public IMqttClientConnectedHandler ConnectedHandler { get; set; }
@@ -89,7 +89,7 @@ namespace MQTTnet.Client
var backgroundCancellationToken = _backgroundCancellationTokenSource.Token; var backgroundCancellationToken = _backgroundCancellationTokenSource.Token;


_isDisconnectPending = 0; _isDisconnectPending = 0;
var adapter = _adapterFactory.CreateClientAdapter(options, _logger);
var adapter = _adapterFactory.CreateClientAdapter(options);
_adapter = adapter; _adapter = adapter;


using (var combined = CancellationTokenSource.CreateLinkedTokenSource(backgroundCancellationToken, cancellationToken)) using (var combined = CancellationTokenSource.CreateLinkedTokenSource(backgroundCancellationToken, cancellationToken))
@@ -295,7 +295,10 @@ namespace MQTTnet.Client


void ThrowIfNotConnected() void ThrowIfNotConnected()
{ {
if (!IsConnected || Interlocked.Read(ref _isDisconnectPending) == 1) throw new MqttCommunicationException("The client is not connected.");
if (!IsConnected || Interlocked.Read(ref _isDisconnectPending) == 1)
{
throw new MqttCommunicationException("The client is not connected.");
}
} }


void ThrowIfConnected(string message) void ThrowIfConnected(string message)
@@ -376,12 +379,9 @@ namespace MQTTnet.Client
} }
} }


private Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{ {
if (cancellationToken.IsCancellationRequested)
{
return Task.FromResult(0);
}
cancellationToken.ThrowIfCancellationRequested();


_sendTracker.Restart(); _sendTracker.Restart();


@@ -391,9 +391,7 @@ namespace MQTTnet.Client
async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttBasePacket async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttBasePacket
{ {
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();

_sendTracker.Restart();

ushort identifier = 0; ushort identifier = 0;
if (requestPacket is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue) if (requestPacket is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue)
{ {
@@ -404,6 +402,8 @@ namespace MQTTnet.Client
{ {
try try
{ {
_sendTracker.Restart();

await _adapter.SendPacketAsync(requestPacket, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); await _adapter.SendPacketAsync(requestPacket, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
} }
catch (Exception e) catch (Exception e)
@@ -414,9 +414,7 @@ namespace MQTTnet.Client


try try
{ {
var response = await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false);
_receiveTracker.Restart();
return response;
return await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false);
} }
catch (Exception exception) catch (Exception exception)
{ {
@@ -436,23 +434,19 @@ namespace MQTTnet.Client
{ {
_logger.Verbose("Start sending keep alive packets."); _logger.Verbose("Start sending keep alive packets.");


var keepAlivePeriod = Options.KeepAlivePeriod;

while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
// Values described here: [MQTT-3.1.2-24]. // Values described here: [MQTT-3.1.2-24].
var keepAliveSendInterval = TimeSpan.FromSeconds(Options.KeepAlivePeriod.TotalSeconds * 0.75);
if (Options.KeepAliveSendInterval.HasValue)
{
keepAliveSendInterval = Options.KeepAliveSendInterval.Value;
}

var waitTimeSend = keepAliveSendInterval - _sendTracker.Elapsed;
var waitTimeReceive = keepAliveSendInterval - _receiveTracker.Elapsed;
if (waitTimeSend <= TimeSpan.Zero || waitTimeReceive <= TimeSpan.Zero)
var waitTime = keepAlivePeriod - _sendTracker.Elapsed;
if (waitTime <= TimeSpan.Zero)
{ {
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false); await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false);
} }


await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false);
await Task.Delay(keepAlivePeriod, cancellationToken).ConfigureAwait(false);
} }
} }
catch (Exception exception) catch (Exception exception)


+ 0
- 2
Source/MQTTnet/Client/Options/IMqttClientOptions.cs View File

@@ -17,8 +17,6 @@ namespace MQTTnet.Client.Options


TimeSpan CommunicationTimeout { get; } TimeSpan CommunicationTimeout { get; }
TimeSpan KeepAlivePeriod { get; } TimeSpan KeepAlivePeriod { get; }
TimeSpan? KeepAliveSendInterval { get; }

MqttApplicationMessage WillMessage { get; } MqttApplicationMessage WillMessage { get; }
uint? WillDelayInterval { get; } uint? WillDelayInterval { get; }




+ 0
- 1
Source/MQTTnet/Client/Options/MqttClientOptions.cs View File

@@ -17,7 +17,6 @@ namespace MQTTnet.Client.Options
public IMqttClientChannelOptions ChannelOptions { get; set; } public IMqttClientChannelOptions ChannelOptions { get; set; }
public TimeSpan CommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan CommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);
public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(15); public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(15);
public TimeSpan? KeepAliveSendInterval { get; set; }


public MqttApplicationMessage WillMessage { get; set; } public MqttApplicationMessage WillMessage { get; set; }
public uint? WillDelayInterval { get; set; } public uint? WillDelayInterval { get; set; }


+ 4
- 4
Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs View File

@@ -45,15 +45,15 @@ namespace MQTTnet.Client.Options
return WithKeepAlivePeriod(TimeSpan.Zero); return WithKeepAlivePeriod(TimeSpan.Zero);
} }


public MqttClientOptionsBuilder WithKeepAlivePeriod(TimeSpan value)
public MqttClientOptionsBuilder WithoutKeepAlivePeriod()
{ {
_options.KeepAlivePeriod = value;
_options.KeepAlivePeriod = TimeSpan.Zero;
return this; return this;
} }


public MqttClientOptionsBuilder WithKeepAliveSendInterval(TimeSpan value)
public MqttClientOptionsBuilder WithKeepAlivePeriod(TimeSpan value)
{ {
_options.KeepAliveSendInterval = value;
_options.KeepAlivePeriod = value;
return this; return this;
} }




+ 2
- 2
Source/MQTTnet/Diagnostics/IMqttNetLogger.cs View File

@@ -6,8 +6,8 @@ namespace MQTTnet.Diagnostics
{ {
event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished; event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;


IMqttNetLogger CreateChildLogger(string source);
IMqttNetScopedLogger CreateScopedLogger(string source);


void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception);
void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception);
} }
} }

+ 11
- 0
Source/MQTTnet/Diagnostics/IMqttNetScopedLogger.cs View File

@@ -0,0 +1,11 @@
using System;

namespace MQTTnet.Diagnostics
{
public interface IMqttNetScopedLogger
{
IMqttNetScopedLogger CreateScopedLogger(string source);

void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception);
}
}

+ 4
- 29
Source/MQTTnet/Diagnostics/MqttNetLogger.cs View File

@@ -7,14 +7,6 @@ namespace MQTTnet.Diagnostics
readonly string _logId; readonly string _logId;
readonly string _source; readonly string _source;


readonly MqttNetLogger _parentLogger;

public MqttNetLogger(string source, string logId)
{
_source = source;
_logId = logId;
}

public MqttNetLogger() public MqttNetLogger()
{ {
} }
@@ -24,31 +16,23 @@ namespace MQTTnet.Diagnostics
_logId = logId; _logId = logId;
} }


MqttNetLogger(MqttNetLogger parentLogger, string logId, string source)
{
_parentLogger = parentLogger ?? throw new ArgumentNullException(nameof(parentLogger));
_source = source ?? throw new ArgumentNullException(nameof(source));

_logId = logId;
}

public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished; public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;


// TODO: Consider creating a LoggerFactory which will allow creating loggers. The logger factory will // TODO: Consider creating a LoggerFactory which will allow creating loggers. The logger factory will
// be the only place which has the published event. // be the only place which has the published event.
public IMqttNetLogger CreateChildLogger(string source)
public IMqttNetScopedLogger CreateScopedLogger(string source)
{ {
if (source is null) throw new ArgumentNullException(nameof(source)); if (source is null) throw new ArgumentNullException(nameof(source));


return new MqttNetLogger(this, _logId, source);
return new MqttNetScopedLogger(this, source);
} }


public void Publish(MqttNetLogLevel level, string message, object[] parameters, Exception exception)
public void Publish(MqttNetLogLevel level, string source, string message, object[] parameters, Exception exception)
{ {
var hasLocalListeners = LogMessagePublished != null; var hasLocalListeners = LogMessagePublished != null;
var hasGlobalListeners = MqttNetGlobalLogger.HasListeners; var hasGlobalListeners = MqttNetGlobalLogger.HasListeners;


if (!hasLocalListeners && !hasGlobalListeners && _parentLogger == null)
if (!hasLocalListeners && !hasGlobalListeners)
{ {
return; return;
} }
@@ -85,15 +69,6 @@ namespace MQTTnet.Diagnostics
{ {
LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage)); LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage));
} }

_parentLogger?.Publish(logMessage);
}

void Publish(MqttNetLogMessage logMessage)
{
LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage));

_parentLogger?.Publish(logMessage);
} }
} }
} }

+ 0
- 35
Source/MQTTnet/Diagnostics/MqttNetLoggerExtensions.cs View File

@@ -1,35 +0,0 @@
using System;

namespace MQTTnet.Diagnostics
{
public static class MqttNetLoggerExtensions
{
public static void Verbose(this IMqttNetLogger logger, string message, params object[] parameters)
{
if (logger is null) throw new ArgumentNullException(nameof(logger));

logger.Publish(MqttNetLogLevel.Verbose, message, parameters, null);
}

public static void Info(this IMqttNetLogger logger, string message, params object[] parameters)
{
if (logger is null) throw new ArgumentNullException(nameof(logger));

logger.Publish(MqttNetLogLevel.Info, message, parameters, null);
}

public static void Warning(this IMqttNetLogger logger, Exception exception, string message, params object[] parameters)
{
if (logger is null) throw new ArgumentNullException(nameof(logger));

logger.Publish(MqttNetLogLevel.Warning, message, parameters, exception);
}

public static void Error(this IMqttNetLogger logger, Exception exception, string message, params object[] parameters)
{
if (logger is null) throw new ArgumentNullException(nameof(logger));

logger.Publish(MqttNetLogLevel.Error, message, parameters, exception);
}
}
}

+ 26
- 0
Source/MQTTnet/Diagnostics/MqttNetScopedLogger.cs View File

@@ -0,0 +1,26 @@
using System;

namespace MQTTnet.Diagnostics
{
public sealed class MqttNetScopedLogger : IMqttNetScopedLogger
{
readonly IMqttNetLogger _logger;
readonly string _source;

public MqttNetScopedLogger(IMqttNetLogger logger, string source)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_source = source ?? throw new ArgumentNullException(nameof(source));
}

public IMqttNetScopedLogger CreateScopedLogger(string source)
{
return new MqttNetScopedLogger(_logger, source);
}

public void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception)
{
_logger.Publish(logLevel, _source, message, parameters, exception);
}
}
}

+ 27
- 0
Source/MQTTnet/Diagnostics/MqttNetScopedLoggerExtensions.cs View File

@@ -0,0 +1,27 @@
using System;

namespace MQTTnet.Diagnostics
{
public static class MqttNetScopedLoggerExtensions
{
public static void Verbose(this IMqttNetScopedLogger logger, string message, params object[] parameters)
{
logger.Publish(MqttNetLogLevel.Verbose, message, parameters, null);
}

public static void Info(this IMqttNetScopedLogger logger, string message, params object[] parameters)
{
logger.Publish(MqttNetLogLevel.Info, message, parameters, null);
}

public static void Warning(this IMqttNetScopedLogger logger, Exception exception, string message, params object[] parameters)
{
logger.Publish(MqttNetLogLevel.Warning, message, parameters, exception);
}

public static void Error(this IMqttNetScopedLogger logger, Exception exception, string message, params object[] parameters)
{
logger.Publish(MqttNetLogLevel.Error, message, parameters, exception);
}
}
}

+ 5
- 8
Source/MQTTnet/Formatter/MqttPacketFormatterAdapter.cs View File

@@ -1,4 +1,5 @@
using System; using System;
using System.Runtime.CompilerServices;
using MQTTnet.Adapter; using MQTTnet.Adapter;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Formatter.V3; using MQTTnet.Formatter.V3;
@@ -9,12 +10,7 @@ namespace MQTTnet.Formatter
{ {
public class MqttPacketFormatterAdapter public class MqttPacketFormatterAdapter
{ {
private IMqttPacketFormatter _formatter;

public MqttPacketFormatterAdapter()
: this(new MqttPacketWriter())
{
}
IMqttPacketFormatter _formatter;
public MqttPacketFormatterAdapter(MqttProtocolVersion protocolVersion) public MqttPacketFormatterAdapter(MqttProtocolVersion protocolVersion)
: this(protocolVersion, new MqttPacketWriter()) : this(protocolVersion, new MqttPacketWriter())
@@ -120,7 +116,7 @@ namespace MQTTnet.Formatter
} }
} }


private MqttProtocolVersion ParseProtocolVersion(ReceivedMqttPacket receivedMqttPacket)
MqttProtocolVersion ParseProtocolVersion(ReceivedMqttPacket receivedMqttPacket)
{ {
if (receivedMqttPacket == null) throw new ArgumentNullException(nameof(receivedMqttPacket)); if (receivedMqttPacket == null) throw new ArgumentNullException(nameof(receivedMqttPacket));


@@ -155,7 +151,8 @@ namespace MQTTnet.Formatter
throw new MqttProtocolViolationException($"Protocol '{protocolName}' not supported."); throw new MqttProtocolViolationException($"Protocol '{protocolName}' not supported.");
} }


private void ThrowIfFormatterNotSet()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void ThrowIfFormatterNotSet()
{ {
if (_formatter == null) if (_formatter == null)
{ {


+ 9
- 9
Source/MQTTnet/Formatter/MqttPacketWriter.cs View File

@@ -11,17 +11,17 @@ namespace MQTTnet.Formatter
/// same as for the original MemoryStream in .net. Also this implementation allows accessing the internal /// same as for the original MemoryStream in .net. Also this implementation allows accessing the internal
/// buffer for all platforms and .net framework versions (which is not available at the regular MemoryStream). /// buffer for all platforms and .net framework versions (which is not available at the regular MemoryStream).
/// </summary> /// </summary>
public class MqttPacketWriter : IMqttPacketWriter
public sealed class MqttPacketWriter : IMqttPacketWriter
{ {
private static readonly ArraySegment<byte> ZeroVariableLengthIntegerArray = new ArraySegment<byte>(new byte[1], 0, 1);
private static readonly ArraySegment<byte> ZeroTwoByteIntegerArray = new ArraySegment<byte>(new byte[2], 0, 2);
static readonly ArraySegment<byte> ZeroVariableLengthIntegerArray = new ArraySegment<byte>(new byte[1], 0, 1);
static readonly ArraySegment<byte> ZeroTwoByteIntegerArray = new ArraySegment<byte>(new byte[2], 0, 2);


public static int InitialBufferSize = 128; public static int InitialBufferSize = 128;
public static int MaxBufferSize = 4096; public static int MaxBufferSize = 4096;


private byte[] _buffer = new byte[InitialBufferSize];
byte[] _buffer = new byte[InitialBufferSize];


private int _offset;
int _offset;


public int Length { get; private set; } public int Length { get; private set; }


@@ -191,13 +191,13 @@ namespace MQTTnet.Formatter
Array.Resize(ref _buffer, MaxBufferSize); Array.Resize(ref _buffer, MaxBufferSize);
} }


private void Write(ArraySegment<byte> buffer)
void Write(ArraySegment<byte> buffer)
{ {
Write(buffer.Array, buffer.Offset, buffer.Count); Write(buffer.Array, buffer.Offset, buffer.Count);
} }


[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
private void EnsureAdditionalCapacity(int additionalCapacity)
void EnsureAdditionalCapacity(int additionalCapacity)
{ {
var freeSpace = _buffer.Length - _offset; var freeSpace = _buffer.Length - _offset;
if (freeSpace >= additionalCapacity) if (freeSpace >= additionalCapacity)
@@ -209,7 +209,7 @@ namespace MQTTnet.Formatter
} }


[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
private void EnsureCapacity(int capacity)
void EnsureCapacity(int capacity)
{ {
var newBufferLength = _buffer.Length; var newBufferLength = _buffer.Length;


@@ -227,7 +227,7 @@ namespace MQTTnet.Formatter
} }


[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
private void IncreasePosition(int length)
void IncreasePosition(int length)
{ {
_offset += length; _offset += length;




+ 29
- 35
Source/MQTTnet/Formatter/V3/MqttV310PacketFormatter.cs View File

@@ -1,5 +1,6 @@
using System; using System;
using System.Linq; using System.Linq;
using System.Runtime.CompilerServices;
using MQTTnet.Adapter; using MQTTnet.Adapter;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Packets; using MQTTnet.Packets;
@@ -9,26 +10,19 @@ namespace MQTTnet.Formatter.V3
{ {
public class MqttV310PacketFormatter : IMqttPacketFormatter public class MqttV310PacketFormatter : IMqttPacketFormatter
{ {
private const int FixedHeaderSize = 1;
const int FixedHeaderSize = 1;


private static readonly MqttPingReqPacket PingReqPacket = new MqttPingReqPacket();
private static readonly MqttPingRespPacket PingRespPacket = new MqttPingRespPacket();
private static readonly MqttDisconnectPacket DisconnectPacket = new MqttDisconnectPacket();
static readonly MqttPingReqPacket PingReqPacket = new MqttPingReqPacket();
static readonly MqttPingRespPacket PingRespPacket = new MqttPingRespPacket();
static readonly MqttDisconnectPacket DisconnectPacket = new MqttDisconnectPacket();


private readonly IMqttPacketWriter _packetWriter;

public MqttV310PacketFormatter()
: this(new MqttPacketWriter())
{

}
readonly IMqttPacketWriter _packetWriter;


public MqttV310PacketFormatter(IMqttPacketWriter packetWriter) public MqttV310PacketFormatter(IMqttPacketWriter packetWriter)
{ {
_packetWriter = packetWriter; _packetWriter = packetWriter;
} }



public IMqttDataConverter DataConverter { get; } = new MqttV310DataConverter(); public IMqttDataConverter DataConverter { get; } = new MqttV310DataConverter();


public ArraySegment<byte> Encode(MqttBasePacket packet) public ArraySegment<byte> Encode(MqttBasePacket packet)
@@ -93,7 +87,7 @@ namespace MQTTnet.Formatter.V3
_packetWriter.FreeBuffer(); _packetWriter.FreeBuffer();
} }


private byte EncodePacket(MqttBasePacket packet, IMqttPacketWriter packetWriter)
byte EncodePacket(MqttBasePacket packet, IMqttPacketWriter packetWriter)
{ {
switch (packet) switch (packet)
{ {
@@ -116,7 +110,7 @@ namespace MQTTnet.Formatter.V3
} }
} }


private static MqttBasePacket DecodeUnsubAckPacket(IMqttPacketBodyReader body)
static MqttBasePacket DecodeUnsubAckPacket(IMqttPacketBodyReader body)
{ {
ThrowIfBodyIsEmpty(body); ThrowIfBodyIsEmpty(body);


@@ -126,7 +120,7 @@ namespace MQTTnet.Formatter.V3
}; };
} }


private static MqttBasePacket DecodePubCompPacket(IMqttPacketBodyReader body)
static MqttBasePacket DecodePubCompPacket(IMqttPacketBodyReader body)
{ {
ThrowIfBodyIsEmpty(body); ThrowIfBodyIsEmpty(body);


@@ -136,7 +130,7 @@ namespace MQTTnet.Formatter.V3
}; };
} }


private static MqttBasePacket DecodePubRelPacket(IMqttPacketBodyReader body)
static MqttBasePacket DecodePubRelPacket(IMqttPacketBodyReader body)
{ {
ThrowIfBodyIsEmpty(body); ThrowIfBodyIsEmpty(body);


@@ -146,7 +140,7 @@ namespace MQTTnet.Formatter.V3
}; };
} }


private static MqttBasePacket DecodePubRecPacket(IMqttPacketBodyReader body)
static MqttBasePacket DecodePubRecPacket(IMqttPacketBodyReader body)
{ {
ThrowIfBodyIsEmpty(body); ThrowIfBodyIsEmpty(body);


@@ -156,7 +150,7 @@ namespace MQTTnet.Formatter.V3
}; };
} }


private static MqttBasePacket DecodePubAckPacket(IMqttPacketBodyReader body)
static MqttBasePacket DecodePubAckPacket(IMqttPacketBodyReader body)
{ {
ThrowIfBodyIsEmpty(body); ThrowIfBodyIsEmpty(body);


@@ -166,7 +160,7 @@ namespace MQTTnet.Formatter.V3
}; };
} }


private static MqttBasePacket DecodeUnsubscribePacket(IMqttPacketBodyReader body)
static MqttBasePacket DecodeUnsubscribePacket(IMqttPacketBodyReader body)
{ {
ThrowIfBodyIsEmpty(body); ThrowIfBodyIsEmpty(body);


@@ -183,7 +177,7 @@ namespace MQTTnet.Formatter.V3
return packet; return packet;
} }


private static MqttBasePacket DecodeSubscribePacket(IMqttPacketBodyReader body)
static MqttBasePacket DecodeSubscribePacket(IMqttPacketBodyReader body)
{ {
ThrowIfBodyIsEmpty(body); ThrowIfBodyIsEmpty(body);


@@ -206,7 +200,7 @@ namespace MQTTnet.Formatter.V3
return packet; return packet;
} }


private static MqttBasePacket DecodePublishPacket(ReceivedMqttPacket receivedMqttPacket)
static MqttBasePacket DecodePublishPacket(ReceivedMqttPacket receivedMqttPacket)
{ {
ThrowIfBodyIsEmpty(receivedMqttPacket.Body); ThrowIfBodyIsEmpty(receivedMqttPacket.Body);


@@ -239,7 +233,7 @@ namespace MQTTnet.Formatter.V3
return packet; return packet;
} }


private MqttBasePacket DecodeConnectPacket(IMqttPacketBodyReader body)
MqttBasePacket DecodeConnectPacket(IMqttPacketBodyReader body)
{ {
ThrowIfBodyIsEmpty(body); ThrowIfBodyIsEmpty(body);


@@ -300,7 +294,7 @@ namespace MQTTnet.Formatter.V3
return packet; return packet;
} }


private static MqttBasePacket DecodeSubAckPacket(IMqttPacketBodyReader body)
static MqttBasePacket DecodeSubAckPacket(IMqttPacketBodyReader body)
{ {
ThrowIfBodyIsEmpty(body); ThrowIfBodyIsEmpty(body);


@@ -340,7 +334,7 @@ namespace MQTTnet.Formatter.V3
} }


// ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local
private static void ValidatePublishPacket(MqttPublishPacket packet)
static void ValidatePublishPacket(MqttPublishPacket packet)
{ {
if (packet.QualityOfServiceLevel == 0 && packet.Dup) if (packet.QualityOfServiceLevel == 0 && packet.Dup)
{ {
@@ -418,7 +412,7 @@ namespace MQTTnet.Formatter.V3
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck);
} }


private static byte EncodePubRelPacket(MqttPubRelPacket packet, IMqttPacketWriter packetWriter)
static byte EncodePubRelPacket(MqttPubRelPacket packet, IMqttPacketWriter packetWriter)
{ {
if (!packet.PacketIdentifier.HasValue) if (!packet.PacketIdentifier.HasValue)
{ {
@@ -430,7 +424,7 @@ namespace MQTTnet.Formatter.V3
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02);
} }


private static byte EncodePublishPacket(MqttPublishPacket packet, IMqttPacketWriter packetWriter)
static byte EncodePublishPacket(MqttPublishPacket packet, IMqttPacketWriter packetWriter)
{ {
ValidatePublishPacket(packet); ValidatePublishPacket(packet);


@@ -475,7 +469,7 @@ namespace MQTTnet.Formatter.V3
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader);
} }


private static byte EncodePubAckPacket(MqttPubAckPacket packet, IMqttPacketWriter packetWriter)
static byte EncodePubAckPacket(MqttPubAckPacket packet, IMqttPacketWriter packetWriter)
{ {
if (!packet.PacketIdentifier.HasValue) if (!packet.PacketIdentifier.HasValue)
{ {
@@ -487,7 +481,7 @@ namespace MQTTnet.Formatter.V3
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck);
} }


private static byte EncodePubRecPacket(MqttPubRecPacket packet, IMqttPacketWriter packetWriter)
static byte EncodePubRecPacket(MqttPubRecPacket packet, IMqttPacketWriter packetWriter)
{ {
if (!packet.PacketIdentifier.HasValue) if (!packet.PacketIdentifier.HasValue)
{ {
@@ -499,7 +493,7 @@ namespace MQTTnet.Formatter.V3
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec);
} }


private static byte EncodePubCompPacket(MqttPubCompPacket packet, IMqttPacketWriter packetWriter)
static byte EncodePubCompPacket(MqttPubCompPacket packet, IMqttPacketWriter packetWriter)
{ {
if (!packet.PacketIdentifier.HasValue) if (!packet.PacketIdentifier.HasValue)
{ {
@@ -511,7 +505,7 @@ namespace MQTTnet.Formatter.V3
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp);
} }


private static byte EncodeSubscribePacket(MqttSubscribePacket packet, IMqttPacketWriter packetWriter)
static byte EncodeSubscribePacket(MqttSubscribePacket packet, IMqttPacketWriter packetWriter)
{ {
if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3]."); if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");


@@ -534,7 +528,7 @@ namespace MQTTnet.Formatter.V3
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Subscribe, 0x02); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Subscribe, 0x02);
} }


private static byte EncodeSubAckPacket(MqttSubAckPacket packet, IMqttPacketWriter packetWriter)
static byte EncodeSubAckPacket(MqttSubAckPacket packet, IMqttPacketWriter packetWriter)
{ {
if (!packet.PacketIdentifier.HasValue) if (!packet.PacketIdentifier.HasValue)
{ {
@@ -554,7 +548,7 @@ namespace MQTTnet.Formatter.V3
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.SubAck); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.SubAck);
} }


private static byte EncodeUnsubscribePacket(MqttUnsubscribePacket packet, IMqttPacketWriter packetWriter)
static byte EncodeUnsubscribePacket(MqttUnsubscribePacket packet, IMqttPacketWriter packetWriter)
{ {
if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2]."); if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");


@@ -576,7 +570,7 @@ namespace MQTTnet.Formatter.V3
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
} }


private static byte EncodeUnsubAckPacket(MqttUnsubAckPacket packet, IMqttPacketWriter packetWriter)
static byte EncodeUnsubAckPacket(MqttUnsubAckPacket packet, IMqttPacketWriter packetWriter)
{ {
if (!packet.PacketIdentifier.HasValue) if (!packet.PacketIdentifier.HasValue)
{ {
@@ -587,12 +581,12 @@ namespace MQTTnet.Formatter.V3
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck);
} }


private static byte EncodeEmptyPacket(MqttControlPacketType type)
static byte EncodeEmptyPacket(MqttControlPacketType type)
{ {
return MqttPacketWriter.BuildFixedHeader(type); return MqttPacketWriter.BuildFixedHeader(type);
} }


// ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected static void ThrowIfBodyIsEmpty(IMqttPacketBodyReader body) protected static void ThrowIfBodyIsEmpty(IMqttPacketBodyReader body)
{ {
if (body == null || body.Length == 0) if (body == null || body.Length == 0)


+ 0
- 5
Source/MQTTnet/Formatter/V3/MqttV311PacketFormatter.cs View File

@@ -6,11 +6,6 @@ namespace MQTTnet.Formatter.V3
{ {
public class MqttV311PacketFormatter : MqttV310PacketFormatter public class MqttV311PacketFormatter : MqttV310PacketFormatter
{ {
public MqttV311PacketFormatter()
: base()
{
}

public MqttV311PacketFormatter(IMqttPacketWriter packetWriter) public MqttV311PacketFormatter(IMqttPacketWriter packetWriter)
: base(packetWriter) : base(packetWriter)
{ {


+ 10
- 3
Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs View File

@@ -8,7 +8,14 @@ namespace MQTTnet.Implementations
{ {
public class MqttClientAdapterFactory : IMqttClientAdapterFactory public class MqttClientAdapterFactory : IMqttClientAdapterFactory
{ {
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger)
readonly IMqttNetLogger _logger;

public MqttClientAdapterFactory(IMqttNetLogger logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));


@@ -16,12 +23,12 @@ namespace MQTTnet.Implementations
{ {
case MqttClientTcpOptions _: case MqttClientTcpOptions _:
{ {
return new MqttChannelAdapter(new MqttTcpChannel(options), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger);
return new MqttChannelAdapter(new MqttTcpChannel(options), new MqttPacketFormatterAdapter(options.ProtocolVersion, new MqttPacketWriter()), _logger);
} }


case MqttClientWebSocketOptions webSocketOptions: case MqttClientWebSocketOptions webSocketOptions:
{ {
return new MqttChannelAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger);
return new MqttChannelAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketFormatterAdapter(options.ProtocolVersion, new MqttPacketWriter()), _logger);
} }


default: default:


+ 4
- 4
Source/MQTTnet/Implementations/MqttTcpChannel.cs View File

@@ -115,13 +115,13 @@ namespace MQTTnet.Implementations
{ {
if (buffer is null) throw new ArgumentNullException(nameof(buffer)); if (buffer is null) throw new ArgumentNullException(nameof(buffer));


cancellationToken.ThrowIfCancellationRequested();

try try
{ {
// Workaround for: https://github.com/dotnet/corefx/issues/24430 // Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(Dispose)) using (cancellationToken.Register(Dispose))
{ {
cancellationToken.ThrowIfCancellationRequested();

return await _stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); return await _stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
} }
} }
@@ -144,13 +144,13 @@ namespace MQTTnet.Implementations
{ {
if (buffer is null) throw new ArgumentNullException(nameof(buffer)); if (buffer is null) throw new ArgumentNullException(nameof(buffer));


cancellationToken.ThrowIfCancellationRequested();

try try
{ {
// Workaround for: https://github.com/dotnet/corefx/issues/24430 // Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(Dispose)) using (cancellationToken.Register(Dispose))
{ {
cancellationToken.ThrowIfCancellationRequested();

await _stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); await _stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
} }
} }


+ 5
- 5
Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs View File

@@ -13,16 +13,16 @@ namespace MQTTnet.Implementations
{ {
public sealed class MqttTcpServerAdapter : IMqttServerAdapter public sealed class MqttTcpServerAdapter : IMqttServerAdapter
{ {
readonly IMqttNetLogger _logger;
readonly IMqttNetScopedLogger _logger;
readonly IMqttNetLogger _rootLogger;


IMqttServerOptions _options; IMqttServerOptions _options;
StreamSocketListener _listener; StreamSocketListener _listener;


public MqttTcpServerAdapter(IMqttNetLogger logger) public MqttTcpServerAdapter(IMqttNetLogger logger)
{ {
if (logger == null) throw new ArgumentNullException(nameof(logger));

_logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter));
_rootLogger = logger ?? throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateScopedLogger(nameof(MqttTcpServerAdapter));
} }


public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; } public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }
@@ -89,7 +89,7 @@ namespace MQTTnet.Implementations
} }
} }
using (var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket, clientCertificate, _options), new MqttPacketFormatterAdapter(), _logger))
using (var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket, clientCertificate, _options), new MqttPacketFormatterAdapter(new MqttPacketWriter()), _rootLogger))
{ {
await clientHandler(clientAdapter).ConfigureAwait(false); await clientHandler(clientAdapter).ConfigureAwait(false);
} }


+ 6
- 14
Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs View File

@@ -16,15 +16,15 @@ namespace MQTTnet.Implementations
public sealed class MqttTcpServerAdapter : Disposable, IMqttServerAdapter public sealed class MqttTcpServerAdapter : Disposable, IMqttServerAdapter
{ {
readonly List<MqttTcpServerListener> _listeners = new List<MqttTcpServerListener>(); readonly List<MqttTcpServerListener> _listeners = new List<MqttTcpServerListener>();
readonly IMqttNetLogger _logger;
readonly IMqttNetScopedLogger _logger;
readonly IMqttNetLogger _rootLogger;


CancellationTokenSource _cancellationTokenSource; CancellationTokenSource _cancellationTokenSource;


public MqttTcpServerAdapter(IMqttNetLogger logger) public MqttTcpServerAdapter(IMqttNetLogger logger)
{ {
if (logger == null) throw new ArgumentNullException(nameof(logger));

_logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter));
_rootLogger = logger ?? throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateScopedLogger(nameof(MqttTcpServerAdapter));
} }


public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; } public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }
@@ -105,11 +105,7 @@ namespace MQTTnet.Implementations
{ {
if (!options.BoundInterNetworkAddress.Equals(IPAddress.None)) if (!options.BoundInterNetworkAddress.Equals(IPAddress.None))
{ {
var listenerV4 = new MqttTcpServerListener(
AddressFamily.InterNetwork,
options,
tlsCertificate,
_logger)
var listenerV4 = new MqttTcpServerListener(AddressFamily.InterNetwork, options, tlsCertificate, _rootLogger)
{ {
ClientHandler = OnClientAcceptedAsync ClientHandler = OnClientAcceptedAsync
}; };
@@ -122,11 +118,7 @@ namespace MQTTnet.Implementations


if (!options.BoundInterNetworkV6Address.Equals(IPAddress.None)) if (!options.BoundInterNetworkV6Address.Equals(IPAddress.None))
{ {
var listenerV6 = new MqttTcpServerListener(
AddressFamily.InterNetworkV6,
options,
tlsCertificate,
_logger)
var listenerV6 = new MqttTcpServerListener(AddressFamily.InterNetworkV6, options, tlsCertificate, _rootLogger)
{ {
ClientHandler = OnClientAcceptedAsync ClientHandler = OnClientAcceptedAsync
}; };


+ 5
- 3
Source/MQTTnet/Implementations/MqttTcpServerListener.cs View File

@@ -17,7 +17,8 @@ namespace MQTTnet.Implementations
{ {
public sealed class MqttTcpServerListener : IDisposable public sealed class MqttTcpServerListener : IDisposable
{ {
readonly IMqttNetLogger _logger;
readonly IMqttNetScopedLogger _logger;
readonly IMqttNetLogger _rootLogger;
readonly AddressFamily _addressFamily; readonly AddressFamily _addressFamily;
readonly MqttServerTcpEndpointBaseOptions _options; readonly MqttServerTcpEndpointBaseOptions _options;
readonly MqttServerTlsTcpEndpointOptions _tlsOptions; readonly MqttServerTlsTcpEndpointOptions _tlsOptions;
@@ -35,7 +36,8 @@ namespace MQTTnet.Implementations
_addressFamily = addressFamily; _addressFamily = addressFamily;
_options = options; _options = options;
_tlsCertificate = tlsCertificate; _tlsCertificate = tlsCertificate;
_logger = logger.CreateChildLogger(nameof(MqttTcpServerListener));
_rootLogger = logger;
_logger = logger.CreateScopedLogger(nameof(MqttTcpServerListener));


if (_options is MqttServerTlsTcpEndpointOptions tlsOptions) if (_options is MqttServerTlsTcpEndpointOptions tlsOptions)
{ {
@@ -178,7 +180,7 @@ namespace MQTTnet.Implementations
var clientHandler = ClientHandler; var clientHandler = ClientHandler;
if (clientHandler != null) if (clientHandler != null)
{ {
using (var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(stream, remoteEndPoint, clientCertificate), new MqttPacketFormatterAdapter(), _logger))
using (var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(stream, remoteEndPoint, clientCertificate), new MqttPacketFormatterAdapter(new MqttPacketWriter()), _rootLogger))
{ {
await clientHandler(clientAdapter).ConfigureAwait(false); await clientHandler(clientAdapter).ConfigureAwait(false);
} }


+ 1
- 1
Source/MQTTnet/Internal/TaskExtensions.cs View File

@@ -5,7 +5,7 @@ namespace MQTTnet.Internal
{ {
public static class TaskExtensions public static class TaskExtensions
{ {
public static void Forget(this Task task, IMqttNetLogger logger)
public static void Forget(this Task task, IMqttNetScopedLogger logger)
{ {
task?.ContinueWith(t => task?.ContinueWith(t =>
{ {


+ 3
- 3
Source/MQTTnet/LowLevelClient/LowLevelMqttClient.cs View File

@@ -10,7 +10,7 @@ namespace MQTTnet.LowLevelClient
{ {
public sealed class LowLevelMqttClient : ILowLevelMqttClient public sealed class LowLevelMqttClient : ILowLevelMqttClient
{ {
readonly IMqttNetLogger _logger;
readonly IMqttNetScopedLogger _logger;
readonly IMqttClientAdapterFactory _clientAdapterFactory; readonly IMqttClientAdapterFactory _clientAdapterFactory;


IMqttChannelAdapter _adapter; IMqttChannelAdapter _adapter;
@@ -22,7 +22,7 @@ namespace MQTTnet.LowLevelClient
if (logger is null) throw new ArgumentNullException(nameof(logger)); if (logger is null) throw new ArgumentNullException(nameof(logger));


_clientAdapterFactory = clientAdapterFactory; _clientAdapterFactory = clientAdapterFactory;
_logger = logger.CreateChildLogger(nameof(LowLevelMqttClient));
_logger = logger.CreateScopedLogger(nameof(LowLevelMqttClient));
} }


bool IsConnected => _adapter != null; bool IsConnected => _adapter != null;
@@ -36,7 +36,7 @@ namespace MQTTnet.LowLevelClient
throw new InvalidOperationException("Low level MQTT client is already connected. Disconnect first before connecting again."); throw new InvalidOperationException("Low level MQTT client is already connected. Disconnect first before connecting again.");
} }


var newAdapter = _clientAdapterFactory.CreateClientAdapter(options, _logger);
var newAdapter = _clientAdapterFactory.CreateClientAdapter(options);


try try
{ {


+ 2
- 1
Source/MQTTnet/MqttFactory.cs View File

@@ -11,7 +11,7 @@ namespace MQTTnet
{ {
public sealed class MqttFactory : IMqttFactory public sealed class MqttFactory : IMqttFactory
{ {
IMqttClientAdapterFactory _clientAdapterFactory = new MqttClientAdapterFactory();
IMqttClientAdapterFactory _clientAdapterFactory;


public MqttFactory() : this(new MqttNetLogger()) public MqttFactory() : this(new MqttNetLogger())
{ {
@@ -20,6 +20,7 @@ namespace MQTTnet
public MqttFactory(IMqttNetLogger logger) public MqttFactory(IMqttNetLogger logger)
{ {
DefaultLogger = logger ?? throw new ArgumentNullException(nameof(logger)); DefaultLogger = logger ?? throw new ArgumentNullException(nameof(logger));
_clientAdapterFactory = new MqttClientAdapterFactory(logger);
} }


public IMqttNetLogger DefaultLogger { get; } public IMqttNetLogger DefaultLogger { get; }


+ 9
- 6
Source/MQTTnet/Protocol/MqttTopicValidator.cs View File

@@ -11,14 +11,17 @@ namespace MQTTnet.Protocol
throw new MqttProtocolViolationException("Topic should not be empty."); throw new MqttProtocolViolationException("Topic should not be empty.");
} }


if (topic.Contains("+"))
foreach(var @char in topic)
{ {
throw new MqttProtocolViolationException("The character '+' is not allowed in topics.");
}
if (@char == '+')
{
throw new MqttProtocolViolationException("The character '+' is not allowed in topics.");
}


if (topic.Contains("#"))
{
throw new MqttProtocolViolationException("The character '#' is not allowed in topics.");
if (@char == '#')
{
throw new MqttProtocolViolationException("The character '#' is not allowed in topics.");
}
} }
} }
} }


+ 3
- 3
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -28,7 +28,7 @@ namespace MQTTnet.Server
readonly MqttClientKeepAliveMonitor _keepAliveMonitor; readonly MqttClientKeepAliveMonitor _keepAliveMonitor;
readonly MqttClientSessionsManager _sessionsManager; readonly MqttClientSessionsManager _sessionsManager;


readonly IMqttNetLogger _logger;
readonly IMqttNetScopedLogger _logger;
readonly IMqttServerOptions _serverOptions; readonly IMqttServerOptions _serverOptions;


readonly IMqttChannelAdapter _channelAdapter; readonly IMqttChannelAdapter _channelAdapter;
@@ -71,9 +71,9 @@ namespace MQTTnet.Server
ConnectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket)); ConnectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket));


if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttClientConnection));
_logger = logger.CreateScopedLogger(nameof(MqttClientConnection));


_keepAliveMonitor = new MqttClientKeepAliveMonitor(ConnectPacket.ClientId, () => StopAsync(), _logger);
_keepAliveMonitor = new MqttClientKeepAliveMonitor(ConnectPacket.ClientId, () => StopAsync(), logger);


_connectedTimestamp = DateTime.UtcNow; _connectedTimestamp = DateTime.UtcNow;
_lastPacketReceivedTimestamp = _connectedTimestamp; _lastPacketReceivedTimestamp = _connectedTimestamp;


+ 3
- 3
Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs View File

@@ -7,13 +7,13 @@ using System.Threading.Tasks;


namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public class MqttClientKeepAliveMonitor
public sealed class MqttClientKeepAliveMonitor
{ {
readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch(); readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch();


readonly string _clientId; readonly string _clientId;
readonly Func<Task> _keepAliveElapsedCallback; readonly Func<Task> _keepAliveElapsedCallback;
readonly IMqttNetLogger _logger;
readonly IMqttNetScopedLogger _logger;


bool _isPaused; bool _isPaused;


@@ -23,7 +23,7 @@ namespace MQTTnet.Server
_keepAliveElapsedCallback = keepAliveElapsedCallback ?? throw new ArgumentNullException(nameof(keepAliveElapsedCallback)); _keepAliveElapsedCallback = keepAliveElapsedCallback ?? throw new ArgumentNullException(nameof(keepAliveElapsedCallback));


if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttClientKeepAliveMonitor));
_logger = logger.CreateScopedLogger(nameof(MqttClientKeepAliveMonitor));
} }


public void Start(int keepAlivePeriod, CancellationToken cancellationToken) public void Start(int keepAlivePeriod, CancellationToken cancellationToken)


+ 9
- 3
Source/MQTTnet/Server/MqttClientSession.cs View File

@@ -8,12 +8,18 @@ namespace MQTTnet.Server
{ {
public class MqttClientSession public class MqttClientSession
{ {
readonly IMqttNetLogger _logger;
readonly IMqttNetScopedLogger _logger;


readonly DateTime _createdTimestamp = DateTime.UtcNow; readonly DateTime _createdTimestamp = DateTime.UtcNow;
readonly IMqttRetainedMessagesManager _retainedMessagesManager; readonly IMqttRetainedMessagesManager _retainedMessagesManager;


public MqttClientSession(string clientId, IDictionary<object, object> items, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, IMqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger)
public MqttClientSession(
string clientId,
IDictionary<object, object> items,
MqttServerEventDispatcher eventDispatcher,
IMqttServerOptions serverOptions,
IMqttRetainedMessagesManager retainedMessagesManager,
IMqttNetLogger logger)
{ {
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
Items = items ?? throw new ArgumentNullException(nameof(items)); Items = items ?? throw new ArgumentNullException(nameof(items));
@@ -22,7 +28,7 @@ namespace MQTTnet.Server
ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions); ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions);


if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttClientSession));
_logger = logger.CreateScopedLogger(nameof(MqttClientSession));
} }


public string ClientId { get; } public string ClientId { get; }


+ 6
- 4
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -28,7 +28,8 @@ namespace MQTTnet.Server


readonly IMqttRetainedMessagesManager _retainedMessagesManager; readonly IMqttRetainedMessagesManager _retainedMessagesManager;
readonly IMqttServerOptions _options; readonly IMqttServerOptions _options;
readonly IMqttNetLogger _logger;
readonly IMqttNetScopedLogger _logger;
readonly IMqttNetLogger _rootLogger;


public MqttClientSessionsManager( public MqttClientSessionsManager(
IMqttServerOptions options, IMqttServerOptions options,
@@ -40,7 +41,8 @@ namespace MQTTnet.Server
_cancellationToken = cancellationToken; _cancellationToken = cancellationToken;


if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttClientSessionsManager));
_logger = logger.CreateScopedLogger(nameof(MqttClientSessionsManager));
_rootLogger = logger;


_eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher)); _eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher));
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
@@ -371,11 +373,11 @@ namespace MQTTnet.Server


if (session == null) if (session == null)
{ {
session = new MqttClientSession(connectPacket.ClientId, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _retainedMessagesManager, _logger);
session = new MqttClientSession(connectPacket.ClientId, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _retainedMessagesManager, _rootLogger);
_logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId);
} }


var connection = new MqttClientConnection(connectPacket, channelAdapter, session, _options, this, _retainedMessagesManager, onStart, onStop, _logger);
var connection = new MqttClientConnection(connectPacket, channelAdapter, session, _options, this, _retainedMessagesManager, onStart, onStop, _rootLogger);


_connections[connection.ClientId] = connection; _connections[connection.ClientId] = connection;
_sessions[session.ClientId] = session; _sessions[session.ClientId] = session;


+ 8
- 6
Source/MQTTnet/Server/MqttRetainedMessagesManager.cs View File

@@ -10,17 +10,19 @@ namespace MQTTnet.Server
{ {
public class MqttRetainedMessagesManager : IMqttRetainedMessagesManager public class MqttRetainedMessagesManager : IMqttRetainedMessagesManager
{ {
private readonly byte[] _emptyArray = new byte[0];
private readonly AsyncLock _messagesLock = new AsyncLock();
private readonly Dictionary<string, MqttApplicationMessage> _messages = new Dictionary<string, MqttApplicationMessage>();
readonly byte[] _emptyArray = new byte[0];
readonly AsyncLock _messagesLock = new AsyncLock();
readonly Dictionary<string, MqttApplicationMessage> _messages = new Dictionary<string, MqttApplicationMessage>();


private IMqttNetLogger _logger;
private IMqttServerOptions _options;
IMqttNetScopedLogger _logger;
IMqttServerOptions _options;


// TODO: Get rid of the logger here!
public Task Start(IMqttServerOptions options, IMqttNetLogger logger) public Task Start(IMqttServerOptions options, IMqttNetLogger logger)
{ {
if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager));
_logger = logger.CreateScopedLogger(nameof(MqttRetainedMessagesManager));

_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
return PlatformAbstractionLayer.CompletedTask; return PlatformAbstractionLayer.CompletedTask;
} }


+ 7
- 5
Source/MQTTnet/Server/MqttServer.cs View File

@@ -17,7 +17,8 @@ namespace MQTTnet.Server
{ {
readonly MqttServerEventDispatcher _eventDispatcher; readonly MqttServerEventDispatcher _eventDispatcher;
readonly ICollection<IMqttServerAdapter> _adapters; readonly ICollection<IMqttServerAdapter> _adapters;
readonly IMqttNetLogger _logger;
readonly IMqttNetLogger _rootLogger;
readonly IMqttNetScopedLogger _logger;


MqttClientSessionsManager _clientSessionsManager; MqttClientSessionsManager _clientSessionsManager;
IMqttRetainedMessagesManager _retainedMessagesManager; IMqttRetainedMessagesManager _retainedMessagesManager;
@@ -29,9 +30,10 @@ namespace MQTTnet.Server
_adapters = adapters.ToList(); _adapters = adapters.ToList();


if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttServer));
_logger = logger.CreateScopedLogger(nameof(MqttServer));
_rootLogger = logger;


_eventDispatcher = new MqttServerEventDispatcher(logger.CreateChildLogger(nameof(MqttServerEventDispatcher)));
_eventDispatcher = new MqttServerEventDispatcher(logger);
} }


public bool IsStarted => _cancellationTokenSource != null; public bool IsStarted => _cancellationTokenSource != null;
@@ -127,10 +129,10 @@ namespace MQTTnet.Server
_cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource = new CancellationTokenSource();


_retainedMessagesManager = Options.RetainedMessagesManager; _retainedMessagesManager = Options.RetainedMessagesManager;
await _retainedMessagesManager.Start(Options, _logger);
await _retainedMessagesManager.Start(Options, _rootLogger);
await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false); await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false);


_clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _cancellationTokenSource.Token, _eventDispatcher, _logger);
_clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _cancellationTokenSource.Token, _eventDispatcher, _rootLogger);
_clientSessionsManager.Start(); _clientSessionsManager.Start();


foreach (var adapter in _adapters) foreach (var adapter in _adapters)


+ 5
- 3
Source/MQTTnet/Server/MqttServerEventDispatcher.cs View File

@@ -5,13 +5,15 @@ using System.Threading.Tasks;


namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public class MqttServerEventDispatcher
public sealed class MqttServerEventDispatcher
{ {
readonly IMqttNetLogger _logger;
readonly IMqttNetScopedLogger _logger;


public MqttServerEventDispatcher(IMqttNetLogger logger) public MqttServerEventDispatcher(IMqttNetLogger logger)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
if (logger is null) throw new ArgumentNullException(nameof(logger));

_logger = logger.CreateScopedLogger(nameof(MqttServerEventDispatcher));
} }


public IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; } public IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; }


+ 1
- 1
Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs View File

@@ -43,7 +43,7 @@ namespace MQTTnet.Benchmarks


var channel = new TestMqttChannel(_stream); var channel = new TestMqttChannel(_stream);
_channelAdapter = new MqttChannelAdapter(channel, serializer, new MqttNetLogger().CreateChildLogger(nameof(MqttChannelAdapter)));
_channelAdapter = new MqttChannelAdapter(channel, serializer, new MqttNetLogger());
} }


[Benchmark] [Benchmark]


+ 4
- 4
Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs View File

@@ -9,15 +9,15 @@ namespace MQTTnet.Benchmarks
[MemoryDiagnoser] [MemoryDiagnoser]
public class LoggerBenchmark public class LoggerBenchmark
{ {
private IMqttNetLogger _logger;
private IMqttNetLogger _childLogger;
private bool _useHandler;
IMqttNetLogger _logger;
IMqttNetScopedLogger _childLogger;
bool _useHandler;


[GlobalSetup] [GlobalSetup]
public void Setup() public void Setup()
{ {
_logger = new MqttNetLogger(); _logger = new MqttNetLogger();
_childLogger = _logger.CreateChildLogger("child");
_childLogger = _logger.CreateScopedLogger("child");


MqttNetGlobalLogger.LogMessagePublished += OnLogMessagePublished; MqttNetGlobalLogger.LogMessagePublished += OnLogMessagePublished;
} }


+ 1
- 1
Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs View File

@@ -29,7 +29,7 @@ namespace MQTTnet.Benchmarks
Topic = "A" Topic = "A"
}; };


_serializer = new MqttV311PacketFormatter();
_serializer = new MqttV311PacketFormatter(new MqttPacketWriter());
_serializedPacket = _serializer.Encode(_packet); _serializedPacket = _serializer.Encode(_packet);
} }




+ 7
- 22
Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs View File

@@ -7,29 +7,9 @@ namespace MQTTnet.Tests.Mockups
{ {
public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished; public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;


public IMqttNetLogger CreateChildLogger(string source)
{
return new TestLogger();
}

public void Verbose(string message, params object[] parameters)
{
}

public void Info(string message, params object[] parameters)
{
}

public void Warning(Exception exception, string message, params object[] parameters)
{
}

public void Error(Exception exception, string message, params object[] parameters)
{
}

public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception)
public IMqttNetScopedLogger CreateScopedLogger(string source)
{ {
return new MqttNetScopedLogger(this, source);
} }


public void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception) public void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception)
@@ -40,5 +20,10 @@ namespace MQTTnet.Tests.Mockups
Message = message Message = message
})); }));
} }

public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception)
{
throw new NotImplementedException();
}
} }
} }

+ 2
- 3
Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapterFactory.cs View File

@@ -1,19 +1,18 @@
using MQTTnet.Adapter; using MQTTnet.Adapter;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;


namespace MQTTnet.Tests.Mockups namespace MQTTnet.Tests.Mockups
{ {
public class TestMqttCommunicationAdapterFactory : IMqttClientAdapterFactory public class TestMqttCommunicationAdapterFactory : IMqttClientAdapterFactory
{ {
private readonly IMqttChannelAdapter _adapter;
readonly IMqttChannelAdapter _adapter;


public TestMqttCommunicationAdapterFactory(IMqttChannelAdapter adapter) public TestMqttCommunicationAdapterFactory(IMqttChannelAdapter adapter)
{ {
_adapter = adapter; _adapter = adapter;
} }


public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger)
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options)
{ {
return _adapter; return _adapter;
} }


+ 1
- 1
Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs View File

@@ -24,7 +24,7 @@ namespace MQTTnet.Tests
//var globalLogCount = 0; //var globalLogCount = 0;
var localLogCount = 0; var localLogCount = 0;


var logger = new MqttNetLogger(null, logId);
var logger = new MqttNetLogger(logId);


// TODO: This is commented out because it is affected by other tests. // TODO: This is commented out because it is affected by other tests.
//// we have a theoretical bug here if a concurrent test is also logging //// we have a theoretical bug here if a concurrent test is also logging


+ 8
- 7
Tests/MQTTnet.Core.Tests/MqttNetLogger_Tests.cs View File

@@ -10,6 +10,7 @@ namespace MQTTnet.Tests
public void Root_Log_Messages() public void Root_Log_Messages()
{ {
var logger = new MqttNetLogger(); var logger = new MqttNetLogger();
var childLogger = logger.CreateScopedLogger("Source1");


var logMessagesCount = 0; var logMessagesCount = 0;


@@ -18,10 +19,10 @@ namespace MQTTnet.Tests
logMessagesCount++; logMessagesCount++;
}; };


logger.Verbose("Verbose");
logger.Info("Info");
logger.Warning(null, "Warning");
logger.Error(null, "Error");
childLogger.Verbose("Verbose");
childLogger.Info("Info");
childLogger.Warning(null, "Warning");
childLogger.Error(null, "Error");


Assert.AreEqual(4, logMessagesCount); Assert.AreEqual(4, logMessagesCount);
} }
@@ -30,7 +31,7 @@ namespace MQTTnet.Tests
public void Bubbling_Log_Messages() public void Bubbling_Log_Messages()
{ {
var logger = new MqttNetLogger(); var logger = new MqttNetLogger();
var childLogger = logger.CreateChildLogger("Source1");
var childLogger = logger.CreateScopedLogger("Source1");


var logMessagesCount = 0; var logMessagesCount = 0;


@@ -50,8 +51,8 @@ namespace MQTTnet.Tests
[TestMethod] [TestMethod]
public void Set_Custom_Log_ID() public void Set_Custom_Log_ID()
{ {
var logger = new MqttNetLogger(null, "logId");
var childLogger = logger.CreateChildLogger("Source1");
var logger = new MqttNetLogger("logId");
var childLogger = logger.CreateScopedLogger("Source1");


logger.LogMessagePublished += (s, e) => logger.LogMessagePublished += (s, e) =>
{ {


+ 3
- 3
Tests/MQTTnet.Core.Tests/MqttPacketSerializer_Tests.cs View File

@@ -156,7 +156,7 @@ namespace MQTTnet.Tests
[TestMethod] [TestMethod]
public void Serialize_LargePacket() public void Serialize_LargePacket()
{ {
var serializer = new MqttV311PacketFormatter();
var serializer = new MqttV311PacketFormatter(new MqttPacketWriter());


const int payloadLength = 80000; const int payloadLength = 80000;


@@ -560,11 +560,11 @@ namespace MQTTnet.Tests
IMqttPacketFormatter serializer; IMqttPacketFormatter serializer;
if (protocolVersion == MqttProtocolVersion.V311) if (protocolVersion == MqttProtocolVersion.V311)
{ {
serializer = new MqttV311PacketFormatter();
serializer = new MqttV311PacketFormatter(new MqttPacketWriter());
} }
else if (protocolVersion == MqttProtocolVersion.V310) else if (protocolVersion == MqttProtocolVersion.V310)
{ {
serializer = new MqttV310PacketFormatter();
serializer = new MqttV310PacketFormatter(new MqttPacketWriter());
} }
else else
{ {


+ 6
- 5
Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs View File

@@ -54,12 +54,12 @@ namespace MQTTnet.TestApp.NetCore
} }
} }


public static void RunClientAndServer()
public static async Task RunClientAndServer()
{ {
try try
{ {
var mqttServer = new MqttFactory().CreateMqttServer(); var mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.StartAsync(new MqttServerOptions()).GetAwaiter().GetResult();
await mqttServer.StartAsync(new MqttServerOptions()).ConfigureAwait(false);


var options = new MqttClientOptions var options = new MqttClientOptions
{ {
@@ -67,11 +67,12 @@ namespace MQTTnet.TestApp.NetCore
{ {
Server = "127.0.0.1" Server = "127.0.0.1"
}, },
CleanSession = true
CleanSession = true,
//KeepAlivePeriod = TimeSpan.FromSeconds(1)
}; };


var client = new MqttFactory().CreateMqttClient(); var client = new MqttFactory().CreateMqttClient();
client.ConnectAsync(options).GetAwaiter().GetResult();
await client.ConnectAsync(options).ConfigureAwait(false);


var message = CreateMessage(); var message = CreateMessage();
var stopwatch = new Stopwatch(); var stopwatch = new Stopwatch();
@@ -83,7 +84,7 @@ namespace MQTTnet.TestApp.NetCore
var sentMessagesCount = 0; var sentMessagesCount = 0;
while (stopwatch.ElapsedMilliseconds < 1000) while (stopwatch.ElapsedMilliseconds < 1000)
{ {
client.PublishAsync(message).GetAwaiter().GetResult();
await client.PublishAsync(message, CancellationToken.None).ConfigureAwait(false);
sentMessagesCount++; sentMessagesCount++;
} }




+ 3
- 2
Tests/MQTTnet.TestApp.NetCore/Program.cs View File

@@ -15,6 +15,8 @@ namespace MQTTnet.TestApp.NetCore
{ {
public static void Main() public static void Main()
{ {
//MqttNetConsoleLogger.ForwardToConsole();

Console.WriteLine($"MQTTnet - TestApp.{TargetFrameworkProvider.TargetFramework}"); Console.WriteLine($"MQTTnet - TestApp.{TargetFrameworkProvider.TargetFramework}");
Console.WriteLine("1 = Start client"); Console.WriteLine("1 = Start client");
Console.WriteLine("2 = Start server"); Console.WriteLine("2 = Start server");
@@ -40,8 +42,7 @@ namespace MQTTnet.TestApp.NetCore
} }
else if (pressedKey.KeyChar == '3') else if (pressedKey.KeyChar == '3')
{ {
PerformanceTest.RunClientAndServer();
return;
Task.Run(PerformanceTest.RunClientAndServer);
} }
else if (pressedKey.KeyChar == '4') else if (pressedKey.KeyChar == '4')
{ {


Loading…
Cancel
Save