From 77b80dc53ea8c24ca2754e90c72f7160a94383a4 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 22 Nov 2017 22:30:55 +0100 Subject: [PATCH] Removed 3rd-Party libraries. --- Build/MQTTnet.nuspec | 27 +--- .../MQTTnet.AspnetCore/MqttHostedServer.cs | 16 +- .../MqttWebSocketServerAdapter.cs | 11 +- .../MqttWebSocketServerChannel.cs | 2 +- .../ServiceCollectionExtensions.cs | 2 - .../Diagnostics/MqttNetConsoleTrace.cs | 50 ++++++ .../MqttClientAdapterFactory.cs | 26 ++++ .../Implementations/MqttServerAdapter.Uwp.cs | 13 +- .../Implementations/MqttServerAdapter.cs | 17 +-- .../Implementations/MqttTcpChannel.Uwp.cs | 2 +- .../Implementations/MqttTcpChannel.cs | 2 +- .../Implementations/MqttWebSocketChannel.cs | 13 +- .../MQTTnet.Netstandard.csproj | 25 +-- Frameworks/MQTTnet.NetStandard/MqttFactory.cs | 116 +++----------- .../ServiceCollectionExtensions.cs | 70 --------- ...ationAdapter.cs => IMqttChannelAdapter.cs} | 2 +- .../IMqttCommunicationAdapterFactory.cs | 12 -- ...cationAdapter.cs => MqttChannelAdapter.cs} | 18 +-- ...ons.cs => MqttChannelAdapterExtensions.cs} | 4 +- ...qttServerAdapterClientAcceptedEventArgs.cs | 4 +- ...ommunicationChannel.cs => IMqttChannel.cs} | 2 +- .../Client/IMqttClientAdapterFactory.cs | 10 ++ MQTTnet.Core/Client/IMqttClientFactory.cs | 7 +- MQTTnet.Core/Client/IMqttClientOptions.cs | 4 - MQTTnet.Core/Client/MqttClient.cs | 60 ++++---- MQTTnet.Core/Client/MqttClientOptions.cs | 3 - .../Client/MqttClientOptionsBuilder.cs | 6 - MQTTnet.Core/Client/MqttPacketDispatcher.cs | 8 +- MQTTnet.Core/Diagnostics/IMqttNetLogger.cs | 21 +++ MQTTnet.Core/Diagnostics/MqttNetGlobalLog.cs | 18 +++ MQTTnet.Core/Diagnostics/MqttNetLogLevel.cs | 13 ++ ...ttTraceMessage.cs => MqttNetLogMessage.cs} | 10 +- .../MqttNetLogMessagePublishedEventArgs.cs | 14 ++ MQTTnet.Core/Diagnostics/MqttNetLogger.cs | 75 +++++---- MQTTnet.Core/Diagnostics/MqttNetTrace.cs | 35 ----- .../MqttNetTraceMessagePublishedEventArgs.cs | 14 -- MQTTnet.Core/MQTTnet.Core.csproj | 10 +- .../ManagedClient/ManagedMqttClient.cs | 38 ++--- .../IMqttClientRetainedMessageManager.cs | 15 -- .../Server/IMqttClientSesssionFactory.cs | 7 - MQTTnet.Core/Server/IMqttServer.cs | 2 +- MQTTnet.Core/Server/IMqttServerFactory.cs | 6 +- .../Server/MqttClientPendingMessagesQueue.cs | 24 +-- MQTTnet.Core/Server/MqttClientSession.cs | 44 +++--- .../Server/MqttClientSessionsManager.cs | 44 +++--- .../Server/MqttClientSubscriptionsManager.cs | 5 +- ...ager.cs => MqttRetainedMessagesManager.cs} | 21 ++- MQTTnet.Core/Server/MqttServer.cs | 105 +++++++------ .../MQTTnet.Core.Tests.csproj | 2 +- .../MqttLoggerProviderTest.cs | 51 ------- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 143 +++++------------- .../MqttSubscriptionsManagerTests.cs | 9 +- .../ServiceCollectionTest.cs | 30 ---- .../TestClientSessionFactory.cs | 14 -- Tests/MQTTnet.Core.Tests/TestLogger.cs | 26 ---- .../TestMqttCommunicationAdapter.cs | 2 +- .../TestMqttCommunicationAdapterFactory.cs | 17 +-- .../TestMqttServerAdapter.cs | 6 +- Tests/MQTTnet.TestApp.NetCore/ClientTest.cs | 4 +- .../MQTTnet.TestApp.NetCore.csproj | 6 - .../ManagedClientTest.cs | 12 +- .../PerformanceTest.cs | 57 ++----- Tests/MQTTnet.TestApp.NetCore/Program.cs | 3 - Tests/MQTTnet.TestApp.NetCore/ServerTest.cs | 47 +++--- .../MQTTnet.TestApp.UniversalWindows.csproj | 6 - .../MainPage.xaml.cs | 119 ++++++--------- 66 files changed, 614 insertions(+), 993 deletions(-) create mode 100644 Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetConsoleTrace.cs create mode 100644 Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs delete mode 100644 Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs rename MQTTnet.Core/Adapter/{IMqttCommunicationAdapter.cs => IMqttChannelAdapter.cs} (92%) delete mode 100644 MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs rename MQTTnet.Core/Adapter/{MqttChannelCommunicationAdapter.cs => MqttChannelAdapter.cs} (89%) rename MQTTnet.Core/Adapter/{MqttCommunicationAdapterExtensions.cs => MqttChannelAdapterExtensions.cs} (58%) rename MQTTnet.Core/Channel/{IMqttCommunicationChannel.cs => IMqttChannel.cs} (83%) create mode 100644 MQTTnet.Core/Client/IMqttClientAdapterFactory.cs create mode 100644 MQTTnet.Core/Diagnostics/IMqttNetLogger.cs create mode 100644 MQTTnet.Core/Diagnostics/MqttNetGlobalLog.cs create mode 100644 MQTTnet.Core/Diagnostics/MqttNetLogLevel.cs rename MQTTnet.Core/Diagnostics/{MqttTraceMessage.cs => MqttNetLogMessage.cs} (61%) create mode 100644 MQTTnet.Core/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs delete mode 100644 MQTTnet.Core/Diagnostics/MqttNetTrace.cs delete mode 100644 MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs delete mode 100644 MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs delete mode 100644 MQTTnet.Core/Server/IMqttClientSesssionFactory.cs rename MQTTnet.Core/Server/{MqttClientRetainedMessagesManager.cs => MqttRetainedMessagesManager.cs} (80%) delete mode 100644 Tests/MQTTnet.Core.Tests/MqttLoggerProviderTest.cs delete mode 100644 Tests/MQTTnet.Core.Tests/ServiceCollectionTest.cs delete mode 100644 Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs delete mode 100644 Tests/MQTTnet.Core.Tests/TestLogger.cs diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index faccaef..8dd284e 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -2,7 +2,7 @@ MQTTnet - 2.5.2 + 2.5.3 Christian Kratky Christian Kratky https://github.com/chkr1011/MQTTnet/blob/master/LICENSE @@ -10,7 +10,9 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * [Core] Refactored trace messages. + * [Core] Removed all dependencies to other libraries (BREAKING CHANGE!). +* [Core] Updated SDK libraries. +* [Client] Fixed broken support for WebSocketSecure connections (Thanks to @StAI). Copyright Christian Kratky 2016-2017 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin @@ -18,43 +20,26 @@ - - - - - + - - - - - - - - + - - - - - - diff --git a/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs b/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs index 8d00518..e21b8fe 100644 --- a/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs +++ b/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs @@ -2,29 +2,27 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Server; namespace MQTTnet.AspNetCore { public class MqttHostedServer : MqttServer, IHostedService { + private readonly MqttServerOptions _options; + public MqttHostedServer( - IOptions options, + MqttServerOptions options, IEnumerable adapters, - ILogger logger, - MqttClientSessionsManager clientSessionsManager, - IMqttClientRetainedMessageManager clientRetainedMessageManager - ) - : base(options, adapters, logger, clientSessionsManager, clientRetainedMessageManager) + IMqttNetLogger logger) : base(adapters, logger) { + _options = options; } public Task StartAsync(CancellationToken cancellationToken) { - return StartAsync(); + return StartAsync(_options); } public Task StopAsync(CancellationToken cancellationToken) diff --git a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index 0e3a3d5..2781092 100644 --- a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs +++ b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -2,19 +2,14 @@ using System.Net.WebSockets; using System.Threading.Tasks; using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Serializer; using MQTTnet.Core.Server; namespace MQTTnet.AspNetCore { public class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable { - private readonly IMqttCommunicationAdapterFactory _mqttCommunicationAdapterFactory; - - public MqttWebSocketServerAdapter(IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory) - { - _mqttCommunicationAdapterFactory = mqttCommunicationAdapterFactory ?? throw new ArgumentNullException(nameof(mqttCommunicationAdapterFactory)); - } - public event EventHandler ClientAccepted; public Task StartAsync(MqttServerOptions options) @@ -32,7 +27,7 @@ namespace MQTTnet.AspNetCore if (webSocket == null) throw new ArgumentNullException(nameof(webSocket)); var channel = new MqttWebSocketServerChannel(webSocket); - var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerCommunicationAdapter(channel); + var clientAdapter = new MqttChannelAdapter(channel, new MqttPacketSerializer(), new MqttNetLogger()); var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter); ClientAccepted?.Invoke(this, eventArgs); diff --git a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs index 54cfb0b..55b4940 100644 --- a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs +++ b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs @@ -8,7 +8,7 @@ using MQTTnet.Implementations; namespace MQTTnet.AspNetCore { - public class MqttWebSocketServerChannel : IMqttCommunicationChannel, IDisposable + public class MqttWebSocketServerChannel : IMqttChannel, IDisposable { private WebSocket _webSocket; diff --git a/Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs b/Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs index f915ab4..f39b1cf 100644 --- a/Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs +++ b/Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs @@ -9,8 +9,6 @@ namespace MQTTnet.AspNetCore { public static IServiceCollection AddHostedMqttServer(this IServiceCollection services) { - services.AddMqttServerServices(); - services.AddSingleton(s => s.GetService()); services.AddSingleton(s => s.GetService()); services.AddSingleton(); diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetConsoleTrace.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetConsoleTrace.cs new file mode 100644 index 0000000..d660b17 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetConsoleTrace.cs @@ -0,0 +1,50 @@ +using System; +using System.Text; +using MQTTnet.Core.Diagnostics; + +namespace MQTTnet.Diagnostics +{ + public static class MqttNetConsoleTrace + { + private static readonly object Lock = new object(); + + public static void ForwardToConsole() + { + MqttNetGlobalLog.LogMessagePublished -= PrintToConsole; + MqttNetGlobalLog.LogMessagePublished += PrintToConsole; + } + + private static void PrintToConsole(object sender, MqttNetLogMessagePublishedEventArgs e) + { + var output = new StringBuilder(); + output.AppendLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); + if (e.TraceMessage.Exception != null) + { + output.AppendLine(e.TraceMessage.Exception.ToString()); + } + + lock (Lock) + { + var backupColor = Console.ForegroundColor; + switch (e.TraceMessage.Level) + { + case MqttNetLogLevel.Error: + Console.ForegroundColor = ConsoleColor.Red; + break; + case MqttNetLogLevel.Warning: + Console.ForegroundColor = ConsoleColor.Yellow; + break; + case MqttNetLogLevel.Info: + Console.ForegroundColor = ConsoleColor.Green; + break; + case MqttNetLogLevel.Verbose: + Console.ForegroundColor = ConsoleColor.Gray; + break; + } + + Console.Write(output); + Console.ForegroundColor = backupColor; + } + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs new file mode 100644 index 0000000..9884c86 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs @@ -0,0 +1,26 @@ +using System; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Serializer; + +namespace MQTTnet.Implementations +{ + public class MqttClientAdapterFactory : IMqttClientAdapterFactory + { + public IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + switch (options) + { + case MqttClientTcpOptions tcpOptions: + return new MqttChannelAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer(), logger); + case MqttClientWebSocketOptions webSocketOptions: + return new MqttChannelAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer(), logger); + default: + throw new NotSupportedException(); + } + } + } +} diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.Uwp.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.Uwp.cs index 658cfe8..1b8a43c 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.Uwp.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.Uwp.cs @@ -4,20 +4,19 @@ using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Server; using Windows.Networking.Sockets; -using Microsoft.Extensions.Logging; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Serializer; namespace MQTTnet.Implementations { public class MqttServerAdapter : IMqttServerAdapter, IDisposable { - private readonly ILogger _logger; - private readonly IMqttCommunicationAdapterFactory _communicationAdapterFactory; + private readonly IMqttNetLogger _logger; private StreamSocketListener _defaultEndpointSocket; - public MqttServerAdapter(ILogger logger, IMqttCommunicationAdapterFactory communicationAdapterFactory) + public MqttServerAdapter(IMqttNetLogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _communicationAdapterFactory = communicationAdapterFactory ?? throw new ArgumentNullException(nameof(communicationAdapterFactory)); } public event EventHandler ClientAccepted; @@ -63,12 +62,12 @@ namespace MQTTnet.Implementations { try { - var clientAdapter = _communicationAdapterFactory.CreateServerCommunicationAdapter(new MqttTcpChannel(args.Socket)); + var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer(), _logger); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Error while accepting connection at default endpoint."); + _logger.Error(exception, "Error while accepting connection at default endpoint."); } } } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index e3f048d..3b8515c 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -9,24 +9,23 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Server; -using Microsoft.Extensions.Logging; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Serializer; namespace MQTTnet.Implementations { public class MqttServerAdapter : IMqttServerAdapter, IDisposable { - private readonly ILogger _logger; - private readonly IMqttCommunicationAdapterFactory _communicationAdapterFactory; + private readonly IMqttNetLogger _logger; private CancellationTokenSource _cancellationTokenSource; private Socket _defaultEndpointSocket; private Socket _tlsEndpointSocket; private X509Certificate2 _tlsCertificate; - public MqttServerAdapter(ILogger logger, IMqttCommunicationAdapterFactory communicationAdapterFactory) + public MqttServerAdapter(IMqttNetLogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _communicationAdapterFactory = communicationAdapterFactory ?? throw new ArgumentNullException(nameof(communicationAdapterFactory)); } public event EventHandler ClientAccepted; @@ -103,12 +102,12 @@ namespace MQTTnet.Implementations #else var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false); #endif - var clientAdapter = _communicationAdapterFactory.CreateServerCommunicationAdapter(new MqttTcpChannel(clientSocket, null)); + var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer(), _logger); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Error while accepting connection at default endpoint."); + _logger.Error(exception, "Error while accepting connection at default endpoint."); //excessive CPU consumed if in endless loop of socket errors await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); @@ -131,12 +130,12 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); - var clientAdapter = _communicationAdapterFactory.CreateServerCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream)); + var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer(), _logger); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Error while accepting connection at TLS endpoint."); + _logger.Error(exception, "Error while accepting connection at TLS endpoint."); //excessive CPU consumed if in endless loop of socket errors await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs index 06fa137..964bfbf 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs @@ -13,7 +13,7 @@ using MQTTnet.Core.Client; namespace MQTTnet.Implementations { - public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable + public sealed class MqttTcpChannel : IMqttChannel, IDisposable { private readonly MqttClientTcpOptions _options; private StreamSocket _socket; diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index a7e4026..ce9dcf6 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -12,7 +12,7 @@ using System.Linq; namespace MQTTnet.Implementations { - public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable + public sealed class MqttTcpChannel : IMqttChannel, IDisposable { private readonly MqttClientTcpOptions _options; diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index 8f56c94..9046bb9 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -9,7 +9,7 @@ using System.Threading.Tasks; namespace MQTTnet.Implementations { - public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable + public sealed class MqttWebSocketChannel : IMqttChannel, IDisposable { // ReSharper disable once MemberCanBePrivate.Global // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global @@ -29,9 +29,16 @@ namespace MQTTnet.Implementations public async Task ConnectAsync() { var uri = _options.Uri; - if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase)) + if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase) && !uri.StartsWith("wss://", StringComparison.OrdinalIgnoreCase)) { - uri = "ws://" + uri; + if (!_options.TlsOptions.UseTls) + { + uri = "ws://" + uri; + } + else + { + uri = "wss://" + uri; + } } _webSocket = new ClientWebSocket(); diff --git a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj index 6349d3e..9b6211d 100644 --- a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj +++ b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj @@ -4,8 +4,10 @@ netstandard1.3;netstandard2.0;net452;net461;uap10.0 MQTTnet MQTTnet - 2.5.2.0 - 2.5.2.0 + False + Full + 2.5.3.0 + 2.5.3.0 0.0.0.0 @@ -31,9 +33,6 @@ - - - @@ -42,36 +41,22 @@ - - - - - - - - - - + - - - - - \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs index 6037929..99e57af 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs @@ -1,130 +1,52 @@ using System; +using System.Collections.Generic; using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; -using MQTTnet.Core.Serializer; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; using MQTTnet.Implementations; using MQTTnet.Core.ManagedClient; using MQTTnet.Core.Server; -using MQTTnet.Core.Channel; +using MQTTnet.Core.Diagnostics; namespace MQTTnet { - public class MqttFactory : IMqttCommunicationAdapterFactory, IMqttClientSesssionFactory, IMqttClientFactory, IMqttServerFactory + public class MqttFactory : IMqttClientFactory, IMqttServerFactory { - private readonly IServiceProvider _serviceProvider; - - public MqttFactory() - : this(BuildServiceProvider()) - { - } - - public MqttFactory(IServiceProvider serviceProvider) - { - _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); - } - - public ILoggerFactory GetLoggerFactory() - { - return _serviceProvider.GetRequiredService(); - } - - public IMqttCommunicationAdapter CreateClientCommunicationAdapter(IMqttClientOptions options) - { - var logger = _serviceProvider.GetRequiredService>(); - return new MqttChannelCommunicationAdapter(CreateCommunicationChannel(options.ChannelOptions), CreateSerializer(options.ProtocolVersion), logger); - } - - public IMqttCommunicationAdapter CreateServerCommunicationAdapter(IMqttCommunicationChannel channel) - { - var serializer = _serviceProvider.GetRequiredService(); - var logger = _serviceProvider.GetRequiredService>(); - return new MqttChannelCommunicationAdapter(channel, serializer, logger); - } - - public IMqttCommunicationChannel CreateCommunicationChannel(IMqttClientChannelOptions options) - { - if (options == null) throw new ArgumentNullException(nameof(options)); - - switch (options) - { - case MqttClientTcpOptions tcpOptions: - return CreateTcpChannel(tcpOptions); - case MqttClientWebSocketOptions webSocketOptions: - return CreateWebSocketChannel(webSocketOptions); - default: - throw new NotSupportedException(); - } - } - - public MqttTcpChannel CreateTcpChannel(MqttClientTcpOptions tcpOptions) + public IMqttClient CreateMqttClient() { - return new MqttTcpChannel(tcpOptions); + return CreateMqttClient(new MqttNetLogger()); } - public MqttWebSocketChannel CreateWebSocketChannel(MqttClientWebSocketOptions webSocketOptions) + public IMqttClient CreateMqttClient(IMqttNetLogger logger) { - return new MqttWebSocketChannel(webSocketOptions); - } + if (logger == null) throw new ArgumentNullException(nameof(logger)); - public MqttPacketSerializer CreateSerializer(MqttProtocolVersion protocolVersion) - { - return new MqttPacketSerializer - { - ProtocolVersion = protocolVersion - }; + return new MqttClient(new MqttClientAdapterFactory(), logger); } - public MqttClientSession CreateClientSession(string clientId, MqttClientSessionsManager clientSessionsManager) + public IManagedMqttClient CreateManagedMqttClient() { - return new MqttClientSession( - clientId, - _serviceProvider.GetRequiredService>(), - clientSessionsManager, - _serviceProvider.GetRequiredService(), - _serviceProvider.GetRequiredService>(), - _serviceProvider.GetRequiredService>()); + return new ManagedMqttClient(CreateMqttClient(), new MqttNetLogger()); } - public IMqttClient CreateMqttClient() + public IManagedMqttClient CreateManagedMqttClient(IMqttNetLogger logger) { - return _serviceProvider.GetRequiredService(); - } + if (logger == null) throw new ArgumentNullException(nameof(logger)); - public IManagedMqttClient CreateManagedMqttClient() - { - return _serviceProvider.GetRequiredService(); + return new ManagedMqttClient(CreateMqttClient(), logger); } public IMqttServer CreateMqttServer() { - return _serviceProvider.GetRequiredService(); - } - - public IMqttServer CreateMqttServer(Action configure) - { - if (configure == null) throw new ArgumentNullException(nameof(configure)); - - var options = _serviceProvider.GetRequiredService>(); - configure(options.Value); - - return _serviceProvider.GetRequiredService(); + var logger = new MqttNetLogger(); + return CreateMqttServer(new List { new MqttServerAdapter(logger) }, logger); } - private static IServiceProvider BuildServiceProvider() + public IMqttServer CreateMqttServer(IEnumerable adapters, IMqttNetLogger logger) { - var serviceProvider = new ServiceCollection() - .AddMqttClient() - .AddMqttServer() - .AddLogging() - .BuildServiceProvider(); - - serviceProvider.GetRequiredService() - .AddMqttTrace(); + if (adapters == null) throw new ArgumentNullException(nameof(adapters)); + if (logger == null) throw new ArgumentNullException(nameof(logger)); - return serviceProvider; + return new MqttServer(adapters, logger); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs deleted file mode 100644 index 4e6639a..0000000 --- a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs +++ /dev/null @@ -1,70 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using MQTTnet.Core.Adapter; -using MQTTnet.Core.Client; -using MQTTnet.Core.ManagedClient; -using MQTTnet.Core.Serializer; -using MQTTnet.Core.Server; -using MQTTnet.Implementations; -using System; -using Microsoft.Extensions.Logging; -using MQTTnet.Core.Diagnostics; - -namespace MQTTnet -{ - public static class ServiceCollectionExtensions - { - public static IServiceCollection AddMqttServer(this IServiceCollection services) - { - services.AddMqttServerServices(); - - services.AddSingleton(s => s.GetService()); - services.AddSingleton(); - - return services; - } - - public static IServiceCollection AddMqttServerServices(this IServiceCollection services) - { - services.AddOptions(); - services.AddSingleton(); - services.AddSingleton(s => s.GetService()); - services.AddSingleton(s => s.GetService()); - - services.AddTransient(); - services.AddTransient(); - - services.AddSingleton(); - services.AddTransient(); - services.AddSingleton(); - return services; - } - - public static IServiceCollection AddMqttServer(this IServiceCollection services, Action configureOptions) - { - return services - .AddMqttServer() - .Configure(configureOptions); - } - - public static IServiceCollection AddMqttClient(this IServiceCollection services) - { - services.AddSingleton(); - services.AddSingleton(s => s.GetService()); - - services.AddTransient(); - services.AddTransient(); - services.AddTransient(); - services.AddTransient(); - services.AddTransient(); - services.AddTransient(); - - return services; - } - - public static ILoggerFactory AddMqttTrace(this ILoggerFactory factory) - { - factory.AddProvider(new MqttNetTrace()); - return factory; - } - } -} diff --git a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs b/MQTTnet.Core/Adapter/IMqttChannelAdapter.cs similarity index 92% rename from MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs rename to MQTTnet.Core/Adapter/IMqttChannelAdapter.cs index 78586f6..13908f6 100644 --- a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/IMqttChannelAdapter.cs @@ -7,7 +7,7 @@ using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Adapter { - public interface IMqttCommunicationAdapter + public interface IMqttChannelAdapter { IMqttPacketSerializer PacketSerializer { get; } diff --git a/MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs b/MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs deleted file mode 100644 index 9609881..0000000 --- a/MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs +++ /dev/null @@ -1,12 +0,0 @@ -using MQTTnet.Core.Client; -using MQTTnet.Core.Channel; - -namespace MQTTnet.Core.Adapter -{ - public interface IMqttCommunicationAdapterFactory - { - IMqttCommunicationAdapter CreateClientCommunicationAdapter(IMqttClientOptions options); - - IMqttCommunicationAdapter CreateServerCommunicationAdapter(IMqttCommunicationChannel channel); - } -} \ No newline at end of file diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelAdapter.cs similarity index 89% rename from MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs rename to MQTTnet.Core/Adapter/MqttChannelAdapter.cs index 5b0038d..5dfcfc0 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelAdapter.cs @@ -5,23 +5,23 @@ using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Channel; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Serializer; -using Microsoft.Extensions.Logging; namespace MQTTnet.Core.Adapter { - public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter + public class MqttChannelAdapter : IMqttChannelAdapter { private const uint ErrorOperationAborted = 0x800703E3; private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); - private readonly ILogger _logger; - private readonly IMqttCommunicationChannel _channel; + private readonly IMqttNetLogger _logger; + private readonly IMqttChannel _channel; - public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer, ILogger logger) + public MqttChannelAdapter(IMqttChannel channel, IMqttPacketSerializer serializer, IMqttNetLogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _channel = channel ?? throw new ArgumentNullException(nameof(channel)); @@ -32,14 +32,14 @@ namespace MQTTnet.Core.Adapter public async Task ConnectAsync(TimeSpan timeout) { - _logger.LogInformation("Connecting [Timeout={0}]", timeout); + _logger.Info("Connecting [Timeout={0}]", timeout); await ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout)); } public async Task DisconnectAsync(TimeSpan timeout) { - _logger.LogInformation("Disconnecting [Timeout={0}]", timeout); + _logger.Info("Disconnecting [Timeout={0}]", timeout); await ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout)); } @@ -58,7 +58,7 @@ namespace MQTTnet.Core.Adapter continue; } - _logger.LogInformation("TX >>> {0} [Timeout={1}]", packet, timeout); + _logger.Trace("TX >>> {0} [Timeout={1}]", packet, timeout); var writeBuffer = PacketSerializer.Serialize(packet); await _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false); @@ -108,7 +108,7 @@ namespace MQTTnet.Core.Adapter throw new MqttProtocolViolationException("Received malformed packet."); } - _logger.LogInformation("RX <<< {0}", packet); + _logger.Trace("RX <<< {0}", packet); } finally { diff --git a/MQTTnet.Core/Adapter/MqttCommunicationAdapterExtensions.cs b/MQTTnet.Core/Adapter/MqttChannelAdapterExtensions.cs similarity index 58% rename from MQTTnet.Core/Adapter/MqttCommunicationAdapterExtensions.cs rename to MQTTnet.Core/Adapter/MqttChannelAdapterExtensions.cs index 2142fd0..f271248 100644 --- a/MQTTnet.Core/Adapter/MqttCommunicationAdapterExtensions.cs +++ b/MQTTnet.Core/Adapter/MqttChannelAdapterExtensions.cs @@ -5,9 +5,9 @@ using MQTTnet.Core.Packets; namespace MQTTnet.Core.Adapter { - public static class MqttCommunicationAdapterExtensions + public static class MqttChannelAdapterExtensions { - public static Task SendPacketsAsync(this IMqttCommunicationAdapter adapter, TimeSpan timeout, CancellationToken cancellationToken, params MqttBasePacket[] packets) + public static Task SendPacketsAsync(this IMqttChannelAdapter adapter, TimeSpan timeout, CancellationToken cancellationToken, params MqttBasePacket[] packets) { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); diff --git a/MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs b/MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs index 6caca0e..341f632 100644 --- a/MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs +++ b/MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs @@ -5,12 +5,12 @@ namespace MQTTnet.Core.Adapter { public class MqttServerAdapterClientAcceptedEventArgs : EventArgs { - public MqttServerAdapterClientAcceptedEventArgs(IMqttCommunicationAdapter client) + public MqttServerAdapterClientAcceptedEventArgs(IMqttChannelAdapter client) { Client = client ?? throw new ArgumentNullException(nameof(client)); } - public IMqttCommunicationAdapter Client { get; } + public IMqttChannelAdapter Client { get; } public Task SessionTask { get; set; } } diff --git a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs b/MQTTnet.Core/Channel/IMqttChannel.cs similarity index 83% rename from MQTTnet.Core/Channel/IMqttCommunicationChannel.cs rename to MQTTnet.Core/Channel/IMqttChannel.cs index e7a74c1..92c0681 100644 --- a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs +++ b/MQTTnet.Core/Channel/IMqttChannel.cs @@ -3,7 +3,7 @@ using System.IO; namespace MQTTnet.Core.Channel { - public interface IMqttCommunicationChannel + public interface IMqttChannel { Stream SendStream { get; } Stream ReceiveStream { get; } diff --git a/MQTTnet.Core/Client/IMqttClientAdapterFactory.cs b/MQTTnet.Core/Client/IMqttClientAdapterFactory.cs new file mode 100644 index 0000000..3bd6143 --- /dev/null +++ b/MQTTnet.Core/Client/IMqttClientAdapterFactory.cs @@ -0,0 +1,10 @@ +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; + +namespace MQTTnet.Core.Client +{ + public interface IMqttClientAdapterFactory + { + IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger); + } +} diff --git a/MQTTnet.Core/Client/IMqttClientFactory.cs b/MQTTnet.Core/Client/IMqttClientFactory.cs index c8b5ef3..8acdb91 100644 --- a/MQTTnet.Core/Client/IMqttClientFactory.cs +++ b/MQTTnet.Core/Client/IMqttClientFactory.cs @@ -1,4 +1,5 @@ -using MQTTnet.Core.ManagedClient; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.ManagedClient; namespace MQTTnet.Core.Client { @@ -6,6 +7,10 @@ namespace MQTTnet.Core.Client { IMqttClient CreateMqttClient(); + IMqttClient CreateMqttClient(IMqttNetLogger logger); + IManagedMqttClient CreateManagedMqttClient(); + + IManagedMqttClient CreateManagedMqttClient(IMqttNetLogger logger); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/IMqttClientOptions.cs b/MQTTnet.Core/Client/IMqttClientOptions.cs index ae77192..9139d5c 100644 --- a/MQTTnet.Core/Client/IMqttClientOptions.cs +++ b/MQTTnet.Core/Client/IMqttClientOptions.cs @@ -7,10 +7,6 @@ namespace MQTTnet.Core.Client { string ClientId { get; } - /// - /// The LogId is used to create a scope to correlate logging. If no value is provided the ClientId is used instead - /// - string LogId { get; } IMqttClientCredentials Credentials { get; } bool CleanSession { get; } MqttApplicationMessage WillMessage { get; } diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index fcab3fb..f270a38 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -4,33 +4,34 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; -using Microsoft.Extensions.Logging; namespace MQTTnet.Core.Client { public class MqttClient : IMqttClient { private readonly HashSet _unacknowledgedPublishPackets = new HashSet(); + + private readonly IMqttClientAdapterFactory _adapterFactory; private readonly MqttPacketDispatcher _packetDispatcher; - private readonly IMqttCommunicationAdapterFactory _communicationAdapterFactory; - private readonly ILogger _logger; + private readonly IMqttNetLogger _logger; private IMqttClientOptions _options; private bool _isReceivingPackets; private int _latestPacketIdentifier; private CancellationTokenSource _cancellationTokenSource; - private IMqttCommunicationAdapter _adapter; - private IDisposable _scopeHandle; + private IMqttChannelAdapter _adapter; - public MqttClient(IMqttCommunicationAdapterFactory communicationAdapterFactory, ILogger logger, MqttPacketDispatcher packetDispatcher) + public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger) { - _communicationAdapterFactory = communicationAdapterFactory ?? throw new ArgumentNullException(nameof(communicationAdapterFactory)); + _adapterFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _packetDispatcher = packetDispatcher ?? throw new ArgumentNullException(nameof(packetDispatcher)); + + _packetDispatcher = new MqttPacketDispatcher(logger); } public event EventHandler Connected; @@ -53,17 +54,16 @@ namespace MQTTnet.Core.Client _latestPacketIdentifier = 0; _packetDispatcher.Reset(); - _adapter = _communicationAdapterFactory.CreateClientCommunicationAdapter(options); - - _scopeHandle = _logger.BeginScope(options.LogId ?? options.ClientId); - _logger.LogTrace("Trying to connect with server."); + _adapter = _adapterFactory.CreateClientAdapter(options.ChannelOptions, _logger); + + _logger.Trace("Trying to connect with server."); await _adapter.ConnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); - _logger.LogTrace("Connection with server established."); + _logger.Trace("Connection with server established."); await StartReceivingPacketsAsync(_cancellationTokenSource.Token).ConfigureAwait(false); var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false); - _logger.LogTrace("MQTT connection with server established."); + _logger.Trace("MQTT connection with server established."); if (_options.KeepAlivePeriod != TimeSpan.Zero) { @@ -76,7 +76,7 @@ namespace MQTTnet.Core.Client } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Error while connecting with server."); + _logger.Error(exception, "Error while connecting with server."); await DisconnectInternalAsync().ConfigureAwait(false); throw; } @@ -215,8 +215,6 @@ namespace MQTTnet.Core.Client private async Task DisconnectInternalAsync() { - _scopeHandle?.Dispose(); - var clientWasConnected = IsConnected; IsConnected = false; @@ -233,15 +231,15 @@ namespace MQTTnet.Core.Client try { await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); - _logger.LogInformation("Disconnected from adapter."); + _logger.Info("Disconnected from adapter."); } catch (Exception exception) { - _logger.LogWarning(new EventId(), exception, "Error while disconnecting from adapter."); + _logger.Warning(exception, "Error while disconnecting from adapter."); } finally { - _logger.LogInformation("Disconnected."); + _logger.Info("Disconnected."); Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected)); } } @@ -250,7 +248,7 @@ namespace MQTTnet.Core.Client { try { - _logger.LogInformation("Received <<< {0}", packet); + _logger.Info("Received <<< {0}", packet); if (packet is MqttPingReqPacket) { @@ -280,7 +278,7 @@ namespace MQTTnet.Core.Client } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Unhandled exception while processing received packet."); + _logger.Error(exception, "Unhandled exception while processing received packet."); } } @@ -293,7 +291,7 @@ namespace MQTTnet.Core.Client } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Unhandled exception while handling application message."); + _logger.Error(exception, "Unhandled exception while handling application message."); } } @@ -357,7 +355,7 @@ namespace MQTTnet.Core.Client private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) { - _logger.LogInformation("Start sending keep alive packets."); + _logger.Info("Start sending keep alive packets."); try { @@ -388,23 +386,23 @@ namespace MQTTnet.Core.Client return; } - _logger.LogWarning(new EventId(), exception, "MQTT communication exception while sending/receiving keep alive packets."); + _logger.Warning(exception, "MQTT communication exception while sending/receiving keep alive packets."); await DisconnectInternalAsync().ConfigureAwait(false); } catch (Exception exception) { - _logger.LogWarning(new EventId(), exception, "Unhandled exception while sending/receiving keep alive packets."); + _logger.Warning(exception, "Unhandled exception while sending/receiving keep alive packets."); await DisconnectInternalAsync().ConfigureAwait(false); } finally { - _logger.LogInformation("Stopped sending keep alive packets."); + _logger.Info("Stopped sending keep alive packets."); } } private async Task ReceivePacketsAsync(CancellationToken cancellationToken) { - _logger.LogInformation("Start receiving packets."); + _logger.Info("Start receiving packets."); try { @@ -438,17 +436,17 @@ namespace MQTTnet.Core.Client return; } - _logger.LogWarning(new EventId(), exception, "MQTT communication exception while receiving packets."); + _logger.Warning(exception, "MQTT communication exception while receiving packets."); await DisconnectInternalAsync().ConfigureAwait(false); } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Unhandled exception while receiving packets."); + _logger.Error(exception, "Unhandled exception while receiving packets."); await DisconnectInternalAsync().ConfigureAwait(false); } finally { - _logger.LogInformation(nameof(MqttClient), "Stopped receiving packets."); + _logger.Info("Stopped receiving packets."); } } diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs index 1b1c1bb..04cf8bd 100644 --- a/MQTTnet.Core/Client/MqttClientOptions.cs +++ b/MQTTnet.Core/Client/MqttClientOptions.cs @@ -9,9 +9,6 @@ namespace MQTTnet.Core.Client public string ClientId { get; set; } = Guid.NewGuid().ToString("N"); - /// - public string LogId { get; set; } - public bool CleanSession { get; set; } = true; public IMqttClientCredentials Credentials { get; set; } = new MqttClientCredentials(); diff --git a/MQTTnet.Core/Client/MqttClientOptionsBuilder.cs b/MQTTnet.Core/Client/MqttClientOptionsBuilder.cs index fb18cc0..429a3fa 100644 --- a/MQTTnet.Core/Client/MqttClientOptionsBuilder.cs +++ b/MQTTnet.Core/Client/MqttClientOptionsBuilder.cs @@ -12,12 +12,6 @@ namespace MQTTnet.Core.Client private MqttClientTlsOptions _tlsOptions; - public MqttClientOptionsBuilder WithLogId(string value) - { - _options.LogId = value; - return this; - } - public MqttClientOptionsBuilder WithProtocolVersion(MqttProtocolVersion value) { _options.ProtocolVersion = value; diff --git a/MQTTnet.Core/Client/MqttPacketDispatcher.cs b/MQTTnet.Core/Client/MqttPacketDispatcher.cs index a999d69..dd9c714 100644 --- a/MQTTnet.Core/Client/MqttPacketDispatcher.cs +++ b/MQTTnet.Core/Client/MqttPacketDispatcher.cs @@ -1,10 +1,10 @@ using System; using System.Collections.Concurrent; using System.Threading.Tasks; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; -using Microsoft.Extensions.Logging; namespace MQTTnet.Core.Client { @@ -12,9 +12,9 @@ namespace MQTTnet.Core.Client { private readonly ConcurrentDictionary> _packetByResponseType = new ConcurrentDictionary>(); private readonly ConcurrentDictionary>> _packetByResponseTypeAndIdentifier = new ConcurrentDictionary>>(); - private readonly ILogger _logger; + private readonly IMqttNetLogger _logger; - public MqttPacketDispatcher(ILogger logger) + public MqttPacketDispatcher(IMqttNetLogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } @@ -30,7 +30,7 @@ namespace MQTTnet.Core.Client } catch (MqttCommunicationTimedOutException) { - _logger.LogWarning("Timeout while waiting for packet of type '{0}'.", responseType.Name); + _logger.Warning("Timeout while waiting for packet of type '{0}'.", responseType.Name); throw; } finally diff --git a/MQTTnet.Core/Diagnostics/IMqttNetLogger.cs b/MQTTnet.Core/Diagnostics/IMqttNetLogger.cs new file mode 100644 index 0000000..99f7b19 --- /dev/null +++ b/MQTTnet.Core/Diagnostics/IMqttNetLogger.cs @@ -0,0 +1,21 @@ +using System; + +namespace MQTTnet.Core.Diagnostics +{ + public interface IMqttNetLogger + { + event EventHandler LogMessagePublished; + + void Trace(string message, params object[] parameters); + + void Info(string message, params object[] parameters); + + void Warning(Exception exception, string message, params object[] parameters); + + void Warning(string message, params object[] parameters); + + void Error(Exception exception, string message, params object[] parameters); + + void Error(string message, params object[] parameters); + } +} diff --git a/MQTTnet.Core/Diagnostics/MqttNetGlobalLog.cs b/MQTTnet.Core/Diagnostics/MqttNetGlobalLog.cs new file mode 100644 index 0000000..6432682 --- /dev/null +++ b/MQTTnet.Core/Diagnostics/MqttNetGlobalLog.cs @@ -0,0 +1,18 @@ +using System; + +namespace MQTTnet.Core.Diagnostics +{ + public static class MqttNetGlobalLog + { + public static event EventHandler LogMessagePublished; + + public static bool HasListeners => LogMessagePublished != null; + + public static void Publish(MqttNetLogMessage logMessage) + { + if (logMessage == null) throw new ArgumentNullException(nameof(logMessage)); + + LogMessagePublished?.Invoke(null, new MqttNetLogMessagePublishedEventArgs(logMessage)); + } + } +} diff --git a/MQTTnet.Core/Diagnostics/MqttNetLogLevel.cs b/MQTTnet.Core/Diagnostics/MqttNetLogLevel.cs new file mode 100644 index 0000000..32e5de1 --- /dev/null +++ b/MQTTnet.Core/Diagnostics/MqttNetLogLevel.cs @@ -0,0 +1,13 @@ +namespace MQTTnet.Core.Diagnostics +{ + public enum MqttNetLogLevel + { + Verbose, + + Info, + + Warning, + + Error + } +} diff --git a/MQTTnet.Core/Diagnostics/MqttTraceMessage.cs b/MQTTnet.Core/Diagnostics/MqttNetLogMessage.cs similarity index 61% rename from MQTTnet.Core/Diagnostics/MqttTraceMessage.cs rename to MQTTnet.Core/Diagnostics/MqttNetLogMessage.cs index 852aae4..e1598eb 100644 --- a/MQTTnet.Core/Diagnostics/MqttTraceMessage.cs +++ b/MQTTnet.Core/Diagnostics/MqttNetLogMessage.cs @@ -1,12 +1,12 @@ using System; -using Microsoft.Extensions.Logging; namespace MQTTnet.Core.Diagnostics { - public sealed class MqttNetTraceMessage + public sealed class MqttNetLogMessage { - public MqttNetTraceMessage(DateTime timestamp, int threadId, string source, LogLevel level, string message, Exception exception) + public MqttNetLogMessage(string logId, DateTime timestamp, int threadId, string source, MqttNetLogLevel level, string message, Exception exception) { + LogId = logId; Timestamp = timestamp; ThreadId = threadId; Source = source; @@ -15,13 +15,15 @@ namespace MQTTnet.Core.Diagnostics Exception = exception; } + public string LogId { get; } + public DateTime Timestamp { get; } public int ThreadId { get; } public string Source { get; } - public LogLevel Level { get; } + public MqttNetLogLevel Level { get; } public string Message { get; } diff --git a/MQTTnet.Core/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs b/MQTTnet.Core/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs new file mode 100644 index 0000000..46c5468 --- /dev/null +++ b/MQTTnet.Core/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs @@ -0,0 +1,14 @@ +using System; + +namespace MQTTnet.Core.Diagnostics +{ + public sealed class MqttNetLogMessagePublishedEventArgs : EventArgs + { + public MqttNetLogMessagePublishedEventArgs(MqttNetLogMessage logMessage) + { + TraceMessage = logMessage ?? throw new ArgumentNullException(nameof(logMessage)); + } + + public MqttNetLogMessage TraceMessage { get; } + } +} diff --git a/MQTTnet.Core/Diagnostics/MqttNetLogger.cs b/MQTTnet.Core/Diagnostics/MqttNetLogger.cs index 32d9609..a879af1 100644 --- a/MQTTnet.Core/Diagnostics/MqttNetLogger.cs +++ b/MQTTnet.Core/Diagnostics/MqttNetLogger.cs @@ -1,52 +1,73 @@ using System; -using Microsoft.Extensions.Logging; namespace MQTTnet.Core.Diagnostics { - public class MqttNetLogger : ILogger + public class MqttNetLogger : IMqttNetLogger { - private readonly string _categoryName; - private readonly MqttNetTrace _mqttNetTrace; + private readonly string _logId; - public MqttNetLogger(string categoryName, MqttNetTrace mqttNetTrace) + public MqttNetLogger(string logId = null) { - _categoryName = categoryName; - _mqttNetTrace = mqttNetTrace; + _logId = logId; } + + public event EventHandler LogMessagePublished; - public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) + public void Trace(string message, params object[] parameters) { - if (formatter == null) - { - throw new ArgumentNullException(nameof(formatter)); - } + Publish(MqttNetLogLevel.Verbose, null, message, parameters); + } - if (!MqttNetTrace.HasListeners) - { - return; - } + public void Info(string message, params object[] parameters) + { + Publish(MqttNetLogLevel.Info, null, message, parameters); + } + + public void Warning(Exception exception, string message, params object[] parameters) + { + Publish(MqttNetLogLevel.Warning, exception, message, parameters); + } - var message = formatter(state, exception); - var traceMessage = new MqttNetTraceMessage(DateTime.Now, Environment.CurrentManagedThreadId, _categoryName, logLevel, message, exception); - _mqttNetTrace.Publish(traceMessage); + public void Warning(string message, params object[] parameters) + { + Warning(null, message, parameters); } - public bool IsEnabled(LogLevel logLevel) + public void Error(Exception exception, string message, params object[] parameters) { - return MqttNetTrace.HasListeners; + Publish(MqttNetLogLevel.Error, exception, message, parameters); } - //not supported: async local requires netstandard1.3 - //for implementation see https://github.com/aspnet/Logging/blob/dev/src/Microsoft.Extensions.Logging.Console/ConsoleLogScope.cs - public IDisposable BeginScope(TState state) + public void Error(string message, params object[] parameters) { - return new DisposableScope(); + Warning(null, message, parameters); } - private class DisposableScope : IDisposable + private void Publish(MqttNetLogLevel logLevel, Exception exception, string message, object[] parameters) { - public void Dispose() + var hasLocalListeners = LogMessagePublished != null; + var hasGlobalListeners = MqttNetGlobalLog.HasListeners; + + if (!hasLocalListeners && !hasGlobalListeners) + { + return; + } + + if (parameters.Length > 0) + { + message = string.Format(message, parameters); + } + + var traceMessage = new MqttNetLogMessage(_logId, DateTime.Now, Environment.CurrentManagedThreadId, typeof(TSource).Name, logLevel, message, exception); + + if (hasGlobalListeners) + { + MqttNetGlobalLog.Publish(traceMessage); + } + + if (hasLocalListeners) { + LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(traceMessage)); } } } diff --git a/MQTTnet.Core/Diagnostics/MqttNetTrace.cs b/MQTTnet.Core/Diagnostics/MqttNetTrace.cs deleted file mode 100644 index f75c7a9..0000000 --- a/MQTTnet.Core/Diagnostics/MqttNetTrace.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System; -using System.Collections.Concurrent; -using Microsoft.Extensions.Logging; - -namespace MQTTnet.Core.Diagnostics -{ - public class MqttNetTrace : ILoggerProvider - { - private readonly ConcurrentDictionary _loggers = new ConcurrentDictionary(); - - public static event EventHandler TraceMessagePublished; - - public static bool HasListeners => TraceMessagePublished != null; - - public void Publish(MqttNetTraceMessage traceMessage) - { - TraceMessagePublished?.Invoke(this, new MqttNetTraceMessagePublishedEventArgs(traceMessage)); - } - - public void Dispose() - { - TraceMessagePublished = null; - } - - public ILogger CreateLogger(string categoryName) - { - return _loggers.GetOrAdd(categoryName, CreateLoggerImplementation); - } - - private MqttNetLogger CreateLoggerImplementation(string categoryName) - { - return new MqttNetLogger(categoryName, this); - } - } -} diff --git a/MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs b/MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs deleted file mode 100644 index 5b8f796..0000000 --- a/MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; - -namespace MQTTnet.Core.Diagnostics -{ - public sealed class MqttNetTraceMessagePublishedEventArgs : EventArgs - { - public MqttNetTraceMessagePublishedEventArgs(MqttNetTraceMessage traceMessage) - { - TraceMessage = traceMessage ?? throw new ArgumentNullException(nameof(traceMessage)); - } - - public MqttNetTraceMessage TraceMessage { get; } - } -} diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index d50466f..60afc30 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -17,15 +17,9 @@ - 2.5.2.0 - 2.5.2.0 + 2.5.3.0 + 2.5.3.0 - - - - - - \ No newline at end of file diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs index 4eb40b2..c2e4077 100644 --- a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs @@ -5,9 +5,9 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Protocol; -using Microsoft.Extensions.Logging; namespace MQTTnet.Core.ManagedClient { @@ -18,7 +18,7 @@ namespace MQTTnet.Core.ManagedClient private readonly HashSet _subscriptions = new HashSet(); private readonly IMqttClient _mqttClient; - private readonly ILogger _logger; + private readonly IMqttNetLogger _logger; private CancellationTokenSource _connectionCancellationToken; private CancellationTokenSource _publishingCancellationToken; @@ -26,7 +26,7 @@ namespace MQTTnet.Core.ManagedClient private IManagedMqttClientOptions _options; private bool _subscriptionsNotPushed; - public ManagedMqttClient(ILogger logger, IMqttClient mqttClient) + public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); @@ -52,10 +52,7 @@ namespace MQTTnet.Core.ManagedClient throw new NotSupportedException("The managed client does not support existing sessions."); } - if (_connectionCancellationToken != null) - { - throw new InvalidOperationException("The managed client is already started."); - } + if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started."); _options = options; await _storageManager.SetStorageAsync(_options.Storage).ConfigureAwait(false); @@ -72,10 +69,10 @@ namespace MQTTnet.Core.ManagedClient _connectionCancellationToken = new CancellationTokenSource(); #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Factory.StartNew(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); + Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token).ConfigureAwait(false); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - _logger.LogInformation("Started"); + _logger.Info("Started"); } public Task StopAsync() @@ -83,6 +80,9 @@ namespace MQTTnet.Core.ManagedClient _connectionCancellationToken?.Cancel(false); _connectionCancellationToken = null; + _publishingCancellationToken?.Cancel(false); + _publishingCancellationToken = null; + while (_messageQueue.Any()) { _messageQueue.Take(); @@ -159,7 +159,7 @@ namespace MQTTnet.Core.ManagedClient _publishingCancellationToken = new CancellationTokenSource(); #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Factory.StartNew(() => PublishQueuedMessagesAsync(_publishingCancellationToken.Token), _publishingCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); + Task.Run(async () => await PublishQueuedMessagesAsync(_publishingCancellationToken.Token), _publishingCancellationToken.Token).ConfigureAwait(false); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed continue; @@ -167,7 +167,7 @@ namespace MQTTnet.Core.ManagedClient if (connectionState == ReconnectionResult.StillConnected) { - await Task.Delay(100, _connectionCancellationToken.Token).ConfigureAwait(false); // Consider using the _Disconnected_ event here. (TaskCompletionSource) + await Task.Delay(TimeSpan.FromSeconds(1), _connectionCancellationToken.Token).ConfigureAwait(false); } } } @@ -176,16 +176,16 @@ namespace MQTTnet.Core.ManagedClient } catch (MqttCommunicationException exception) { - _logger.LogWarning(new EventId(), exception, "Communication exception while maintaining connection."); + _logger.Warning(exception, "Communication exception while maintaining connection."); } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Unhandled exception while maintaining connection."); + _logger.Error(exception, "Unhandled exception while maintaining connection."); } finally { await _mqttClient.DisconnectAsync().ConfigureAwait(false); - _logger.LogInformation("Stopped"); + _logger.Info("Stopped"); } } @@ -215,7 +215,7 @@ namespace MQTTnet.Core.ManagedClient } finally { - _logger.LogInformation("Stopped publishing messages"); + _logger.Info("Stopped publishing messages"); } } @@ -227,7 +227,7 @@ namespace MQTTnet.Core.ManagedClient } catch (MqttCommunicationException exception) { - _logger.LogWarning(new EventId(), exception, "Publishing application message failed."); + _logger.Warning(exception, "Publishing application message failed."); if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) { @@ -236,13 +236,13 @@ namespace MQTTnet.Core.ManagedClient } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Unhandled exception while publishing queued application message."); + _logger.Error(exception, "Unhandled exception while publishing queued application message."); } } private async Task PushSubscriptionsAsync() { - _logger.LogInformation(nameof(ManagedMqttClient), "Synchronizing subscriptions"); + _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions"); List subscriptions; lock (_subscriptions) @@ -262,7 +262,7 @@ namespace MQTTnet.Core.ManagedClient } catch (Exception exception) { - _logger.LogWarning(new EventId(), exception, "Synchronizing subscriptions failed"); + _logger.Warning(exception, "Synchronizing subscriptions failed"); _subscriptionsNotPushed = true; } } diff --git a/MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs b/MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs deleted file mode 100644 index f400899..0000000 --- a/MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System.Collections.Generic; -using System.Threading.Tasks; -using MQTTnet.Core.Packets; - -namespace MQTTnet.Core.Server -{ - public interface IMqttClientRetainedMessageManager - { - Task LoadMessagesAsync(); - - Task HandleMessageAsync(string clientId, MqttApplicationMessage applicationMessage); - - Task> GetSubscribedMessagesAsync(MqttSubscribePacket subscribePacket); - } -} diff --git a/MQTTnet.Core/Server/IMqttClientSesssionFactory.cs b/MQTTnet.Core/Server/IMqttClientSesssionFactory.cs deleted file mode 100644 index f58dd5e..0000000 --- a/MQTTnet.Core/Server/IMqttClientSesssionFactory.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace MQTTnet.Core.Server -{ - public interface IMqttClientSesssionFactory - { - MqttClientSession CreateClientSession(string sessionId, MqttClientSessionsManager mqttClientSessionsManager); - } -} diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs index 8d86077..936e897 100644 --- a/MQTTnet.Core/Server/IMqttServer.cs +++ b/MQTTnet.Core/Server/IMqttServer.cs @@ -12,7 +12,7 @@ namespace MQTTnet.Core.Server Task> GetConnectedClientsAsync(); - Task StartAsync(); + Task StartAsync(MqttServerOptions options); Task StopAsync(); } } \ No newline at end of file diff --git a/MQTTnet.Core/Server/IMqttServerFactory.cs b/MQTTnet.Core/Server/IMqttServerFactory.cs index 500614b..378dfb0 100644 --- a/MQTTnet.Core/Server/IMqttServerFactory.cs +++ b/MQTTnet.Core/Server/IMqttServerFactory.cs @@ -1,4 +1,6 @@ -using System; +using System.Collections.Generic; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; namespace MQTTnet.Core.Server { @@ -6,6 +8,6 @@ namespace MQTTnet.Core.Server { IMqttServer CreateMqttServer(); - IMqttServer CreateMqttServer(Action configure); + IMqttServer CreateMqttServer(IEnumerable adapters, IMqttNetLogger logger); } } \ No newline at end of file diff --git a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs index 16c9094..bea6039 100644 --- a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs +++ b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs @@ -3,10 +3,10 @@ using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; -using Microsoft.Extensions.Logging; namespace MQTTnet.Core.Server { @@ -15,16 +15,16 @@ namespace MQTTnet.Core.Server private readonly BlockingCollection _pendingPublishPackets = new BlockingCollection(); private readonly MqttServerOptions _options; private readonly MqttClientSession _session; - private readonly ILogger _logger; + private readonly IMqttNetLogger _logger; - public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session, ILogger logger) + public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session, IMqttNetLogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _session = session ?? throw new ArgumentNullException(nameof(session)); _options = options ?? throw new ArgumentNullException(nameof(options)); } - public void Start(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken) + public void Start(IMqttChannelAdapter adapter, CancellationToken cancellationToken) { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); @@ -41,10 +41,10 @@ namespace MQTTnet.Core.Server if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); _pendingPublishPackets.Add(publishPacket); - _logger.LogTrace("Enqueued packet (ClientId: {0}).", _session.ClientId); + _logger.Trace("Enqueued packet (ClientId: {0}).", _session.ClientId); } - private async Task SendPendingPublishPacketsAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken) + private async Task SendPendingPublishPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) { try { @@ -58,11 +58,11 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _session.ClientId); + _logger.Error(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _session.ClientId); } } - private async Task SendPendingPublishPacketAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken) + private async Task SendPendingPublishPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) { MqttPublishPacket packet = null; try @@ -70,24 +70,24 @@ namespace MQTTnet.Core.Server packet = _pendingPublishPackets.Take(cancellationToken); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false); - _logger.LogTrace("Enqueued packet sent (ClientId: {0}).", _session.ClientId); + _logger.Trace("Enqueued packet sent (ClientId: {0}).", _session.ClientId); } catch (Exception exception) { if (exception is MqttCommunicationTimedOutException) { - _logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _session.ClientId); + _logger.Warning(exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _session.ClientId); } else if (exception is MqttCommunicationException) { - _logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _session.ClientId); + _logger.Warning(exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _session.ClientId); } else if (exception is OperationCanceledException) { } else { - _logger.LogError(new EventId(), exception, "Sending publish packet failed (ClientId: {0}).", _session.ClientId); + _logger.Error(exception, "Sending publish packet failed (ClientId: {0}).", _session.ClientId); } if (packet != null && packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index ad1416e..3646c40 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -3,13 +3,12 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Serializer; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; namespace MQTTnet.Core.Server { @@ -21,28 +20,27 @@ namespace MQTTnet.Core.Server private readonly MqttClientSessionsManager _sessionsManager; private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; private readonly MqttServerOptions _options; - private readonly ILogger _logger; + private readonly IMqttNetLogger _logger; - private IMqttCommunicationAdapter _adapter; + private IMqttChannelAdapter _adapter; private CancellationTokenSource _cancellationTokenSource; private MqttApplicationMessage _willMessage; public MqttClientSession( string clientId, - IOptions options, + MqttServerOptions options, MqttClientSessionsManager sessionsManager, - MqttClientSubscriptionsManager subscriptionsManager, - ILogger logger, - ILogger messageQueueLogger) + IMqttNetLogger logger) { _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); - _subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(subscriptionsManager)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); ClientId = clientId; - _options = options.Value; - _pendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, messageQueueLogger); + _options = options; + + _subscriptionsManager = new MqttClientSubscriptionsManager(_options); + _pendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger); } public string ClientId { get; } @@ -51,7 +49,7 @@ namespace MQTTnet.Core.Server public bool IsConnected => _adapter != null; - public async Task RunAsync(MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter) + public async Task RunAsync(MqttApplicationMessage willMessage, IMqttChannelAdapter adapter) { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); @@ -71,11 +69,11 @@ namespace MQTTnet.Core.Server } catch (MqttCommunicationException exception) { - _logger.LogWarning(new EventId(), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); + _logger.Warning(exception, "Client '{0}': Communication exception while processing client packets.", ClientId); } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); + _logger.Error(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); } } @@ -93,7 +91,7 @@ namespace MQTTnet.Core.Server _adapter = null; } - _logger.LogInformation("Client '{0}': Session stopped.", ClientId); + _logger.Info("Client '{0}': Session stopped.", ClientId); } finally { @@ -120,7 +118,7 @@ namespace MQTTnet.Core.Server _pendingMessagesQueue.Enqueue(publishPacket); } - private async Task ReceivePacketsAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken) + private async Task ReceivePacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) { try { @@ -135,17 +133,17 @@ namespace MQTTnet.Core.Server } catch (MqttCommunicationException exception) { - _logger.LogWarning(new EventId(), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); + _logger.Warning(exception, "Client '{0}': Communication exception while processing client packets.", ClientId); await StopAsync(); } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); + _logger.Error(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); await StopAsync(); } } - private Task ProcessReceivedPacketAsync(IMqttCommunicationAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken) + private Task ProcessReceivedPacketAsync(IMqttChannelAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken) { if (packet is MqttPingReqPacket) { @@ -188,11 +186,11 @@ namespace MQTTnet.Core.Server return StopAsync(); } - _logger.LogWarning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); + _logger.Warning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); return StopAsync(); } - private async Task HandleIncomingSubscribePacketAsync(IMqttCommunicationAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) + private async Task HandleIncomingSubscribePacketAsync(IMqttChannelAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) { var subscribeResult = _subscriptionsManager.Subscribe(subscribePacket, ClientId); @@ -215,7 +213,7 @@ namespace MQTTnet.Core.Server } } - private async Task HandleIncomingPublishPacketAsync(IMqttCommunicationAdapter adapter, MqttPublishPacket publishPacket, CancellationToken cancellationToken) + private async Task HandleIncomingPublishPacketAsync(IMqttChannelAdapter adapter, MqttPublishPacket publishPacket, CancellationToken cancellationToken) { var applicationMessage = publishPacket.ToApplicationMessage(); @@ -255,7 +253,7 @@ namespace MQTTnet.Core.Server } } - private Task HandleIncomingPubRelPacketAsync(IMqttCommunicationAdapter adapter, MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken) + private Task HandleIncomingPubRelPacketAsync(IMqttChannelAdapter adapter, MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken) { lock (_unacknowledgedPublishPackets) { diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index 80722ab..6323522 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -4,13 +4,12 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Serializer; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; namespace MQTTnet.Core.Server { @@ -20,27 +19,21 @@ namespace MQTTnet.Core.Server private readonly SemaphoreSlim _sessionsSemaphore = new SemaphoreSlim(1, 1); private readonly MqttServerOptions _options; - private readonly ILogger _logger; - private readonly IMqttClientSesssionFactory _clientSesssionFactory; - private readonly IMqttClientRetainedMessageManager _clientRetainedMessageManager; - - public MqttClientSessionsManager( - IOptions options, - ILogger logger, - IMqttClientSesssionFactory clientSesssionFactory, - IMqttClientRetainedMessageManager clientRetainedMessageManager) + private readonly MqttRetainedMessagesManager _retainedMessagesManager; + private readonly IMqttNetLogger _logger; + + public MqttClientSessionsManager(MqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _options = options.Value ?? throw new ArgumentNullException(nameof(options)); - _clientSesssionFactory = clientSesssionFactory ?? throw new ArgumentNullException(nameof(clientSesssionFactory)); - _clientRetainedMessageManager = clientRetainedMessageManager ?? throw new ArgumentNullException(nameof(clientRetainedMessageManager)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); } public event EventHandler ClientConnected; public event EventHandler ClientDisconnected; public event EventHandler ApplicationMessageReceived; - public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter, CancellationToken cancellationToken) + public async Task RunClientSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) { var clientId = string.Empty; try @@ -80,14 +73,11 @@ namespace MQTTnet.Core.Server ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion })); - using (_logger.BeginScope(clientId)) - { - await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false); - } + await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false); } catch (Exception exception) { - _logger.LogError(new EventId(), exception, exception.Message); + _logger.Error(exception, exception.Message); } finally { @@ -157,7 +147,7 @@ namespace MQTTnet.Core.Server if (applicationMessage.Retain) { - await _clientRetainedMessageManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false); + await _retainedMessagesManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false); } var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession?.ClientId, applicationMessage); @@ -165,7 +155,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Error while processing application message"); + _logger.Error(exception, "Error while processing application message"); } lock (_sessions) @@ -179,7 +169,7 @@ namespace MQTTnet.Core.Server public Task> GetRetainedMessagesAsync(MqttSubscribePacket subscribePacket) { - return _clientRetainedMessageManager.GetSubscribedMessagesAsync(subscribePacket); + return _retainedMessagesManager.GetSubscribedMessagesAsync(subscribePacket); } private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket) @@ -206,11 +196,11 @@ namespace MQTTnet.Core.Server await clientSession.StopAsync(); clientSession = null; - _logger.LogTrace("Stopped existing session of client '{0}'.", connectPacket.ClientId); + _logger.Trace("Stopped existing session of client '{0}'.", connectPacket.ClientId); } else { - _logger.LogTrace("Reusing existing session of client '{0}'.", connectPacket.ClientId); + _logger.Trace("Reusing existing session of client '{0}'.", connectPacket.ClientId); } } @@ -219,10 +209,10 @@ namespace MQTTnet.Core.Server { isExistingSession = false; - clientSession = _clientSesssionFactory.CreateClientSession(connectPacket.ClientId, this); + clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _logger); _sessions[connectPacket.ClientId] = clientSession; - _logger.LogTrace("Created a new session for client '{0}'.", connectPacket.ClientId); + _logger.Trace("Created a new session for client '{0}'.", connectPacket.ClientId); } return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; diff --git a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs index 5af3a2d..8801c01 100644 --- a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using Microsoft.Extensions.Options; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; @@ -11,9 +10,9 @@ namespace MQTTnet.Core.Server private readonly Dictionary _subscriptions = new Dictionary(); private readonly MqttServerOptions _options; - public MqttClientSubscriptionsManager(IOptions options) + public MqttClientSubscriptionsManager(MqttServerOptions options) { - _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); + _options = options ?? throw new ArgumentNullException(nameof(options)); } public MqttClientSubscribeResult Subscribe(MqttSubscribePacket subscribePacket, string clientId) diff --git a/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs b/MQTTnet.Core/Server/MqttRetainedMessagesManager.cs similarity index 80% rename from MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs rename to MQTTnet.Core/Server/MqttRetainedMessagesManager.cs index 5fe8522..cf66723 100644 --- a/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs +++ b/MQTTnet.Core/Server/MqttRetainedMessagesManager.cs @@ -4,22 +4,21 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Packets; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; +using MQTTnet.Core.Diagnostics; namespace MQTTnet.Core.Server { - public sealed class MqttClientRetainedMessagesManager : IMqttClientRetainedMessageManager + public sealed class MqttRetainedMessagesManager { private readonly Dictionary _retainedMessages = new Dictionary(); private readonly SemaphoreSlim _gate = new SemaphoreSlim(1, 1); - private readonly ILogger _logger; + private readonly IMqttNetLogger _logger; private readonly MqttServerOptions _options; - public MqttClientRetainedMessagesManager(IOptions options, ILogger logger) + public MqttRetainedMessagesManager(MqttServerOptions options, IMqttNetLogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _options = options.Value ?? throw new ArgumentNullException(nameof(options)); + _options = options ?? throw new ArgumentNullException(nameof(options)); } public async Task LoadMessagesAsync() @@ -42,7 +41,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Unhandled exception while loading retained messages."); + _logger.Error(exception, "Unhandled exception while loading retained messages."); } finally { @@ -61,7 +60,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - _logger.LogError(new EventId(), exception, "Unhandled exception while handling retained messages."); + _logger.Error(exception, "Unhandled exception while handling retained messages."); } finally { @@ -110,7 +109,7 @@ namespace MQTTnet.Core.Server if (applicationMessage.Payload?.Any() == false) { saveIsRequired = _retainedMessages.Remove(applicationMessage.Topic); - _logger.LogInformation("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic); + _logger.Info("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic); } else { @@ -129,12 +128,12 @@ namespace MQTTnet.Core.Server } } - _logger.LogInformation("Client '{0}' set retained message for topic '{1}'.", clientId, applicationMessage.Topic); + _logger.Info("Client '{0}' set retained message for topic '{1}'.", clientId, applicationMessage.Topic); } if (!saveIsRequired) { - _logger.LogTrace("Skipped saving retained messages because no changes were detected."); + _logger.Trace("Skipped saving retained messages because no changes were detected."); } if (saveIsRequired && _options.Storage != null) diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index c671907..f934928 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -3,33 +3,24 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; using System.Linq; +using MQTTnet.Core.Diagnostics; namespace MQTTnet.Core.Server { public class MqttServer : IMqttServer { - private readonly IMqttClientRetainedMessageManager _clientRetainedMessageManager; - private readonly ILogger _logger; - private readonly MqttClientSessionsManager _clientSessionsManager; private readonly ICollection _adapters; - private readonly MqttServerOptions _options; + private readonly IMqttNetLogger _logger; + private MqttClientSessionsManager _clientSessionsManager; + private MqttRetainedMessagesManager _retainedMessagesManager; private CancellationTokenSource _cancellationTokenSource; + private MqttServerOptions _options; - public MqttServer( - IOptions options, - IEnumerable adapters, - ILogger logger, - MqttClientSessionsManager clientSessionsManager, - IMqttClientRetainedMessageManager clientRetainedMessageManager) + public MqttServer(IEnumerable adapters, IMqttNetLogger logger) { - _options = options.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _clientSessionsManager = clientSessionsManager ?? throw new ArgumentNullException(nameof(clientSessionsManager)); - _clientRetainedMessageManager = clientRetainedMessageManager ?? throw new ArgumentNullException(nameof(clientRetainedMessageManager)); if (adapters == null) { @@ -37,15 +28,6 @@ namespace MQTTnet.Core.Server } _adapters = adapters.ToList(); - - _clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); - _clientSessionsManager.ClientConnected += OnClientConnected; - _clientSessionsManager.ClientDisconnected += OnClientDisconnected; - } - - public Task> GetConnectedClientsAsync() - { - return _clientSessionsManager.GetConnectedClientsAsync(); } public event EventHandler Started; @@ -53,14 +35,16 @@ namespace MQTTnet.Core.Server public event EventHandler ClientDisconnected; public event EventHandler ApplicationMessageReceived; + public Task> GetConnectedClientsAsync() + { + return _clientSessionsManager.GetConnectedClientsAsync(); + } + public async Task PublishAsync(IEnumerable applicationMessages) { if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); - if (_cancellationTokenSource == null) - { - throw new InvalidOperationException("The server is not started."); - } + if (_cancellationTokenSource == null) throw new InvalidOperationException("The server is not started."); foreach (var applicationMessage in applicationMessages) { @@ -68,13 +52,21 @@ namespace MQTTnet.Core.Server } } - public async Task StartAsync() + public async Task StartAsync(MqttServerOptions options) { - if (_cancellationTokenSource != null) throw new InvalidOperationException("The MQTT server is already started."); + _options = options ?? throw new ArgumentNullException(nameof(options)); + + if (_cancellationTokenSource != null) throw new InvalidOperationException("The server is already started."); _cancellationTokenSource = new CancellationTokenSource(); + _retainedMessagesManager = new MqttRetainedMessagesManager(_options, _logger); - await _clientRetainedMessageManager.LoadMessagesAsync(); + _clientSessionsManager = new MqttClientSessionsManager(_options, _retainedMessagesManager, _logger); + _clientSessionsManager.ApplicationMessageReceived += OnApplicationMessageReceived; + _clientSessionsManager.ClientConnected += OnClientConnected; + _clientSessionsManager.ClientDisconnected += OnClientDisconnected; + + await _retainedMessagesManager.LoadMessagesAsync(); foreach (var adapter in _adapters) { @@ -82,26 +74,48 @@ namespace MQTTnet.Core.Server await adapter.StartAsync(_options); } - _logger.LogInformation("Started."); + _logger.Info("Started."); Started?.Invoke(this, new MqttServerStartedEventArgs()); } public async Task StopAsync() { - _cancellationTokenSource?.Cancel(false); - _cancellationTokenSource?.Dispose(); - _cancellationTokenSource = null; - - foreach (var adapter in _adapters) + try { - adapter.ClientAccepted -= OnClientAccepted; - await adapter.StopAsync(); + if (_cancellationTokenSource == null) + { + return; + } + + _cancellationTokenSource.Cancel(false); + _cancellationTokenSource.Dispose(); + + foreach (var adapter in _adapters) + { + adapter.ClientAccepted -= OnClientAccepted; + await adapter.StopAsync(); + } + + await _clientSessionsManager.StopAsync(); + + _logger.Info("Stopped."); } + finally + { + _cancellationTokenSource = null; - await _clientSessionsManager.StopAsync(); + _retainedMessagesManager = null; - _logger.LogInformation("Stopped."); + if (_clientSessionsManager != null) + { + _clientSessionsManager.ApplicationMessageReceived -= OnApplicationMessageReceived; + _clientSessionsManager.ClientConnected -= OnClientConnected; + _clientSessionsManager.ClientDisconnected -= OnClientDisconnected; + } + + _clientSessionsManager = null; + } } private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) @@ -111,14 +125,19 @@ namespace MQTTnet.Core.Server private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs) { - _logger.LogInformation("Client '{0}': Connected.", eventArgs.Client.ClientId); + _logger.Info("Client '{0}': Connected.", eventArgs.Client.ClientId); ClientConnected?.Invoke(this, eventArgs); } private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) { - _logger.LogInformation("Client '{0}': Disconnected.", eventArgs.Client.ClientId); + _logger.Info("Client '{0}': Disconnected.", eventArgs.Client.ClientId); ClientDisconnected?.Invoke(this, eventArgs); } + + private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) + { + ApplicationMessageReceived?.Invoke(this, e); + } } } diff --git a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj index 88ffbe4..b03d0d1 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj +++ b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj @@ -9,7 +9,7 @@ - + diff --git a/Tests/MQTTnet.Core.Tests/MqttLoggerProviderTest.cs b/Tests/MQTTnet.Core.Tests/MqttLoggerProviderTest.cs deleted file mode 100644 index bda3195..0000000 --- a/Tests/MQTTnet.Core.Tests/MqttLoggerProviderTest.cs +++ /dev/null @@ -1,51 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using MQTTnet.Core.Diagnostics; - -namespace MQTTnet.Core.Tests -{ - [TestClass] - public class MqttLoggerProviderTest - { - [TestMethod] - public void TestLoggerCallback() - { - var serviceCollection = new ServiceCollection(); - serviceCollection.AddLogging(); - - var serviceProvider = serviceCollection.BuildServiceProvider(); - using ((IDisposable)serviceProvider) - { - var loggerFactory = serviceProvider.GetRequiredService(); - - loggerFactory.AddMqttTrace(); - - var expectedMsg = "Hello World!"; - MqttNetTraceMessage msg = null; - - MqttNetTrace.TraceMessagePublished += (sender, args) => - { - msg = args.TraceMessage; - }; - - var logger = loggerFactory.CreateLogger(); - - logger.LogInformation(expectedMsg); - - Assert.AreEqual(expectedMsg, msg.Message); - - var expectedException = new Exception("bad stuff"); - - logger.LogError(new EventId(), expectedException, expectedException.Message); - - Assert.AreEqual(expectedException, msg.Exception); - Assert.AreEqual(expectedException.Message, msg.Message); - } - } - } -} diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index faad01c..89cae3b 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -1,16 +1,12 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; -using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; -using Microsoft.Extensions.DependencyInjection; -using MQTTnet.Core.Internal; -using MQTTnet.Core.Packets; namespace MQTTnet.Core.Tests { @@ -54,18 +50,12 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_WillMessage() { var serverAdapter = new TestMqttServerAdapter(); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); - var services = new ServiceCollection() - .AddLogging() - .AddMqttServer() - .AddSingleton(serverAdapter) - .BuildServiceProvider(); - - var s = new MqttFactory(services).CreateMqttServer(); var receivedMessagesCount = 0; try { - await s.StartAsync(); + await s.StartAsync(new MqttServerOptions()); var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -92,19 +82,13 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_Unsubscribe() { var serverAdapter = new TestMqttServerAdapter(); - var services = new ServiceCollection() - .AddLogging() - .AddMqttServer() - .AddSingleton(serverAdapter) - .BuildServiceProvider(); - - var s = new MqttFactory(services).CreateMqttServer(); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); var receivedMessagesCount = 0; try { - await s.StartAsync(); + await s.StartAsync(new MqttServerOptions()); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c2 = await serverAdapter.ConnectTestClient(s, "c2"); @@ -140,18 +124,12 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_Publish() { var serverAdapter = new TestMqttServerAdapter(); - var services = new ServiceCollection() - .AddLogging() - .AddMqttServer() - .AddSingleton(serverAdapter) - .BuildServiceProvider(); - - var s = new MqttFactory(services).CreateMqttServer(); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); var receivedMessagesCount = 0; try { - await s.StartAsync(); + await s.StartAsync(new MqttServerOptions()); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -167,7 +145,7 @@ namespace MQTTnet.Core.Tests { await s.StopAsync(); } - + Assert.AreEqual(1, receivedMessagesCount); } @@ -175,18 +153,13 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_NoRetainedMessage() { var serverAdapter = new TestMqttServerAdapter(); - var services = new ServiceCollection() - .AddLogging() - .AddMqttServer() - .AddSingleton(serverAdapter) - .BuildServiceProvider(); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); - var s = new MqttFactory(services).CreateMqttServer(); var receivedMessagesCount = 0; try { - await s.StartAsync(); + await s.StartAsync(new MqttServerOptions()); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).Build()); @@ -194,7 +167,7 @@ namespace MQTTnet.Core.Tests var c2 = await serverAdapter.ConnectTestClient(s, "c2"); c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); + await c2.SubscribeAsync(new TopicFilter("retained")); await Task.Delay(500); } @@ -210,28 +183,23 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_RetainedMessage() { var serverAdapter = new TestMqttServerAdapter(); - var services = new ServiceCollection() - .AddLogging() - .AddMqttServer() - .AddSingleton(serverAdapter) - .BuildServiceProvider(); - - var s = new MqttFactory(services).CreateMqttServer(); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); var receivedMessagesCount = 0; try { - await s.StartAsync(); + await s.StartAsync(new MqttServerOptions()); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); await c1.DisconnectAsync(); - await services.WaitForRetainedMessage("retained").TimeoutAfter(TimeSpan.FromSeconds(5)); + await Task.Delay(TimeSpan.FromSeconds(2)); + // TODO: Find another way to wait for the retained components. var c2 = await serverAdapter.ConnectTestClient(s, "c2"); c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); + await c2.SubscribeAsync(new TopicFilter("retained")); await Task.Delay(500); } @@ -239,7 +207,7 @@ namespace MQTTnet.Core.Tests { await s.StopAsync(); } - + Assert.AreEqual(1, receivedMessagesCount); } @@ -247,17 +215,12 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_ClearRetainedMessage() { var serverAdapter = new TestMqttServerAdapter(); - var services = new ServiceCollection() - .AddLogging() - .AddMqttServer() - .AddSingleton(serverAdapter) - .BuildServiceProvider(); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); - var s = new MqttFactory(services).CreateMqttServer(); var receivedMessagesCount = 0; try { - await s.StartAsync(); + await s.StartAsync(new MqttServerOptions()); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); @@ -284,18 +247,13 @@ namespace MQTTnet.Core.Tests { var storage = new TestStorage(); var serverAdapter = new TestMqttServerAdapter(); - - var services = new ServiceCollection() - .AddLogging() - .AddMqttServer() - .AddSingleton(serverAdapter) - .BuildServiceProvider(); - - var s = new MqttFactory(services).CreateMqttServer(options => options.Storage = storage); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); try { - await s.StartAsync(); + var options = new MqttServerOptions { Storage = storage }; + + await s.StartAsync(options); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); @@ -306,18 +264,20 @@ namespace MQTTnet.Core.Tests await s.StopAsync(); } - await services.WaitForRetainedMessage("retained").TimeoutAfter(TimeSpan.FromSeconds(5)); + await Task.Delay(TimeSpan.FromSeconds(2)); + // TODO: Find another way to wait for the retained components. - s = new MqttFactory(services).CreateMqttServer(options => options.Storage = storage); + s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); var receivedMessagesCount = 0; try { - await s.StartAsync(); + var options = new MqttServerOptions { Storage = storage }; + await s.StartAsync(options); var c2 = await serverAdapter.ConnectTestClient(s, "c2"); c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); + await c2.SubscribeAsync(new TopicFilter("retained")); await Task.Delay(500); } @@ -338,16 +298,13 @@ namespace MQTTnet.Core.Tests } var serverAdapter = new TestMqttServerAdapter(); - var services = new ServiceCollection() - .AddLogging() - .AddMqttServer() - .AddSingleton(serverAdapter) - .BuildServiceProvider(); - - var s = new MqttFactory(services).CreateMqttServer(options => options.ApplicationMessageInterceptor = Interceptor); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); + try { - await s.StartAsync(); + var options = new MqttServerOptions { ApplicationMessageInterceptor = Interceptor }; + + await s.StartAsync(options); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c2 = await serverAdapter.ConnectTestClient(s, "c2"); @@ -397,17 +354,12 @@ namespace MQTTnet.Core.Tests int expectedReceivedMessagesCount) { var serverAdapter = new TestMqttServerAdapter(); - var services = new ServiceCollection() - .AddMqttServer() - .AddLogging() - .AddSingleton(serverAdapter) - .BuildServiceProvider(); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); - var s = services.GetRequiredService(); var receivedMessagesCount = 0; try { - await s.StartAsync(); + await s.StartAsync(new MqttServerOptions()); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c2 = await serverAdapter.ConnectTestClient(s, "c2"); @@ -426,29 +378,8 @@ namespace MQTTnet.Core.Tests { await s.StopAsync(); } - - Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount); - } - } - public static class TestExtensions - { - public static async Task WaitForRetainedMessage(this IServiceProvider services, string topic) - { - var retainMessagemanager = services.GetRequiredService(); - - var subscribe = new MqttSubscribePacket() - { - TopicFilters = new List() - { - new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce) - } - }; - - while (!(await retainMessagemanager.GetSubscribedMessagesAsync(subscribe)).Any()) - { - await Task.Delay(TimeSpan.FromMilliseconds(10)); - } + Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount); } } } diff --git a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs index 44bad99..f488543 100644 --- a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs @@ -1,5 +1,4 @@ -using Microsoft.Extensions.Options; -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; @@ -12,7 +11,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeSingleSuccess() { - var sm = new MqttClientSubscriptionsManager(new OptionsWrapper(new MqttServerOptions())); + var sm = new MqttClientSubscriptionsManager(new MqttServerOptions()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilter("A/B/C")); @@ -31,7 +30,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeSingleNoSuccess() { - var sm = new MqttClientSubscriptionsManager(new OptionsWrapper(new MqttServerOptions())); + var sm = new MqttClientSubscriptionsManager(new MqttServerOptions()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilter("A/B/C")); @@ -50,7 +49,7 @@ namespace MQTTnet.Core.Tests [TestMethod] public void MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle() { - var sm = new MqttClientSubscriptionsManager(new OptionsWrapper(new MqttServerOptions())); + var sm = new MqttClientSubscriptionsManager(new MqttServerOptions()); var sp = new MqttSubscribePacket(); sp.TopicFilters.Add(new TopicFilter("A/B/C")); diff --git a/Tests/MQTTnet.Core.Tests/ServiceCollectionTest.cs b/Tests/MQTTnet.Core.Tests/ServiceCollectionTest.cs deleted file mode 100644 index 0117ed0..0000000 --- a/Tests/MQTTnet.Core.Tests/ServiceCollectionTest.cs +++ /dev/null @@ -1,30 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace MQTTnet.Core.Tests -{ - [TestClass] - public class ServiceCollectionTest - { - [TestMethod] - public void TestCanConstructAllServices() - { - var services = new ServiceCollection() - .AddLogging() - .AddMqttServer() - .AddMqttClient(); - - var serviceProvider = services - .BuildServiceProvider(); - - foreach (var service in services) - { - if (service.ServiceType.IsGenericType) - { - continue; - } - serviceProvider.GetRequiredService(service.ServiceType); - } - } - } -} diff --git a/Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs b/Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs deleted file mode 100644 index 638df85..0000000 --- a/Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; -using MQTTnet.Core.Server; - -namespace MQTTnet.Core.Tests -{ - public class TestClientSessionFactory : IMqttClientSesssionFactory - { - public MqttClientSession CreateClientSession(string clientId, MqttClientSessionsManager mqttClientSessionsManager) - { - throw new NotImplementedException(); - //return new MqttClientSession(clientId, mqttClientSessionsManager, new TestLogger(), new TestLogger()); - } - } -} diff --git a/Tests/MQTTnet.Core.Tests/TestLogger.cs b/Tests/MQTTnet.Core.Tests/TestLogger.cs deleted file mode 100644 index f57c877..0000000 --- a/Tests/MQTTnet.Core.Tests/TestLogger.cs +++ /dev/null @@ -1,26 +0,0 @@ -using Microsoft.Extensions.Logging; -using System; - -namespace MQTTnet.Core.Tests -{ - public class TestLogger : IDisposable, ILogger - { - public IDisposable BeginScope(TState state) - { - return this; - } - - public bool IsEnabled(LogLevel logLevel) - { - return true; - } - - public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) - { - } - - public void Dispose() - { - } - } -} diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index ecde0f1..53a6f0f 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -9,7 +9,7 @@ using MQTTnet.Core.Serializer; namespace MQTTnet.Core.Tests { - public class TestMqttCommunicationAdapter : IMqttCommunicationAdapter + public class TestMqttCommunicationAdapter : IMqttChannelAdapter { private readonly BlockingCollection _incomingPackets = new BlockingCollection(); diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs index 4a0acad..7da1e16 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs @@ -1,24 +1,19 @@ using MQTTnet.Core.Adapter; -using MQTTnet.Core.Channel; using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; namespace MQTTnet.Core.Tests { - public class TestMqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory + public class TestMqttCommunicationAdapterFactory : IMqttClientAdapterFactory { - private readonly IMqttCommunicationAdapter _adapter; + private readonly IMqttChannelAdapter _adapter; - public TestMqttCommunicationAdapterFactory(IMqttCommunicationAdapter adapter) + public TestMqttCommunicationAdapterFactory(IMqttChannelAdapter adapter) { _adapter = adapter; } - - public IMqttCommunicationAdapter CreateClientCommunicationAdapter(IMqttClientOptions options) - { - return _adapter; - } - - public IMqttCommunicationAdapter CreateServerCommunicationAdapter(IMqttCommunicationChannel channel) + + public IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger) { return _adapter; } diff --git a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs index cac3b1c..e65d9f8 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs @@ -3,6 +3,7 @@ using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Server; using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; namespace MQTTnet.Core.Tests { @@ -19,8 +20,7 @@ namespace MQTTnet.Core.Tests var client = new MqttClient( new TestMqttCommunicationAdapterFactory(adapterA), - new TestLogger(), - new MqttPacketDispatcher(new TestLogger())); + new MqttNetLogger()); var connected = WaitForClientToConnect(server, clientId); @@ -57,7 +57,7 @@ namespace MQTTnet.Core.Tests return tcs.Task; } - private void FireClientAcceptedEvent(IMqttCommunicationAdapter adapter) + private void FireClientAcceptedEvent(IMqttChannelAdapter adapter) { ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(adapter)); } diff --git a/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs index bc644e5..dfd7d0d 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs @@ -1,7 +1,6 @@ using System; using System.Text; using System.Threading.Tasks; -using Microsoft.Extensions.Logging; using MQTTnet.Core; using MQTTnet.Core.Client; @@ -28,8 +27,7 @@ namespace MQTTnet.TestApp.NetCore }; var factory = new MqttFactory(); - factory.GetLoggerFactory().AddConsole(); - + var client = factory.CreateMqttClient(); client.ApplicationMessageReceived += (s, e) => diff --git a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj index d89bdd3..8f25448 100644 --- a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj +++ b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj @@ -14,10 +14,4 @@ - - - 1.1.2 - - - diff --git a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs index 7f6d430..7f76b65 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs @@ -4,8 +4,6 @@ using MQTTnet.Core; using MQTTnet.Core.Client; using MQTTnet.Core.ManagedClient; using MQTTnet.Core.Protocol; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; using System.IO; using Newtonsoft.Json; using System.Collections.Generic; @@ -16,14 +14,6 @@ namespace MQTTnet.TestApp.NetCore { public static async Task RunAsync() { - var services = new ServiceCollection() - .AddMqttClient() - .AddLogging() - .BuildServiceProvider(); - - services.GetService() - .AddConsole(); - var ms = new ClientRetainedMessageHandler(); var options = new ManagedMqttClientOptions @@ -44,7 +34,7 @@ namespace MQTTnet.TestApp.NetCore try { - var managedClient = services.GetRequiredService(); + var managedClient = new MqttFactory().CreateManagedMqttClient(); managedClient.ApplicationMessageReceived += (s, e) => { Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic); diff --git a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs index e21a7ea..c6bbe04 100644 --- a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs @@ -1,9 +1,6 @@ -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using MQTTnet.Core; +using MQTTnet.Core; using MQTTnet.Core.Client; using MQTTnet.Core.Protocol; -using MQTTnet.Core.Server; using System; using System.Collections.Generic; using System.Diagnostics; @@ -11,6 +8,7 @@ using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Core.Server; namespace MQTTnet.TestApp.NetCore { @@ -18,46 +16,21 @@ namespace MQTTnet.TestApp.NetCore { public static async Task RunAsync() { - var services = new ServiceCollection() - .AddMqttServer(options => - { - - options.ConnectionValidator = p => - { - if (p.ClientId == "SpecialClient") - { - if (p.Username != "USER" || p.Password != "PASS") - { - return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; - } - } - - return MqttConnectReturnCode.ConnectionAccepted; - }; - - options.DefaultCommunicationTimeout = TimeSpan.FromMinutes(10); - }) - .AddMqttClient() - .AddLogging() - .BuildServiceProvider(); - - //services.GetService().AddConsole(LogLevel.Warning, true); - Console.WriteLine("Press 'c' for concurrent sends. Otherwise in one batch."); var concurrent = Console.ReadKey(true).KeyChar == 'c'; - var server = Task.Factory.StartNew(() => RunServerAsync(services), TaskCreationOptions.LongRunning); - var client = Task.Factory.StartNew(() => RunClientAsync(2000, TimeSpan.FromMilliseconds(10), services, concurrent), TaskCreationOptions.LongRunning); + var server = Task.Factory.StartNew(async () => await RunServerAsync(), TaskCreationOptions.LongRunning); + var client = Task.Factory.StartNew(async () => await RunClientAsync(2000, TimeSpan.FromMilliseconds(10), concurrent), TaskCreationOptions.LongRunning); await Task.WhenAll(server, client).ConfigureAwait(false); } - private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval, IServiceProvider serviceProvider, bool concurrent) + private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval, bool concurrent) { - return Task.WhenAll(Enumerable.Range(0, 3).Select(i => Task.Run(() => RunClientAsync(msgChunkSize, interval, serviceProvider, concurrent)))); + return Task.WhenAll(Enumerable.Range(0, 3).Select(i => Task.Run(() => RunClientAsync(msgChunkSize, interval, concurrent)))); } - private static async Task RunClientAsync(int msgChunkSize, TimeSpan interval, IServiceProvider serviceProvider, bool concurrent) + private static async Task RunClientAsync(int msgChunkSize, TimeSpan interval, bool concurrent) { try { @@ -69,7 +42,7 @@ namespace MQTTnet.TestApp.NetCore CommunicationTimeout = TimeSpan.FromMinutes(10) }; - var client = serviceProvider.GetRequiredService(); + var client = new MqttFactory().CreateMqttClient(); client.Connected += async (s, e) => { @@ -77,7 +50,7 @@ namespace MQTTnet.TestApp.NetCore await client.SubscribeAsync(new List { - new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce) + new TopicFilter("#") }); Console.WriteLine("### SUBSCRIBED ###"); @@ -120,7 +93,6 @@ namespace MQTTnet.TestApp.NetCore stopwatch.Stop(); Console.WriteLine($"Sent 10.000 messages within {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / (float)testMessageCount} ms / message)."); - var messages = new[] { message }; var sentMessagesCount = 0; @@ -158,9 +130,7 @@ namespace MQTTnet.TestApp.NetCore msgCount += msgs.Count; //send multiple } - - - + var now = DateTime.Now; if (last < now - TimeSpan.FromSeconds(1)) { @@ -194,11 +164,12 @@ namespace MQTTnet.TestApp.NetCore return Task.Run(() => client.PublishAsync(applicationMessage)); } - private static async Task RunServerAsync(IServiceProvider serviceProvider) + private static async Task RunServerAsync() { try { - var mqttServer = serviceProvider.GetRequiredService(); + var mqttServer = new MqttFactory().CreateMqttServer(); + var msgs = 0; var stopwatch = Stopwatch.StartNew(); mqttServer.ApplicationMessageReceived += (sender, args) => @@ -211,7 +182,7 @@ namespace MQTTnet.TestApp.NetCore stopwatch.Restart(); } }; - await mqttServer.StartAsync(); + await mqttServer.StartAsync(new MqttServerOptions()); Console.WriteLine("Press any key to exit."); Console.ReadLine(); diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index d1c010f..fb02de8 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -7,7 +7,6 @@ using System.IO; using System.Threading; using System.Threading.Tasks; using Newtonsoft.Json; -using Microsoft.Extensions.Logging; namespace MQTTnet.TestApp.NetCore { @@ -70,8 +69,6 @@ namespace MQTTnet.TestApp.NetCore { var factory = new MqttFactory(); - factory.GetLoggerFactory().AddConsole(); - var client = factory.CreateMqttClient(); } } diff --git a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs index f4a18cd..a0b1626 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs @@ -3,24 +3,21 @@ using System.Text; using System.Threading.Tasks; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; +using MQTTnet.Diagnostics; namespace MQTTnet.TestApp.NetCore { public static class ServerTest { - public static Task RunAsync() + public static async Task RunAsync() { try { - var services = new ServiceCollection() - .AddMqttServer() - .AddLogging(); + MqttNetConsoleTrace.ForwardToConsole(); - services.Configure(options => + var options = new MqttServerOptions { - options.ConnectionValidator = p => + ConnectionValidator = p => { if (p.ClientId == "SpecialClient") { @@ -31,22 +28,19 @@ namespace MQTTnet.TestApp.NetCore } return MqttConnectReturnCode.ConnectionAccepted; - }; + }, - options.Storage = new RetainedMessageHandler(); - - // Extend the timestamp for all messages from clients. - options.ApplicationMessageInterceptor = context => + Storage = new RetainedMessageHandler(), + ApplicationMessageInterceptor = context => { if (MqttTopicFilterComparer.IsMatch(context.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#")) { - // Replace the payload with the timestamp. But also extending a JSON - // based payload with the timestamp is a suitable use case. - context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O")); + // Replace the payload with the timestamp. But also extending a JSON + // based payload with the timestamp is a suitable use case. + context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O")); } - }; - // Protect several topics from being subscribed from every client. - options.SubscriptionsInterceptor = context => + }, + SubscriptionsInterceptor = context => { if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin") { @@ -58,11 +52,11 @@ namespace MQTTnet.TestApp.NetCore context.AcceptSubscription = false; context.CloseConnection = true; } - }; - }); + } + }; - var serviceProvider = services.BuildServiceProvider(); - serviceProvider.GetRequiredService().AddConsole(); + // Extend the timestamp for all messages from clients. + // Protect several topics from being subscribed from every client. //var certificate = new X509Certificate(@"C:\certs\test\test.cer", ""); //options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert); @@ -70,18 +64,18 @@ namespace MQTTnet.TestApp.NetCore //options.DefaultEndpointOptions.IsEnabled = true; //options.TlsEndpointOptions.IsEnabled = false; - var mqttServer = new MqttFactory(serviceProvider).CreateMqttServer(); + var mqttServer = new MqttFactory().CreateMqttServer(); mqttServer.ClientDisconnected += (s, e) => { Console.Write("Client disconnected event fired."); }; - mqttServer.StartAsync(); + await mqttServer.StartAsync(options); Console.WriteLine("Press any key to exit."); Console.ReadLine(); - mqttServer.StopAsync(); + await mqttServer.StopAsync(); } catch (Exception e) { @@ -89,7 +83,6 @@ namespace MQTTnet.TestApp.NetCore } Console.ReadLine(); - return Task.FromResult(0); } } } diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj index f8c1ce9..fa0763c 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj @@ -137,12 +137,6 @@ - - 1.1.2 - - - 1.1.2 - 5.4.0 diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index fcf6d09..6c7258b 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -5,8 +5,6 @@ using System.Threading.Tasks; using Windows.Security.Cryptography.Certificates; using Windows.UI.Core; using Windows.UI.Xaml; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; using MQTTnet.Core; using MQTTnet.Core.Client; using MQTTnet.Core.Diagnostics; @@ -19,7 +17,7 @@ namespace MQTTnet.TestApp.UniversalWindows { public sealed partial class MainPage { - private readonly ConcurrentQueue _traceMessages = new ConcurrentQueue(); + private readonly ConcurrentQueue _traceMessages = new ConcurrentQueue(); private IMqttClient _mqttClient; private IMqttServer _mqttServer; @@ -28,10 +26,10 @@ namespace MQTTnet.TestApp.UniversalWindows { InitializeComponent(); - MqttNetTrace.TraceMessagePublished += OnTraceMessagePublished; + MqttNetGlobalLog.LogMessagePublished += OnTraceMessagePublished; } - private async void OnTraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e) + private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e) { _traceMessages.Enqueue(e.TraceMessage); while (_traceMessages.Count > 100) @@ -252,7 +250,7 @@ namespace MQTTnet.TestApp.UniversalWindows { { // Write all trace messages to the console window. - MqttNetTrace.TraceMessagePublished += (s, e) => + MqttNetGlobalLog.LogMessagePublished += (s, e) => { Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); if (e.TraceMessage.Exception != null) @@ -262,32 +260,12 @@ namespace MQTTnet.TestApp.UniversalWindows }; } - { - // Add the console logger to the logger factory. - var services = new ServiceCollection() - .AddMqttClient() - .BuildServiceProvider(); - - services.GetRequiredService() - .AddConsole(); - } - { // Use a custom identifier for the trace messages. var clientOptions = new MqttClientOptionsBuilder() - .WithLogId("ClientX") .Build(); } - { - // Create a client from the service provider manually. - var serviceProvider = new ServiceCollection() - .AddMqttClient() - .BuildServiceProvider(); - - var mqttClient = serviceProvider.GetRequiredService(); - } - { // Create a new MQTT client. var factory = new MqttFactory(); @@ -371,34 +349,31 @@ namespace MQTTnet.TestApp.UniversalWindows // ---------------------------------- { - var services = new ServiceCollection() - .AddMqttServer(options => + var options = new MqttServerOptions(); + + options.ConnectionValidator = c => + { + if (c.ClientId.Length < 10) { - options.ConnectionValidator = c => - { - if (c.ClientId.Length < 10) - { - return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected; - } + return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected; + } - if (c.Username != "mySecretUser") - { - return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; - } + if (c.Username != "mySecretUser") + { + return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; + } - if (c.Password != "mySecretPassword") - { - return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; - } + if (c.Password != "mySecretPassword") + { + return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; + } - return MqttConnectReturnCode.ConnectionAccepted; - }; - }) - .BuildServiceProvider(); + return MqttConnectReturnCode.ConnectionAccepted; + }; - var factory = new MqttFactory(services); + var factory = new MqttFactory(); var mqttServer = factory.CreateMqttServer(); - await mqttServer.StartAsync(); + await mqttServer.StartAsync(options); Console.WriteLine("Press any key to exit."); Console.ReadLine(); @@ -422,7 +397,7 @@ namespace MQTTnet.TestApp.UniversalWindows { // Start a MQTT server. var mqttServer = new MqttFactory().CreateMqttServer(); - await mqttServer.StartAsync(); + await mqttServer.StartAsync(new MqttServerOptions()); Console.WriteLine("Press any key to exit."); Console.ReadLine(); await mqttServer.StopAsync(); @@ -430,27 +405,31 @@ namespace MQTTnet.TestApp.UniversalWindows { // Configure MQTT server. - var mqttServer = new MqttFactory().CreateMqttServer(options => + var options = new MqttServerOptions + { + ConnectionBacklog = 100 + }; + + options.DefaultEndpointOptions.Port = 1884; + options.ConnectionValidator = packet => { - options.ConnectionBacklog = 100; - options.DefaultEndpointOptions.Port = 1884; - options.ConnectionValidator = packet => + if (packet.ClientId != "Highlander") { - if (packet.ClientId != "Highlander") - { - return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected; - } + return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected; + } - return MqttConnectReturnCode.ConnectionAccepted; - }; - }); + return MqttConnectReturnCode.ConnectionAccepted; + }; + + var mqttServer = new MqttFactory().CreateMqttServer(); + await mqttServer.StartAsync(options); } { // Setup client validator. - var mqttServer = new MqttFactory().CreateMqttServer(options => + var options = new MqttServerOptions { - options.ConnectionValidator = c => + ConnectionValidator = c => { if (c.ClientId.Length < 10) { @@ -468,8 +447,8 @@ namespace MQTTnet.TestApp.UniversalWindows } return MqttConnectReturnCode.ConnectionAccepted; - }; - }); + } + }; } { @@ -512,13 +491,13 @@ namespace MQTTnet.TestApp.UniversalWindows } } - _mqttServer = new MqttFactory().CreateMqttServer(o => - { - o.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text); - o.Storage = storage; - }); + _mqttServer = new MqttFactory().CreateMqttServer(); + + var options = new MqttServerOptions(); + options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text); + options.Storage = storage; - await _mqttServer.StartAsync(); + await _mqttServer.StartAsync(options); } private async void StopServer(object sender, RoutedEventArgs e)