diff --git a/Build/MQTTnet.AspNetCore.nuspec b/Build/MQTTnet.AspNetCore.nuspec index a46cd81..61fc121 100644 --- a/Build/MQTTnet.AspNetCore.nuspec +++ b/Build/MQTTnet.AspNetCore.nuspec @@ -11,15 +11,15 @@ false This is a support library to integrate MQTTnet into AspNetCore. For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/). - Copyright Christian Kratky 2016-2019 + Copyright Christian Kratky 2016-2020 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - - - - + + + + diff --git a/Build/MQTTnet.Extensions.ManagedClient.nuspec b/Build/MQTTnet.Extensions.ManagedClient.nuspec index edec769..8ae373f 100644 --- a/Build/MQTTnet.Extensions.ManagedClient.nuspec +++ b/Build/MQTTnet.Extensions.ManagedClient.nuspec @@ -11,7 +11,7 @@ false This is an extension library which provides a managed MQTT client with additional features using MQTTnet. For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/). - Copyright Christian Kratky 2016-2019 + Copyright Christian Kratky 2016-2020 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Build/MQTTnet.Extensions.Rpc.nuspec b/Build/MQTTnet.Extensions.Rpc.nuspec index ce81cb6..4089ae4 100644 --- a/Build/MQTTnet.Extensions.Rpc.nuspec +++ b/Build/MQTTnet.Extensions.Rpc.nuspec @@ -11,7 +11,7 @@ false This is an extension library which allows executing synchronous device calls including a response using MQTTnet. For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/). - Copyright Christian Kratky 2016-2019 + Copyright Christian Kratky 2016-2020 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Build/MQTTnet.Extensions.WebSocket4Net.nuspec b/Build/MQTTnet.Extensions.WebSocket4Net.nuspec index 054933e..0baacdd 100644 --- a/Build/MQTTnet.Extensions.WebSocket4Net.nuspec +++ b/Build/MQTTnet.Extensions.WebSocket4Net.nuspec @@ -11,7 +11,7 @@ false This is an extension library which allows using _WebSocket4Net_ as transport for MQTTnet clients. For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/). - Copyright Christian Kratky 2016-2019 + Copyright Christian Kratky 2016-2020 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Build/MQTTnet.NETStandard.nuspec b/Build/MQTTnet.NETStandard.nuspec index c505dd1..ef1b3fa 100644 --- a/Build/MQTTnet.NETStandard.nuspec +++ b/Build/MQTTnet.NETStandard.nuspec @@ -11,7 +11,7 @@ false This package contains the .NET Standard version of MQTTnet only. For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/). - Copyright Christian Kratky 2016-2019 + Copyright Christian Kratky 2016-2020 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 10c7280..9e25687 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -12,17 +12,23 @@ MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol. * [All] Due to a merge issue not all changes are included in 3.0.8. All these changes are now included in this version. +* [Core] Updated all nuget references. +* [Core] Added MqttApplicationMessage.GetUserProperty() convenience method (thanks to @PMExtra). +* [LowLevelMqttClient] Added low level MQTT client in order to provide more flexibility when using the MQTT protocol. This client requires detailed knowledge about the MQTT protocol. +* [Client] Improve connection stability (thanks to @jltjohanlindqvist). +* [Client] Support WithConnectionUri to configure client (thanks to @PMExtra). * [ManagedClient] Added builder class for MqttClientUnsubscribeOptions (thanks to @dominikviererbe). * [ManagedClient] Added support for persisted sessions (thansk to @PMExtra). -* [Client] Improve connection stability (thanks to @jltjohanlindqvist). * [ManagedClient] Fixed a memory leak (thanks to @zawodskoj). * [ManagedClient] Improved internal subscription management (#569, thanks to @cstichlberger). * [ManagedClient] Refactored log messages (thanks to @cstichlberger). * [Server] Added support for assigned client IDs (MQTTv5 only) (thanks to @bcrosnier). * [Server] Added interceptor for unsubscriptions. -* [MQTTnet.Server] Added interceptor for unsubscriptions. +* [Server] Removed exceptions when user properties are set with MQTT protocol version 3.1 +* [Server] Added custom session items to the client status. +* [Server] Added option to check whether the server is already started properly or not. * [MQTTnet.AspNetCore] improved compatibility with AspNetCore 3.1 -* [MQTTnet.Server] Added option to check whether the server is already started properly or not. +* [MQTTnet.Server] Added interceptor for unsubscriptions. Copyright Christian Kratky 2016-2020 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin @@ -40,7 +46,7 @@ - + diff --git a/MQTTnet.sln b/MQTTnet.sln index 22fd0ad..ebbe8cd 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -53,6 +53,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Server", "Source\MQ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.WebSocket4Net", "Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj", "{2BD01D53-4CA5-4142-BE8D-313876395E3E}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Apps", "Apps", "{A56E3128-1639-4F31-873A-325E14BB6295}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -270,7 +272,7 @@ Global EndGlobalSection GlobalSection(NestedProjects) = preSolution {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} - {FF1F72D6-9524-4422-9497-3CC0002216ED} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} + {FF1F72D6-9524-4422-9497-3CC0002216ED} = {A56E3128-1639-4F31-873A-325E14BB6295} {3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} {3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} @@ -279,7 +281,7 @@ Global {998D04DD-7CB0-45F5-A393-E2495C16399E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {C400533A-8EBA-4F0B-BF4D-295C3708604B} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587} {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} - {5699FB8C-838C-4AB0-80A5-9CA809F9B65B} = {32A630A7-2598-41D7-B625-204CD906F5FB} + {5699FB8C-838C-4AB0-80A5-9CA809F9B65B} = {A56E3128-1639-4F31-873A-325E14BB6295} {2BD01D53-4CA5-4142-BE8D-313876395E3E} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution diff --git a/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs b/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs index e8f570c..44acb77 100644 --- a/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs +++ b/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs @@ -1,17 +1,16 @@ -using System; -using System.Net; -using MQTTnet.Adapter; +using MQTTnet.Adapter; using MQTTnet.AspNetCore.Client.Tcp; -using MQTTnet.Client; using MQTTnet.Client.Options; using MQTTnet.Diagnostics; using MQTTnet.Formatter; +using System; +using System.Net; namespace MQTTnet.AspNetCore.Client { public class MqttClientConnectionContextFactory : IMqttClientAdapterFactory { - public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Source/MQTTnet.AspnetCore/Extensions/ServiceCollectionExtensions.cs b/Source/MQTTnet.AspnetCore/Extensions/ServiceCollectionExtensions.cs index 8606142..84a10d4 100644 --- a/Source/MQTTnet.AspnetCore/Extensions/ServiceCollectionExtensions.cs +++ b/Source/MQTTnet.AspnetCore/Extensions/ServiceCollectionExtensions.cs @@ -1,10 +1,10 @@ -using System; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using MQTTnet.Adapter; using MQTTnet.Diagnostics; -using MQTTnet.Server; using MQTTnet.Implementations; -using Microsoft.Extensions.DependencyInjection; +using MQTTnet.Server; +using System; namespace MQTTnet.AspNetCore { @@ -13,7 +13,7 @@ namespace MQTTnet.AspNetCore public static IServiceCollection AddHostedMqttServer(this IServiceCollection services, IMqttServerOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); - + services.AddSingleton(options); services.AddHostedMqttServer(); @@ -23,7 +23,8 @@ namespace MQTTnet.AspNetCore public static IServiceCollection AddHostedMqttServer(this IServiceCollection services, Action configure) { - services.AddSingleton(s => { + services.AddSingleton(s => + { var builder = new MqttServerOptionsBuilder(); configure(builder); return builder.Build(); @@ -36,7 +37,8 @@ namespace MQTTnet.AspNetCore public static IServiceCollection AddHostedMqttServerWithServices(this IServiceCollection services, Action configure) { - services.AddSingleton(s => { + services.AddSingleton(s => + { var builder = new AspNetMqttServerOptionsBuilder(s); configure(builder); return builder.Build(); @@ -60,14 +62,12 @@ namespace MQTTnet.AspNetCore private static IServiceCollection AddHostedMqttServer(this IServiceCollection services) { var logger = new MqttNetLogger(); - var childLogger = logger.CreateChildLogger(); services.AddSingleton(logger); - services.AddSingleton(childLogger); services.AddSingleton(); services.AddSingleton(s => s.GetService()); services.AddSingleton(s => s.GetService()); - + return services; } diff --git a/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj b/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj index e6f0582..ff0fbf9 100644 --- a/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj +++ b/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj @@ -21,10 +21,10 @@ - - - - + + + + diff --git a/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs b/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index a052dae..8e0134c 100644 --- a/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs +++ b/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -1,20 +1,20 @@ -using System; -using System.Net.WebSockets; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Http; using MQTTnet.Adapter; using MQTTnet.Diagnostics; using MQTTnet.Formatter; using MQTTnet.Implementations; using MQTTnet.Server; +using System; +using System.Net.WebSockets; +using System.Threading.Tasks; namespace MQTTnet.AspNetCore { public class MqttWebSocketServerAdapter : IMqttServerAdapter { - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; - public MqttWebSocketServerAdapter(IMqttNetChildLogger logger) + public MqttWebSocketServerAdapter(IMqttNetLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -38,7 +38,7 @@ namespace MQTTnet.AspNetCore if (webSocket == null) throw new ArgumentNullException(nameof(webSocket)); var endpoint = $"{httpContext.Connection.RemoteIpAddress}:{httpContext.Connection.RemotePort}"; - + var clientCertificate = await httpContext.Connection.GetClientCertificateAsync().ConfigureAwait(false); try { diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 8b5a48f..10d7d77 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -1,9 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Client; +using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Publishing; @@ -13,6 +8,11 @@ using MQTTnet.Exceptions; using MQTTnet.Internal; using MQTTnet.Protocol; using MQTTnet.Server; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Extensions.ManagedClient { @@ -33,7 +33,7 @@ namespace MQTTnet.Extensions.ManagedClient private readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0); private readonly IMqttClient _mqttClient; - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; private readonly AsyncLock _messageQueueLock = new AsyncLock(); @@ -42,8 +42,8 @@ namespace MQTTnet.Extensions.ManagedClient private Task _maintainConnectionTask; private ManagedMqttClientStorageManager _storageManager; - - public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) + + public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger) { _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); diff --git a/Source/MQTTnet.Extensions.ManagedClient/MqttFactoryExtensions.cs b/Source/MQTTnet.Extensions.ManagedClient/MqttFactoryExtensions.cs index 37393ac..b0fd882 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/MqttFactoryExtensions.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/MqttFactoryExtensions.cs @@ -1,5 +1,5 @@ -using System; -using MQTTnet.Diagnostics; +using MQTTnet.Diagnostics; +using System; namespace MQTTnet.Extensions.ManagedClient { @@ -9,7 +9,7 @@ namespace MQTTnet.Extensions.ManagedClient { if (factory == null) throw new ArgumentNullException(nameof(factory)); - return new ManagedMqttClient(factory.CreateMqttClient(), factory.DefaultLogger.CreateChildLogger()); + return new ManagedMqttClient(factory.CreateMqttClient(), factory.DefaultLogger); } public static IManagedMqttClient CreateManagedMqttClient(this IMqttFactory factory, IMqttNetLogger logger) @@ -17,7 +17,7 @@ namespace MQTTnet.Extensions.ManagedClient if (factory == null) throw new ArgumentNullException(nameof(factory)); if (logger == null) throw new ArgumentNullException(nameof(logger)); - return new ManagedMqttClient(factory.CreateMqttClient(logger), logger.CreateChildLogger()); + return new ManagedMqttClient(factory.CreateMqttClient(logger), logger); } } } \ No newline at end of file diff --git a/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs b/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs index 7625fc9..1c3a9c6 100644 --- a/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs +++ b/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs @@ -1,15 +1,15 @@ -using System; -using MQTTnet.Adapter; +using MQTTnet.Adapter; using MQTTnet.Client.Options; using MQTTnet.Diagnostics; using MQTTnet.Formatter; using MQTTnet.Implementations; +using System; namespace MQTTnet.Extensions.WebSocket4Net { public class WebSocket4NetMqttClientAdapterFactory : IMqttClientAdapterFactory { - public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) { if (options == null) throw new ArgumentNullException(nameof(options)); if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -17,19 +17,19 @@ namespace MQTTnet.Extensions.WebSocket4Net switch (options.ChannelOptions) { case MqttClientTcpOptions _: - { - return new MqttChannelAdapter(new MqttTcpChannel(options), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger); - } + { + return new MqttChannelAdapter(new MqttTcpChannel(options), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger); + } case MqttClientWebSocketOptions webSocketOptions: - { - return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options, webSocketOptions), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger); - } + { + return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options, webSocketOptions), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger); + } default: - { - throw new NotSupportedException(); - } + { + throw new NotSupportedException(); + } } } } diff --git a/Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs b/Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs index 7f35117..656239f 100644 --- a/Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs +++ b/Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs @@ -1,43 +1,43 @@ -using System; -using MQTTnet.Diagnostics; - -namespace MQTTnet.Server.Logging -{ - public class MqttNetChildLoggerWrapper : IMqttNetChildLogger - { - private readonly MqttNetLoggerWrapper _logger; - private readonly string _source; - - public MqttNetChildLoggerWrapper(string source, MqttNetLoggerWrapper logger) - { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - - _source = source; - } - - public IMqttNetChildLogger CreateChildLogger(string source = null) - { - return _logger.CreateChildLogger(source); - } - - public void Verbose(string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Verbose, _source, message, parameters, null); - } - - public void Info(string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Info, _source, message, parameters, null); - } - - public void Warning(Exception exception, string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Warning, _source, message, parameters, exception); - } - - public void Error(Exception exception, string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Error, _source, message, parameters, exception); - } - } -} +//using MQTTnet.Diagnostics; +//using System; + +//namespace MQTTnet.Server.Logging +//{ +// public class MqttNetChildLoggerWrapper : IMqttNetChildLogger +// { +// private readonly MqttNetLoggerWrapper _logger; +// private readonly string _source; + +// public MqttNetChildLoggerWrapper(string source, MqttNetLoggerWrapper logger) +// { +// _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + +// _source = source; +// } + +// public IMqttNetLogger CreateChildLogger(string source = null) +// { +// return _logger.CreateChildLogger(source); +// } + +// public void Verbose(string message, params object[] parameters) +// { +// _logger.Publish(MqttNetLogLevel.Verbose, _source, message, parameters, null); +// } + +// public void Info(string message, params object[] parameters) +// { +// _logger.Publish(MqttNetLogLevel.Info, _source, message, parameters, null); +// } + +// public void Warning(Exception exception, string message, params object[] parameters) +// { +// _logger.Publish(MqttNetLogLevel.Warning, _source, message, parameters, exception); +// } + +// public void Error(Exception exception, string message, params object[] parameters) +// { +// _logger.Publish(MqttNetLogLevel.Error, _source, message, parameters, exception); +// } +// } +//} diff --git a/Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs b/Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs index 3f52109..9c71e28 100644 --- a/Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs +++ b/Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs @@ -1,13 +1,13 @@ -using System; -using System.Threading; -using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging; using MQTTnet.Diagnostics; +using System; +using System.Threading; namespace MQTTnet.Server.Logging { public class MqttNetLoggerWrapper : IMqttNetLogger { - private readonly ILogger _logger; + readonly ILogger _logger; public MqttNetLoggerWrapper(ILogger logger) { @@ -16,25 +16,39 @@ namespace MQTTnet.Server.Logging public event EventHandler LogMessagePublished; - public IMqttNetChildLogger CreateChildLogger(string source = null) + public IMqttNetLogger CreateChildLogger(string source) { - return new MqttNetChildLoggerWrapper(source, this); + return new MqttNetLogger(source); } - public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception) + public void Publish(MqttNetLogLevel level, string source, string message, object[] parameters, Exception exception) { - var convertedLogLevel = ConvertLogLevel(logLevel); + var convertedLogLevel = ConvertLogLevel(level); _logger.Log(convertedLogLevel, exception, message, parameters); var logMessagePublishedEvent = LogMessagePublished; if (logMessagePublishedEvent != null) { - var logMessage = new MqttNetLogMessage(null, DateTime.UtcNow, Thread.CurrentThread.ManagedThreadId, source, logLevel, message, exception); + var logMessage = new MqttNetLogMessage + { + Timestamp = DateTime.UtcNow, + ThreadId = Thread.CurrentThread.ManagedThreadId, + Source = source, + Level = level, + Message = message, + Exception = exception + }; + logMessagePublishedEvent.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage)); } } - - private static LogLevel ConvertLogLevel(MqttNetLogLevel logLevel) + + public void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception) + { + Publish(logLevel, null, message, parameters, exception); + } + + static LogLevel ConvertLogLevel(MqttNetLogLevel logLevel) { switch (logLevel) { diff --git a/Source/MQTTnet.Server/MQTTnet.Server.csproj b/Source/MQTTnet.Server/MQTTnet.Server.csproj index 3f35d97..885ec2f 100644 --- a/Source/MQTTnet.Server/MQTTnet.Server.csproj +++ b/Source/MQTTnet.Server/MQTTnet.Server.csproj @@ -45,8 +45,10 @@ - - + + + + diff --git a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs index 85c4176..9bcb42c 100644 --- a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs +++ b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs @@ -1,11 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net.WebSockets; -using System.Security.Authentication; -using System.Text; -using System.Threading.Tasks; -using IronPython.Runtime; +using IronPython.Runtime; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using MQTTnet.Adapter; @@ -16,6 +9,13 @@ using MQTTnet.Protocol; using MQTTnet.Server.Configuration; using MQTTnet.Server.Scripting; using MQTTnet.Server.Status; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.WebSockets; +using System.Security.Authentication; +using System.Text; +using System.Threading.Tasks; namespace MQTTnet.Server.Mqtt { @@ -65,11 +65,11 @@ namespace MQTTnet.Server.Mqtt _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _webSocketServerAdapter = new MqttWebSocketServerAdapter(mqttFactory.Logger.CreateChildLogger()); + _webSocketServerAdapter = new MqttWebSocketServerAdapter(mqttFactory.Logger); var adapters = new List { - new MqttTcpServerAdapter(mqttFactory.Logger.CreateChildLogger()) + new MqttTcpServerAdapter(mqttFactory.Logger) { TreatSocketOpeningErrorAsWarning = true // Opening other ports than for HTTP is not allows in Azure App Services. }, @@ -215,7 +215,7 @@ namespace MQTTnet.Server.Mqtt options .WithEncryptedEndpoint() .WithEncryptionSslProtocol(SslProtocols.Tls12); - + if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Path)) { IMqttServerCertificateCredentials certificateCredentials = null; @@ -230,7 +230,7 @@ namespace MQTTnet.Server.Mqtt options.WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.Certificate.ReadCertificate(), certificateCredentials); } - + if (_settings.EncryptedTcpEndPoint.TryReadIPv4(out var address4)) { options.WithEncryptedEndpointBoundIPAddress(address4); diff --git a/Source/MQTTnet/Adapter/IMqttClientAdapterFactory.cs b/Source/MQTTnet/Adapter/IMqttClientAdapterFactory.cs index cb0988d..c9aba6f 100644 --- a/Source/MQTTnet/Adapter/IMqttClientAdapterFactory.cs +++ b/Source/MQTTnet/Adapter/IMqttClientAdapterFactory.cs @@ -5,6 +5,6 @@ namespace MQTTnet.Adapter { public interface IMqttClientAdapterFactory { - IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger); + IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger); } } diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index ccf5a33..e8bed00 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -14,12 +14,12 @@ using System.Threading.Tasks; namespace MQTTnet.Adapter { - public class MqttChannelAdapter : Disposable, IMqttChannelAdapter + public sealed class MqttChannelAdapter : Disposable, IMqttChannelAdapter { const uint ErrorOperationAborted = 0x800703E3; const int ReadBufferSize = 4096; // TODO: Move buffer size to config - readonly IMqttNetChildLogger _logger; + readonly IMqttNetLogger _logger; readonly IMqttChannel _channel; readonly MqttPacketReader _packetReader; @@ -30,7 +30,7 @@ namespace MQTTnet.Adapter long _bytesReceived; long _bytesSent; - public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packetFormatterAdapter, IMqttNetChildLogger logger) + public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packetFormatterAdapter, IMqttNetLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 29687fc..2546fc1 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -1,7 +1,3 @@ -using System; -using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; using MQTTnet.Adapter; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; @@ -17,6 +13,10 @@ using MQTTnet.Internal; using MQTTnet.PacketDispatcher; using MQTTnet.Packets; using MQTTnet.Protocol; +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Client { @@ -29,7 +29,7 @@ namespace MQTTnet.Client private readonly object _disconnectLock = new object(); private readonly IMqttClientAdapterFactory _adapterFactory; - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; private CancellationTokenSource _backgroundCancellationTokenSource; private Task _packetReceiverTask; diff --git a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs index 4fd0ccf..b301819 100644 --- a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs +++ b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs @@ -1,19 +1,19 @@ -using System; +using MQTTnet.Client.ExtendedAuthenticationExchange; +using MQTTnet.Formatter; +using System; using System.Linq; using System.Text; -using MQTTnet.Client.ExtendedAuthenticationExchange; -using MQTTnet.Formatter; namespace MQTTnet.Client.Options { public class MqttClientOptionsBuilder { - private readonly MqttClientOptions _options = new MqttClientOptions(); + readonly MqttClientOptions _options = new MqttClientOptions(); - private MqttClientTcpOptions _tcpOptions; - private MqttClientWebSocketOptions _webSocketOptions; - private MqttClientOptionsBuilderTlsParameters _tlsParameters; - private MqttClientWebSocketProxyOptions _proxyOptions; + MqttClientTcpOptions _tcpOptions; + MqttClientWebSocketOptions _webSocketOptions; + MqttClientOptionsBuilderTlsParameters _tlsParameters; + MqttClientWebSocketProxyOptions _proxyOptions; public MqttClientOptionsBuilder WithProtocolVersion(MqttProtocolVersion value) { diff --git a/Source/MQTTnet/Diagnostics/IMqttNetChildLogger.cs b/Source/MQTTnet/Diagnostics/IMqttNetChildLogger.cs deleted file mode 100644 index df99f44..0000000 --- a/Source/MQTTnet/Diagnostics/IMqttNetChildLogger.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; - -namespace MQTTnet.Diagnostics -{ - public interface IMqttNetChildLogger - { - IMqttNetChildLogger CreateChildLogger(string source = null); - - void Verbose(string message, params object[] parameters); - - void Info(string message, params object[] parameters); - - void Warning(Exception exception, string message, params object[] parameters); - - void Error(Exception exception, string message, params object[] parameters); - } -} diff --git a/Source/MQTTnet/Diagnostics/IMqttNetLogger.cs b/Source/MQTTnet/Diagnostics/IMqttNetLogger.cs index 73b6fcb..b952cf5 100644 --- a/Source/MQTTnet/Diagnostics/IMqttNetLogger.cs +++ b/Source/MQTTnet/Diagnostics/IMqttNetLogger.cs @@ -6,8 +6,8 @@ namespace MQTTnet.Diagnostics { event EventHandler LogMessagePublished; - IMqttNetChildLogger CreateChildLogger(string source = null); + IMqttNetLogger CreateChildLogger(string source); - void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception); + void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception); } } diff --git a/Source/MQTTnet/Diagnostics/MqttNetChildLogger.cs b/Source/MQTTnet/Diagnostics/MqttNetChildLogger.cs deleted file mode 100644 index 3733454..0000000 --- a/Source/MQTTnet/Diagnostics/MqttNetChildLogger.cs +++ /dev/null @@ -1,51 +0,0 @@ -using System; - -namespace MQTTnet.Diagnostics -{ - public class MqttNetChildLogger : IMqttNetChildLogger - { - private readonly IMqttNetLogger _logger; - private readonly string _source; - - public MqttNetChildLogger(IMqttNetLogger logger, string source) - { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _source = source; - } - - public IMqttNetChildLogger CreateChildLogger(string source) - { - string childSource; - if (!string.IsNullOrEmpty(_source)) - { - childSource = _source + "." + source; - } - else - { - childSource = source; - } - - return new MqttNetChildLogger(_logger, childSource); - } - - public void Verbose(string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Verbose, _source, message, parameters, null); - } - - public void Info(string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Info, _source, message, parameters, null); - } - - public void Warning(Exception exception, string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Warning, _source, message, parameters, exception); - } - - public void Error(Exception exception, string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Error, _source, message, parameters, exception); - } - } -} diff --git a/Source/MQTTnet/Diagnostics/MqttNetLogMessage.cs b/Source/MQTTnet/Diagnostics/MqttNetLogMessage.cs index 107d8f3..1ab1b4c 100644 --- a/Source/MQTTnet/Diagnostics/MqttNetLogMessage.cs +++ b/Source/MQTTnet/Diagnostics/MqttNetLogMessage.cs @@ -4,30 +4,19 @@ namespace MQTTnet.Diagnostics { public class MqttNetLogMessage { - public MqttNetLogMessage(string logId, DateTime timestamp, int threadId, string source, MqttNetLogLevel level, string message, Exception exception) - { - LogId = logId; - Timestamp = timestamp; - ThreadId = threadId; - Source = source; - Level = level; - Message = message; - Exception = exception; - } - - public string LogId { get; } + public string LogId { get; set; } - public DateTime Timestamp { get; } + public DateTime Timestamp { get; set; } - public int ThreadId { get; } + public int ThreadId { get; set; } - public string Source { get; } + public string Source { get; set; } - public MqttNetLogLevel Level { get; } + public MqttNetLogLevel Level { get; set; } - public string Message { get; } + public string Message { get; set; } - public Exception Exception { get; } + public Exception Exception { get; set; } public override string ToString() { diff --git a/Source/MQTTnet/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs b/Source/MQTTnet/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs index 8a0c0fc..e64a40d 100644 --- a/Source/MQTTnet/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs +++ b/Source/MQTTnet/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs @@ -6,9 +6,14 @@ namespace MQTTnet.Diagnostics { public MqttNetLogMessagePublishedEventArgs(MqttNetLogMessage logMessage) { + LogMessage = logMessage ?? throw new ArgumentNullException(nameof(logMessage)); + TraceMessage = logMessage ?? throw new ArgumentNullException(nameof(logMessage)); } + [Obsolete("Use new proeprty LogMessage instead.")] public MqttNetLogMessage TraceMessage { get; } + + public MqttNetLogMessage LogMessage { get; } } } diff --git a/Source/MQTTnet/Diagnostics/MqttNetLogger.cs b/Source/MQTTnet/Diagnostics/MqttNetLogger.cs index 56a664e..bfd4c42 100644 --- a/Source/MQTTnet/Diagnostics/MqttNetLogger.cs +++ b/Source/MQTTnet/Diagnostics/MqttNetLogger.cs @@ -4,26 +4,51 @@ namespace MQTTnet.Diagnostics { public class MqttNetLogger : IMqttNetLogger { - private readonly string _logId; + readonly string _logId; + readonly string _source; - public MqttNetLogger(string logId = null) + readonly MqttNetLogger _parentLogger; + + public MqttNetLogger(string source, string logId) + { + _source = source; + _logId = logId; + } + + public MqttNetLogger() + { + } + + public MqttNetLogger(string logId) + { + _logId = logId; + } + + MqttNetLogger(MqttNetLogger parentLogger, string logId, string source) { + _parentLogger = parentLogger ?? throw new ArgumentNullException(nameof(parentLogger)); + _source = source ?? throw new ArgumentNullException(nameof(source)); + _logId = logId; } public event EventHandler LogMessagePublished; - public IMqttNetChildLogger CreateChildLogger(string source = null) + // TODO: Consider creating a LoggerFactory which will allow creating loggers. The logger factory will + // be the only place which has the published event. + public IMqttNetLogger CreateChildLogger(string source) { - return new MqttNetChildLogger(this, source); + if (source is null) throw new ArgumentNullException(nameof(source)); + + return new MqttNetLogger(this, _logId, source); } - public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception) + public void Publish(MqttNetLogLevel level, string message, object[] parameters, Exception exception) { var hasLocalListeners = LogMessagePublished != null; var hasGlobalListeners = MqttNetGlobalLogger.HasListeners; - if (!hasLocalListeners && !hasGlobalListeners) + if (!hasLocalListeners && !hasGlobalListeners && _parentLogger == null) { return; } @@ -40,17 +65,35 @@ namespace MQTTnet.Diagnostics } } - var traceMessage = new MqttNetLogMessage(_logId, DateTime.UtcNow, Environment.CurrentManagedThreadId, source, logLevel, message, exception); + var logMessage = new MqttNetLogMessage + { + LogId = _logId, + Timestamp = DateTime.UtcNow, + Source = _source, + ThreadId = Environment.CurrentManagedThreadId, + Level = level, + Message = message, + Exception = exception + }; if (hasGlobalListeners) { - MqttNetGlobalLogger.Publish(traceMessage); + MqttNetGlobalLogger.Publish(logMessage); } if (hasLocalListeners) { - LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(traceMessage)); + LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage)); } + + _parentLogger?.Publish(logMessage); + } + + void Publish(MqttNetLogMessage logMessage) + { + LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage)); + + _parentLogger?.Publish(logMessage); } } } \ No newline at end of file diff --git a/Source/MQTTnet/Diagnostics/MqttNetLoggerExtensions.cs b/Source/MQTTnet/Diagnostics/MqttNetLoggerExtensions.cs new file mode 100644 index 0000000..bc8a97b --- /dev/null +++ b/Source/MQTTnet/Diagnostics/MqttNetLoggerExtensions.cs @@ -0,0 +1,35 @@ +using System; + +namespace MQTTnet.Diagnostics +{ + public static class MqttNetLoggerExtensions + { + public static void Verbose(this IMqttNetLogger logger, string message, params object[] parameters) + { + if (logger is null) throw new ArgumentNullException(nameof(logger)); + + logger.Publish(MqttNetLogLevel.Verbose, message, parameters, null); + } + + public static void Info(this IMqttNetLogger logger, string message, params object[] parameters) + { + if (logger is null) throw new ArgumentNullException(nameof(logger)); + + logger.Publish(MqttNetLogLevel.Info, message, parameters, null); + } + + public static void Warning(this IMqttNetLogger logger, Exception exception, string message, params object[] parameters) + { + if (logger is null) throw new ArgumentNullException(nameof(logger)); + + logger.Publish(MqttNetLogLevel.Warning, message, parameters, exception); + } + + public static void Error(this IMqttNetLogger logger, Exception exception, string message, params object[] parameters) + { + if (logger is null) throw new ArgumentNullException(nameof(logger)); + + logger.Publish(MqttNetLogLevel.Error, message, parameters, exception); + } + } +} diff --git a/Source/MQTTnet/Diagnostics/TargetFrameworkInfoProvider.cs b/Source/MQTTnet/Diagnostics/TargetFrameworkProvider.cs similarity index 89% rename from Source/MQTTnet/Diagnostics/TargetFrameworkInfoProvider.cs rename to Source/MQTTnet/Diagnostics/TargetFrameworkProvider.cs index efbf08b..7135af6 100644 --- a/Source/MQTTnet/Diagnostics/TargetFrameworkInfoProvider.cs +++ b/Source/MQTTnet/Diagnostics/TargetFrameworkProvider.cs @@ -1,6 +1,6 @@ namespace MQTTnet.Diagnostics { - public static class TargetFrameworkInfoProvider + public static class TargetFrameworkProvider { public static string TargetFramework { diff --git a/Source/MQTTnet/Extensions/MqttClientOptionsBuilderExtension.cs b/Source/MQTTnet/Extensions/MqttClientOptionsBuilderExtension.cs new file mode 100644 index 0000000..fc9da23 --- /dev/null +++ b/Source/MQTTnet/Extensions/MqttClientOptionsBuilderExtension.cs @@ -0,0 +1,48 @@ +using System; +using System.Linq; +using MQTTnet.Client.Options; + +namespace MQTTnet.Extensions +{ + public static class MqttClientOptionsBuilderExtension + { + public static MqttClientOptionsBuilder WithConnectionUri(this MqttClientOptionsBuilder builder, Uri uri) + { + var port = uri.IsDefaultPort ? null : (int?) uri.Port; + switch (uri.Scheme.ToLower()) + { + case "tcp": + case "mqtt": + builder.WithTcpServer(uri.Host, port); + break; + + case "mqtts": + builder.WithTcpServer(uri.Host, port).WithTls(); + break; + + case "ws": + case "wss": + builder.WithWebSocketServer(uri.ToString()); + break; + + default: + throw new ArgumentException("Unexpected scheme in uri."); + } + + if (!string.IsNullOrEmpty(uri.UserInfo)) + { + var userInfo = uri.UserInfo.Split(':'); + var username = userInfo[0]; + var password = userInfo.Length > 1 ? userInfo[1] : ""; + builder.WithCredentials(username, password); + } + + return builder; + } + + public static MqttClientOptionsBuilder WithConnectionUri(this MqttClientOptionsBuilder builder, string uri) + { + return WithConnectionUri(builder, new Uri(uri, UriKind.Absolute)); + } + } +} diff --git a/Source/MQTTnet/Extensions/UserPropertyExtension.cs b/Source/MQTTnet/Extensions/UserPropertyExtension.cs new file mode 100644 index 0000000..f861c32 --- /dev/null +++ b/Source/MQTTnet/Extensions/UserPropertyExtension.cs @@ -0,0 +1,16 @@ +using System; +using System.Linq; + +namespace MQTTnet.Extensions +{ + public static class UserPropertyExtension + { + public static string GetUserProperty(this MqttApplicationMessage message, string propertyName, StringComparison comparisonType = StringComparison.Ordinal) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + if (propertyName == null) throw new ArgumentNullException(nameof(propertyName)); + + return message.UserProperties?.SingleOrDefault(up => up.Name.Equals(propertyName, comparisonType))?.Value; + } + } +} diff --git a/Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs b/Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs index 48d42cd..4bf4944 100644 --- a/Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs +++ b/Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs @@ -20,11 +20,6 @@ namespace MQTTnet.Formatter.V3 { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); - if (applicationMessage.UserProperties?.Any() == true) - { - throw new MqttProtocolViolationException("User properties are not supported in MQTT version 3."); - } - return new MqttPublishPacket { Topic = applicationMessage.Topic, @@ -171,11 +166,6 @@ namespace MQTTnet.Formatter.V3 { if (options == null) throw new ArgumentNullException(nameof(options)); - if (options.UserProperties?.Any() == true) - { - throw new MqttProtocolViolationException("User properties are not supported in MQTT version 3."); - } - var subscribePacket = new MqttSubscribePacket(); subscribePacket.TopicFilters.AddRange(options.TopicFilters); @@ -186,11 +176,6 @@ namespace MQTTnet.Formatter.V3 { if (options == null) throw new ArgumentNullException(nameof(options)); - if (options.UserProperties?.Any() == true) - { - throw new MqttProtocolViolationException("User properties are not supported in MQTT version 3."); - } - var unsubscribePacket = new MqttUnsubscribePacket(); unsubscribePacket.TopicFilters.AddRange(options.TopicFilters); diff --git a/Source/MQTTnet/Implementations/CrossPlatformSocket.cs b/Source/MQTTnet/Implementations/CrossPlatformSocket.cs new file mode 100644 index 0000000..4d7f6c9 --- /dev/null +++ b/Source/MQTTnet/Implementations/CrossPlatformSocket.cs @@ -0,0 +1,229 @@ +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Implementations +{ + public sealed class CrossPlatformSocket : IDisposable + { + readonly Socket _socket; + + public CrossPlatformSocket(AddressFamily addressFamily) + { + _socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp); + } + + public CrossPlatformSocket() + { + // Having this contructor is important because avoiding the address family as parameter + // will make use of dual mode in the .net framework. + _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + } + + public CrossPlatformSocket(Socket socket) + { + _socket = socket ?? throw new ArgumentNullException(nameof(socket)); + } + + public bool NoDelay + { + get + { + return (int)_socket.GetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay) > 0; + } + + set + { + _socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, value ? 1 : 0); + } + } + + public bool DualMode + { + get + { + return _socket.DualMode; + } + + set + { + _socket.DualMode = value; + } + } + + public int ReceiveBufferSize + { + get + { + return _socket.ReceiveBufferSize; + } + + set + { + _socket.ReceiveBufferSize = value; + } + } + + public int SendBufferSize + { + get + { + return _socket.SendBufferSize; + } + + set + { + _socket.SendBufferSize = value; + } + } + + public EndPoint RemoteEndPoint + { + get + { + return _socket.RemoteEndPoint; + } + } + + public bool ReuseAddress + { + get + { + return (int)_socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress) != 0; + } + + set + { + _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, value ? 1 : 0); + } + } + + public async Task AcceptAsync() + { + try + { +#if NET452 || NET461 + var clientSocket = await Task.Factory.FromAsync(_socket.BeginAccept, _socket.EndAccept, null).ConfigureAwait(false); + return new CrossPlatformSocket(clientSocket); +#else + var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false); + return new CrossPlatformSocket(clientSocket); +#endif + } + catch (ObjectDisposedException) + { + // This will happen when _socket.EndAccept gets called by Task library but the socket is already disposed. + return null; + } + } + + public void Bind(EndPoint localEndPoint) + { + if (localEndPoint is null) throw new ArgumentNullException(nameof(localEndPoint)); + + _socket.Bind(localEndPoint); + } + + public void Listen(int connectionBacklog) + { + _socket.Listen(connectionBacklog); + } + + public async Task ConnectAsync(string host, int port, CancellationToken cancellationToken) + { + if (host is null) throw new ArgumentNullException(nameof(host)); + + try + { + // Workaround for: workaround for https://github.com/dotnet/corefx/issues/24430 + using (cancellationToken.Register(() => _socket.Dispose())) + { + cancellationToken.ThrowIfCancellationRequested(); + +#if NET452 || NET461 + await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, host, port, null).ConfigureAwait(false); +#else + await _socket.ConnectAsync(host, port).ConfigureAwait(false); +#endif + } + } + catch (ObjectDisposedException) + { + // This will happen when _socket.EndConnect gets called by Task library but the socket is already disposed. + } + } + + public async Task SendAsync(ArraySegment buffer, SocketFlags socketFlags) + { + try + { +#if NET452 || NET461 + await Task.Factory.FromAsync(SocketWrapper.BeginSend, _socket.EndSend, new SocketWrapper(_socket, buffer, socketFlags)).ConfigureAwait(false); +#else + await _socket.SendAsync(buffer, socketFlags).ConfigureAwait(false); +#endif + } + catch (ObjectDisposedException) + { + // This will happen when _socket.EndConnect gets called by Task library but the socket is already disposed. + } + } + + public async Task ReceiveAsync(ArraySegment buffer, SocketFlags socketFlags) + { + try + { +#if NET452 || NET461 + return await Task.Factory.FromAsync(SocketWrapper.BeginReceive, _socket.EndReceive, new SocketWrapper(_socket, buffer, socketFlags)).ConfigureAwait(false); +#else + return await _socket.ReceiveAsync(buffer, socketFlags).ConfigureAwait(false); +#endif + } + catch (ObjectDisposedException) + { + // This will happen when _socket.EndReceive gets called by Task library but the socket is already disposed. + return -1; + } + } + + public NetworkStream GetStream() + { + return new NetworkStream(_socket, true); + } + + public void Dispose() + { + _socket?.Dispose(); + } + +#if NET452 || NET461 + class SocketWrapper + { + readonly Socket _socket; + readonly ArraySegment _buffer; + readonly SocketFlags _socketFlags; + + public SocketWrapper(Socket socket, ArraySegment buffer, SocketFlags socketFlags) + { + _socket = socket; + _buffer = buffer; + _socketFlags = socketFlags; + } + + public static IAsyncResult BeginSend(AsyncCallback callback, object state) + { + var socketWrapper = (SocketWrapper)state; + return socketWrapper._socket.BeginSend(socketWrapper._buffer.Array, socketWrapper._buffer.Offset, socketWrapper._buffer.Count, socketWrapper._socketFlags, callback, state); + } + + public static IAsyncResult BeginReceive(AsyncCallback callback, object state) + { + var socketWrapper = (SocketWrapper)state; + return socketWrapper._socket.BeginReceive(socketWrapper._buffer.Array, socketWrapper._buffer.Offset, socketWrapper._buffer.Count, socketWrapper._socketFlags, callback, state); + } + } +#endif + } +} diff --git a/Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs b/Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs index 2377cde..148fe04 100644 --- a/Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs +++ b/Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs @@ -1,17 +1,17 @@ -using System; -using MQTTnet.Adapter; +using MQTTnet.Adapter; using MQTTnet.Client.Options; using MQTTnet.Diagnostics; using MQTTnet.Formatter; +using System; namespace MQTTnet.Implementations { public class MqttClientAdapterFactory : IMqttClientAdapterFactory { - public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) { if (options == null) throw new ArgumentNullException(nameof(options)); - + switch (options.ChannelOptions) { case MqttClientTcpOptions _: diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs index 71050e1..021a7d4 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs @@ -46,15 +46,15 @@ namespace MQTTnet.Implementations public async Task ConnectAsync(CancellationToken cancellationToken) { - Socket socket; + CrossPlatformSocket socket; if (_options.AddressFamily == AddressFamily.Unspecified) { - socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + socket = new CrossPlatformSocket(); } else { - socket = new Socket(_options.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + socket = new CrossPlatformSocket(_options.AddressFamily); } socket.ReceiveBufferSize = _options.BufferSize; @@ -69,20 +69,24 @@ namespace MQTTnet.Implementations socket.DualMode = _options.DualMode.Value; } - // Workaround for: workaround for https://github.com/dotnet/corefx/issues/24430 - using (cancellationToken.Register(() => socket.Dispose())) - { - await PlatformAbstractionLayer.ConnectAsync(socket, _options.Server, _options.GetPort()).ConfigureAwait(false); - } + await socket.ConnectAsync(_options.Server, _options.GetPort(), cancellationToken).ConfigureAwait(false); - var networkStream = new NetworkStream(socket, true); + var networkStream = socket.GetStream(); if (_options.TlsOptions.UseTls) { var sslStream = new SslStream(networkStream, false, InternalUserCertificateValidationCallback); - _stream = sslStream; + try + { + await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, !_options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false); + } + catch + { + sslStream.Dispose(); + throw; + } - await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, !_options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false); + _stream = sslStream; } else { @@ -107,17 +111,14 @@ namespace MQTTnet.Implementations // Workaround for: https://github.com/dotnet/corefx/issues/24430 using (cancellationToken.Register(Dispose)) { - if (cancellationToken.IsCancellationRequested) - { - return 0; - } + cancellationToken.ThrowIfCancellationRequested(); return await _stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); } } catch (ObjectDisposedException) { - return 0; + return -1; } catch (IOException exception) { @@ -139,10 +140,7 @@ namespace MQTTnet.Implementations // Workaround for: https://github.com/dotnet/corefx/issues/24430 using (cancellationToken.Register(Dispose)) { - if (cancellationToken.IsCancellationRequested) - { - return; - } + cancellationToken.ThrowIfCancellationRequested(); await _stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); } diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs index 3b24bd1..4d26da7 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs @@ -11,14 +11,14 @@ using System.Threading.Tasks; namespace MQTTnet.Implementations { - public class MqttTcpServerAdapter : IMqttServerAdapter + public sealed class MqttTcpServerAdapter : IMqttServerAdapter { - private readonly IMqttNetChildLogger _logger; + readonly IMqttNetLogger _logger; - private IMqttServerOptions _options; - private StreamSocketListener _listener; + IMqttServerOptions _options; + StreamSocketListener _listener; - public MqttTcpServerAdapter(IMqttNetChildLogger logger) + public MqttTcpServerAdapter(IMqttNetLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -68,7 +68,7 @@ namespace MQTTnet.Implementations _listener = null; } - private async void OnConnectionReceivedAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args) + async void OnConnectionReceivedAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args) { try { diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs index 501c4da..22a5824 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs @@ -1,4 +1,8 @@ #if !WINDOWS_UWP +using MQTTnet.Adapter; +using MQTTnet.Diagnostics; +using MQTTnet.Internal; +using MQTTnet.Server; using System; using System.Collections.Generic; using System.Net; @@ -6,21 +10,17 @@ using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; -using MQTTnet.Adapter; -using MQTTnet.Diagnostics; -using MQTTnet.Internal; -using MQTTnet.Server; namespace MQTTnet.Implementations { - public class MqttTcpServerAdapter : Disposable, IMqttServerAdapter + public sealed class MqttTcpServerAdapter : Disposable, IMqttServerAdapter { - private readonly List _listeners = new List(); - private readonly IMqttNetChildLogger _logger; + readonly List _listeners = new List(); + readonly IMqttNetLogger _logger; - private CancellationTokenSource _cancellationTokenSource; + CancellationTokenSource _cancellationTokenSource; - public MqttTcpServerAdapter(IMqttNetChildLogger logger) + public MqttTcpServerAdapter(IMqttNetLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -59,7 +59,7 @@ namespace MQTTnet.Implementations { tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate, options.TlsEndpointOptions.CertificateCredentials.Password); } - + if (!tlsCertificate.HasPrivateKey) { throw new InvalidOperationException("The certificate for TLS encryption must contain the private key."); @@ -77,7 +77,17 @@ namespace MQTTnet.Implementations return Task.FromResult(0); } - private void Cleanup() + protected override void Dispose(bool disposing) + { + if (disposing) + { + Cleanup(); + } + + base.Dispose(disposing); + } + + void Cleanup() { _cancellationTokenSource?.Cancel(false); _cancellationTokenSource?.Dispose(); @@ -91,16 +101,7 @@ namespace MQTTnet.Implementations _listeners.Clear(); } - protected override void Dispose(bool disposing) - { - if (disposing) - { - Cleanup(); - } - base.Dispose(disposing); - } - - private void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, CancellationToken cancellationToken) + void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, CancellationToken cancellationToken) { if (!options.BoundInterNetworkAddress.Equals(IPAddress.None)) { diff --git a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs index f2f439e..83ef80f 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs @@ -1,4 +1,9 @@ #if !WINDOWS_UWP +using MQTTnet.Adapter; +using MQTTnet.Diagnostics; +using MQTTnet.Formatter; +using MQTTnet.Internal; +using MQTTnet.Server; using System; using System.IO; using System.Net; @@ -7,30 +12,25 @@ using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; -using MQTTnet.Adapter; -using MQTTnet.Diagnostics; -using MQTTnet.Formatter; -using MQTTnet.Internal; -using MQTTnet.Server; namespace MQTTnet.Implementations { - public class MqttTcpServerListener : IDisposable + public sealed class MqttTcpServerListener : IDisposable { - private readonly IMqttNetChildLogger _logger; - private readonly AddressFamily _addressFamily; - private readonly MqttServerTcpEndpointBaseOptions _options; - private readonly MqttServerTlsTcpEndpointOptions _tlsOptions; - private readonly X509Certificate2 _tlsCertificate; + readonly IMqttNetLogger _logger; + readonly AddressFamily _addressFamily; + readonly MqttServerTcpEndpointBaseOptions _options; + readonly MqttServerTlsTcpEndpointOptions _tlsOptions; + readonly X509Certificate2 _tlsCertificate; - private Socket _socket; + private CrossPlatformSocket _socket; private IPEndPoint _localEndPoint; public MqttTcpServerListener( AddressFamily addressFamily, MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, - IMqttNetChildLogger logger) + IMqttNetLogger logger) { _addressFamily = addressFamily; _options = options; @@ -59,20 +59,20 @@ namespace MQTTnet.Implementations _logger.Info($"Starting TCP listener for {_localEndPoint} TLS={_tlsCertificate != null}."); - _socket = new Socket(_addressFamily, SocketType.Stream, ProtocolType.Tcp); + _socket = new CrossPlatformSocket(_addressFamily); // Usage of socket options is described here: https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.setsocketoption?view=netcore-2.2 if (_options.ReuseAddress) { - _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + _socket.ReuseAddress = true; } - + if (_options.NoDelay) { - _socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true); + _socket.NoDelay = true; } - + _socket.Bind(_localEndPoint); _socket.Listen(_options.ConnectionBacklog); @@ -87,7 +87,7 @@ namespace MQTTnet.Implementations throw; } - _logger.Warning(exception,"Error while creating listener socket for local end point '{0}'.", _localEndPoint); + _logger.Warning(exception, "Error while creating listener socket for local end point '{0}'.", _localEndPoint); return false; } } @@ -101,13 +101,13 @@ namespace MQTTnet.Implementations #endif } - private async Task AcceptClientConnectionsAsync(CancellationToken cancellationToken) + async Task AcceptClientConnectionsAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { try { - var clientSocket = await PlatformAbstractionLayer.AcceptAsync(_socket).ConfigureAwait(false); + var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false); if (clientSocket == null) { continue; @@ -116,7 +116,7 @@ namespace MQTTnet.Implementations Task.Run(() => TryHandleClientConnectionAsync(clientSocket), cancellationToken).Forget(_logger); } catch (OperationCanceledException) - { + { } catch (Exception exception) { @@ -128,14 +128,14 @@ namespace MQTTnet.Implementations continue; } } - + _logger.Error(exception, $"Error while accepting connection at TCP listener {_localEndPoint} TLS={_tlsCertificate != null}."); await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); } } } - private async Task TryHandleClientConnectionAsync(Socket clientSocket) + async Task TryHandleClientConnectionAsync(CrossPlatformSocket clientSocket) { Stream stream = null; string remoteEndPoint = null; @@ -151,7 +151,7 @@ namespace MQTTnet.Implementations clientSocket.NoDelay = _options.NoDelay; - stream = new NetworkStream(clientSocket, true); + stream = clientSocket.GetStream(); X509Certificate2 clientCertificate = null; @@ -160,9 +160,9 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(stream, false, _tlsOptions.RemoteCertificateValidationCallback); await sslStream.AuthenticateAsServerAsync( - _tlsCertificate, - _tlsOptions.ClientCertificateRequired, - _tlsOptions.SslProtocol, + _tlsCertificate, + _tlsOptions.ClientCertificateRequired, + _tlsOptions.SslProtocol, _tlsOptions.CheckCertificateRevocation).ConfigureAwait(false); stream = sslStream; diff --git a/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs b/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs index 80c0890..0b683dc 100644 --- a/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs +++ b/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs @@ -1,94 +1,9 @@ -using System; -using System.Net; -using System.Net.Sockets; -using System.Threading.Tasks; +using System.Threading.Tasks; namespace MQTTnet.Implementations { public static class PlatformAbstractionLayer { - // TODO: Consider creating primitives like "MqttNetSocket" which will wrap all required methods and do the platform stuff. - public static async Task AcceptAsync(Socket socket) - { -#if NET452 || NET461 - try - { - return await Task.Factory.FromAsync(socket.BeginAccept, socket.EndAccept, null).ConfigureAwait(false); - } - catch (ObjectDisposedException) - { - return null; - } -#else - return await socket.AcceptAsync().ConfigureAwait(false); -#endif - } - - - public static Task ConnectAsync(Socket socket, IPAddress ip, int port) - { -#if NET452 || NET461 - return Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, ip, port, null); -#else - return socket.ConnectAsync(ip, port); -#endif - } - - public static Task ConnectAsync(Socket socket, string host, int port) - { -#if NET452 || NET461 - return Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, host, port, null); -#else - return socket.ConnectAsync(host, port); -#endif - } - -#if NET452 || NET461 - public class SocketWrapper - { - private readonly Socket _socket; - private readonly ArraySegment _buffer; - private readonly SocketFlags _socketFlags; - - public SocketWrapper(Socket socket, ArraySegment buffer, SocketFlags socketFlags) - { - _socket = socket; - _buffer = buffer; - _socketFlags = socketFlags; - } - - public static IAsyncResult BeginSend(AsyncCallback callback, object state) - { - var real = (SocketWrapper)state; - return real._socket.BeginSend(real._buffer.Array, real._buffer.Offset, real._buffer.Count, real._socketFlags, callback, state); - } - - public static IAsyncResult BeginReceive(AsyncCallback callback, object state) - { - var real = (SocketWrapper)state; - return real._socket.BeginReceive(real._buffer.Array, real._buffer.Offset, real._buffer.Count, real._socketFlags, callback, state); - } - } -#endif - - public static Task SendAsync(Socket socket, ArraySegment buffer, SocketFlags socketFlags) - { -#if NET452 || NET461 - return Task.Factory.FromAsync(SocketWrapper.BeginSend, socket.EndSend, new SocketWrapper(socket, buffer, socketFlags)); -#else - return socket.SendAsync(buffer, socketFlags); -#endif - } - - public static Task ReceiveAsync(Socket socket, ArraySegment buffer, SocketFlags socketFlags) - { -#if NET452 || NET461 - return Task.Factory.FromAsync(SocketWrapper.BeginReceive, socket.EndReceive, new SocketWrapper(socket, buffer, socketFlags)); -#else - return socket.ReceiveAsync(buffer, socketFlags); -#endif - } - public static Task CompletedTask { get diff --git a/Source/MQTTnet/Internal/TaskExtensions.cs b/Source/MQTTnet/Internal/TaskExtensions.cs index 733631c..12a8c07 100644 --- a/Source/MQTTnet/Internal/TaskExtensions.cs +++ b/Source/MQTTnet/Internal/TaskExtensions.cs @@ -1,11 +1,11 @@ -using System.Threading.Tasks; -using MQTTnet.Diagnostics; +using MQTTnet.Diagnostics; +using System.Threading.Tasks; namespace MQTTnet.Internal { public static class TaskExtensions { - public static void Forget(this Task task, IMqttNetChildLogger logger) + public static void Forget(this Task task, IMqttNetLogger logger) { task?.ContinueWith(t => { diff --git a/Source/MQTTnet/LowLevelClient/ILowLevelMqttClient.cs b/Source/MQTTnet/LowLevelClient/ILowLevelMqttClient.cs new file mode 100644 index 0000000..1734bf4 --- /dev/null +++ b/Source/MQTTnet/LowLevelClient/ILowLevelMqttClient.cs @@ -0,0 +1,19 @@ +using MQTTnet.Client.Options; +using MQTTnet.Packets; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.LowLevelClient +{ + public interface ILowLevelMqttClient : IDisposable + { + Task ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken); + + Task DisconnectAsync(CancellationToken cancellationToken); + + Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken); + + Task ReceiveAsync(CancellationToken cancellationToken); + } +} diff --git a/Source/MQTTnet/LowLevelClient/LowLevelMqttClient.cs b/Source/MQTTnet/LowLevelClient/LowLevelMqttClient.cs new file mode 100644 index 0000000..045100d --- /dev/null +++ b/Source/MQTTnet/LowLevelClient/LowLevelMqttClient.cs @@ -0,0 +1,128 @@ +using MQTTnet.Adapter; +using MQTTnet.Client.Options; +using MQTTnet.Diagnostics; +using MQTTnet.Packets; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.LowLevelClient +{ + public sealed class LowLevelMqttClient : ILowLevelMqttClient + { + readonly IMqttNetLogger _logger; + readonly IMqttClientAdapterFactory _clientAdapterFactory; + + IMqttChannelAdapter _adapter; + IMqttClientOptions _options; + + public LowLevelMqttClient(IMqttClientAdapterFactory clientAdapterFactory, IMqttNetLogger logger) + { + if (clientAdapterFactory is null) throw new ArgumentNullException(nameof(clientAdapterFactory)); + if (logger is null) throw new ArgumentNullException(nameof(logger)); + + _clientAdapterFactory = clientAdapterFactory; + _logger = logger.CreateChildLogger(nameof(LowLevelMqttClient)); + } + + bool IsConnected => _adapter != null; + + public async Task ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken) + { + if (options is null) throw new ArgumentNullException(nameof(options)); + + if (_adapter != null) + { + throw new InvalidOperationException("Low level MQTT client is already connected. Disconnect first before connecting again."); + } + + var newAdapter = _clientAdapterFactory.CreateClientAdapter(options, _logger); + + try + { + _logger.Verbose($"Trying to connect with server '{options.ChannelOptions}' (Timeout={options.CommunicationTimeout})."); + await newAdapter.ConnectAsync(options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); + _logger.Verbose("Connection with server established."); + + _options = options; + } + catch (Exception) + { + _adapter.Dispose(); + throw; + } + + _adapter = newAdapter; + } + + public async Task DisconnectAsync(CancellationToken cancellationToken) + { + if (_adapter == null) + { + return; + } + + await SafeDisconnect(cancellationToken).ConfigureAwait(false); + _adapter = null; + } + + public async Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken) + { + if (packet is null) throw new ArgumentNullException(nameof(packet)); + + if (_adapter == null) + { + throw new InvalidOperationException("Low level MQTT client is not connected."); + } + + try + { + await _adapter.SendPacketAsync(packet, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); + } + catch (Exception) + { + await SafeDisconnect(cancellationToken).ConfigureAwait(false); + throw; + } + } + + public async Task ReceiveAsync(CancellationToken cancellationToken) + { + if (_adapter == null) + { + throw new InvalidOperationException("Low level MQTT client is not connected."); + } + + try + { + return await _adapter.ReceivePacketAsync(_options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); + } + catch (Exception) + { + await SafeDisconnect(cancellationToken).ConfigureAwait(false); + throw; + } + } + + public void Dispose() + { + _adapter?.Dispose(); + } + + async Task SafeDisconnect(CancellationToken cancellationToken) + { + try + { + await _adapter.DisconnectAsync(_options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); + } + catch (Exception exception) + { + _logger.Error(exception, "Error while disconnecting."); + } + finally + { + _adapter.Dispose(); + } + } + } +} diff --git a/Source/MQTTnet/MQTTnet.csproj b/Source/MQTTnet/MQTTnet.csproj index 28e3f3d..a08f843 100644 --- a/Source/MQTTnet/MQTTnet.csproj +++ b/Source/MQTTnet/MQTTnet.csproj @@ -64,7 +64,7 @@ - + \ No newline at end of file diff --git a/Source/MQTTnet/MqttFactory.cs b/Source/MQTTnet/MqttFactory.cs index cb835f6..ffc8f90 100644 --- a/Source/MQTTnet/MqttFactory.cs +++ b/Source/MQTTnet/MqttFactory.cs @@ -1,16 +1,17 @@ -using System; -using System.Collections.Generic; -using MQTTnet.Adapter; +using MQTTnet.Adapter; using MQTTnet.Client; using MQTTnet.Diagnostics; using MQTTnet.Implementations; +using MQTTnet.LowLevelClient; using MQTTnet.Server; +using System; +using System.Collections.Generic; namespace MQTTnet { - public class MqttFactory : IMqttFactory + public sealed class MqttFactory : IMqttFactory { - private IMqttClientAdapterFactory _clientAdapterFactory = new MqttClientAdapterFactory(); + IMqttClientAdapterFactory _clientAdapterFactory = new MqttClientAdapterFactory(); public MqttFactory() : this(new MqttNetLogger()) { @@ -29,6 +30,33 @@ namespace MQTTnet return this; } + public ILowLevelMqttClient CreateLowLevelMqttClient() + { + return CreateLowLevelMqttClient(DefaultLogger); + } + + public ILowLevelMqttClient CreateLowLevelMqttClient(IMqttNetLogger logger) + { + if (logger == null) throw new ArgumentNullException(nameof(logger)); + + return new LowLevelMqttClient(_clientAdapterFactory, logger); + } + + public ILowLevelMqttClient CreateLowLevelMqttClient(IMqttClientAdapterFactory clientAdapterFactory) + { + if (clientAdapterFactory == null) throw new ArgumentNullException(nameof(clientAdapterFactory)); + + return new LowLevelMqttClient(_clientAdapterFactory, DefaultLogger); + } + + public ILowLevelMqttClient CreateLowLevelMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory clientAdapterFactoryy) + { + if (logger == null) throw new ArgumentNullException(nameof(logger)); + if (clientAdapterFactoryy == null) throw new ArgumentNullException(nameof(clientAdapterFactoryy)); + + return new LowLevelMqttClient(_clientAdapterFactory, logger); + } + public IMqttClient CreateMqttClient() { return CreateMqttClient(DefaultLogger); @@ -65,7 +93,7 @@ namespace MQTTnet { if (logger == null) throw new ArgumentNullException(nameof(logger)); - return CreateMqttServer(new List { new MqttTcpServerAdapter(logger.CreateChildLogger()) }, logger); + return CreateMqttServer(new List { new MqttTcpServerAdapter(logger) }, logger); } public IMqttServer CreateMqttServer(IEnumerable serverAdapters, IMqttNetLogger logger) @@ -73,14 +101,14 @@ namespace MQTTnet if (serverAdapters == null) throw new ArgumentNullException(nameof(serverAdapters)); if (logger == null) throw new ArgumentNullException(nameof(logger)); - return new MqttServer(serverAdapters, logger.CreateChildLogger()); + return new MqttServer(serverAdapters, logger); } public IMqttServer CreateMqttServer(IEnumerable serverAdapters) { if (serverAdapters == null) throw new ArgumentNullException(nameof(serverAdapters)); - - return new MqttServer(serverAdapters, DefaultLogger.CreateChildLogger()); + + return new MqttServer(serverAdapters, DefaultLogger); } } } \ No newline at end of file diff --git a/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs b/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs index 6ffdd2b..ab1d38c 100644 --- a/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs +++ b/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs @@ -1,12 +1,12 @@ -using System.Collections.Generic; +using MQTTnet.Diagnostics; +using System.Collections.Generic; using System.Threading.Tasks; -using MQTTnet.Diagnostics; namespace MQTTnet.Server { public interface IMqttRetainedMessagesManager { - Task Start(IMqttServerOptions options, IMqttNetChildLogger logger); + Task Start(IMqttServerOptions options, IMqttNetLogger logger); Task LoadMessagesAsync(); diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index 5528969..a163b81 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -3,6 +3,7 @@ using MQTTnet.Client; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; using MQTTnet.Formatter; +using MQTTnet.Implementations; using MQTTnet.Internal; using MQTTnet.PacketDispatcher; using MQTTnet.Packets; @@ -15,32 +16,34 @@ using System.Threading.Tasks; namespace MQTTnet.Server { - public class MqttClientConnection : IDisposable + public sealed class MqttClientConnection : IDisposable { - private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); - private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); - private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource(); + readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); + readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); + readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource(); - private readonly IMqttRetainedMessagesManager _retainedMessagesManager; - private readonly MqttClientKeepAliveMonitor _keepAliveMonitor; - private readonly MqttClientSessionsManager _sessionsManager; + readonly IMqttRetainedMessagesManager _retainedMessagesManager; + readonly MqttClientKeepAliveMonitor _keepAliveMonitor; + readonly MqttClientSessionsManager _sessionsManager; - private readonly IMqttNetChildLogger _logger; - private readonly IMqttServerOptions _serverOptions; + readonly IMqttNetLogger _logger; + readonly IMqttServerOptions _serverOptions; - private readonly IMqttChannelAdapter _channelAdapter; - private readonly IMqttDataConverter _dataConverter; - private readonly string _endpoint; - private readonly DateTime _connectedTimestamp; + readonly IMqttChannelAdapter _channelAdapter; + readonly IMqttDataConverter _dataConverter; + readonly string _endpoint; + readonly DateTime _connectedTimestamp; - private Task _packageReceiverTask; - private DateTime _lastPacketReceivedTimestamp; - private DateTime _lastNonKeepAlivePacketReceivedTimestamp; + Task _packageReceiverTask; + DateTime _lastPacketReceivedTimestamp; + DateTime _lastNonKeepAlivePacketReceivedTimestamp; - private long _receivedPacketsCount; - private long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere. - private long _receivedApplicationMessagesCount; - private long _sentApplicationMessagesCount; + long _receivedPacketsCount; + long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere. + long _receivedApplicationMessagesCount; + long _sentApplicationMessagesCount; + + bool _isTakeover; public MqttClientConnection( MqttConnectPacket connectPacket, @@ -49,7 +52,7 @@ namespace MQTTnet.Server IMqttServerOptions serverOptions, MqttClientSessionsManager sessionsManager, IMqttRetainedMessagesManager retainedMessagesManager, - IMqttNetChildLogger logger) + IMqttNetLogger logger) { Session = session ?? throw new ArgumentNullException(nameof(session)); _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions)); @@ -64,7 +67,7 @@ namespace MQTTnet.Server if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttClientConnection)); - _keepAliveMonitor = new MqttClientKeepAliveMonitor(ConnectPacket.ClientId, StopAsync, _logger); + _keepAliveMonitor = new MqttClientKeepAliveMonitor(ConnectPacket.ClientId, () => StopAsync(), _logger); _connectedTimestamp = DateTime.UtcNow; _lastPacketReceivedTimestamp = _connectedTimestamp; @@ -77,15 +80,21 @@ namespace MQTTnet.Server public MqttClientSession Session { get; } - public async Task StopAsync() + public bool IsFinalized { get; set; } + + public Task StopAsync(bool isTakeover = false) { + _isTakeover = isTakeover; + StopInternal(); var task = _packageReceiverTask; if (task != null) { - await task.ConfigureAwait(false); + return task; } + + return PlatformAbstractionLayer.CompletedTask; } public void ResetStatistics() @@ -124,7 +133,7 @@ namespace MQTTnet.Server return _packageReceiverTask; } - private async Task RunInternalAsync(MqttConnectionValidatorContext connectionValidatorContext) + async Task RunInternalAsync(MqttConnectionValidatorContext connectionValidatorContext) { var disconnectType = MqttClientDisconnectType.NotClean; try @@ -243,20 +252,25 @@ namespace MQTTnet.Server _channelAdapter.ReadingPacketStartedCallback = null; _channelAdapter.ReadingPacketCompletedCallback = null; - _logger.Info("Client '{0}': Session stopped.", ClientId); + _logger.Info("Client '{0}': Connection stopped.", ClientId); _packageReceiverTask = null; } + if (_isTakeover) + { + return MqttClientDisconnectType.Takeover; + } + return disconnectType; } - private void StopInternal() + void StopInternal() { _cancellationToken.Cancel(false); } - private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection topicFilters) + async Task EnqueueSubscribedRetainedMessagesAsync(ICollection topicFilters) { var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false); foreach (var applicationMessage in retainedMessages) @@ -265,7 +279,7 @@ namespace MQTTnet.Server } } - private async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket) + async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket) { // TODO: Let the channel adapter create the packet. var subscribeResult = await Session.SubscriptionsManager.SubscribeAsync(subscribePacket, ConnectPacket).ConfigureAwait(false); @@ -281,14 +295,14 @@ namespace MQTTnet.Server await EnqueueSubscribedRetainedMessagesAsync(subscribePacket.TopicFilters).ConfigureAwait(false); } - private async Task HandleIncomingUnsubscribePacketAsync(MqttUnsubscribePacket unsubscribePacket) + async Task HandleIncomingUnsubscribePacketAsync(MqttUnsubscribePacket unsubscribePacket) { // TODO: Let the channel adapter create the packet. var unsubscribeResult = await Session.SubscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false); await SendAsync(unsubscribeResult).ConfigureAwait(false); } - private Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket) + Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket) { Interlocked.Increment(ref _sentApplicationMessagesCount); @@ -313,16 +327,16 @@ namespace MQTTnet.Server } } - private Task HandleIncomingPublishPacketWithQoS0Async(MqttPublishPacket publishPacket) + Task HandleIncomingPublishPacketWithQoS0Async(MqttPublishPacket publishPacket) { var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket); _sessionsManager.DispatchApplicationMessage(applicationMessage, this); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; } - private Task HandleIncomingPublishPacketWithQoS1Async(MqttPublishPacket publishPacket) + Task HandleIncomingPublishPacketWithQoS1Async(MqttPublishPacket publishPacket) { var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket); _sessionsManager.DispatchApplicationMessage(applicationMessage, this); @@ -331,7 +345,7 @@ namespace MQTTnet.Server return SendAsync(pubAckPacket); } - private Task HandleIncomingPublishPacketWithQoS2Async(MqttPublishPacket publishPacket) + Task HandleIncomingPublishPacketWithQoS2Async(MqttPublishPacket publishPacket) { var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket); _sessionsManager.DispatchApplicationMessage(applicationMessage, this); @@ -345,7 +359,7 @@ namespace MQTTnet.Server return SendAsync(pubRecPacket); } - private async Task SendPendingPacketsAsync(CancellationToken cancellationToken) + async Task SendPendingPacketsAsync(CancellationToken cancellationToken) { MqttQueuedApplicationMessage queuedApplicationMessage = null; MqttPublishPacket publishPacket = null; @@ -422,9 +436,6 @@ namespace MQTTnet.Server } _logger.Verbose("Queued application message sent (ClientId: {0}).", ClientId); - - // TODO: - //Interlocked.Increment(ref _sentPacketsCount); } } catch (Exception exception) @@ -459,7 +470,7 @@ namespace MQTTnet.Server } } - private async Task SendAsync(MqttBasePacket packet) + async Task SendAsync(MqttBasePacket packet) { await _channelAdapter.SendPacketAsync(packet, _serverOptions.DefaultCommunicationTimeout, _cancellationToken.Token).ConfigureAwait(false); @@ -471,12 +482,12 @@ namespace MQTTnet.Server } } - private void OnAdapterReadingPacketCompleted() + void OnAdapterReadingPacketCompleted() { _keepAliveMonitor?.Resume(); } - private void OnAdapterReadingPacketStarted() + void OnAdapterReadingPacketStarted() { _keepAliveMonitor?.Pause(); } diff --git a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs index deefe4c..6b28825 100644 --- a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs +++ b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs @@ -1,27 +1,27 @@ -using System; +using MQTTnet.Diagnostics; +using MQTTnet.Internal; +using System; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; -using MQTTnet.Diagnostics; -using MQTTnet.Internal; namespace MQTTnet.Server { public class MqttClientKeepAliveMonitor { - private readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch(); + readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch(); - private readonly string _clientId; - private readonly Func _keepAliveElapsedCallback; - private readonly IMqttNetChildLogger _logger; + readonly string _clientId; + readonly Func _keepAliveElapsedCallback; + readonly IMqttNetLogger _logger; - private bool _isPaused; + bool _isPaused; - public MqttClientKeepAliveMonitor(string clientId, Func keepAliveElapsedCallback, IMqttNetChildLogger logger) + public MqttClientKeepAliveMonitor(string clientId, Func keepAliveElapsedCallback, IMqttNetLogger logger) { _clientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); _keepAliveElapsedCallback = keepAliveElapsedCallback ?? throw new ArgumentNullException(nameof(keepAliveElapsedCallback)); - + if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttClientKeepAliveMonitor)); } @@ -32,7 +32,7 @@ namespace MQTTnet.Server { return; } - + Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken).Forget(_logger); } @@ -51,7 +51,7 @@ namespace MQTTnet.Server _lastPacketReceivedTracker.Restart(); } - private async Task RunAsync(int keepAlivePeriod, CancellationToken cancellationToken) + async Task RunAsync(int keepAlivePeriod, CancellationToken cancellationToken) { try { diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index d097b9f..70b04d9 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -1,18 +1,18 @@ -using System; +using MQTTnet.Diagnostics; +using MQTTnet.Server.Status; +using System; using System.Collections.Generic; using System.Threading.Tasks; -using MQTTnet.Diagnostics; -using MQTTnet.Server.Status; namespace MQTTnet.Server { public class MqttClientSession { - private readonly IMqttNetChildLogger _logger; + readonly IMqttNetLogger _logger; - private readonly DateTime _createdTimestamp = DateTime.UtcNow; + readonly DateTime _createdTimestamp = DateTime.UtcNow; - public MqttClientSession(string clientId, IDictionary items, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, IMqttNetChildLogger logger) + public MqttClientSession(string clientId, IDictionary items, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, IMqttNetLogger logger) { ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); Items = items ?? throw new ArgumentNullException(nameof(items)); @@ -73,6 +73,7 @@ namespace MQTTnet.Server status.ClientId = ClientId; status.CreatedTimestamp = _createdTimestamp; status.PendingApplicationMessagesCount = ApplicationMessagesQueue.Count; + status.Items = Items; } } } \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 6f8ea20..1e2bcef 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -16,26 +16,26 @@ namespace MQTTnet.Server { public class MqttClientSessionsManager : Disposable { - private readonly AsyncQueue _messageQueue = new AsyncQueue(); + readonly AsyncQueue _messageQueue = new AsyncQueue(); - private readonly SemaphoreSlim _createConnectionGate = new SemaphoreSlim(1, 1); - private readonly ConcurrentDictionary _connections = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _sessions = new ConcurrentDictionary(); - private readonly IDictionary _serverSessionItems = new ConcurrentDictionary(); + readonly AsyncLock _createConnectionGate = new AsyncLock(); + readonly ConcurrentDictionary _connections = new ConcurrentDictionary(); + readonly ConcurrentDictionary _sessions = new ConcurrentDictionary(); + readonly IDictionary _serverSessionItems = new ConcurrentDictionary(); - private readonly CancellationToken _cancellationToken; - private readonly MqttServerEventDispatcher _eventDispatcher; + readonly CancellationToken _cancellationToken; + readonly MqttServerEventDispatcher _eventDispatcher; - private readonly IMqttRetainedMessagesManager _retainedMessagesManager; - private readonly IMqttServerOptions _options; - private readonly IMqttNetChildLogger _logger; + readonly IMqttRetainedMessagesManager _retainedMessagesManager; + readonly IMqttServerOptions _options; + readonly IMqttNetLogger _logger; public MqttClientSessionsManager( IMqttServerOptions options, IMqttRetainedMessagesManager retainedMessagesManager, CancellationToken cancellationToken, MqttServerEventDispatcher eventDispatcher, - IMqttNetChildLogger logger) + IMqttNetLogger logger) { _cancellationToken = cancellationToken; @@ -60,9 +60,11 @@ namespace MQTTnet.Server } } - public Task HandleClientAsync(IMqttChannelAdapter clientAdapter) + public Task HandleClientConnectionAsync(IMqttChannelAdapter clientAdapter) { - return HandleClientAsync(clientAdapter, _cancellationToken); + if (clientAdapter is null) throw new ArgumentNullException(nameof(clientAdapter)); + + return HandleClientConnectionAsync(clientAdapter, _cancellationToken); } public Task> GetClientStatusAsync() @@ -155,7 +157,7 @@ namespace MQTTnet.Server base.Dispose(disposing); } - private async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken) + async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { @@ -173,7 +175,7 @@ namespace MQTTnet.Server } } - private async Task TryProcessNextQueuedApplicationMessageAsync(CancellationToken cancellationToken) + async Task TryProcessNextQueuedApplicationMessageAsync(CancellationToken cancellationToken) { try { @@ -207,7 +209,7 @@ namespace MQTTnet.Server applicationMessage = interceptorContext.ApplicationMessage; } - await _eventDispatcher.HandleApplicationMessageReceivedAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false); + await _eventDispatcher.SafeNotifyApplicationMessageReceivedAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false); if (applicationMessage.Retain) { @@ -231,14 +233,14 @@ namespace MQTTnet.Server } } - private async Task HandleClientAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) + async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) { var disconnectType = MqttClientDisconnectType.NotClean; string clientId = null; - var clientWasConnected = true; - - MqttConnectPacket connectPacket = null; + var clientWasAuthorized = false; + MqttConnectPacket connectPacket; + MqttClientConnection clientConnection = null; try { try @@ -259,11 +261,8 @@ namespace MQTTnet.Server var connectionValidatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false); - clientId = connectPacket.ClientId; - if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success) { - clientWasConnected = false; // Send failure response here without preparing a session. The result for a successful connect // will be sent from the session itself. var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext); @@ -272,11 +271,13 @@ namespace MQTTnet.Server return; } - var connection = await CreateConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false); + clientWasAuthorized = true; + clientId = connectPacket.ClientId; + clientConnection = await CreateClientConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false); - await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false); + await _eventDispatcher.SafeNotifyClientConnectedAsync(clientId).ConfigureAwait(false); - disconnectType = await connection.RunAsync(connectionValidatorContext).ConfigureAwait(false); + disconnectType = await clientConnection.RunAsync(connectionValidatorContext).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -287,8 +288,10 @@ namespace MQTTnet.Server } finally { - if (clientWasConnected) + if (clientWasAuthorized && disconnectType != MqttClientDisconnectType.Takeover) { + // Only cleanup if the client was authorized. If not it will remove the existing connection, session etc. + // This allows to kill connections and sessions from known client IDs. if (clientId != null) { _connections.TryRemove(clientId, out _); @@ -298,18 +301,23 @@ namespace MQTTnet.Server await DeleteSessionAsync(clientId).ConfigureAwait(false); } } + } - await TryCleanupChannelAsync(channelAdapter).ConfigureAwait(false); + await SafeCleanupChannelAsync(channelAdapter).ConfigureAwait(false); - if (clientId != null) - { - await _eventDispatcher.TryHandleClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false); - } + if (clientWasAuthorized && clientId != null) + { + await _eventDispatcher.SafeNotifyClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false); + } + + if (clientConnection != null) + { + clientConnection.IsFinalized = true; } } } - private async Task ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter) + async Task ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter) { var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter, new ConcurrentDictionary()); @@ -337,17 +345,22 @@ namespace MQTTnet.Server return context; } - private async Task CreateConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter) + async Task CreateClientConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter) { - await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false); - try + using (await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false)) { var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var session); var isConnectionPresent = _connections.TryGetValue(connectPacket.ClientId, out var existingConnection); if (isConnectionPresent) { - await existingConnection.StopAsync().ConfigureAwait(false); + await existingConnection.StopAsync(true); + + // TODO: This fixes a race condition with unit test Same_Client_Id_Connect_Disconnect_Event_Order. + // It is not clear where the issue is coming from. The connected event is fired BEFORE the disconnected + // event. This is wrong. It seems that the finally block in HandleClientAsync must be finished before we + // can continue here. Maybe there is a better way to do this. + SpinWait.SpinUntil(() => existingConnection.IsFinalized, TimeSpan.FromSeconds(10)); } if (isSessionPresent) @@ -377,13 +390,9 @@ namespace MQTTnet.Server return connection; } - finally - { - _createConnectionGate.Release(); - } } - private async Task InterceptApplicationMessageAsync(MqttClientConnection senderConnection, MqttApplicationMessage applicationMessage) + async Task InterceptApplicationMessageAsync(MqttClientConnection senderConnection, MqttApplicationMessage applicationMessage) { var interceptor = _options.ApplicationMessageInterceptor; if (interceptor == null) @@ -411,7 +420,7 @@ namespace MQTTnet.Server return interceptorContext; } - private async Task TryCleanupChannelAsync(IMqttChannelAdapter channelAdapter) + async Task SafeCleanupChannelAsync(IMqttChannelAdapter channelAdapter) { try { diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index deeadf4..59e9f34 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -1,9 +1,9 @@ -using System; +using MQTTnet.Packets; +using MQTTnet.Protocol; +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using MQTTnet.Packets; -using MQTTnet.Protocol; namespace MQTTnet.Server { @@ -67,7 +67,7 @@ namespace MQTTnet.Server _subscriptions[finalTopicFilter.Topic] = finalTopicFilter; } - await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientSession.ClientId, finalTopicFilter).ConfigureAwait(false); + await _eventDispatcher.SafeNotifyClientSubscribedTopicAsync(_clientSession.ClientId, finalTopicFilter).ConfigureAwait(false); } } @@ -83,7 +83,7 @@ namespace MQTTnet.Server var interceptorContext = await InterceptSubscribeAsync(topicFilter).ConfigureAwait(false); if (!interceptorContext.AcceptSubscription) { - continue; + continue; } if (interceptorContext.AcceptSubscription) @@ -93,7 +93,7 @@ namespace MQTTnet.Server _subscriptions[topicFilter.Topic] = topicFilter; } - await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false); + await _eventDispatcher.SafeNotifyClientSubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false); } } } @@ -131,9 +131,9 @@ namespace MQTTnet.Server foreach (var topicFilter in unsubscribePacket.TopicFilters) { - await _eventDispatcher.HandleClientUnsubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false); + await _eventDispatcher.SafeNotifyClientUnsubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false); } - + return unsubAckPacket; } @@ -152,7 +152,7 @@ namespace MQTTnet.Server lock (_subscriptions) { _subscriptions.Remove(topicFilter); - } + } } } diff --git a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs b/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs index f4ebe48..f6a994b 100644 --- a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs +++ b/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs @@ -1,10 +1,10 @@ -using System; +using MQTTnet.Diagnostics; +using MQTTnet.Implementations; +using MQTTnet.Internal; +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using MQTTnet.Diagnostics; -using MQTTnet.Implementations; -using MQTTnet.Internal; namespace MQTTnet.Server { @@ -14,10 +14,10 @@ namespace MQTTnet.Server private readonly AsyncLock _messagesLock = new AsyncLock(); private readonly Dictionary _messages = new Dictionary(); - private IMqttNetChildLogger _logger; + private IMqttNetLogger _logger; private IMqttServerOptions _options; - public Task Start(IMqttServerOptions options, IMqttNetChildLogger logger) + public Task Start(IMqttServerOptions options, IMqttNetLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager)); diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index a76e658..a8c4369 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -1,15 +1,15 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Adapter; +using MQTTnet.Adapter; using MQTTnet.Client.Publishing; using MQTTnet.Client.Receiving; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; using MQTTnet.Protocol; using MQTTnet.Server.Status; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Server { @@ -17,13 +17,13 @@ namespace MQTTnet.Server { private readonly MqttServerEventDispatcher _eventDispatcher; private readonly ICollection _adapters; - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; private MqttClientSessionsManager _clientSessionsManager; private IMqttRetainedMessagesManager _retainedMessagesManager; private CancellationTokenSource _cancellationTokenSource; - public MqttServer(IEnumerable adapters, IMqttNetChildLogger logger) + public MqttServer(IEnumerable adapters, IMqttNetLogger logger) { if (adapters == null) throw new ArgumentNullException(nameof(adapters)); _adapters = adapters.ToList(); @@ -194,7 +194,7 @@ namespace MQTTnet.Server private Task OnHandleClient(IMqttChannelAdapter channelAdapter) { - return _clientSessionsManager.HandleClientAsync(channelAdapter); + return _clientSessionsManager.HandleClientConnectionAsync(channelAdapter); } } } diff --git a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs index e6e608a..3bb3768 100644 --- a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs +++ b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs @@ -1,15 +1,15 @@ -using System; -using System.Threading.Tasks; -using MQTTnet.Client.Receiving; +using MQTTnet.Client.Receiving; using MQTTnet.Diagnostics; +using System; +using System.Threading.Tasks; namespace MQTTnet.Server { public class MqttServerEventDispatcher { - private readonly IMqttNetChildLogger _logger; + readonly IMqttNetLogger _logger; - public MqttServerEventDispatcher(IMqttNetChildLogger logger) + public MqttServerEventDispatcher(IMqttNetLogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } @@ -24,18 +24,25 @@ namespace MQTTnet.Server public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get; set; } - public Task HandleClientConnectedAsync(string clientId) + public async Task SafeNotifyClientConnectedAsync(string clientId) { - var handler = ClientConnectedHandler; - if (handler == null) + try { - return Task.FromResult(0); - } + var handler = ClientConnectedHandler; + if (handler == null) + { + return; + } - return handler.HandleClientConnectedAsync(new MqttServerClientConnectedEventArgs(clientId)); + await handler.HandleClientConnectedAsync(new MqttServerClientConnectedEventArgs(clientId)).ConfigureAwait(false); + } + catch (Exception exception) + { + _logger.Error(exception, "Error while handling custom 'ClientConnected' event."); + } } - public async Task TryHandleClientDisconnectedAsync(string clientId, MqttClientDisconnectType disconnectType) + public async Task SafeNotifyClientDisconnectedAsync(string clientId, MqttClientDisconnectType disconnectType) { try { @@ -49,41 +56,62 @@ namespace MQTTnet.Server } catch (Exception exception) { - _logger.Error(exception, "Error while handling 'ClientDisconnected' event."); + _logger.Error(exception, "Error while handling custom 'ClientDisconnected' event."); } } - public Task HandleClientSubscribedTopicAsync(string clientId, TopicFilter topicFilter) + public async Task SafeNotifyClientSubscribedTopicAsync(string clientId, TopicFilter topicFilter) { - var handler = ClientSubscribedTopicHandler; - if (handler == null) + try { - return Task.FromResult(0); - } + var handler = ClientSubscribedTopicHandler; + if (handler == null) + { + return; + } - return handler.HandleClientSubscribedTopicAsync(new MqttServerClientSubscribedTopicEventArgs(clientId, topicFilter)); + await handler.HandleClientSubscribedTopicAsync(new MqttServerClientSubscribedTopicEventArgs(clientId, topicFilter)).ConfigureAwait(false); + } + catch (Exception exception) + { + _logger.Error(exception, "Error while handling custom 'ClientSubscribedTopic' event."); + } } - public Task HandleClientUnsubscribedTopicAsync(string clientId, string topicFilter) + public async Task SafeNotifyClientUnsubscribedTopicAsync(string clientId, string topicFilter) { - var handler = ClientUnsubscribedTopicHandler; - if (handler == null) + try { - return Task.FromResult(0); - } + var handler = ClientUnsubscribedTopicHandler; + if (handler == null) + { + return; + } - return handler.HandleClientUnsubscribedTopicAsync(new MqttServerClientUnsubscribedTopicEventArgs(clientId, topicFilter)); + await handler.HandleClientUnsubscribedTopicAsync(new MqttServerClientUnsubscribedTopicEventArgs(clientId, topicFilter)).ConfigureAwait(false); + } + catch (Exception exception) + { + _logger.Error(exception, "Error while handling custom 'ClientUnsubscribedTopic' event."); + } } - public Task HandleApplicationMessageReceivedAsync(string senderClientId, MqttApplicationMessage applicationMessage) + public async Task SafeNotifyApplicationMessageReceivedAsync(string senderClientId, MqttApplicationMessage applicationMessage) { - var handler = ApplicationMessageReceivedHandler; - if (handler == null) + try { - return Task.FromResult(0); - } + var handler = ApplicationMessageReceivedHandler; + if (handler == null) + { + return; + } - return handler.HandleApplicationMessageReceivedAsync(new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage)); + await handler.HandleApplicationMessageReceivedAsync(new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage)).ConfigureAwait(false); ; + } + catch (Exception exception) + { + _logger.Error(exception, "Error while handling custom 'ApplicationMessageReceived' event."); + } } } } diff --git a/Source/MQTTnet/Server/Status/IMqttClientStatus.cs b/Source/MQTTnet/Server/Status/IMqttClientStatus.cs index bbf4799..d7debdc 100644 --- a/Source/MQTTnet/Server/Status/IMqttClientStatus.cs +++ b/Source/MQTTnet/Server/Status/IMqttClientStatus.cs @@ -1,6 +1,6 @@ -using System; +using MQTTnet.Formatter; +using System; using System.Threading.Tasks; -using MQTTnet.Formatter; namespace MQTTnet.Server.Status { @@ -9,7 +9,7 @@ namespace MQTTnet.Server.Status string ClientId { get; } string Endpoint { get; } - + MqttProtocolVersion ProtocolVersion { get; } DateTime LastPacketReceivedTimestamp { get; } @@ -29,7 +29,7 @@ namespace MQTTnet.Server.Status long BytesSent { get; } long BytesReceived { get; } - + Task DisconnectAsync(); void ResetStatistics(); diff --git a/Source/MQTTnet/Server/Status/IMqttSessionStatus.cs b/Source/MQTTnet/Server/Status/IMqttSessionStatus.cs index 349f715..2ff996b 100644 --- a/Source/MQTTnet/Server/Status/IMqttSessionStatus.cs +++ b/Source/MQTTnet/Server/Status/IMqttSessionStatus.cs @@ -1,12 +1,15 @@ -using System.Threading.Tasks; +using System.Collections.Generic; +using System.Threading.Tasks; namespace MQTTnet.Server.Status { public interface IMqttSessionStatus { - string ClientId { get; set; } + string ClientId { get; } - long PendingApplicationMessagesCount { get; set; } + long PendingApplicationMessagesCount { get; } + + IDictionary Items { get; } Task ClearPendingApplicationMessagesAsync(); diff --git a/Source/MQTTnet/Server/Status/MqttClientStatus.cs b/Source/MQTTnet/Server/Status/MqttClientStatus.cs index 553795f..7f6448f 100644 --- a/Source/MQTTnet/Server/Status/MqttClientStatus.cs +++ b/Source/MQTTnet/Server/Status/MqttClientStatus.cs @@ -1,16 +1,16 @@ -using System; +using MQTTnet.Formatter; +using System; using System.Threading.Tasks; -using MQTTnet.Formatter; namespace MQTTnet.Server.Status { public class MqttClientStatus : IMqttClientStatus { - private readonly MqttClientConnection _connection; + readonly MqttClientConnection _connection; public MqttClientStatus(MqttClientConnection connection) { - _connection = connection; + _connection = connection ?? throw new ArgumentNullException(nameof(connection)); } public string ClientId { get; set; } diff --git a/Source/MQTTnet/Server/Status/MqttSessionStatus.cs b/Source/MQTTnet/Server/Status/MqttSessionStatus.cs index 451644c..2dfe22f 100644 --- a/Source/MQTTnet/Server/Status/MqttSessionStatus.cs +++ b/Source/MQTTnet/Server/Status/MqttSessionStatus.cs @@ -1,12 +1,13 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace MQTTnet.Server.Status { public class MqttSessionStatus : IMqttSessionStatus { - private readonly MqttClientSession _session; - private readonly MqttClientSessionsManager _sessionsManager; + readonly MqttClientSession _session; + readonly MqttClientSessionsManager _sessionsManager; public MqttSessionStatus(MqttClientSession session, MqttClientSessionsManager sessionsManager) { @@ -17,14 +18,16 @@ namespace MQTTnet.Server.Status public string ClientId { get; set; } public long PendingApplicationMessagesCount { get; set; } - + public DateTime CreatedTimestamp { get; set; } + public IDictionary Items { get; set; } + public Task DeleteAsync() { return _sessionsManager.DeleteSessionAsync(ClientId); } - + public Task ClearPendingApplicationMessagesAsync() { _session.ApplicationMessagesQueue.Clear(); diff --git a/Source/MQTTnet/TopicFilter.cs b/Source/MQTTnet/TopicFilter.cs index 3b62e7e..b086b48 100644 --- a/Source/MQTTnet/TopicFilter.cs +++ b/Source/MQTTnet/TopicFilter.cs @@ -2,6 +2,7 @@ namespace MQTTnet { + // TODO: Consider renaming to "MqttTopicFilter" public class TopicFilter { public string Topic { get; set; } @@ -26,16 +27,16 @@ namespace MQTTnet public override string ToString() { return string.Concat( - "TopicFilter: [Topic=", + "TopicFilter: [Topic=", Topic, - "] [QualityOfServiceLevel=", + "] [QualityOfServiceLevel=", QualityOfServiceLevel, - "] [NoLocal=", - NoLocal, - "] [RetainAsPublished=", - RetainAsPublished, - "] [RetainHandling=", - RetainHandling, + "] [NoLocal=", + NoLocal, + "] [RetainAsPublished=", + RetainAsPublished, + "] [RetainHandling=", + RetainHandling, "]"); } } diff --git a/Tests/MQTTnet.AspNetCore.Tests/MQTTnet.AspNetCore.Tests.csproj b/Tests/MQTTnet.AspNetCore.Tests/MQTTnet.AspNetCore.Tests.csproj index 06b5976..3c3f016 100644 --- a/Tests/MQTTnet.AspNetCore.Tests/MQTTnet.AspNetCore.Tests.csproj +++ b/Tests/MQTTnet.AspNetCore.Tests/MQTTnet.AspNetCore.Tests.csproj @@ -6,9 +6,9 @@ - - - + + + diff --git a/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs b/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs index cfc88d4..da77299 100644 --- a/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs @@ -10,13 +10,13 @@ namespace MQTTnet.Benchmarks public class LoggerBenchmark { private IMqttNetLogger _logger; - private IMqttNetChildLogger _childLogger; + private IMqttNetLogger _childLogger; private bool _useHandler; [GlobalSetup] public void Setup() { - _logger = new MqttNetLogger("1"); + _logger = new MqttNetLogger(); _childLogger = _logger.CreateChildLogger("child"); MqttNetGlobalLogger.LogMessagePublished += OnLogMessagePublished; diff --git a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj index 5d11fac..7212b47 100644 --- a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj +++ b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj @@ -10,8 +10,8 @@ - - + + diff --git a/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs b/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs index 04761e7..ecfa76e 100644 --- a/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs @@ -1,11 +1,11 @@ using BenchmarkDotNet.Attributes; using MQTTnet.Channel; +using MQTTnet.Client.Options; using MQTTnet.Diagnostics; using MQTTnet.Implementations; using MQTTnet.Server; using System.Threading; using System.Threading.Tasks; -using MQTTnet.Client.Options; namespace MQTTnet.Benchmarks { @@ -20,7 +20,7 @@ namespace MQTTnet.Benchmarks public void Setup() { var factory = new MqttFactory(); - var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); + var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger()); tcpServer.ClientHandler += args => { _serverChannel = @@ -30,7 +30,7 @@ namespace MQTTnet.Benchmarks return Task.CompletedTask; }; - + _mqttServer = factory.CreateMqttServer(new[] { tcpServer }, new MqttNetLogger()); var serverOptions = new MqttServerOptionsBuilder().Build(); diff --git a/Tests/MQTTnet.Benchmarks/Program.cs b/Tests/MQTTnet.Benchmarks/Program.cs index 280416d..760cb52 100644 --- a/Tests/MQTTnet.Benchmarks/Program.cs +++ b/Tests/MQTTnet.Benchmarks/Program.cs @@ -9,7 +9,7 @@ namespace MQTTnet.Benchmarks { public static void Main(string[] args) { - Console.WriteLine($"MQTTnet - BenchmarkApp.{TargetFrameworkInfoProvider.TargetFramework}"); + Console.WriteLine($"MQTTnet - BenchmarkApp.{TargetFrameworkProvider.TargetFramework}"); Console.WriteLine("1 = MessageProcessingBenchmark"); Console.WriteLine("2 = SerializerBenchmark"); Console.WriteLine("3 = LoggerBenchmark"); diff --git a/Tests/MQTTnet.Core.Tests/CrossPlatformSocket_Tests.cs b/Tests/MQTTnet.Core.Tests/CrossPlatformSocket_Tests.cs new file mode 100644 index 0000000..3e3455e --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/CrossPlatformSocket_Tests.cs @@ -0,0 +1,74 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Implementations; +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Tests +{ + [TestClass] + public class CrossPlatformSocket_Tests + { + [TestMethod] + public async Task Connect_Send_Receive() + { + var crossPlatformSocket = new CrossPlatformSocket(); + await crossPlatformSocket.ConnectAsync("www.google.de", 80, CancellationToken.None); + + var requestBuffer = Encoding.UTF8.GetBytes("GET / HTTP/1.1\r\nHost: www.google.de\r\n\r\n"); + await crossPlatformSocket.SendAsync(new ArraySegment(requestBuffer), System.Net.Sockets.SocketFlags.None); + + var buffer = new byte[1024]; + var length = await crossPlatformSocket.ReceiveAsync(new ArraySegment(buffer), System.Net.Sockets.SocketFlags.None); + crossPlatformSocket.Dispose(); + + var responseText = Encoding.UTF8.GetString(buffer, 0, length); + + Assert.IsTrue(responseText.Contains("HTTP/1.1 200 OK")); + } + + [TestMethod] + public async Task Try_Connect_Invalid_Host() + { + var crossPlatformSocket = new CrossPlatformSocket(); + + var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + cancellationToken.Token.Register(() => crossPlatformSocket.Dispose()); + + await crossPlatformSocket.ConnectAsync("www.google.de", 1234, CancellationToken.None); + } + + //[TestMethod] + //public async Task Use_Disconnected_Socket() + //{ + // var crossPlatformSocket = new CrossPlatformSocket(); + + // await crossPlatformSocket.ConnectAsync("www.google.de", 80); + + // var requestBuffer = Encoding.UTF8.GetBytes("GET /wrong_uri HTTP/1.1\r\nConnection: close\r\n\r\n"); + // await crossPlatformSocket.SendAsync(new ArraySegment(requestBuffer), System.Net.Sockets.SocketFlags.None); + + // var buffer = new byte[64000]; + // var length = await crossPlatformSocket.ReceiveAsync(new ArraySegment(buffer), System.Net.Sockets.SocketFlags.None); + + // await Task.Delay(500); + + // await crossPlatformSocket.SendAsync(new ArraySegment(requestBuffer), System.Net.Sockets.SocketFlags.None); + //} + + [TestMethod] + public async Task Set_Options() + { + var crossPlatformSocket = new CrossPlatformSocket(); + + Assert.IsFalse(crossPlatformSocket.ReuseAddress); + crossPlatformSocket.ReuseAddress = true; + Assert.IsTrue(crossPlatformSocket.ReuseAddress); + + Assert.IsFalse(crossPlatformSocket.NoDelay); + crossPlatformSocket.NoDelay = true; + Assert.IsTrue(crossPlatformSocket.NoDelay); + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/LowLevelMqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/LowLevelMqttClient_Tests.cs new file mode 100644 index 0000000..05440de --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/LowLevelMqttClient_Tests.cs @@ -0,0 +1,111 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Client.Options; +using MQTTnet.LowLevelClient; +using MQTTnet.Packets; +using MQTTnet.Protocol; +using MQTTnet.Tests.Mockups; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Tests +{ + [TestClass] + public class LowLevelMqttClient_Tests + { + public TestContext TestContext { get; set; } + + [TestMethod] + public async Task Connect_And_Disconnect() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + var server = await testEnvironment.StartServerAsync(); + + var factory = new MqttFactory(); + var lowLevelClient = factory.CreateLowLevelMqttClient(); + + await lowLevelClient.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build(), CancellationToken.None); + + await lowLevelClient.DisconnectAsync(CancellationToken.None); + } + } + + [TestMethod] + public async Task Authenticate() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + var server = await testEnvironment.StartServerAsync(); + + var factory = new MqttFactory(); + var lowLevelClient = factory.CreateLowLevelMqttClient(); + + await lowLevelClient.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build(), CancellationToken.None); + + var receivedPacket = await Authenticate(lowLevelClient).ConfigureAwait(false); + + await lowLevelClient.DisconnectAsync(CancellationToken.None).ConfigureAwait(false); + + Assert.IsNotNull(receivedPacket); + Assert.AreEqual(MqttConnectReturnCode.ConnectionAccepted, receivedPacket.ReturnCode); + } + } + + [TestMethod] + public async Task Subscribe() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + var server = await testEnvironment.StartServerAsync(); + + var factory = new MqttFactory(); + var lowLevelClient = factory.CreateLowLevelMqttClient(); + + await lowLevelClient.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build(), CancellationToken.None); + + await Authenticate(lowLevelClient).ConfigureAwait(false); + + var receivedPacket = await Subscribe(lowLevelClient, "a").ConfigureAwait(false); + + await lowLevelClient.DisconnectAsync(CancellationToken.None).ConfigureAwait(false); + + Assert.IsNotNull(receivedPacket); + Assert.AreEqual(MqttSubscribeReturnCode.SuccessMaximumQoS0, receivedPacket.ReturnCodes[0]); + } + } + + async Task Authenticate(ILowLevelMqttClient client) + { + await client.SendAsync(new MqttConnectPacket() + { + CleanSession = true, + ClientId = TestContext.TestName, + Username = "user", + Password = Encoding.UTF8.GetBytes("pass") + }, + CancellationToken.None).ConfigureAwait(false); + + return await client.ReceiveAsync(CancellationToken.None).ConfigureAwait(false) as MqttConnAckPacket; + } + + async Task Subscribe(ILowLevelMqttClient client, string topic) + { + await client.SendAsync(new MqttSubscribePacket + { + PacketIdentifier = 1, + TopicFilters = new List + { + new TopicFilter + { + Topic = topic + } + } + }, + CancellationToken.None).ConfigureAwait(false); + + return await client.ReceiveAsync(CancellationToken.None).ConfigureAwait(false) as MqttSubAckPacket; + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj b/Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj index 0305b16..45ea7b3 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj +++ b/Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj @@ -6,9 +6,9 @@ - - - + + + diff --git a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs index 52ec583..f8052fb 100644 --- a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs @@ -69,7 +69,7 @@ namespace MQTTnet.Tests .WithTcpServer("localhost", testEnvironment.ServerPort) .WithWillMessage(willMessage); var dyingClient = testEnvironment.CreateClient(); - var dyingManagedClient = new ManagedMqttClient(dyingClient, testEnvironment.ClientLogger.CreateChildLogger()); + var dyingManagedClient = new ManagedMqttClient(dyingClient, testEnvironment.ClientLogger); await dyingManagedClient.StartAsync(new ManagedMqttClientOptionsBuilder() .WithClientOptions(clientOptions) .Build()); @@ -96,7 +96,7 @@ namespace MQTTnet.Tests var server = await testEnvironment.StartServerAsync(); - var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger()); + var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger()); var clientOptions = new MqttClientOptionsBuilder() .WithTcpServer("localhost", testEnvironment.ServerPort); @@ -128,7 +128,7 @@ namespace MQTTnet.Tests var server = await testEnvironment.StartServerAsync(); - var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger()); + var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger()); var clientOptions = new MqttClientOptionsBuilder() .WithTcpServer("localhost", testEnvironment.ServerPort); var storage = new ManagedMqttClientTestStorage(); @@ -351,7 +351,7 @@ namespace MQTTnet.Tests managedOptions.ConnectionCheckInterval = connectionCheckInterval ?? TimeSpan.FromSeconds(0.1); var managedClient = - new ManagedMqttClient(underlyingClient ?? testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger()); + new ManagedMqttClient(underlyingClient ?? testEnvironment.CreateClient(), new MqttNetLogger()); var connected = GetConnectedTask(managedClient); diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs index 2500a6f..e52133c 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs @@ -1,6 +1,4 @@ -using System.Threading; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; @@ -10,10 +8,12 @@ using MQTTnet.Client.Publishing; using MQTTnet.Client.Receiving; using MQTTnet.Client.Subscribing; using MQTTnet.Client.Unsubscribing; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Tests.Mockups { - public class TestClientWrapper : IMqttClient + public sealed class TestClientWrapper : IMqttClient { public TestClientWrapper(IMqttClient implementation, TestContext testContext) { @@ -22,40 +22,42 @@ namespace MQTTnet.Tests.Mockups } public IMqttClient Implementation { get; } + public TestContext TestContext { get; } public bool IsConnected => Implementation.IsConnected; public IMqttClientOptions Options => Implementation.Options; - public IMqttClientConnectedHandler ConnectedHandler { get => Implementation.ConnectedHandler; set => Implementation.ConnectedHandler = value; } - public IMqttClientDisconnectedHandler DisconnectedHandler { get => Implementation.DisconnectedHandler; set => Implementation.DisconnectedHandler = value; } - public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get => Implementation.ApplicationMessageReceivedHandler; set => Implementation.ApplicationMessageReceivedHandler = value; } + public IMqttClientConnectedHandler ConnectedHandler + { + get => Implementation.ConnectedHandler; + set => Implementation.ConnectedHandler = value; + } + + public IMqttClientDisconnectedHandler DisconnectedHandler + { + get => Implementation.DisconnectedHandler; + set => Implementation.DisconnectedHandler = value; + } + + public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler + { + get => Implementation.ApplicationMessageReceivedHandler; + set => Implementation.ApplicationMessageReceivedHandler = value; + } public Task ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken) { - switch (options) + if (TestContext != null) { - case MqttClientOptionsBuilder builder: - { - var existingClientId = builder.Build().ClientId; - if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName)) - { - builder.WithClientId(TestContext.TestName + existingClientId); - } - } - break; - case MqttClientOptions op: - { - var existingClientId = op.ClientId; - if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName)) - { - op.ClientId = TestContext.TestName + existingClientId; - } - } - break; - default: - break; + var clientOptions = (MqttClientOptions)options; + + var existingClientId = clientOptions.ClientId; + if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName)) + { + clientOptions.ClientId = TestContext.TestName + existingClientId; + } } return Implementation.ConnectAsync(options, cancellationToken); @@ -81,7 +83,7 @@ namespace MQTTnet.Tests.Mockups return Implementation.SendExtendedAuthenticationExchangeDataAsync(data, cancellationToken); } - public Task SubscribeAsync(MqttClientSubscribeOptions options, CancellationToken cancellationToken) + public Task SubscribeAsync(MqttClientSubscribeOptions options, CancellationToken cancellationToken) { return Implementation.SubscribeAsync(options, cancellationToken); } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs index 7f2dcac..be99534 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs @@ -1,27 +1,27 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; using MQTTnet.Client.Options; using MQTTnet.Diagnostics; using MQTTnet.Internal; using MQTTnet.Server; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; namespace MQTTnet.Tests.Mockups { - public class TestEnvironment : Disposable + public sealed class TestEnvironment : Disposable { - private readonly MqttFactory _mqttFactory = new MqttFactory(); - private readonly List _clients = new List(); - private readonly IMqttNetLogger _serverLogger = new MqttNetLogger("server"); - private readonly IMqttNetLogger _clientLogger = new MqttNetLogger("client"); + readonly MqttFactory _mqttFactory = new MqttFactory(); + readonly List _clients = new List(); + readonly IMqttNetLogger _serverLogger = new MqttNetLogger("server"); + readonly IMqttNetLogger _clientLogger = new MqttNetLogger("client"); - private readonly List _serverErrors = new List(); - private readonly List _clientErrors = new List(); + readonly List _serverErrors = new List(); + readonly List _clientErrors = new List(); - private readonly List _exceptions = new List(); + readonly List _exceptions = new List(); public IMqttServer Server { get; private set; } @@ -37,36 +37,42 @@ namespace MQTTnet.Tests.Mockups public TestContext TestContext { get; } + public TestEnvironment() : this(null) + { + } + public TestEnvironment(TestContext testContext) { + TestContext = testContext; + _serverLogger.LogMessagePublished += (s, e) => { - if (e.TraceMessage.Level == MqttNetLogLevel.Error) + if (e.LogMessage.Level == MqttNetLogLevel.Error) { lock (_serverErrors) { - _serverErrors.Add(e.TraceMessage.ToString()); + _serverErrors.Add(e.LogMessage.ToString()); } } }; _clientLogger.LogMessagePublished += (s, e) => { - lock (_clientErrors) + if (e.LogMessage.Level == MqttNetLogLevel.Error) { - if (e.TraceMessage.Level == MqttNetLogLevel.Error) + lock (_clientErrors) { - _clientErrors.Add(e.TraceMessage.ToString()); + _clientErrors.Add(e.LogMessage.ToString()); } } }; - TestContext = testContext; } public IMqttClient CreateClient() { var client = _mqttFactory.CreateMqttClient(_clientLogger); _clients.Add(client); + return new TestClientWrapper(client, TestContext); } @@ -90,15 +96,17 @@ namespace MQTTnet.Tests.Mockups public Task ConnectClientAsync() { - return ConnectClientAsync(new MqttClientOptionsBuilder() ); + return ConnectClientAsync(new MqttClientOptionsBuilder()); } public async Task ConnectClientAsync(MqttClientOptionsBuilder options) { if (options == null) throw new ArgumentNullException(nameof(options)); + options = options.WithTcpServer("localhost", ServerPort); + var client = CreateClient(); - await client.ConnectAsync(options.WithTcpServer("localhost", ServerPort).Build()); + await client.ConnectAsync(options.Build()); return client; } @@ -150,6 +158,7 @@ namespace MQTTnet.Tests.Mockups throw new Exception($"{_exceptions.Count} exceptions tracked.\r\n" + string.Join(Environment.NewLine, _exceptions)); } } + base.Dispose(disposing); } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs index 79607d9..e1878ed 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs @@ -1,20 +1,15 @@ -using System; -using MQTTnet.Diagnostics; +using MQTTnet.Diagnostics; +using System; namespace MQTTnet.Tests.Mockups { - public class TestLogger : IMqttNetLogger, IMqttNetChildLogger + public class TestLogger : IMqttNetLogger { public event EventHandler LogMessagePublished; - IMqttNetChildLogger IMqttNetLogger.CreateChildLogger(string source) + public IMqttNetLogger CreateChildLogger(string source) { - return new MqttNetChildLogger(this, source); - } - - IMqttNetChildLogger IMqttNetChildLogger.CreateChildLogger(string source) - { - return new MqttNetChildLogger(this, source); + return new TestLogger(); } public void Verbose(string message, params object[] parameters) @@ -36,5 +31,10 @@ namespace MQTTnet.Tests.Mockups public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception) { } + + public void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception) + { + throw new NotImplementedException(); + } } } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapterFactory.cs index ff47f71..95fbdfe 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapterFactory.cs @@ -12,8 +12,8 @@ namespace MQTTnet.Tests.Mockups { _adapter = adapter; } - - public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) + + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) { return _adapter; } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs index f990197..86030e8 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs @@ -1,16 +1,16 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client.Publishing; using MQTTnet.Client.Receiving; using MQTTnet.Server; using MQTTnet.Server.Status; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Tests.Mockups { - public class TestServerWrapper : IMqttServer + public sealed class TestServerWrapper : IMqttServer { public TestServerWrapper(IMqttServer implementation, TestContext testContext, TestEnvironment testEnvironment) { @@ -22,16 +22,50 @@ namespace MQTTnet.Tests.Mockups public IMqttServer Implementation { get; } public TestContext TestContext { get; } public TestEnvironment TestEnvironment { get; } - public IMqttServerStartedHandler StartedHandler { get => Implementation.StartedHandler; set => Implementation.StartedHandler = value; } - public IMqttServerStoppedHandler StoppedHandler { get => Implementation.StoppedHandler; set => Implementation.StoppedHandler = value; } - public IMqttServerClientConnectedHandler ClientConnectedHandler { get => Implementation.ClientConnectedHandler; set => Implementation.ClientConnectedHandler = value; } - public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get => Implementation.ClientDisconnectedHandler; set => Implementation.ClientDisconnectedHandler = value; } - public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler { get => Implementation.ClientSubscribedTopicHandler; set => Implementation.ClientSubscribedTopicHandler = value; } - public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler { get => Implementation.ClientUnsubscribedTopicHandler; set => Implementation.ClientUnsubscribedTopicHandler = value; } + + public IMqttServerStartedHandler StartedHandler + { + get => Implementation.StartedHandler; + set => Implementation.StartedHandler = value; + } + + public IMqttServerStoppedHandler StoppedHandler + { + get => Implementation.StoppedHandler; + set => Implementation.StoppedHandler = value; + } + + public IMqttServerClientConnectedHandler ClientConnectedHandler + { + get => Implementation.ClientConnectedHandler; + set => Implementation.ClientConnectedHandler = value; + } + + public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler + { + get => Implementation.ClientDisconnectedHandler; + set => Implementation.ClientDisconnectedHandler = value; + } + + public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler + { + get => Implementation.ClientSubscribedTopicHandler; + set => Implementation.ClientSubscribedTopicHandler = value; + } + + public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler + { + get => Implementation.ClientUnsubscribedTopicHandler; + set => Implementation.ClientUnsubscribedTopicHandler = value; + } public IMqttServerOptions Options => Implementation.Options; - public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get => Implementation.ApplicationMessageReceivedHandler; set => Implementation.ApplicationMessageReceivedHandler = value; } + public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler + { + get => Implementation.ApplicationMessageReceivedHandler; + set => Implementation.ApplicationMessageReceivedHandler = value; + } public Task ClearRetainedApplicationMessagesAsync() { @@ -60,22 +94,14 @@ namespace MQTTnet.Tests.Mockups public Task StartAsync(IMqttServerOptions options) { - switch (options) + if (TestContext != null) { - case MqttServerOptionsBuilder builder: - if (builder.Build().ConnectionValidator == null) - { - builder.WithConnectionValidator(ConnectionValidator); - } - break; - case MqttServerOptions op: - if (op.ConnectionValidator == null) - { - op.ConnectionValidator = new MqttServerConnectionValidatorDelegate(ConnectionValidator); - } - break; - default: - break; + var serverOptions = (MqttServerOptions)options; + + if (serverOptions.ConnectionValidator == null) + { + serverOptions.ConnectionValidator = new MqttServerConnectionValidatorDelegate(ConnectionValidator); + } } return Implementation.StartAsync(options); @@ -85,7 +111,7 @@ namespace MQTTnet.Tests.Mockups { if (!ctx.ClientId.StartsWith(TestContext.TestName)) { - TestEnvironment.TrackException(new InvalidOperationException($"invalid client connected '{ctx.ClientId}'")); + TestEnvironment.TrackException(new InvalidOperationException($"Invalid client ID used ({ctx.ClientId}). It must start with UnitTest name.")); ctx.ReasonCode = Protocol.MqttConnectReasonCode.ClientIdentifierNotValid; } } diff --git a/Tests/MQTTnet.Core.Tests/MqttApplicationMessage_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttApplicationMessage_Tests.cs new file mode 100644 index 0000000..05d0907 --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/MqttApplicationMessage_Tests.cs @@ -0,0 +1,32 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Extensions; +using MQTTnet.Packets; +using System.Collections.Generic; + +namespace MQTTnet.Tests +{ + [TestClass] + public class MqttApplicationMessage_Tests + { + [TestMethod] + public void GetUserProperty_Test() + { + var message = new MqttApplicationMessage + { + UserProperties = new List + { + new MqttUserProperty("foo", "bar"), + new MqttUserProperty("value", "1011"), + new MqttUserProperty("CASE", "insensitive") + } + }; + + Assert.AreEqual("bar", message.GetUserProperty("foo")); + //Assert.AreEqual(1011, message.GetUserProperty("value")); + Assert.AreEqual(null, message.GetUserProperty("case")); + Assert.AreEqual(null, message.GetUserProperty("nonExists")); + //Assert.AreEqual(null, message.GetUserProperty("nonExists")); + //Assert.ThrowsException(() => message.GetUserProperty("nonExists")); + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/MqttClientOptionsBuilder_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttClientOptionsBuilder_Tests.cs new file mode 100644 index 0000000..a482081 --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/MqttClientOptionsBuilder_Tests.cs @@ -0,0 +1,22 @@ +using System.Linq; +using System.Text; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Client.Options; +using MQTTnet.Extensions; + +namespace MQTTnet.Tests +{ + [TestClass] + public class MqttClientOptionsBuilder_Tests + { + [TestMethod] + public void WithConnectionUri_Credential_Test() + { + var options = new MqttClientOptionsBuilder() + .WithConnectionUri("mqtt://user:password@127.0.0.1") + .Build(); + Assert.AreEqual("user", options.Credentials.Username); + Assert.IsTrue(Encoding.UTF8.GetBytes("password").SequenceEqual(options.Credentials.Password)); + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs index b8f4fbc..2b86955 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs @@ -1,9 +1,3 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net.Sockets; -using System.Threading; -using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; using MQTTnet.Client.Connecting; @@ -14,6 +8,12 @@ using MQTTnet.Exceptions; using MQTTnet.Protocol; using MQTTnet.Server; using MQTTnet.Tests.Mockups; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Tests { @@ -29,9 +29,9 @@ namespace MQTTnet.Tests { await testEnvironment.StartServerAsync(); var client = await testEnvironment.ConnectClientAsync(); - + await client.SubscribeAsync("#"); - + var replyReceived = false; client.UseApplicationMessageReceivedHandler(c => @@ -78,7 +78,7 @@ namespace MQTTnet.Tests } }); - client2.UseApplicationMessageReceivedHandler(async c =>{ await client2.PublishAsync("reply", null, MqttQualityOfServiceLevel.AtLeastOnce); }); + client2.UseApplicationMessageReceivedHandler(async c => { await client2.PublishAsync("reply", null, MqttQualityOfServiceLevel.AtLeastOnce); }); await client1.PublishAsync("request", null, MqttQualityOfServiceLevel.AtLeastOnce); @@ -181,7 +181,7 @@ namespace MQTTnet.Tests catch { } - + SpinWait.SpinUntil(() => tries >= maxTries, 10000); Assert.AreEqual(maxTries, tries); @@ -215,7 +215,7 @@ namespace MQTTnet.Tests Assert.AreEqual((ushort)4, result.PacketIdentifier); } } - + [TestMethod] public async Task Invalid_Connect_Throws_Exception() { @@ -558,6 +558,8 @@ namespace MQTTnet.Tests clients.Add(await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a"))); } + await Task.Delay(500); + var clientStatus = await testEnvironment.Server.GetClientStatusAsync(); var sessionStatus = await testEnvironment.Server.GetSessionStatusAsync(); @@ -565,7 +567,7 @@ namespace MQTTnet.Tests { Assert.IsFalse(clients[i].IsConnected); } - + Assert.IsTrue(clients[99].IsConnected); Assert.AreEqual(1, clientStatus.Count); @@ -583,7 +585,7 @@ namespace MQTTnet.Tests var sendClient = await testEnvironment.ConnectClientAsync(); await sendClient.PublishAsync("x", "1"); - await Task.Delay(100); + await Task.Delay(250); Assert.AreEqual("1", receivedPayload); } diff --git a/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs index 740980d..5ec2f9f 100644 --- a/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs @@ -18,13 +18,13 @@ namespace MQTTnet.Tests // This test compares // 1. correct logID var logId = "logId"; - string invalidLogId = null; + var hasInvalidLogId = false; // 2. if the total log calls are the same for global and local //var globalLogCount = 0; var localLogCount = 0; - var logger = new MqttNetLogger(logId); + var logger = new MqttNetLogger(null, logId); // TODO: This is commented out because it is affected by other tests. //// we have a theoretical bug here if a concurrent test is also logging @@ -42,9 +42,9 @@ namespace MQTTnet.Tests logger.LogMessagePublished += (s, e) => { - if (e.TraceMessage.LogId != logId) + if (e.LogMessage.LogId != logId) { - invalidLogId = e.TraceMessage.LogId; + hasInvalidLogId = true; } Interlocked.Increment(ref localLogCount); @@ -72,7 +72,9 @@ namespace MQTTnet.Tests //MqttNetGlobalLogger.LogMessagePublished -= globalLog; } - Assert.IsNull(invalidLogId); + await Task.Delay(500); + + Assert.IsFalse(hasInvalidLogId); Assert.AreNotEqual(0, localLogCount); } } diff --git a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs index fe1a0c5..73ed846 100644 --- a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs @@ -1,12 +1,12 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Adapter; using MQTTnet.Diagnostics; using MQTTnet.Server; using MQTTnet.Server.Status; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Tests { @@ -23,7 +23,7 @@ namespace MQTTnet.Tests counter++; return Task.CompletedTask; }, - new MqttNetLogger().CreateChildLogger()); + new MqttNetLogger()); Assert.AreEqual(0, counter); @@ -46,7 +46,7 @@ namespace MQTTnet.Tests counter++; return Task.CompletedTask; }, - new MqttNetLogger().CreateChildLogger()); + new MqttNetLogger()); Assert.AreEqual(0, counter); diff --git a/Tests/MQTTnet.Core.Tests/MqttNetLogger_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttNetLogger_Tests.cs new file mode 100644 index 0000000..8a36ff3 --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/MqttNetLogger_Tests.cs @@ -0,0 +1,67 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Diagnostics; + +namespace MQTTnet.Tests +{ + [TestClass] + public class MqttNetLogger_Tests + { + [TestMethod] + public void Root_Log_Messages() + { + var logger = new MqttNetLogger(); + + var logMessagesCount = 0; + + logger.LogMessagePublished += (s, e) => + { + logMessagesCount++; + }; + + logger.Verbose("Verbose"); + logger.Info("Info"); + logger.Warning(null, "Warning"); + logger.Error(null, "Error"); + + Assert.AreEqual(4, logMessagesCount); + } + + [TestMethod] + public void Bubbling_Log_Messages() + { + var logger = new MqttNetLogger(); + var childLogger = logger.CreateChildLogger("Source1"); + + var logMessagesCount = 0; + + logger.LogMessagePublished += (s, e) => + { + logMessagesCount++; + }; + + childLogger.Verbose("Verbose"); + childLogger.Info("Info"); + childLogger.Warning(null, "Warning"); + childLogger.Error(null, "Error"); + + Assert.AreEqual(4, logMessagesCount); + } + + [TestMethod] + public void Set_Custom_Log_ID() + { + var logger = new MqttNetLogger(null, "logId"); + var childLogger = logger.CreateChildLogger("Source1"); + + logger.LogMessagePublished += (s, e) => + { + Assert.AreEqual("logId", e.LogMessage.LogId); + }; + + childLogger.Verbose("Verbose"); + childLogger.Info("Info"); + childLogger.Warning(null, "Warning"); + childLogger.Error(null, "Error"); + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/MqttTcpChannel_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttTcpChannel_Tests.cs index a4b0ca7..6e78dec 100644 --- a/Tests/MQTTnet.Core.Tests/MqttTcpChannel_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttTcpChannel_Tests.cs @@ -1,10 +1,10 @@ -using System; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Implementations; +using System; using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using MQTTnet.Implementations; namespace MQTTnet.Tests { @@ -15,7 +15,7 @@ namespace MQTTnet.Tests public async Task Dispose_Channel_While_Used() { var ct = new CancellationTokenSource(); - var serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + var serverSocket = new CrossPlatformSocket(AddressFamily.InterNetwork); try { @@ -28,18 +28,18 @@ namespace MQTTnet.Tests { while (!ct.IsCancellationRequested) { - var client = await PlatformAbstractionLayer.AcceptAsync(serverSocket); + var client = await serverSocket.AcceptAsync(); var data = new byte[] { 128 }; - await PlatformAbstractionLayer.SendAsync(client, new ArraySegment(data), SocketFlags.None); + await client.SendAsync(new ArraySegment(data), SocketFlags.None); } }, ct.Token); - var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - await PlatformAbstractionLayer.ConnectAsync(clientSocket, IPAddress.Loopback, 50001); + var clientSocket = new CrossPlatformSocket(AddressFamily.InterNetwork); + await clientSocket.ConnectAsync("localhost", 50001, CancellationToken.None); await Task.Delay(100, ct.Token); - var tcpChannel = new MqttTcpChannel(new NetworkStream(clientSocket, true), "test", null); + var tcpChannel = new MqttTcpChannel(clientSocket.GetStream(), "test", null); var buffer = new byte[1]; await tcpChannel.ReadAsync(buffer, 0, 1, ct.Token); diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index 6923c69..e40eba0 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -904,14 +904,13 @@ namespace MQTTnet.Tests await testEnvironment.StartServerAsync(serverOptions); - var connectingFailedException = await Assert.ThrowsExceptionAsync(() => testEnvironment.ConnectClientAsync()); Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode); } } + Dictionary _connected; - private Dictionary _connected; private void ConnectionValidationHandler(MqttConnectionValidatorContext eventArgs) { if (_connected.ContainsKey(eventArgs.ClientId)) @@ -919,6 +918,7 @@ namespace MQTTnet.Tests eventArgs.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } + _connected[eventArgs.ClientId] = true; eventArgs.ReasonCode = MqttConnectReasonCode.Success; return; @@ -1016,7 +1016,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Same_Client_Id_Connect_Disconnect_Event_Order() { - using (var testEnvironment = new TestEnvironment(TestContext)) + using (var testEnvironment = new TestEnvironment()) { var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder()); @@ -1038,11 +1038,11 @@ namespace MQTTnet.Tests } }); - var clientOptions = new MqttClientOptionsBuilder() - .WithClientId("same_id"); + var clientOptionsBuilder = new MqttClientOptionsBuilder() + .WithClientId(Guid.NewGuid().ToString()); // c - var c1 = await testEnvironment.ConnectClientAsync(clientOptions); + var c1 = await testEnvironment.ConnectClientAsync(clientOptionsBuilder); await Task.Delay(500); @@ -1051,7 +1051,13 @@ namespace MQTTnet.Tests // dc // Connect client with same client ID. Should disconnect existing client. - var c2 = await testEnvironment.ConnectClientAsync(clientOptions); + var c2 = await testEnvironment.ConnectClientAsync(clientOptionsBuilder); + + await Task.Delay(500); + + flow = string.Join(string.Empty, events); + + Assert.AreEqual("cdc", flow); c2.UseApplicationMessageReceivedHandler(_ => { @@ -1061,15 +1067,10 @@ namespace MQTTnet.Tests } }); - c2.SubscribeAsync("topic").Wait(); - - await Task.Delay(500); - - flow = string.Join(string.Empty, events); - Assert.AreEqual("cdc", flow); + await c2.SubscribeAsync("topic"); // r - c2.PublishAsync("topic").Wait(); + await c2.PublishAsync("topic"); await Task.Delay(500); @@ -1149,15 +1150,15 @@ namespace MQTTnet.Tests { await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1))); - var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - await PlatformAbstractionLayer.ConnectAsync(client, "localhost", testEnvironment.ServerPort); + var client = new CrossPlatformSocket(AddressFamily.InterNetwork); + await client.ConnectAsync("localhost", testEnvironment.ServerPort, CancellationToken.None); // Don't send anything. The server should close the connection. await Task.Delay(TimeSpan.FromSeconds(3)); try { - var receivedBytes = await PlatformAbstractionLayer.ReceiveAsync(client, new ArraySegment(new byte[10]), SocketFlags.Partial); + var receivedBytes = await client.ReceiveAsync(new ArraySegment(new byte[10]), SocketFlags.Partial); if (receivedBytes == 0) { return; @@ -1180,17 +1181,17 @@ namespace MQTTnet.Tests // Send an invalid packet and ensure that the server will close the connection and stay in a waiting state // forever. This is security related. - var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - await PlatformAbstractionLayer.ConnectAsync(client, "localhost", testEnvironment.ServerPort); + var client = new CrossPlatformSocket(AddressFamily.InterNetwork); + await client.ConnectAsync("localhost", testEnvironment.ServerPort, CancellationToken.None); var buffer = Encoding.UTF8.GetBytes("Garbage"); - client.Send(buffer, buffer.Length, SocketFlags.None); + await client.SendAsync(new ArraySegment(buffer), SocketFlags.None); await Task.Delay(TimeSpan.FromSeconds(3)); try { - var receivedBytes = await PlatformAbstractionLayer.ReceiveAsync(client, new ArraySegment(new byte[10]), SocketFlags.Partial); + var receivedBytes = await client.ReceiveAsync(new ArraySegment(new byte[10]), SocketFlags.Partial); if (receivedBytes == 0) { return; diff --git a/Tests/MQTTnet.Core.Tests/Session_Tests.cs b/Tests/MQTTnet.Core.Tests/Session_Tests.cs index 073f272..973fbf3 100644 --- a/Tests/MQTTnet.Core.Tests/Session_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Session_Tests.cs @@ -1,10 +1,11 @@ -using System.Text; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; using MQTTnet.Client.Subscribing; using MQTTnet.Server; using MQTTnet.Tests.Mockups; +using System.Linq; +using System.Text; +using System.Threading.Tasks; namespace MQTTnet.Tests { @@ -42,7 +43,7 @@ namespace MQTTnet.Tests string receivedPayload = null; var client = await testEnvironment.ConnectClientAsync(); - client.UseApplicationMessageReceivedHandler(delegate(MqttApplicationMessageReceivedEventArgs args) + client.UseApplicationMessageReceivedHandler(delegate (MqttApplicationMessageReceivedEventArgs args) { receivedPayload = args.ApplicationMessage.ConvertPayloadToString(); }); @@ -59,5 +60,29 @@ namespace MQTTnet.Tests Assert.AreEqual("Hello World", receivedPayload); } } + + [TestMethod] + public async Task Get_Session_Items_In_Status() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + var serverOptions = new MqttServerOptionsBuilder() + .WithConnectionValidator(delegate (MqttConnectionValidatorContext context) + { + // Don't validate anything. Just set some session items. + context.SessionItems["can_subscribe_x"] = true; + context.SessionItems["default_payload"] = "Hello World"; + }); + + await testEnvironment.StartServerAsync(serverOptions); + + var client = await testEnvironment.ConnectClientAsync(); + + var sessionStatus = await testEnvironment.Server.GetSessionStatusAsync(); + var session = sessionStatus.First(); + + Assert.AreEqual(true, session.Items["can_subscribe_x"]); + } + } } } diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj b/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj index b5208f6..69e9f4f 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj +++ b/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj @@ -10,8 +10,8 @@ - - + + diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index 777c6c0..9b2cbeb 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -17,7 +17,7 @@ namespace MQTTnet.TestApp.NetCore { public static void Main() { - Console.WriteLine($"MQTTnet - TestApp.{TargetFrameworkInfoProvider.TargetFramework}"); + Console.WriteLine($"MQTTnet - TestApp.{TargetFrameworkProvider.TargetFramework}"); Console.WriteLine("1 = Start client"); Console.WriteLine("2 = Start server"); Console.WriteLine("3 = Start performance test"); diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj index cbb25f3..f7e4223 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj @@ -147,7 +147,7 @@ - 6.2.8 + 6.2.10 4.0.0 diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 94abc41..beb07ff 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -1,12 +1,4 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.ObjectModel; -using System.Text; -using System.Threading.Tasks; -using Windows.Security.Cryptography.Certificates; -using Windows.UI.Core; -using Windows.UI.Xaml; -using MQTTnet.Client; +using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; @@ -14,14 +6,22 @@ using MQTTnet.Diagnostics; using MQTTnet.Exceptions; using MQTTnet.Extensions.ManagedClient; using MQTTnet.Extensions.Rpc; +using MQTTnet.Extensions.WebSocket4Net; using MQTTnet.Formatter; using MQTTnet.Implementations; using MQTTnet.Protocol; using MQTTnet.Server; using MQTTnet.Server.Status; +using System; +using System.Collections.Concurrent; +using System.Collections.ObjectModel; +using System.Text; +using System.Threading.Tasks; +using Windows.Security.Cryptography.Certificates; +using Windows.UI.Core; +using Windows.UI.Xaml; using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs; using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs; -using MQTTnet.Extensions.WebSocket4Net; namespace MQTTnet.TestApp.UniversalWindows { @@ -141,7 +141,7 @@ namespace MQTTnet.TestApp.UniversalWindows Password = Encoding.UTF8.GetBytes(Password.Text) }; } - + options.CleanSession = CleanSession.IsChecked == true; options.KeepAlivePeriod = TimeSpan.FromSeconds(double.Parse(KeepAliveInterval.Text)); @@ -198,16 +198,26 @@ namespace MQTTnet.TestApp.UniversalWindows private void OnDisconnected(MqttClientDisconnectedEventArgs e) { - _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1, - "", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null)); + _traceMessages.Enqueue(new MqttNetLogMessage + { + Timestamp = DateTime.UtcNow, + ThreadId = -1, + Level = MqttNetLogLevel.Info, + Message = "! DISCONNECTED EVENT FIRED", + }); Task.Run(UpdateLogAsync); } private void OnConnected(MqttClientConnectedEventArgs e) { - _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1, - "", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null)); + _traceMessages.Enqueue(new MqttNetLogMessage + { + Timestamp = DateTime.UtcNow, + ThreadId = -1, + Level = MqttNetLogLevel.Info, + Message = "! CONNECTED EVENT FIRED", + }); Task.Run(UpdateLogAsync); } @@ -538,7 +548,7 @@ namespace MQTTnet.TestApp.UniversalWindows { //... } - + client.UseApplicationMessageReceivedHandler(e => Handler(e)); // Subscribe after connect @@ -614,7 +624,7 @@ namespace MQTTnet.TestApp.UniversalWindows }; } } - + // ---------------------------------- { var options = new MqttServerOptions();