From d10f29bbc6c406e61c7f0856879fee2da042b296 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sun, 22 Oct 2017 19:00:50 +0200 Subject: [PATCH] initial version --- .../MqttCommunicationAdapterFactory.cs | 30 ----- .../Implementations/MqttServerAdapter.cs | 23 ++-- .../Implementations/MqttTcpChannel.cs | 2 +- .../MQTTnet.Netstandard.csproj | 7 +- .../MQTTnet.NetStandard/MqttClientFactory.cs | 20 ---- Frameworks/MQTTnet.NetStandard/MqttFactory.cs | 104 ++++++++++++++++++ .../MQTTnet.NetStandard/MqttServerFactory.cs | 20 ---- .../ServiceCollectionExtensions.cs | 55 +++++++++ .../IMqttCommunicationAdapterFactory.cs | 12 ++ .../MqttChannelCommunicationAdapter.cs | 12 +- MQTTnet.Core/Client/IMqttClientFactory.cs | 7 +- .../IMqttCommunicationAdapterFactory.cs | 9 -- MQTTnet.Core/Client/MqttClient.cs | 52 ++++----- MQTTnet.Core/Client/MqttPacketDispatcher.cs | 12 +- .../Diagnostics/IMqttNetTraceHandler.cs | 9 -- MQTTnet.Core/Diagnostics/MqttNetTrace.cs | 77 ------------- MQTTnet.Core/Diagnostics/MqttNetTraceLevel.cs | 10 -- .../Diagnostics/MqttNetTraceMessage.cs | 29 ----- .../MqttNetTraceMessagePublishedEventArgs.cs | 14 --- MQTTnet.Core/MQTTnet.Core.csproj | 5 + .../ManagedClient/ManagedMqttClient.cs | 29 +++-- .../Server/IMqttClientSesssionFactory.cs | 7 ++ MQTTnet.Core/Server/IMqttServerFactory.cs | 6 +- .../Server/MqttClientPendingMessagesQueue.cs | 16 +-- .../MqttClientRetainedMessagesManager.cs | 19 ++-- MQTTnet.Core/Server/MqttClientSession.cs | 27 ++--- .../Server/MqttClientSessionsManager.cs | 46 ++++---- MQTTnet.Core/Server/MqttServer.cs | 29 +++-- .../MQTTnet.Core.Tests.csproj | 1 + .../MqttCommunicationAdapterFactory.cs | 8 +- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 67 +++++++++-- .../ServiceCollectionTest.cs | 30 +++++ .../TestClientSessionFactory.cs | 12 ++ Tests/MQTTnet.Core.Tests/TestLogger.cs | 22 ++++ .../TestMqttServerAdapter.cs | 3 +- .../MqttWebSocketServerAdapter.cs | 12 +- Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs | 20 +--- .../MQTTnet.TestApp.NetCore.csproj | 9 +- .../ManagedClientTest.cs | 21 ++-- .../PerformanceTest.cs | 73 ++++++------ Tests/MQTTnet.TestApp.NetCore/Program.cs | 53 +++------ Tests/MQTTnet.TestApp.NetCore/ServerTest.cs | 48 ++++---- 42 files changed, 580 insertions(+), 487 deletions(-) delete mode 100644 Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs delete mode 100644 Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs create mode 100644 Frameworks/MQTTnet.NetStandard/MqttFactory.cs delete mode 100644 Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs create mode 100644 Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs create mode 100644 MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs delete mode 100644 MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs delete mode 100644 MQTTnet.Core/Diagnostics/IMqttNetTraceHandler.cs delete mode 100644 MQTTnet.Core/Diagnostics/MqttNetTrace.cs delete mode 100644 MQTTnet.Core/Diagnostics/MqttNetTraceLevel.cs delete mode 100644 MQTTnet.Core/Diagnostics/MqttNetTraceMessage.cs delete mode 100644 MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs create mode 100644 MQTTnet.Core/Server/IMqttClientSesssionFactory.cs create mode 100644 Tests/MQTTnet.Core.Tests/ServiceCollectionTest.cs create mode 100644 Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs create mode 100644 Tests/MQTTnet.Core.Tests/TestLogger.cs diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs deleted file mode 100644 index 0654e27..0000000 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs +++ /dev/null @@ -1,30 +0,0 @@ -using System; -using MQTTnet.Core.Adapter; -using MQTTnet.Core.Client; -using MQTTnet.Core.Diagnostics; -using MQTTnet.Core.Serializer; - -namespace MQTTnet.Implementations -{ - public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory - { - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options) - { - if (options == null) throw new ArgumentNullException(nameof(options)); - - if (options is MqttClientTcpOptions tcpOptions) - { - var trace = new MqttNetTrace(); - return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }, trace); - } - - if (options is MqttClientWebSocketOptions webSocketOptions) - { - var trace = new MqttNetTrace(); - return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }, trace); - } - - throw new NotSupportedException(); - } - } -} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index 4b0c178..2d3db15 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -7,23 +7,26 @@ using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Serializer; using MQTTnet.Core.Server; +using Microsoft.Extensions.Logging; +using MQTTnet.Core.Client; namespace MQTTnet.Implementations { public class MqttServerAdapter : IMqttServerAdapter, IDisposable { - private readonly MqttNetTrace _trace; + private readonly ILogger _logger; + private readonly IMqttCommunicationAdapterFactory _mqttCommunicationAdapterFactory; private CancellationTokenSource _cancellationTokenSource; private Socket _defaultEndpointSocket; private Socket _tlsEndpointSocket; private X509Certificate2 _tlsCertificate; - public MqttServerAdapter(MqttNetTrace trace) + public MqttServerAdapter(ILogger logger, IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory) { - _trace = trace ?? throw new ArgumentNullException(nameof(trace)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _mqttCommunicationAdapterFactory = mqttCommunicationAdapterFactory ?? throw new ArgumentNullException(nameof(mqttCommunicationAdapterFactory)); } public event EventHandler ClientAccepted; @@ -95,17 +98,17 @@ namespace MQTTnet.Implementations try { //todo: else branch can be used with min dependency NET46 -#if NET45 +#if NET451 var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false); #else var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false); #endif - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer(), _trace); + var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerMqttCommunicationAdapter(new MqttTcpChannel(clientSocket, null)); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (Exception exception) { - _trace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); + _logger.LogError(new EventId(), exception, "Error while accepting connection at default endpoint."); //excessive CPU consumed if in endless loop of socket errors await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); @@ -119,7 +122,7 @@ namespace MQTTnet.Implementations { try { -#if NET45 +#if NET451 var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null).ConfigureAwait(false); #else var clientSocket = await _tlsEndpointSocket.AcceptAsync().ConfigureAwait(false); @@ -128,12 +131,12 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer(), _trace); + var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerMqttCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream)); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (Exception exception) { - _trace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at TLS endpoint."); + _logger.LogError(new EventId(), exception, "Error while accepting connection at TLS endpoint."); //excessive CPU consumed if in endless loop of socket errors await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index cd9151e..7c051bf 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -59,7 +59,7 @@ namespace MQTTnet.Implementations } //todo: else brach can be used with min dependency NET46 -#if NET45 +#if NET451 await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, _options.Server, _options.GetPort(), null).ConfigureAwait(false); #else await _socket.ConnectAsync(_options.Server, _options.GetPort()).ConfigureAwait(false); diff --git a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj index 88f2514..f8dbcc9 100644 --- a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj +++ b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj @@ -1,7 +1,7 @@  - netstandard1.3;net45 + netstandard1.3;net451 MQTTnet MQTTnet 2.5.0.0 @@ -18,6 +18,11 @@ + + + + + diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs deleted file mode 100644 index fd5da68..0000000 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ /dev/null @@ -1,20 +0,0 @@ -using MQTTnet.Core.Client; -using MQTTnet.Core.Diagnostics; -using MQTTnet.Core.ManagedClient; -using MQTTnet.Implementations; - -namespace MQTTnet -{ - public class MqttClientFactory : IMqttClientFactory - { - public IMqttClient CreateMqttClient(IMqttNetTraceHandler traceHandler = null) - { - return new MqttClient(new MqttCommunicationAdapterFactory(), new MqttNetTrace(traceHandler)); - } - - public ManagedMqttClient CreateManagedMqttClient(IMqttNetTraceHandler traceHandler = null) - { - return new ManagedMqttClient(new MqttCommunicationAdapterFactory(), new MqttNetTrace()); - } - } -} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs new file mode 100644 index 0000000..3ac03d2 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs @@ -0,0 +1,104 @@ +using System; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Client; +using MQTTnet.Core.Serializer; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; +using MQTTnet.Implementations; +using MQTTnet.Core.ManagedClient; +using MQTTnet.Core.Server; +using System.Linq; +using MQTTnet.Core.Channel; + +namespace MQTTnet +{ + public class MqttFactory : IMqttCommunicationAdapterFactory, IMqttClientSesssionFactory, IMqttClientFactory, IMqttServerFactory + { + private readonly IServiceProvider _serviceProvider; + + private static IServiceProvider BuildServiceProvider() + { + return new ServiceCollection() + .AddMqttClient() + .AddMqttServer() + .AddLogging() + .BuildServiceProvider(); + } + + public MqttFactory() + : this(BuildServiceProvider()) + { + } + + public MqttFactory(IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider; + } + + public IMqttCommunicationAdapter CreateClientMqttCommunicationAdapter(IMqttClientOptions options) + { + var logger = _serviceProvider.GetRequiredService>(); + return new MqttChannelCommunicationAdapter(CreateMqttCommunicationChannel(options), CreateSerializer(options.ProtocolVersion), logger); + } + + public IMqttCommunicationAdapter CreateServerMqttCommunicationAdapter(IMqttCommunicationChannel channel) + { + var serializer = _serviceProvider.GetRequiredService(); + var logger = _serviceProvider.GetRequiredService>(); + return new MqttChannelCommunicationAdapter(channel, serializer, logger); + } + + public IMqttCommunicationChannel CreateMqttCommunicationChannel(IMqttClientOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + switch (options) + { + case MqttClientTcpOptions tcpOptions: + return CreateTcpChannel(tcpOptions); + case MqttClientWebSocketOptions webSocketOptions: + return CreateWebSocketChannel(webSocketOptions); + default: + throw new NotSupportedException(); + } + } + + public MqttTcpChannel CreateTcpChannel(MqttClientTcpOptions tcpOptions) + { + return new MqttTcpChannel(tcpOptions); + } + + public MqttWebSocketChannel CreateWebSocketChannel(MqttClientWebSocketOptions webSocketOptions) + { + return new MqttWebSocketChannel(webSocketOptions); + } + + public MqttPacketSerializer CreateSerializer(MqttProtocolVersion protocolVersion) + { + return new MqttPacketSerializer() + { + ProtocolVersion = protocolVersion + }; + } + + public MqttClientSession CreateClientSession(string sessionId, MqttClientSessionsManager mqttClientSessionsManager) + { + return new MqttClientSession(sessionId, mqttClientSessionsManager, _serviceProvider.GetRequiredService>(), _serviceProvider.GetRequiredService>()); + } + + public IMqttClient CreateMqttClient() + { + return _serviceProvider.GetRequiredService(); + } + + public ManagedMqttClient CreateManagedMqttClient() + { + return _serviceProvider.GetRequiredService(); + } + + public IMqttServer CreateMqttServer() + { + return _serviceProvider.GetRequiredService(); + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs deleted file mode 100644 index 468b8e0..0000000 --- a/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs +++ /dev/null @@ -1,20 +0,0 @@ -using System; -using System.Collections.Generic; -using MQTTnet.Core.Adapter; -using MQTTnet.Core.Diagnostics; -using MQTTnet.Core.Server; -using MQTTnet.Implementations; - -namespace MQTTnet -{ - public class MqttServerFactory : IMqttServerFactory - { - public IMqttServer CreateMqttServer(MqttServerOptions options, IMqttNetTraceHandler traceHandler = null) - { - if (options == null) throw new ArgumentNullException(nameof(options)); - - var trace = new MqttNetTrace(traceHandler); - return new MqttServer(options, new List { new MqttServerAdapter(trace) }, trace); - } - } -} diff --git a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..c057882 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs @@ -0,0 +1,55 @@ +using Microsoft.Extensions.DependencyInjection; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Client; +using MQTTnet.Core.ManagedClient; +using MQTTnet.Core.Serializer; +using MQTTnet.Core.Server; +using MQTTnet.Implementations; +using System; + +namespace MQTTnet +{ + public static class ServiceCollectionExtensions + { + public static IServiceCollection AddMqttServer(this IServiceCollection services) + { + services.AddOptions(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + + services.AddSingleton(); + services.AddSingleton(); + + services.AddTransient(); + services.AddTransient(); + + services.AddTransient(); + services.AddTransient(); + + return services; + } + + public static IServiceCollection AddMqttServer(this IServiceCollection services, Action configureOptions) + { + return services + .AddMqttServer() + .Configure(configureOptions); + } + + public static IServiceCollection AddMqttClient(this IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + + services.AddTransient(); + services.AddTransient(); + services.AddTransient(); + services.AddTransient(); + services.AddTransient(); + + return services; + } + } +} diff --git a/MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs b/MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs new file mode 100644 index 0000000..ffcfd2c --- /dev/null +++ b/MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs @@ -0,0 +1,12 @@ +using MQTTnet.Core.Client; +using MQTTnet.Core.Channel; + +namespace MQTTnet.Core.Adapter +{ + public interface IMqttCommunicationAdapterFactory + { + IMqttCommunicationAdapter CreateClientMqttCommunicationAdapter(IMqttClientOptions options); + + IMqttCommunicationAdapter CreateServerMqttCommunicationAdapter(IMqttCommunicationChannel channel); + } +} \ No newline at end of file diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index 2bed87d..d47d4ce 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -4,23 +4,23 @@ using System.IO; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Channel; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Serializer; +using Microsoft.Extensions.Logging; namespace MQTTnet.Core.Adapter { public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter { - private readonly MqttNetTrace _trace; + private readonly ILogger _logger; private readonly IMqttCommunicationChannel _channel; private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); - public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer, MqttNetTrace trace) + public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer, ILogger logger) { - _trace = trace ?? throw new ArgumentNullException(nameof(trace)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _channel = channel ?? throw new ArgumentNullException(nameof(channel)); PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); } @@ -96,7 +96,7 @@ namespace MQTTnet.Core.Adapter continue; } - _trace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout); + _logger.LogInformation("TX >>> {0} [Timeout={1}]", packet, timeout); var writeBuffer = PacketSerializer.Serialize(packet); await _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false); @@ -162,7 +162,7 @@ namespace MQTTnet.Core.Adapter throw new MqttProtocolViolationException("Received malformed packet."); } - _trace.Information(nameof(MqttChannelCommunicationAdapter), "RX <<< {0}", packet); + _logger.LogInformation("RX <<< {0}", packet); return packet; } catch (TaskCanceledException) diff --git a/MQTTnet.Core/Client/IMqttClientFactory.cs b/MQTTnet.Core/Client/IMqttClientFactory.cs index e63047a..7547906 100644 --- a/MQTTnet.Core/Client/IMqttClientFactory.cs +++ b/MQTTnet.Core/Client/IMqttClientFactory.cs @@ -1,12 +1,11 @@ -using MQTTnet.Core.Diagnostics; -using MQTTnet.Core.ManagedClient; +using MQTTnet.Core.ManagedClient; namespace MQTTnet.Core.Client { public interface IMqttClientFactory { - IMqttClient CreateMqttClient(IMqttNetTraceHandler traceHandler = null); + IMqttClient CreateMqttClient(); - ManagedMqttClient CreateManagedMqttClient(IMqttNetTraceHandler traceHandler = null); + ManagedMqttClient CreateManagedMqttClient(); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs b/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs deleted file mode 100644 index 0e29590..0000000 --- a/MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs +++ /dev/null @@ -1,9 +0,0 @@ -using MQTTnet.Core.Adapter; - -namespace MQTTnet.Core.Client -{ - public interface IMqttCommunicationAdapterFactory - { - IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options); - } -} \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index fe1e4ab..67ffb12 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -4,11 +4,11 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; +using Microsoft.Extensions.Logging; namespace MQTTnet.Core.Client { @@ -16,21 +16,21 @@ namespace MQTTnet.Core.Client { private readonly HashSet _unacknowledgedPublishPackets = new HashSet(); private readonly MqttPacketDispatcher _packetDispatcher; - private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory; - private readonly MqttNetTrace _trace; + private readonly IMqttCommunicationAdapterFactory _communicationAdapterFactory; + private readonly ILogger _logger; private IMqttClientOptions _options; private bool _isReceivingPackets; private int _latestPacketIdentifier; private CancellationTokenSource _cancellationTokenSource; private IMqttCommunicationAdapter _adapter; + private IDisposable _scopeHandle; - public MqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory, MqttNetTrace trace) + public MqttClient(IMqttCommunicationAdapterFactory communicationAdapterFactory, ILogger logger, MqttPacketDispatcher packetDispatcher) { - _trace = trace ?? throw new ArgumentNullException(nameof(trace)); - _communicationChannelFactory = communicationChannelFactory ?? throw new ArgumentNullException(nameof(communicationChannelFactory)); - - _packetDispatcher = new MqttPacketDispatcher(trace); + _communicationAdapterFactory = communicationAdapterFactory ?? throw new ArgumentNullException(nameof(communicationAdapterFactory)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _packetDispatcher = packetDispatcher ?? throw new ArgumentNullException(nameof(packetDispatcher)); } public event EventHandler Connected; @@ -52,16 +52,17 @@ namespace MQTTnet.Core.Client _latestPacketIdentifier = 0; _packetDispatcher.Reset(); - _adapter = _communicationChannelFactory.CreateMqttCommunicationAdapter(options); + _adapter = _communicationAdapterFactory.CreateClientMqttCommunicationAdapter(options); - _trace.Verbose(nameof(MqttClient), "Trying to connect with server."); + _scopeHandle = _logger.BeginScope(options.ClientId); + _logger.LogTrace("Trying to connect with server."); await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); - _trace.Verbose(nameof(MqttClient), "Connection with server established."); + _logger.LogTrace("Connection with server established."); await SetupIncomingPacketProcessingAsync(); var connectResponse = await AuthenticateAsync(options.WillMessage); - _trace.Verbose(nameof(MqttClient), "MQTT connection with server established."); + _logger.LogTrace("MQTT connection with server established."); if (_options.KeepAlivePeriod != TimeSpan.Zero) { @@ -93,6 +94,7 @@ namespace MQTTnet.Core.Client finally { await DisconnectInternalAsync().ConfigureAwait(false); + _scopeHandle.Dispose(); } } @@ -246,11 +248,11 @@ namespace MQTTnet.Core.Client try { await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); - _trace.Information(nameof(MqttClient), "Disconnected from adapter."); + _logger.LogInformation("Disconnected from adapter."); } catch (Exception exception) { - _trace.Warning(nameof(MqttClient), exception, "Error while disconnecting from adapter."); + _logger.LogWarning(new EventId(), exception, "Error while disconnecting from adapter."); } finally { @@ -262,7 +264,7 @@ namespace MQTTnet.Core.Client { try { - _trace.Information(nameof(MqttClient), "Received <<< {0}", packet); + _logger.LogInformation("Received <<< {0}", packet); if (packet is MqttPingReqPacket) { @@ -292,7 +294,7 @@ namespace MQTTnet.Core.Client } catch (Exception exception) { - _trace.Error(nameof(MqttClient), exception, "Unhandled exception while processing received packet."); + _logger.LogError(new EventId(), exception, "Unhandled exception while processing received packet."); } } @@ -305,7 +307,7 @@ namespace MQTTnet.Core.Client } catch (Exception exception) { - _trace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); + _logger.LogError(new EventId(), exception, "Unhandled exception while handling application message."); } } @@ -369,7 +371,7 @@ namespace MQTTnet.Core.Client private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) { - _trace.Information(nameof(MqttClient), "Start sending keep alive packets."); + _logger.LogInformation("Start sending keep alive packets."); try { @@ -394,23 +396,23 @@ namespace MQTTnet.Core.Client return; } - _trace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending/receiving keep alive packets."); + _logger.LogWarning(new EventId(), exception, "MQTT communication exception while sending/receiving keep alive packets."); await DisconnectInternalAsync().ConfigureAwait(false); } catch (Exception exception) { - _trace.Warning(nameof(MqttClient), exception, "Unhandled exception while sending/receiving keep alive packets."); + _logger.LogWarning(new EventId(), exception, "Unhandled exception while sending/receiving keep alive packets."); await DisconnectInternalAsync().ConfigureAwait(false); } finally { - _trace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); + _logger.LogInformation("Stopped sending keep alive packets."); } } private async Task ReceivePackets(CancellationToken cancellationToken) { - _trace.Information(nameof(MqttClient), "Start receiving packets."); + _logger.LogInformation("Start receiving packets."); try { @@ -437,17 +439,17 @@ namespace MQTTnet.Core.Client return; } - _trace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); + _logger.LogWarning(new EventId(), exception, "MQTT communication exception while receiving packets."); await DisconnectInternalAsync().ConfigureAwait(false); } catch (Exception exception) { - _trace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); + _logger.LogError(new EventId(), exception, "Unhandled exception while receiving packets."); await DisconnectInternalAsync().ConfigureAwait(false); } finally { - _trace.Information(nameof(MqttClient), "Stopped receiving packets."); + _logger.LogInformation(nameof(MqttClient), "Stopped receiving packets."); } } diff --git a/MQTTnet.Core/Client/MqttPacketDispatcher.cs b/MQTTnet.Core/Client/MqttPacketDispatcher.cs index 40902db..a999d69 100644 --- a/MQTTnet.Core/Client/MqttPacketDispatcher.cs +++ b/MQTTnet.Core/Client/MqttPacketDispatcher.cs @@ -1,10 +1,10 @@ using System; +using System.Collections.Concurrent; using System.Threading.Tasks; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; -using System.Collections.Concurrent; +using Microsoft.Extensions.Logging; namespace MQTTnet.Core.Client { @@ -12,11 +12,11 @@ namespace MQTTnet.Core.Client { private readonly ConcurrentDictionary> _packetByResponseType = new ConcurrentDictionary>(); private readonly ConcurrentDictionary>> _packetByResponseTypeAndIdentifier = new ConcurrentDictionary>>(); - private readonly MqttNetTrace _trace; + private readonly ILogger _logger; - public MqttPacketDispatcher(MqttNetTrace trace) + public MqttPacketDispatcher(ILogger logger) { - _trace = trace ?? throw new ArgumentNullException(nameof(trace)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task WaitForPacketAsync(MqttBasePacket request, Type responseType, TimeSpan timeout) @@ -30,7 +30,7 @@ namespace MQTTnet.Core.Client } catch (MqttCommunicationTimedOutException) { - _trace.Warning(nameof(MqttPacketDispatcher), "Timeout while waiting for packet of type '{0}'.", responseType.Name); + _logger.LogWarning("Timeout while waiting for packet of type '{0}'.", responseType.Name); throw; } finally diff --git a/MQTTnet.Core/Diagnostics/IMqttNetTraceHandler.cs b/MQTTnet.Core/Diagnostics/IMqttNetTraceHandler.cs deleted file mode 100644 index 969cce0..0000000 --- a/MQTTnet.Core/Diagnostics/IMqttNetTraceHandler.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace MQTTnet.Core.Diagnostics -{ - public interface IMqttNetTraceHandler - { - bool IsEnabled { get; } - - void HandleTraceMessage(MqttNetTraceMessage traceMessage); - } -} diff --git a/MQTTnet.Core/Diagnostics/MqttNetTrace.cs b/MQTTnet.Core/Diagnostics/MqttNetTrace.cs deleted file mode 100644 index ece6cf7..0000000 --- a/MQTTnet.Core/Diagnostics/MqttNetTrace.cs +++ /dev/null @@ -1,77 +0,0 @@ -using System; - -namespace MQTTnet.Core.Diagnostics -{ - public sealed class MqttNetTrace : IMqttNetTraceHandler - { - private readonly IMqttNetTraceHandler _traceHandler; - - public MqttNetTrace(IMqttNetTraceHandler customTraceHandler = null) - { - _traceHandler = customTraceHandler ?? this; - } - - public static event EventHandler TraceMessagePublished; - - public bool IsEnabled => TraceMessagePublished != null; - - public void Verbose(string source, string message, params object[] parameters) - { - Publish(source, MqttNetTraceLevel.Verbose, null, message, parameters); - } - - public void Information(string source, string message, params object[] parameters) - { - Publish(source, MqttNetTraceLevel.Information, null, message, parameters); - } - - public void Warning(string source, string message, params object[] parameters) - { - Publish(source, MqttNetTraceLevel.Warning, null, message, parameters); - } - - public void Warning(string source, Exception exception, string message, params object[] parameters) - { - Publish(source, MqttNetTraceLevel.Warning, exception, message, parameters); - } - - public void Error(string source, string message, params object[] parameters) - { - Publish(source, MqttNetTraceLevel.Error, null, message, parameters); - } - - public void Error(string source, Exception exception, string message, params object[] parameters) - { - Publish(source, MqttNetTraceLevel.Error, exception, message, parameters); - } - - public void HandleTraceMessage(MqttNetTraceMessage mqttNetTraceMessage) - { - TraceMessagePublished?.Invoke(this, new MqttNetTraceMessagePublishedEventArgs(mqttNetTraceMessage)); - } - - private void Publish(string source, MqttNetTraceLevel traceLevel, Exception exception, string message, params object[] parameters) - { - if (!_traceHandler.IsEnabled) - { - return; - } - - var now = DateTime.Now; - if (parameters?.Length > 0) - { - try - { - message = string.Format(message, parameters); - } - catch (Exception formatException) - { - Error(nameof(MqttNetTrace), formatException, "Error while tracing message: " + message); - return; - } - } - - _traceHandler.HandleTraceMessage(new MqttNetTraceMessage(now, Environment.CurrentManagedThreadId, source, traceLevel, message, exception)); - } - } -} diff --git a/MQTTnet.Core/Diagnostics/MqttNetTraceLevel.cs b/MQTTnet.Core/Diagnostics/MqttNetTraceLevel.cs deleted file mode 100644 index 5f3a13f..0000000 --- a/MQTTnet.Core/Diagnostics/MqttNetTraceLevel.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace MQTTnet.Core.Diagnostics -{ - public enum MqttNetTraceLevel - { - Verbose, - Information, - Warning, - Error - } -} diff --git a/MQTTnet.Core/Diagnostics/MqttNetTraceMessage.cs b/MQTTnet.Core/Diagnostics/MqttNetTraceMessage.cs deleted file mode 100644 index 3eb1b56..0000000 --- a/MQTTnet.Core/Diagnostics/MqttNetTraceMessage.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System; - -namespace MQTTnet.Core.Diagnostics -{ - public sealed class MqttNetTraceMessage - { - public MqttNetTraceMessage(DateTime timestamp, int threadId, string source, MqttNetTraceLevel level, string message, Exception exception) - { - Timestamp = timestamp; - ThreadId = threadId; - Source = source; - Level = level; - Message = message; - Exception = exception; - } - - public DateTime Timestamp { get; } - - public int ThreadId { get; } - - public string Source { get; } - - public MqttNetTraceLevel Level { get; } - - public string Message { get; } - - public Exception Exception { get; } - } -} diff --git a/MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs b/MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs deleted file mode 100644 index 5b8f796..0000000 --- a/MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; - -namespace MQTTnet.Core.Diagnostics -{ - public sealed class MqttNetTraceMessagePublishedEventArgs : EventArgs - { - public MqttNetTraceMessagePublishedEventArgs(MqttNetTraceMessage traceMessage) - { - TraceMessage = traceMessage ?? throw new ArgumentNullException(nameof(traceMessage)); - } - - public MqttNetTraceMessage TraceMessage { get; } - } -} diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index 2f55ab5..3e87742 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -22,4 +22,9 @@ + + + + + \ No newline at end of file diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs index 0612078..6b2e1dd 100644 --- a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs @@ -5,10 +5,10 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Client; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; +using Microsoft.Extensions.Logging; namespace MQTTnet.Core.ManagedClient { @@ -19,7 +19,7 @@ namespace MQTTnet.Core.ManagedClient private readonly HashSet _subscriptions = new HashSet(); private readonly IMqttClient _mqttClient; - private readonly MqttNetTrace _trace; + private readonly ILogger _logger; private CancellationTokenSource _connectionCancellationToken; private CancellationTokenSource _publishingCancellationToken; @@ -27,12 +27,11 @@ namespace MQTTnet.Core.ManagedClient private IManagedMqttClientOptions _options; private bool _subscriptionsNotPushed; - public ManagedMqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory, MqttNetTrace trace) + public ManagedMqttClient(ILogger logger, IMqttClient mqttClient) { - if (communicationChannelFactory == null) throw new ArgumentNullException(nameof(communicationChannelFactory)); - _trace = trace ?? throw new ArgumentNullException(nameof(trace)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); - _mqttClient = new MqttClient(communicationChannelFactory, _trace); _mqttClient.Connected += OnConnected; _mqttClient.Disconnected += OnDisconnected; _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; @@ -77,7 +76,7 @@ namespace MQTTnet.Core.ManagedClient Task.Factory.StartNew(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - _trace.Information(nameof(ManagedMqttClient), "Started"); + _logger.LogInformation("Started"); } public Task StopAsync() @@ -179,16 +178,16 @@ namespace MQTTnet.Core.ManagedClient } catch (MqttCommunicationException exception) { - _trace.Warning(nameof(ManagedMqttClient), exception, "Communication exception while maintaining connection."); + _logger.LogWarning(new EventId(), exception, "Communication exception while maintaining connection."); } catch (Exception exception) { - _trace.Error(nameof(ManagedMqttClient), exception, "Unhandled exception while maintaining connection."); + _logger.LogError(new EventId(), exception, "Unhandled exception while maintaining connection."); } finally { await _mqttClient.DisconnectAsync().ConfigureAwait(false); - _trace.Information(nameof(ManagedMqttClient), "Stopped"); + _logger.LogInformation("Stopped"); } } @@ -217,7 +216,7 @@ namespace MQTTnet.Core.ManagedClient } finally { - _trace.Information(nameof(ManagedMqttClient), "Stopped publishing messages"); + _logger.LogInformation("Stopped publishing messages"); } } @@ -229,7 +228,7 @@ namespace MQTTnet.Core.ManagedClient } catch (MqttCommunicationException exception) { - _trace.Warning(nameof(ManagedMqttClient), exception, "Publishing application message failed."); + _logger.LogWarning(new EventId(), exception, "Publishing application message failed."); if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) { @@ -238,13 +237,13 @@ namespace MQTTnet.Core.ManagedClient } catch (Exception exception) { - _trace.Error(nameof(ManagedMqttClient), exception, "Unhandled exception while publishing queued application message."); + _logger.LogError(new EventId(), exception, "Unhandled exception while publishing queued application message."); } } private async Task PushSubscriptionsAsync() { - _trace.Information(nameof(ManagedMqttClient), "Synchronizing subscriptions"); + _logger.LogInformation(nameof(ManagedMqttClient), "Synchronizing subscriptions"); List subscriptions; lock (_subscriptions) @@ -264,7 +263,7 @@ namespace MQTTnet.Core.ManagedClient } catch (Exception exception) { - _trace.Warning(nameof(ManagedMqttClient), exception, "Synchronizing subscriptions failed"); + _logger.LogWarning(new EventId(), exception, "Synchronizing subscriptions failed"); _subscriptionsNotPushed = true; } } diff --git a/MQTTnet.Core/Server/IMqttClientSesssionFactory.cs b/MQTTnet.Core/Server/IMqttClientSesssionFactory.cs new file mode 100644 index 0000000..f58dd5e --- /dev/null +++ b/MQTTnet.Core/Server/IMqttClientSesssionFactory.cs @@ -0,0 +1,7 @@ +namespace MQTTnet.Core.Server +{ + public interface IMqttClientSesssionFactory + { + MqttClientSession CreateClientSession(string sessionId, MqttClientSessionsManager mqttClientSessionsManager); + } +} diff --git a/MQTTnet.Core/Server/IMqttServerFactory.cs b/MQTTnet.Core/Server/IMqttServerFactory.cs index 865080b..61d022f 100644 --- a/MQTTnet.Core/Server/IMqttServerFactory.cs +++ b/MQTTnet.Core/Server/IMqttServerFactory.cs @@ -1,9 +1,7 @@ -using MQTTnet.Core.Diagnostics; - -namespace MQTTnet.Core.Server +namespace MQTTnet.Core.Server { public interface IMqttServerFactory { - IMqttServer CreateMqttServer(MqttServerOptions options, IMqttNetTraceHandler traceHandler = null); + IMqttServer CreateMqttServer(); } } \ No newline at end of file diff --git a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs index 551dc8e..6b3f023 100644 --- a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs +++ b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs @@ -3,10 +3,10 @@ using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; +using Microsoft.Extensions.Logging; namespace MQTTnet.Core.Server { @@ -15,11 +15,11 @@ namespace MQTTnet.Core.Server private readonly BlockingCollection _pendingPublishPackets = new BlockingCollection(); private readonly MqttClientSession _session; private readonly MqttServerOptions _options; - private readonly MqttNetTrace _trace; + private readonly ILogger _logger; - public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session, MqttNetTrace trace) + public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session, ILogger logger) { - _trace = trace ?? throw new ArgumentNullException(nameof(trace)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _session = session ?? throw new ArgumentNullException(nameof(session)); _options = options ?? throw new ArgumentNullException(nameof(options)); } @@ -52,7 +52,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - _trace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Unhandled exception while sending pending publish packets."); + _logger.LogError(new EventId(), exception, "Unhandled exception while sending pending publish packets."); } } @@ -68,18 +68,18 @@ namespace MQTTnet.Core.Server { if (exception is MqttCommunicationTimedOutException) { - _trace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to timeout."); + _logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to timeout."); } else if (exception is MqttCommunicationException) { - _trace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to communication exception."); + _logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to communication exception."); } if (exception is OperationCanceledException) { } else { - _trace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed."); + _logger.LogError(new EventId(), exception, "Sending publish packet failed."); } if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) diff --git a/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs b/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs index 22de305..68a7b99 100644 --- a/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs +++ b/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs @@ -2,21 +2,22 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Packets; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace MQTTnet.Core.Server { public sealed class MqttClientRetainedMessagesManager { private readonly Dictionary _retainedMessages = new Dictionary(); - private readonly MqttNetTrace _trace; + private readonly ILogger _logger; private readonly MqttServerOptions _options; - public MqttClientRetainedMessagesManager(MqttServerOptions options, MqttNetTrace trace) + public MqttClientRetainedMessagesManager(IOptions options, ILogger logger) { - _trace = trace ?? throw new ArgumentNullException(nameof(trace)); - _options = options ?? throw new ArgumentNullException(nameof(options)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _options = options.Value ?? throw new ArgumentNullException(nameof(options)); } public async Task LoadMessagesAsync() @@ -40,7 +41,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - _trace.Error(nameof(MqttClientRetainedMessagesManager), exception, "Unhandled exception while loading retained messages."); + _logger.LogError(new EventId(), exception, "Unhandled exception while loading retained messages."); } } @@ -54,12 +55,12 @@ namespace MQTTnet.Core.Server if (applicationMessage.Payload?.Any() == false) { _retainedMessages.Remove(applicationMessage.Topic); - _trace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic); + _logger.LogInformation("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic); } else { _retainedMessages[applicationMessage.Topic] = applicationMessage; - _trace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' updated retained message for topic '{1}'.", clientId, applicationMessage.Topic); + _logger.LogInformation("Client '{0}' updated retained message for topic '{1}'.", clientId, applicationMessage.Topic); } allRetainedMessages = new List(_retainedMessages.Values); @@ -75,7 +76,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - _trace.Error(nameof(MqttClientRetainedMessagesManager), exception, "Unhandled exception while saving retained messages."); + _logger.LogError(new EventId(), exception, "Unhandled exception while saving retained messages."); } } diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index f913901..b519ef2 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -3,12 +3,12 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Serializer; +using Microsoft.Extensions.Logging; namespace MQTTnet.Core.Server { @@ -20,19 +20,20 @@ namespace MQTTnet.Core.Server private readonly MqttClientSessionsManager _mqttClientSessionsManager; private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; private readonly MqttServerOptions _options; - private readonly MqttNetTrace _trace; + private readonly ILogger _logger; private IMqttCommunicationAdapter _adapter; private CancellationTokenSource _cancellationTokenSource; private MqttApplicationMessage _willMessage; - public MqttClientSession(string clientId, MqttServerOptions options, MqttClientSessionsManager mqttClientSessionsManager, MqttNetTrace trace) + public MqttClientSession(string clientId, MqttClientSessionsManager mqttClientSessionsManager, ILogger logger, ILogger msgQueueLogger) { - _trace = trace ?? throw new ArgumentNullException(nameof(trace)); ClientId = clientId; - _options = options ?? throw new ArgumentNullException(nameof(options)); + _options = mqttClientSessionsManager.Options; _mqttClientSessionsManager = mqttClientSessionsManager ?? throw new ArgumentNullException(nameof(mqttClientSessionsManager)); - _pendingMessagesQueue = new MqttClientPendingMessagesQueue(options, this, trace); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + _pendingMessagesQueue = new MqttClientPendingMessagesQueue(mqttClientSessionsManager.Options, this, msgQueueLogger); } public string ClientId { get; } @@ -60,11 +61,11 @@ namespace MQTTnet.Core.Server } catch (MqttCommunicationException exception) { - _trace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); + _logger.LogWarning(new EventId(), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); } catch (Exception exception) { - _trace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); + _logger.LogError(new EventId(), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); } } @@ -81,7 +82,7 @@ namespace MQTTnet.Core.Server _adapter = null; - _trace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId); + _logger.LogInformation("Client '{0}': Disconnected.", ClientId); } public void EnqueuePublishPacket(MqttPublishPacket publishPacket) @@ -94,7 +95,7 @@ namespace MQTTnet.Core.Server } _pendingMessagesQueue.Enqueue(publishPacket); - _trace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", ClientId); + _logger.LogTrace("Client '{0}': Enqueued pending publish packet.", ClientId); } public void Dispose() @@ -118,12 +119,12 @@ namespace MQTTnet.Core.Server } catch (MqttCommunicationException exception) { - _trace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); + _logger.LogWarning(new EventId(), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); Stop(); } catch (Exception exception) { - _trace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); + _logger.LogError(new EventId(), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); Stop(); } } @@ -165,7 +166,7 @@ namespace MQTTnet.Core.Server } else { - _trace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); + _logger.LogWarning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); Stop(); } } diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index aedd3d7..f1840eb 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -4,26 +4,28 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Serializer; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace MQTTnet.Core.Server { public sealed class MqttClientSessionsManager { private readonly Dictionary _clientSessions = new Dictionary(); - private readonly MqttServerOptions _options; - private readonly MqttNetTrace _trace; + private readonly ILogger _logger; + private readonly IMqttClientSesssionFactory _mqttClientSesssionFactory; - public MqttClientSessionsManager(MqttServerOptions options, MqttNetTrace trace) + public MqttClientSessionsManager(IOptions options, ILogger logger, MqttClientRetainedMessagesManager retainedMessagesManager, IMqttClientSesssionFactory mqttClientSesssionFactory) { - _trace = trace ?? throw new ArgumentNullException(nameof(trace)); - _options = options ?? throw new ArgumentNullException(nameof(options)); - RetainedMessagesManager = new MqttClientRetainedMessagesManager(options, trace); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + Options = options.Value ?? throw new ArgumentNullException(nameof(options)); + RetainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(options)); + _mqttClientSesssionFactory = mqttClientSesssionFactory ?? throw new ArgumentNullException(nameof(mqttClientSesssionFactory)); } public event EventHandler ApplicationMessageReceived; @@ -31,13 +33,14 @@ namespace MQTTnet.Core.Server public event EventHandler ClientDisconnected; public MqttClientRetainedMessagesManager RetainedMessagesManager { get; } + public MqttServerOptions Options { get; } public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter) { var clientId = string.Empty; try { - if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket)) + if (!(await clientAdapter.ReceivePacketAsync(Options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket)) { throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); } @@ -50,7 +53,7 @@ namespace MQTTnet.Core.Server var connectReturnCode = ValidateConnection(connectPacket); if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { - await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket + await clientAdapter.SendPacketsAsync(Options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode }).ConfigureAwait(false); @@ -60,7 +63,7 @@ namespace MQTTnet.Core.Server var clientSession = GetOrCreateClientSession(connectPacket); - await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket + await clientAdapter.SendPacketsAsync(Options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode, IsSessionPresent = clientSession.IsExistingSession @@ -72,17 +75,20 @@ namespace MQTTnet.Core.Server ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion })); - await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false); + using (_logger.BeginScope(clientId)) + { + await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false); + } } catch (Exception exception) { - _trace.Error(nameof(MqttServer), exception, exception.Message); + _logger.LogError(new EventId(), exception, exception.Message); } finally { try { - await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); + await clientAdapter.DisconnectAsync(Options.DefaultCommunicationTimeout).ConfigureAwait(false); } catch (Exception) { @@ -126,7 +132,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - _trace.Error(nameof(MqttClientSessionsManager), exception, "Error while processing application message"); + _logger.LogError(new EventId(), exception, "Error while processing application message"); } lock (_clientSessions) @@ -140,9 +146,9 @@ namespace MQTTnet.Core.Server private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket) { - if (_options.ConnectionValidator != null) + if (Options.ConnectionValidator != null) { - return _options.ConnectionValidator(connectPacket); + return Options.ConnectionValidator(connectPacket); } return MqttConnectReturnCode.ConnectionAccepted; @@ -160,11 +166,11 @@ namespace MQTTnet.Core.Server _clientSessions.Remove(connectPacket.ClientId); clientSession.Dispose(); clientSession = null; - _trace.Verbose(nameof(MqttClientSessionsManager), "Disposed existing session of client '{0}'.", connectPacket.ClientId); + _logger.LogTrace("Disposed existing session of client '{0}'.", connectPacket.ClientId); } else { - _trace.Verbose(nameof(MqttClientSessionsManager), "Reusing existing session of client '{0}'.", connectPacket.ClientId); + _logger.LogTrace("Reusing existing session of client '{0}'.", connectPacket.ClientId); } } @@ -173,10 +179,10 @@ namespace MQTTnet.Core.Server { isExistingSession = false; - clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _trace); + clientSession = _mqttClientSesssionFactory.CreateClientSession(connectPacket.ClientId, this); _clientSessions[connectPacket.ClientId] = clientSession; - _trace.Verbose(nameof(MqttClientSessionsManager), "Created a new session for client '{0}'.", connectPacket.ClientId); + _logger.LogTrace("Created a new session for client '{0}'.", connectPacket.ClientId); } return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index 341b370..b5c25de 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -3,26 +3,33 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; -using MQTTnet.Core.Diagnostics; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System.Linq; namespace MQTTnet.Core.Server { public sealed class MqttServer : IMqttServer { - private readonly MqttNetTrace _trace; + private readonly ILogger _logger; private readonly MqttClientSessionsManager _clientSessionsManager; private readonly ICollection _adapters; private readonly MqttServerOptions _options; private CancellationTokenSource _cancellationTokenSource; - public MqttServer(MqttServerOptions options, ICollection adapters, MqttNetTrace trace) + public MqttServer(IOptions options, IEnumerable adapters, ILogger logger, MqttClientSessionsManager clientSessionsManager) { - _trace = trace ?? throw new ArgumentNullException(nameof(trace)); - _options = options ?? throw new ArgumentNullException(nameof(options)); - _adapters = adapters ?? throw new ArgumentNullException(nameof(adapters)); + _options = options.Value ?? throw new ArgumentNullException(nameof(options)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _clientSessionsManager = clientSessionsManager ?? throw new ArgumentNullException(nameof(clientSessionsManager)); + + if (adapters == null) + { + throw new ArgumentNullException(nameof(adapters)); + } + _adapters = adapters.ToList(); - _clientSessionsManager = new MqttClientSessionsManager(options, trace); _clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); _clientSessionsManager.ClientConnected += OnClientConnected; _clientSessionsManager.ClientDisconnected += OnClientDisconnected; @@ -59,7 +66,7 @@ namespace MQTTnet.Core.Server await adapter.StartAsync(_options); } - _trace.Information(nameof(MqttServer), "Started."); + _logger.LogInformation("Started."); } public async Task StopAsync() @@ -76,7 +83,7 @@ namespace MQTTnet.Core.Server _clientSessionsManager.Clear(); - _trace.Information(nameof(MqttServer), "Stopped."); + _logger.LogInformation("Stopped."); } private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) @@ -86,13 +93,13 @@ namespace MQTTnet.Core.Server private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs) { - _trace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Client.ClientId); + _logger.LogInformation("Client '{0}': Connected.", eventArgs.Client.ClientId); ClientConnected?.Invoke(this, eventArgs); } private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) { - _trace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Client.ClientId); + _logger.LogInformation("Client '{0}': Disconnected.", eventArgs.Client.ClientId); ClientDisconnected?.Invoke(this, eventArgs); } } diff --git a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj index 2a03359..7ba4895 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj +++ b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj @@ -7,6 +7,7 @@ + diff --git a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs index bed3218..7e3a7dc 100644 --- a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs @@ -1,4 +1,5 @@ using MQTTnet.Core.Adapter; +using MQTTnet.Core.Channel; using MQTTnet.Core.Client; namespace MQTTnet.Core.Tests @@ -12,7 +13,12 @@ namespace MQTTnet.Core.Tests _adapter = adapter; } - public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options) + public IMqttCommunicationAdapter CreateClientMqttCommunicationAdapter(IMqttClientOptions options) + { + return _adapter; + } + + public IMqttCommunicationAdapter CreateServerMqttCommunicationAdapter(IMqttCommunicationChannel channel) { return _adapter; } diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 948497e..e866ca8 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -5,10 +5,10 @@ using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; +using Microsoft.Extensions.DependencyInjection; namespace MQTTnet.Core.Tests { @@ -52,7 +52,12 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_WillMessage() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); + var services = new ServiceCollection() + .AddMqttServer() + .AddSingleton(serverAdapter) + .BuildServiceProvider(); + + var s = services.GetRequiredService(); await s.StartAsync(); var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build(); @@ -76,7 +81,12 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_Unsubscribe() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); + var services = new ServiceCollection() + .AddMqttServer() + .AddSingleton(serverAdapter) + .BuildServiceProvider(); + + var s = services.GetRequiredService(); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -112,7 +122,12 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_Publish() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); + var services = new ServiceCollection() + .AddMqttServer() + .AddSingleton(serverAdapter) + .BuildServiceProvider(); + + var s = services.GetRequiredService(); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -135,7 +150,12 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_NoRetainedMessage() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); + var services = new ServiceCollection() + .AddMqttServer() + .AddSingleton(serverAdapter) + .BuildServiceProvider(); + + var s = services.GetRequiredService(); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -158,7 +178,12 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_RetainedMessage() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); + var services = new ServiceCollection() + .AddMqttServer() + .AddSingleton(serverAdapter) + .BuildServiceProvider(); + + var s = services.GetRequiredService(); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -181,7 +206,12 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_ClearRetainedMessage() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); + var services = new ServiceCollection() + .AddMqttServer() + .AddSingleton(serverAdapter) + .BuildServiceProvider(); + + var s = services.GetRequiredService(); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -207,7 +237,12 @@ namespace MQTTnet.Core.Tests var storage = new TestStorage(); var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions { Storage = storage }, new List { serverAdapter }, new MqttNetTrace()); + var services = new ServiceCollection() + .AddMqttServer() + .AddSingleton(serverAdapter) + .BuildServiceProvider(); + + var s = services.GetRequiredService(); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -216,7 +251,7 @@ namespace MQTTnet.Core.Tests await s.StopAsync(); - s = new MqttServer(new MqttServerOptions { Storage = storage }, new List { serverAdapter }, new MqttNetTrace()); + s = services.GetRequiredService(); await s.StartAsync(); var c2 = await serverAdapter.ConnectTestClient(s, "c2"); @@ -244,7 +279,12 @@ namespace MQTTnet.Core.Tests }; var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(options, new List { serverAdapter }, new MqttNetTrace()); + var services = new ServiceCollection() + .AddMqttServer() + .AddSingleton(serverAdapter) + .BuildServiceProvider(); + + var s = services.GetRequiredService(); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -290,7 +330,12 @@ namespace MQTTnet.Core.Tests int expectedReceivedMessagesCount) { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); + var services = new ServiceCollection() + .AddMqttServer() + .AddSingleton(serverAdapter) + .BuildServiceProvider(); + + var s = services.GetRequiredService(); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); diff --git a/Tests/MQTTnet.Core.Tests/ServiceCollectionTest.cs b/Tests/MQTTnet.Core.Tests/ServiceCollectionTest.cs new file mode 100644 index 0000000..0117ed0 --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/ServiceCollectionTest.cs @@ -0,0 +1,30 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace MQTTnet.Core.Tests +{ + [TestClass] + public class ServiceCollectionTest + { + [TestMethod] + public void TestCanConstructAllServices() + { + var services = new ServiceCollection() + .AddLogging() + .AddMqttServer() + .AddMqttClient(); + + var serviceProvider = services + .BuildServiceProvider(); + + foreach (var service in services) + { + if (service.ServiceType.IsGenericType) + { + continue; + } + serviceProvider.GetRequiredService(service.ServiceType); + } + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs b/Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs new file mode 100644 index 0000000..519ef50 --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs @@ -0,0 +1,12 @@ +using MQTTnet.Core.Server; + +namespace MQTTnet.Core.Tests +{ + public class TestClientSessionFactory : IMqttClientSesssionFactory + { + public MqttClientSession CreateClientSession(string sessionId, MqttClientSessionsManager mqttClientSessionsManager) + { + return new MqttClientSession(sessionId, mqttClientSessionsManager, new TestLogger(), new TestLogger()); + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/TestLogger.cs b/Tests/MQTTnet.Core.Tests/TestLogger.cs new file mode 100644 index 0000000..ea597a6 --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/TestLogger.cs @@ -0,0 +1,22 @@ +using Microsoft.Extensions.Logging; +using System; + +namespace MQTTnet.Core.Tests +{ + public class TestLogger : ILogger + { + public IDisposable BeginScope(TState state) + { + throw new NotImplementedException(); + } + + public bool IsEnabled(LogLevel logLevel) + { + return true; + } + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) + { + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs index ae15cc2..0655a09 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs @@ -3,7 +3,6 @@ using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Server; using MQTTnet.Core.Client; -using MQTTnet.Core.Diagnostics; namespace MQTTnet.Core.Tests { @@ -18,7 +17,7 @@ namespace MQTTnet.Core.Tests adapterA.Partner = adapterB; adapterB.Partner = adapterA; - var client = new MqttClient(new MqttCommunicationAdapterFactory(adapterA), new MqttNetTrace()); + var client = new MqttClient(new MqttCommunicationAdapterFactory(adapterA), new TestLogger(), new MqttPacketDispatcher(new TestLogger())); var connected = WaitForClientToConnect(server, clientId); FireClientAcceptedEvent(adapterB); diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs b/Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs index c9f5293..90ad49f 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs +++ b/Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs @@ -5,20 +5,22 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Channel; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Serializer; using MQTTnet.Core.Server; using MQTTnet.Implementations; +using Microsoft.Extensions.Logging; namespace MQTTnet.TestApp.AspNetCore2 { public class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable { - private readonly MqttNetTrace _trace; + private readonly ILogger _logger; + private readonly IMqttCommunicationAdapterFactory _mqttCommunicationAdapterFactory; - public MqttWebSocketServerAdapter(MqttNetTrace trace) + public MqttWebSocketServerAdapter(ILogger logger, IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory) { - _trace = trace ?? throw new ArgumentNullException(nameof(trace)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _mqttCommunicationAdapterFactory = mqttCommunicationAdapterFactory ?? throw new ArgumentNullException(nameof(mqttCommunicationAdapterFactory)); } public event EventHandler ClientAccepted; @@ -38,7 +40,7 @@ namespace MQTTnet.TestApp.AspNetCore2 if (webSocket == null) throw new ArgumentNullException(nameof(webSocket)); var channel = new MqttWebSocketServerChannel(webSocket); - var clientAdapter = new MqttChannelCommunicationAdapter(channel, new MqttPacketSerializer(), _trace); + var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerMqttCommunicationAdapter(channel); var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter); ClientAccepted?.Invoke(this, eventArgs); diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs index d730d2c..a4f5298 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs +++ b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs @@ -5,7 +5,6 @@ using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using MQTTnet.Core.Adapter; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Server; namespace MQTTnet.TestApp.AspNetCore2 @@ -14,25 +13,18 @@ namespace MQTTnet.TestApp.AspNetCore2 { public void ConfigureServices(IServiceCollection services) { + services.AddMqttServer(); + services.AddSingleton(); + services.AddSingleton(); } public async void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) { loggerFactory.AddConsole(LogLevel.Debug); + loggerFactory.AddDebug(); - MqttNetTrace.TraceMessagePublished += (s, e) => - { - Debug.WriteLine($">> [{e.TraceMessage.Timestamp}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); - if (e.TraceMessage.Exception != null) - { - Debug.WriteLine(e.TraceMessage.Exception.Message); - } - }; - - var trace = new MqttNetTrace(); - var adapter = new MqttWebSocketServerAdapter(trace); - var options = new MqttServerOptions(); - var mqttServer = new MqttServer(options, new List { adapter }, new MqttNetTrace()); + var adapter = app.ApplicationServices.GetService(); + var mqttServer = app.ApplicationServices.GetService(); await mqttServer.StartAsync(); app.UseWebSockets(); diff --git a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj index 41ece1b..4c0f99d 100644 --- a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj +++ b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj @@ -3,10 +3,11 @@ Exe Full - netcoreapp2.0;net45 + netcoreapp2.0;net451 + @@ -15,4 +16,10 @@ + + + 1.1.2 + + + diff --git a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs index 4959f37..b54de06 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs @@ -2,10 +2,11 @@ using System.Threading.Tasks; using MQTTnet.Core; using MQTTnet.Core.Client; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.ManagedClient; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; namespace MQTTnet.TestApp.NetCore { @@ -13,14 +14,14 @@ namespace MQTTnet.TestApp.NetCore { public static async Task RunAsync() { - MqttNetTrace.TraceMessagePublished += (s, e) => - { - Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); - if (e.TraceMessage.Exception != null) - { - Console.WriteLine(e.TraceMessage.Exception); - } - }; + var services = new ServiceCollection() + .AddMqttClient() + .AddLogging() + .BuildServiceProvider(); + + services.GetService() + .AddConsole(); + var options = new ManagedMqttClientOptions { @@ -35,7 +36,7 @@ namespace MQTTnet.TestApp.NetCore try { - var managedClient = new MqttClientFactory().CreateManagedMqttClient(); + var managedClient = services.GetRequiredService(); managedClient.ApplicationMessageReceived += (s, e) => { Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic); diff --git a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs index 638bdc0..746aa74 100644 --- a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs @@ -1,4 +1,6 @@ -using MQTTnet.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using MQTTnet.Core; using MQTTnet.Core.Client; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; @@ -17,33 +19,58 @@ namespace MQTTnet.TestApp.NetCore { public static async Task RunAsync() { - var server = Task.Factory.StartNew(RunServerAsync, TaskCreationOptions.LongRunning); - var client = Task.Factory.StartNew(() => RunClientAsync(2000, TimeSpan.FromMilliseconds(10)), TaskCreationOptions.LongRunning); + var services = new ServiceCollection() + .AddMqttServer(options => { + + options.ConnectionValidator = p => + { + if (p.ClientId == "SpecialClient") + { + if (p.Username != "USER" || p.Password != "PASS") + { + return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; + } + } + + return MqttConnectReturnCode.ConnectionAccepted; + }; + + options.DefaultCommunicationTimeout = TimeSpan.FromMinutes(10); + }) + .AddMqttClient() + .AddLogging() + .BuildServiceProvider(); + + services.GetService() + .AddConsole(minLevel: LogLevel.Warning, includeScopes: true); + + Console.WriteLine("Press 'c' for concurrent sends. Otherwise in one batch."); + var concurrent = Console.ReadKey(intercept: true).KeyChar == 'c'; + + var server = Task.Factory.StartNew(() => RunServerAsync(services), TaskCreationOptions.LongRunning); + var client = Task.Factory.StartNew(() => RunClientAsync(2000, TimeSpan.FromMilliseconds(10), services, concurrent), TaskCreationOptions.LongRunning); await Task.WhenAll(server, client).ConfigureAwait(false); } - private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval) + private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval, IServiceProvider serviceProvider, bool concurrent) { - return Task.WhenAll(Enumerable.Range(0, 3).Select(i => Task.Run(() => RunClientAsync(msgChunkSize, interval)))); + return Task.WhenAll(Enumerable.Range(0, 3).Select(i => Task.Run(() => RunClientAsync(msgChunkSize, interval, serviceProvider, concurrent)))); } - private static async Task RunClientAsync(int msgChunkSize, TimeSpan interval) + private static async Task RunClientAsync(int msgChunkSize, TimeSpan interval, IServiceProvider serviceProvider, bool concurrent) { try { var options = new MqttClientTcpOptions { Server = "localhost", - ClientId = "XYZ", + ClientId = "Client1", CleanSession = true, DefaultCommunicationTimeout = TimeSpan.FromMinutes(10) }; - var client = new MqttClientFactory().CreateMqttClient(); - client.ApplicationMessageReceived += (s, e) => - { - }; + var client = serviceProvider.GetRequiredService(); client.Connected += async (s, e) => { @@ -113,8 +140,7 @@ namespace MQTTnet.TestApp.NetCore .Select(i => CreateMessage()) .ToList(); - Console.WriteLine("Press 'c' for concurrent sends. Otherwise in one batch."); - if (Console.ReadKey().KeyChar == 'c') + if (concurrent) { //send concurrent (test for raceconditions) var sendTasks = msgs @@ -165,28 +191,11 @@ namespace MQTTnet.TestApp.NetCore return Task.Run(() => client.PublishAsync(applicationMessage)); } - private static async Task RunServerAsync() + private static async Task RunServerAsync(IServiceProvider serviceProvider) { try { - var options = new MqttServerOptions - { - ConnectionValidator = p => - { - if (p.ClientId == "SpecialClient") - { - if (p.Username != "USER" || p.Password != "PASS") - { - return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; - } - } - - return MqttConnectReturnCode.ConnectionAccepted; - }, - DefaultCommunicationTimeout = TimeSpan.FromMinutes(10) - }; - - var mqttServer = new MqttServerFactory().CreateMqttServer(options); + var mqttServer = serviceProvider.GetRequiredService(); var msgs = 0; var stopwatch = Stopwatch.StartNew(); mqttServer.ApplicationMessageReceived += (sender, args) => diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index 63c4ee2..f0b28c8 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -1,6 +1,5 @@ using MQTTnet.Core; using MQTTnet.Core.Client; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; @@ -12,6 +11,8 @@ using System.Text; using System.Threading; using System.Threading.Tasks; using Newtonsoft.Json; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; namespace MQTTnet.TestApp.NetCore { @@ -24,7 +25,7 @@ namespace MQTTnet.TestApp.NetCore Console.WriteLine("2 = Start server"); Console.WriteLine("3 = Start performance test"); Console.WriteLine("4 = Start managed client"); - + var pressedKey = Console.ReadKey(true); if (pressedKey.KeyChar == '1') { @@ -48,17 +49,16 @@ namespace MQTTnet.TestApp.NetCore private static async Task RunClientAsync() { - MqttNetTrace.TraceMessagePublished += (s, e) => - { - Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); - if (e.TraceMessage.Exception != null) - { - Console.WriteLine(e.TraceMessage.Exception); - } - }; - try { + var services = new ServiceCollection() + .AddMqttServer() + .AddLogging() + .BuildServiceProvider(); + + services.GetService() + .AddConsole(); + var options = new MqttClientWebSocketOptions { Uri = "localhost", @@ -66,7 +66,7 @@ namespace MQTTnet.TestApp.NetCore CleanSession = true }; - var client = new MqttClientFactory().CreateMqttClient(); + var client = services.GetRequiredService(); client.ApplicationMessageReceived += (s, e) => { Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); @@ -139,7 +139,12 @@ namespace MQTTnet.TestApp.NetCore private static async void WikiCode() { { - var client = new MqttClientFactory().CreateMqttClient(new CustomTraceHandler("Client 1")); + var serviceProvider = new ServiceCollection() + .AddMqttServer() + .AddLogging() + .BuildServiceProvider(); + + var client = serviceProvider.GetRequiredService(); var message = new MqttApplicationMessageBuilder() .WithTopic("MyTopic") @@ -159,28 +164,6 @@ namespace MQTTnet.TestApp.NetCore } } - public class CustomTraceHandler : IMqttNetTraceHandler - { - private readonly string _clientId; - - public CustomTraceHandler(string clientId) - { - _clientId = clientId; - } - - public bool IsEnabled { get; } = true; - - public void HandleTraceMessage(MqttNetTraceMessage traceMessage) - { - // Client ID is added to the trace message. - Console.WriteLine($">> [{_clientId}] [{traceMessage.Timestamp:O}] [{traceMessage.ThreadId}] [{traceMessage.Source}] [{traceMessage.Level}]: {traceMessage.Message}"); - if (traceMessage.Exception != null) - { - Console.WriteLine(traceMessage.Exception); - } - } - } - public class RetainedMessageHandler : IMqttServerStorage { private const string Filename = "C:\\MQTT\\RetainedMessages.json"; diff --git a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs index 30d9596..0432a67 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs @@ -1,9 +1,10 @@ using System; using System.Text; using System.Threading.Tasks; -using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; namespace MQTTnet.TestApp.NetCore { @@ -11,20 +12,15 @@ namespace MQTTnet.TestApp.NetCore { public static Task RunAsync() { - MqttNetTrace.TraceMessagePublished += (s, e) => - { - Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); - if (e.TraceMessage.Exception != null) - { - Console.WriteLine(e.TraceMessage.Exception); - } - }; - try { - var options = new MqttServerOptions + var services = new ServiceCollection() + .AddMqttServer() + .AddLogging(); + + services.Configure(options => { - ConnectionValidator = p => + options.ConnectionValidator = p => { if (p.ClientId == "SpecialClient") { @@ -35,22 +31,24 @@ namespace MQTTnet.TestApp.NetCore } return MqttConnectReturnCode.ConnectionAccepted; - } - }; + }; - options.Storage = new RetainedMessageHandler(); + options.Storage = new RetainedMessageHandler(); - options.ApplicationMessageInterceptor = message => - { - if (MqttTopicFilterComparer.IsMatch(message.Topic, "/myTopic/WithTimestamp/#")) + options.ApplicationMessageInterceptor = message => { - // Replace the payload with the timestamp. But also extending a JSON - // based payload with the timestamp is a suitable use case. - message.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O")); - } + if (MqttTopicFilterComparer.IsMatch(message.Topic, "/myTopic/WithTimestamp/#")) + { + // Replace the payload with the timestamp. But also extending a JSON + // based payload with the timestamp is a suitable use case. + message.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O")); + } - return message; - }; + return message; + }; + }); + + var serviceProvider = services.BuildServiceProvider(); //var certificate = new X509Certificate(@"C:\certs\test\test.cer", ""); //options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert); @@ -58,7 +56,7 @@ namespace MQTTnet.TestApp.NetCore //options.DefaultEndpointOptions.IsEnabled = true; //options.TlsEndpointOptions.IsEnabled = false; - var mqttServer = new MqttServerFactory().CreateMqttServer(options); + var mqttServer = serviceProvider.GetRequiredService(); mqttServer.ClientDisconnected += (s, e) => { Console.Write("Client disconnected event fired.");