Browse Source

Merge branch 'master' into feature-isconnected

release/3.x.x
Christian 4 years ago
committed by GitHub
parent
commit
b14d7affe5
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
86 changed files with 1647 additions and 814 deletions
  1. +5
    -5
      Build/MQTTnet.AspNetCore.nuspec
  2. +1
    -1
      Build/MQTTnet.Extensions.ManagedClient.nuspec
  3. +1
    -1
      Build/MQTTnet.Extensions.Rpc.nuspec
  4. +1
    -1
      Build/MQTTnet.Extensions.WebSocket4Net.nuspec
  5. +1
    -1
      Build/MQTTnet.NETStandard.nuspec
  6. +10
    -4
      Build/MQTTnet.nuspec
  7. +4
    -2
      MQTTnet.sln
  8. +4
    -5
      Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs
  9. +9
    -9
      Source/MQTTnet.AspnetCore/Extensions/ServiceCollectionExtensions.cs
  10. +4
    -4
      Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj
  11. +7
    -7
      Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs
  12. +9
    -9
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  13. +4
    -4
      Source/MQTTnet.Extensions.ManagedClient/MqttFactoryExtensions.cs
  14. +12
    -12
      Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs
  15. +43
    -43
      Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs
  16. +25
    -11
      Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs
  17. +4
    -2
      Source/MQTTnet.Server/MQTTnet.Server.csproj
  18. +12
    -12
      Source/MQTTnet.Server/Mqtt/MqttServerService.cs
  19. +1
    -1
      Source/MQTTnet/Adapter/IMqttClientAdapterFactory.cs
  20. +3
    -3
      Source/MQTTnet/Adapter/MqttChannelAdapter.cs
  21. +5
    -5
      Source/MQTTnet/Client/MqttClient.cs
  22. +8
    -8
      Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs
  23. +0
    -17
      Source/MQTTnet/Diagnostics/IMqttNetChildLogger.cs
  24. +2
    -2
      Source/MQTTnet/Diagnostics/IMqttNetLogger.cs
  25. +0
    -51
      Source/MQTTnet/Diagnostics/MqttNetChildLogger.cs
  26. +7
    -18
      Source/MQTTnet/Diagnostics/MqttNetLogMessage.cs
  27. +5
    -0
      Source/MQTTnet/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs
  28. +52
    -9
      Source/MQTTnet/Diagnostics/MqttNetLogger.cs
  29. +35
    -0
      Source/MQTTnet/Diagnostics/MqttNetLoggerExtensions.cs
  30. +1
    -1
      Source/MQTTnet/Diagnostics/TargetFrameworkProvider.cs
  31. +48
    -0
      Source/MQTTnet/Extensions/MqttClientOptionsBuilderExtension.cs
  32. +16
    -0
      Source/MQTTnet/Extensions/UserPropertyExtension.cs
  33. +0
    -15
      Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs
  34. +229
    -0
      Source/MQTTnet/Implementations/CrossPlatformSocket.cs
  35. +4
    -4
      Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs
  36. +18
    -20
      Source/MQTTnet/Implementations/MqttTcpChannel.cs
  37. +6
    -6
      Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs
  38. +22
    -21
      Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
  39. +28
    -28
      Source/MQTTnet/Implementations/MqttTcpServerListener.cs
  40. +1
    -86
      Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs
  41. +3
    -3
      Source/MQTTnet/Internal/TaskExtensions.cs
  42. +19
    -0
      Source/MQTTnet/LowLevelClient/ILowLevelMqttClient.cs
  43. +128
    -0
      Source/MQTTnet/LowLevelClient/LowLevelMqttClient.cs
  44. +1
    -1
      Source/MQTTnet/MQTTnet.csproj
  45. +37
    -9
      Source/MQTTnet/MqttFactory.cs
  46. +3
    -3
      Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs
  47. +53
    -42
      Source/MQTTnet/Server/MqttClientConnection.cs
  48. +12
    -12
      Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs
  49. +7
    -6
      Source/MQTTnet/Server/MqttClientSession.cs
  50. +52
    -43
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  51. +9
    -9
      Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
  52. +6
    -6
      Source/MQTTnet/Server/MqttRetainedMessagesManager.cs
  53. +9
    -9
      Source/MQTTnet/Server/MqttServer.cs
  54. +59
    -31
      Source/MQTTnet/Server/MqttServerEventDispatcher.cs
  55. +4
    -4
      Source/MQTTnet/Server/Status/IMqttClientStatus.cs
  56. +6
    -3
      Source/MQTTnet/Server/Status/IMqttSessionStatus.cs
  57. +4
    -4
      Source/MQTTnet/Server/Status/MqttClientStatus.cs
  58. +7
    -4
      Source/MQTTnet/Server/Status/MqttSessionStatus.cs
  59. +9
    -8
      Source/MQTTnet/TopicFilter.cs
  60. +3
    -3
      Tests/MQTTnet.AspNetCore.Tests/MQTTnet.AspNetCore.Tests.csproj
  61. +2
    -2
      Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs
  62. +2
    -2
      Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj
  63. +3
    -3
      Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs
  64. +1
    -1
      Tests/MQTTnet.Benchmarks/Program.cs
  65. +74
    -0
      Tests/MQTTnet.Core.Tests/CrossPlatformSocket_Tests.cs
  66. +111
    -0
      Tests/MQTTnet.Core.Tests/LowLevelMqttClient_Tests.cs
  67. +3
    -3
      Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj
  68. +4
    -4
      Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs
  69. +31
    -29
      Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs
  70. +30
    -21
      Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs
  71. +10
    -10
      Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs
  72. +2
    -2
      Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapterFactory.cs
  73. +55
    -29
      Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs
  74. +32
    -0
      Tests/MQTTnet.Core.Tests/MqttApplicationMessage_Tests.cs
  75. +22
    -0
      Tests/MQTTnet.Core.Tests/MqttClientOptionsBuilder_Tests.cs
  76. +15
    -13
      Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs
  77. +7
    -5
      Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs
  78. +7
    -7
      Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs
  79. +67
    -0
      Tests/MQTTnet.Core.Tests/MqttNetLogger_Tests.cs
  80. +9
    -9
      Tests/MQTTnet.Core.Tests/MqttTcpChannel_Tests.cs
  81. +22
    -21
      Tests/MQTTnet.Core.Tests/Server_Tests.cs
  82. +29
    -4
      Tests/MQTTnet.Core.Tests/Session_Tests.cs
  83. +2
    -2
      Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj
  84. +1
    -1
      Tests/MQTTnet.TestApp.NetCore/Program.cs
  85. +1
    -1
      Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj
  86. +27
    -17
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 5
- 5
Build/MQTTnet.AspNetCore.nuspec View File

@@ -11,15 +11,15 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is a support library to integrate MQTTnet into AspNetCore.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="$nugetVersion" />

<dependency id="Microsoft.AspNetCore.Connections.Abstractions" version="2.1.3" />
<dependency id="Microsoft.AspNetCore.Http.Connections" version="1.0.3" />
<dependency id="Microsoft.AspNetCore.WebSockets" version="2.1.1" />
<dependency id="Microsoft.Extensions.Hosting.Abstractions" version="2.1.1" />
<dependency id="Microsoft.AspNetCore.Connections.Abstractions" version="3.1.3" />
<dependency id="Microsoft.AspNetCore.Http.Connections" version="1.1.0" />
<dependency id="Microsoft.AspNetCore.WebSockets" version="2.2.1" />
<dependency id="Microsoft.Extensions.Hosting.Abstractions" version="3.1.3" />
</dependencies>
</metadata>



+ 1
- 1
Build/MQTTnet.Extensions.ManagedClient.nuspec View File

@@ -11,7 +11,7 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is an extension library which provides a managed MQTT client with additional features using MQTTnet.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="$nugetVersion" />


+ 1
- 1
Build/MQTTnet.Extensions.Rpc.nuspec View File

@@ -11,7 +11,7 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is an extension library which allows executing synchronous device calls including a response using MQTTnet.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="$nugetVersion" />


+ 1
- 1
Build/MQTTnet.Extensions.WebSocket4Net.nuspec View File

@@ -11,7 +11,7 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is an extension library which allows using _WebSocket4Net_ as transport for MQTTnet clients.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="$nugetVersion" />


+ 1
- 1
Build/MQTTnet.NETStandard.nuspec View File

@@ -11,7 +11,7 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This package contains the .NET Standard version of MQTTnet only.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<group targetFramework="netstandard1.3">


+ 10
- 4
Build/MQTTnet.nuspec View File

@@ -12,17 +12,23 @@
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol.</description>
<releaseNotes>
* [All] Due to a merge issue not all changes are included in 3.0.8. All these changes are now included in this version.
* [Core] Updated all nuget references.
* [Core] Added MqttApplicationMessage.GetUserProperty() convenience method (thanks to @PMExtra).
* [LowLevelMqttClient] Added low level MQTT client in order to provide more flexibility when using the MQTT protocol. This client requires detailed knowledge about the MQTT protocol.
* [Client] Improve connection stability (thanks to @jltjohanlindqvist).
* [Client] Support WithConnectionUri to configure client (thanks to @PMExtra).
* [ManagedClient] Added builder class for MqttClientUnsubscribeOptions (thanks to @dominikviererbe).
* [ManagedClient] Added support for persisted sessions (thansk to @PMExtra).
* [Client] Improve connection stability (thanks to @jltjohanlindqvist).
* [ManagedClient] Fixed a memory leak (thanks to @zawodskoj).
* [ManagedClient] Improved internal subscription management (#569, thanks to @cstichlberger).
* [ManagedClient] Refactored log messages (thanks to @cstichlberger).
* [Server] Added support for assigned client IDs (MQTTv5 only) (thanks to @bcrosnier).
* [Server] Added interceptor for unsubscriptions.
* [MQTTnet.Server] Added interceptor for unsubscriptions.
* [Server] Removed exceptions when user properties are set with MQTT protocol version 3.1
* [Server] Added custom session items to the client status.
* [Server] Added option to check whether the server is already started properly or not.
* [MQTTnet.AspNetCore] improved compatibility with AspNetCore 3.1
* [MQTTnet.Server] Added option to check whether the server is already started properly or not.
* [MQTTnet.Server] Added interceptor for unsubscriptions.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
@@ -40,7 +46,7 @@
<dependency id="System.Net.WebSockets.Client" version="4.3.2" />
</group>
<group targetFramework="uap10.0">
<dependency id="Microsoft.NETCore.UniversalWindowsPlatform" version="6.2.8" />
<dependency id="Microsoft.NETCore.UniversalWindowsPlatform" version="6.2.10" />
</group>
</dependencies>
</metadata>


+ 4
- 2
MQTTnet.sln View File

@@ -53,6 +53,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Server", "Source\MQ
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.WebSocket4Net", "Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj", "{2BD01D53-4CA5-4142-BE8D-313876395E3E}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Apps", "Apps", "{A56E3128-1639-4F31-873A-325E14BB6295}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -270,7 +272,7 @@ Global
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{FF1F72D6-9524-4422-9497-3CC0002216ED} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{FF1F72D6-9524-4422-9497-3CC0002216ED} = {A56E3128-1639-4F31-873A-325E14BB6295}
{3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB}
{3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
@@ -279,7 +281,7 @@ Global
{998D04DD-7CB0-45F5-A393-E2495C16399E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{C400533A-8EBA-4F0B-BF4D-295C3708604B} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587}
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{5699FB8C-838C-4AB0-80A5-9CA809F9B65B} = {32A630A7-2598-41D7-B625-204CD906F5FB}
{5699FB8C-838C-4AB0-80A5-9CA809F9B65B} = {A56E3128-1639-4F31-873A-325E14BB6295}
{2BD01D53-4CA5-4142-BE8D-313876395E3E} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution


+ 4
- 5
Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs View File

@@ -1,17 +1,16 @@
using System;
using System.Net;
using MQTTnet.Adapter;
using MQTTnet.Adapter;
using MQTTnet.AspNetCore.Client.Tcp;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using System;
using System.Net;

namespace MQTTnet.AspNetCore.Client
{
public class MqttClientConnectionContextFactory : IMqttClientAdapterFactory
{
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger)
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger)
{
if (options == null) throw new ArgumentNullException(nameof(options));



+ 9
- 9
Source/MQTTnet.AspnetCore/Extensions/ServiceCollectionExtensions.cs View File

@@ -1,10 +1,10 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Server;
using MQTTnet.Implementations;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet.Server;
using System;

namespace MQTTnet.AspNetCore
{
@@ -13,7 +13,7 @@ namespace MQTTnet.AspNetCore
public static IServiceCollection AddHostedMqttServer(this IServiceCollection services, IMqttServerOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
services.AddSingleton(options);

services.AddHostedMqttServer();
@@ -23,7 +23,8 @@ namespace MQTTnet.AspNetCore

public static IServiceCollection AddHostedMqttServer(this IServiceCollection services, Action<MqttServerOptionsBuilder> configure)
{
services.AddSingleton<IMqttServerOptions>(s => {
services.AddSingleton<IMqttServerOptions>(s =>
{
var builder = new MqttServerOptionsBuilder();
configure(builder);
return builder.Build();
@@ -36,7 +37,8 @@ namespace MQTTnet.AspNetCore

public static IServiceCollection AddHostedMqttServerWithServices(this IServiceCollection services, Action<AspNetMqttServerOptionsBuilder> configure)
{
services.AddSingleton<IMqttServerOptions>(s => {
services.AddSingleton<IMqttServerOptions>(s =>
{
var builder = new AspNetMqttServerOptionsBuilder(s);
configure(builder);
return builder.Build();
@@ -60,14 +62,12 @@ namespace MQTTnet.AspNetCore
private static IServiceCollection AddHostedMqttServer(this IServiceCollection services)
{
var logger = new MqttNetLogger();
var childLogger = logger.CreateChildLogger();

services.AddSingleton<IMqttNetLogger>(logger);
services.AddSingleton(childLogger);
services.AddSingleton<MqttHostedServer>();
services.AddSingleton<IHostedService>(s => s.GetService<MqttHostedServer>());
services.AddSingleton<IMqttServer>(s => s.GetService<MqttHostedServer>());
return services;
}



+ 4
- 4
Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj View File

@@ -21,10 +21,10 @@
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' != 'netcoreapp3.1'">
<PackageReference Include="Microsoft.AspNetCore.Connections.Abstractions" Version="2.1.3" />
<PackageReference Include="Microsoft.AspNetCore.Http.Connections" Version="1.0.3" />
<PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.1.1" />
<PackageReference Include="Microsoft.AspNetCore.Connections.Abstractions" Version="3.1.3" />
<PackageReference Include="Microsoft.AspNetCore.Http.Connections" Version="1.1.0" />
<PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="2.2.1" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.3" />
</ItemGroup>

<ItemGroup>


+ 7
- 7
Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs View File

@@ -1,20 +1,20 @@
using System;
using System.Net.WebSockets;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using MQTTnet.Implementations;
using MQTTnet.Server;
using System;
using System.Net.WebSockets;
using System.Threading.Tasks;

namespace MQTTnet.AspNetCore
{
public class MqttWebSocketServerAdapter : IMqttServerAdapter
{
private readonly IMqttNetChildLogger _logger;
private readonly IMqttNetLogger _logger;

public MqttWebSocketServerAdapter(IMqttNetChildLogger logger)
public MqttWebSocketServerAdapter(IMqttNetLogger logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));

@@ -38,7 +38,7 @@ namespace MQTTnet.AspNetCore
if (webSocket == null) throw new ArgumentNullException(nameof(webSocket));

var endpoint = $"{httpContext.Connection.RemoteIpAddress}:{httpContext.Connection.RemotePort}";
var clientCertificate = await httpContext.Connection.GetClientCertificateAsync().ConfigureAwait(false);
try
{


+ 9
- 9
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -1,9 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Publishing;
@@ -13,6 +8,11 @@ using MQTTnet.Exceptions;
using MQTTnet.Internal;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Extensions.ManagedClient
{
@@ -33,7 +33,7 @@ namespace MQTTnet.Extensions.ManagedClient
private readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0);

private readonly IMqttClient _mqttClient;
private readonly IMqttNetChildLogger _logger;
private readonly IMqttNetLogger _logger;

private readonly AsyncLock _messageQueueLock = new AsyncLock();

@@ -42,8 +42,8 @@ namespace MQTTnet.Extensions.ManagedClient
private Task _maintainConnectionTask;

private ManagedMqttClientStorageManager _storageManager;
public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
{
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));



+ 4
- 4
Source/MQTTnet.Extensions.ManagedClient/MqttFactoryExtensions.cs View File

@@ -1,5 +1,5 @@
using System;
using MQTTnet.Diagnostics;
using MQTTnet.Diagnostics;
using System;

namespace MQTTnet.Extensions.ManagedClient
{
@@ -9,7 +9,7 @@ namespace MQTTnet.Extensions.ManagedClient
{
if (factory == null) throw new ArgumentNullException(nameof(factory));

return new ManagedMqttClient(factory.CreateMqttClient(), factory.DefaultLogger.CreateChildLogger());
return new ManagedMqttClient(factory.CreateMqttClient(), factory.DefaultLogger);
}

public static IManagedMqttClient CreateManagedMqttClient(this IMqttFactory factory, IMqttNetLogger logger)
@@ -17,7 +17,7 @@ namespace MQTTnet.Extensions.ManagedClient
if (factory == null) throw new ArgumentNullException(nameof(factory));
if (logger == null) throw new ArgumentNullException(nameof(logger));

return new ManagedMqttClient(factory.CreateMqttClient(logger), logger.CreateChildLogger());
return new ManagedMqttClient(factory.CreateMqttClient(logger), logger);
}
}
}

+ 12
- 12
Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs View File

@@ -1,15 +1,15 @@
using System;
using MQTTnet.Adapter;
using MQTTnet.Adapter;
using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using MQTTnet.Implementations;
using System;

namespace MQTTnet.Extensions.WebSocket4Net
{
public class WebSocket4NetMqttClientAdapterFactory : IMqttClientAdapterFactory
{
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger)
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger)
{
if (options == null) throw new ArgumentNullException(nameof(options));
if (logger == null) throw new ArgumentNullException(nameof(logger));
@@ -17,19 +17,19 @@ namespace MQTTnet.Extensions.WebSocket4Net
switch (options.ChannelOptions)
{
case MqttClientTcpOptions _:
{
return new MqttChannelAdapter(new MqttTcpChannel(options), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger);
}
{
return new MqttChannelAdapter(new MqttTcpChannel(options), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger);
}

case MqttClientWebSocketOptions webSocketOptions:
{
return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options, webSocketOptions), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger);
}
{
return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options, webSocketOptions), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger);
}

default:
{
throw new NotSupportedException();
}
{
throw new NotSupportedException();
}
}
}
}


+ 43
- 43
Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs View File

@@ -1,43 +1,43 @@
using System;
using MQTTnet.Diagnostics;
namespace MQTTnet.Server.Logging
{
public class MqttNetChildLoggerWrapper : IMqttNetChildLogger
{
private readonly MqttNetLoggerWrapper _logger;
private readonly string _source;
public MqttNetChildLoggerWrapper(string source, MqttNetLoggerWrapper logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_source = source;
}
public IMqttNetChildLogger CreateChildLogger(string source = null)
{
return _logger.CreateChildLogger(source);
}
public void Verbose(string message, params object[] parameters)
{
_logger.Publish(MqttNetLogLevel.Verbose, _source, message, parameters, null);
}
public void Info(string message, params object[] parameters)
{
_logger.Publish(MqttNetLogLevel.Info, _source, message, parameters, null);
}
public void Warning(Exception exception, string message, params object[] parameters)
{
_logger.Publish(MqttNetLogLevel.Warning, _source, message, parameters, exception);
}
public void Error(Exception exception, string message, params object[] parameters)
{
_logger.Publish(MqttNetLogLevel.Error, _source, message, parameters, exception);
}
}
}
//using MQTTnet.Diagnostics;
//using System;
//namespace MQTTnet.Server.Logging
//{
// public class MqttNetChildLoggerWrapper : IMqttNetChildLogger
// {
// private readonly MqttNetLoggerWrapper _logger;
// private readonly string _source;
// public MqttNetChildLoggerWrapper(string source, MqttNetLoggerWrapper logger)
// {
// _logger = logger ?? throw new ArgumentNullException(nameof(logger));
// _source = source;
// }
// public IMqttNetLogger CreateChildLogger(string source = null)
// {
// return _logger.CreateChildLogger(source);
// }
// public void Verbose(string message, params object[] parameters)
// {
// _logger.Publish(MqttNetLogLevel.Verbose, _source, message, parameters, null);
// }
// public void Info(string message, params object[] parameters)
// {
// _logger.Publish(MqttNetLogLevel.Info, _source, message, parameters, null);
// }
// public void Warning(Exception exception, string message, params object[] parameters)
// {
// _logger.Publish(MqttNetLogLevel.Warning, _source, message, parameters, exception);
// }
// public void Error(Exception exception, string message, params object[] parameters)
// {
// _logger.Publish(MqttNetLogLevel.Error, _source, message, parameters, exception);
// }
// }
//}

+ 25
- 11
Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs View File

@@ -1,13 +1,13 @@
using System;
using System.Threading;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging;
using MQTTnet.Diagnostics;
using System;
using System.Threading;

namespace MQTTnet.Server.Logging
{
public class MqttNetLoggerWrapper : IMqttNetLogger
{
private readonly ILogger<MqttServer> _logger;
readonly ILogger<MqttServer> _logger;

public MqttNetLoggerWrapper(ILogger<MqttServer> logger)
{
@@ -16,25 +16,39 @@ namespace MQTTnet.Server.Logging

public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;

public IMqttNetChildLogger CreateChildLogger(string source = null)
public IMqttNetLogger CreateChildLogger(string source)
{
return new MqttNetChildLoggerWrapper(source, this);
return new MqttNetLogger(source);
}

public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception)
public void Publish(MqttNetLogLevel level, string source, string message, object[] parameters, Exception exception)
{
var convertedLogLevel = ConvertLogLevel(logLevel);
var convertedLogLevel = ConvertLogLevel(level);
_logger.Log(convertedLogLevel, exception, message, parameters);

var logMessagePublishedEvent = LogMessagePublished;
if (logMessagePublishedEvent != null)
{
var logMessage = new MqttNetLogMessage(null, DateTime.UtcNow, Thread.CurrentThread.ManagedThreadId, source, logLevel, message, exception);
var logMessage = new MqttNetLogMessage
{
Timestamp = DateTime.UtcNow,
ThreadId = Thread.CurrentThread.ManagedThreadId,
Source = source,
Level = level,
Message = message,
Exception = exception
};

logMessagePublishedEvent.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage));
}
}
private static LogLevel ConvertLogLevel(MqttNetLogLevel logLevel)

public void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception)
{
Publish(logLevel, null, message, parameters, exception);
}

static LogLevel ConvertLogLevel(MqttNetLogLevel logLevel)
{
switch (logLevel)
{


+ 4
- 2
Source/MQTTnet.Server/MQTTnet.Server.csproj View File

@@ -45,8 +45,10 @@
<ItemGroup>
<PackageReference Include="IronPython" Version="2.7.9" />
<PackageReference Include="IronPython.StdLib" Version="2.7.9" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="5.0.0-rc2" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="5.0.0-rc2" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="3.1.3" />
<PackageReference Include="MSTest.TestAdapter" Version="2.1.1" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="5.3.1" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="5.3.1" />
</ItemGroup>

<ItemGroup>


+ 12
- 12
Source/MQTTnet.Server/Mqtt/MqttServerService.cs View File

@@ -1,11 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Security.Authentication;
using System.Text;
using System.Threading.Tasks;
using IronPython.Runtime;
using IronPython.Runtime;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using MQTTnet.Adapter;
@@ -16,6 +9,13 @@ using MQTTnet.Protocol;
using MQTTnet.Server.Configuration;
using MQTTnet.Server.Scripting;
using MQTTnet.Server.Status;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Security.Authentication;
using System.Text;
using System.Threading.Tasks;

namespace MQTTnet.Server.Mqtt
{
@@ -65,11 +65,11 @@ namespace MQTTnet.Server.Mqtt
_pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

_webSocketServerAdapter = new MqttWebSocketServerAdapter(mqttFactory.Logger.CreateChildLogger());
_webSocketServerAdapter = new MqttWebSocketServerAdapter(mqttFactory.Logger);

var adapters = new List<IMqttServerAdapter>
{
new MqttTcpServerAdapter(mqttFactory.Logger.CreateChildLogger())
new MqttTcpServerAdapter(mqttFactory.Logger)
{
TreatSocketOpeningErrorAsWarning = true // Opening other ports than for HTTP is not allows in Azure App Services.
},
@@ -215,7 +215,7 @@ namespace MQTTnet.Server.Mqtt
options
.WithEncryptedEndpoint()
.WithEncryptionSslProtocol(SslProtocols.Tls12);
if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Path))
{
IMqttServerCertificateCredentials certificateCredentials = null;
@@ -230,7 +230,7 @@ namespace MQTTnet.Server.Mqtt

options.WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.Certificate.ReadCertificate(), certificateCredentials);
}
if (_settings.EncryptedTcpEndPoint.TryReadIPv4(out var address4))
{
options.WithEncryptedEndpointBoundIPAddress(address4);


+ 1
- 1
Source/MQTTnet/Adapter/IMqttClientAdapterFactory.cs View File

@@ -5,6 +5,6 @@ namespace MQTTnet.Adapter
{
public interface IMqttClientAdapterFactory
{
IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger);
IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger);
}
}

+ 3
- 3
Source/MQTTnet/Adapter/MqttChannelAdapter.cs View File

@@ -14,12 +14,12 @@ using System.Threading.Tasks;

namespace MQTTnet.Adapter
{
public class MqttChannelAdapter : Disposable, IMqttChannelAdapter
public sealed class MqttChannelAdapter : Disposable, IMqttChannelAdapter
{
const uint ErrorOperationAborted = 0x800703E3;
const int ReadBufferSize = 4096; // TODO: Move buffer size to config

readonly IMqttNetChildLogger _logger;
readonly IMqttNetLogger _logger;
readonly IMqttChannel _channel;
readonly MqttPacketReader _packetReader;

@@ -30,7 +30,7 @@ namespace MQTTnet.Adapter
long _bytesReceived;
long _bytesSent;

public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packetFormatterAdapter, IMqttNetChildLogger logger)
public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packetFormatterAdapter, IMqttNetLogger logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));



+ 5
- 5
Source/MQTTnet/Client/MqttClient.cs View File

@@ -1,7 +1,3 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
@@ -17,6 +13,10 @@ using MQTTnet.Internal;
using MQTTnet.PacketDispatcher;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Client
{
@@ -29,7 +29,7 @@ namespace MQTTnet.Client
private readonly object _disconnectLock = new object();

private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly IMqttNetChildLogger _logger;
private readonly IMqttNetLogger _logger;

private CancellationTokenSource _backgroundCancellationTokenSource;
private Task _packetReceiverTask;


+ 8
- 8
Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs View File

@@ -1,19 +1,19 @@
using System;
using MQTTnet.Client.ExtendedAuthenticationExchange;
using MQTTnet.Formatter;
using System;
using System.Linq;
using System.Text;
using MQTTnet.Client.ExtendedAuthenticationExchange;
using MQTTnet.Formatter;

namespace MQTTnet.Client.Options
{
public class MqttClientOptionsBuilder
{
private readonly MqttClientOptions _options = new MqttClientOptions();
readonly MqttClientOptions _options = new MqttClientOptions();

private MqttClientTcpOptions _tcpOptions;
private MqttClientWebSocketOptions _webSocketOptions;
private MqttClientOptionsBuilderTlsParameters _tlsParameters;
private MqttClientWebSocketProxyOptions _proxyOptions;
MqttClientTcpOptions _tcpOptions;
MqttClientWebSocketOptions _webSocketOptions;
MqttClientOptionsBuilderTlsParameters _tlsParameters;
MqttClientWebSocketProxyOptions _proxyOptions;

public MqttClientOptionsBuilder WithProtocolVersion(MqttProtocolVersion value)
{


+ 0
- 17
Source/MQTTnet/Diagnostics/IMqttNetChildLogger.cs View File

@@ -1,17 +0,0 @@
using System;

namespace MQTTnet.Diagnostics
{
public interface IMqttNetChildLogger
{
IMqttNetChildLogger CreateChildLogger(string source = null);

void Verbose(string message, params object[] parameters);

void Info(string message, params object[] parameters);

void Warning(Exception exception, string message, params object[] parameters);

void Error(Exception exception, string message, params object[] parameters);
}
}

+ 2
- 2
Source/MQTTnet/Diagnostics/IMqttNetLogger.cs View File

@@ -6,8 +6,8 @@ namespace MQTTnet.Diagnostics
{
event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;

IMqttNetChildLogger CreateChildLogger(string source = null);
IMqttNetLogger CreateChildLogger(string source);

void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception);
void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception);
}
}

+ 0
- 51
Source/MQTTnet/Diagnostics/MqttNetChildLogger.cs View File

@@ -1,51 +0,0 @@
using System;

namespace MQTTnet.Diagnostics
{
public class MqttNetChildLogger : IMqttNetChildLogger
{
private readonly IMqttNetLogger _logger;
private readonly string _source;

public MqttNetChildLogger(IMqttNetLogger logger, string source)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_source = source;
}

public IMqttNetChildLogger CreateChildLogger(string source)
{
string childSource;
if (!string.IsNullOrEmpty(_source))
{
childSource = _source + "." + source;
}
else
{
childSource = source;
}

return new MqttNetChildLogger(_logger, childSource);
}

public void Verbose(string message, params object[] parameters)
{
_logger.Publish(MqttNetLogLevel.Verbose, _source, message, parameters, null);
}

public void Info(string message, params object[] parameters)
{
_logger.Publish(MqttNetLogLevel.Info, _source, message, parameters, null);
}

public void Warning(Exception exception, string message, params object[] parameters)
{
_logger.Publish(MqttNetLogLevel.Warning, _source, message, parameters, exception);
}

public void Error(Exception exception, string message, params object[] parameters)
{
_logger.Publish(MqttNetLogLevel.Error, _source, message, parameters, exception);
}
}
}

+ 7
- 18
Source/MQTTnet/Diagnostics/MqttNetLogMessage.cs View File

@@ -4,30 +4,19 @@ namespace MQTTnet.Diagnostics
{
public class MqttNetLogMessage
{
public MqttNetLogMessage(string logId, DateTime timestamp, int threadId, string source, MqttNetLogLevel level, string message, Exception exception)
{
LogId = logId;
Timestamp = timestamp;
ThreadId = threadId;
Source = source;
Level = level;
Message = message;
Exception = exception;
}

public string LogId { get; }
public string LogId { get; set; }

public DateTime Timestamp { get; }
public DateTime Timestamp { get; set; }

public int ThreadId { get; }
public int ThreadId { get; set; }

public string Source { get; }
public string Source { get; set; }

public MqttNetLogLevel Level { get; }
public MqttNetLogLevel Level { get; set; }

public string Message { get; }
public string Message { get; set; }

public Exception Exception { get; }
public Exception Exception { get; set; }

public override string ToString()
{


+ 5
- 0
Source/MQTTnet/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs View File

@@ -6,9 +6,14 @@ namespace MQTTnet.Diagnostics
{
public MqttNetLogMessagePublishedEventArgs(MqttNetLogMessage logMessage)
{
LogMessage = logMessage ?? throw new ArgumentNullException(nameof(logMessage));

TraceMessage = logMessage ?? throw new ArgumentNullException(nameof(logMessage));
}

[Obsolete("Use new proeprty LogMessage instead.")]
public MqttNetLogMessage TraceMessage { get; }

public MqttNetLogMessage LogMessage { get; }
}
}

+ 52
- 9
Source/MQTTnet/Diagnostics/MqttNetLogger.cs View File

@@ -4,26 +4,51 @@ namespace MQTTnet.Diagnostics
{
public class MqttNetLogger : IMqttNetLogger
{
private readonly string _logId;
readonly string _logId;
readonly string _source;

public MqttNetLogger(string logId = null)
readonly MqttNetLogger _parentLogger;

public MqttNetLogger(string source, string logId)
{
_source = source;
_logId = logId;
}

public MqttNetLogger()
{
}

public MqttNetLogger(string logId)
{
_logId = logId;
}

MqttNetLogger(MqttNetLogger parentLogger, string logId, string source)
{
_parentLogger = parentLogger ?? throw new ArgumentNullException(nameof(parentLogger));
_source = source ?? throw new ArgumentNullException(nameof(source));

_logId = logId;
}

public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;

public IMqttNetChildLogger CreateChildLogger(string source = null)
// TODO: Consider creating a LoggerFactory which will allow creating loggers. The logger factory will
// be the only place which has the published event.
public IMqttNetLogger CreateChildLogger(string source)
{
return new MqttNetChildLogger(this, source);
if (source is null) throw new ArgumentNullException(nameof(source));

return new MqttNetLogger(this, _logId, source);
}

public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception)
public void Publish(MqttNetLogLevel level, string message, object[] parameters, Exception exception)
{
var hasLocalListeners = LogMessagePublished != null;
var hasGlobalListeners = MqttNetGlobalLogger.HasListeners;

if (!hasLocalListeners && !hasGlobalListeners)
if (!hasLocalListeners && !hasGlobalListeners && _parentLogger == null)
{
return;
}
@@ -40,17 +65,35 @@ namespace MQTTnet.Diagnostics
}
}

var traceMessage = new MqttNetLogMessage(_logId, DateTime.UtcNow, Environment.CurrentManagedThreadId, source, logLevel, message, exception);
var logMessage = new MqttNetLogMessage
{
LogId = _logId,
Timestamp = DateTime.UtcNow,
Source = _source,
ThreadId = Environment.CurrentManagedThreadId,
Level = level,
Message = message,
Exception = exception
};

if (hasGlobalListeners)
{
MqttNetGlobalLogger.Publish(traceMessage);
MqttNetGlobalLogger.Publish(logMessage);
}

if (hasLocalListeners)
{
LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(traceMessage));
LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage));
}

_parentLogger?.Publish(logMessage);
}

void Publish(MqttNetLogMessage logMessage)
{
LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage));

_parentLogger?.Publish(logMessage);
}
}
}

+ 35
- 0
Source/MQTTnet/Diagnostics/MqttNetLoggerExtensions.cs View File

@@ -0,0 +1,35 @@
using System;

namespace MQTTnet.Diagnostics
{
public static class MqttNetLoggerExtensions
{
public static void Verbose(this IMqttNetLogger logger, string message, params object[] parameters)
{
if (logger is null) throw new ArgumentNullException(nameof(logger));

logger.Publish(MqttNetLogLevel.Verbose, message, parameters, null);
}

public static void Info(this IMqttNetLogger logger, string message, params object[] parameters)
{
if (logger is null) throw new ArgumentNullException(nameof(logger));

logger.Publish(MqttNetLogLevel.Info, message, parameters, null);
}

public static void Warning(this IMqttNetLogger logger, Exception exception, string message, params object[] parameters)
{
if (logger is null) throw new ArgumentNullException(nameof(logger));

logger.Publish(MqttNetLogLevel.Warning, message, parameters, exception);
}

public static void Error(this IMqttNetLogger logger, Exception exception, string message, params object[] parameters)
{
if (logger is null) throw new ArgumentNullException(nameof(logger));

logger.Publish(MqttNetLogLevel.Error, message, parameters, exception);
}
}
}

Source/MQTTnet/Diagnostics/TargetFrameworkInfoProvider.cs → Source/MQTTnet/Diagnostics/TargetFrameworkProvider.cs View File

@@ -1,6 +1,6 @@
namespace MQTTnet.Diagnostics
{
public static class TargetFrameworkInfoProvider
public static class TargetFrameworkProvider
{
public static string TargetFramework
{

+ 48
- 0
Source/MQTTnet/Extensions/MqttClientOptionsBuilderExtension.cs View File

@@ -0,0 +1,48 @@
using System;
using System.Linq;
using MQTTnet.Client.Options;

namespace MQTTnet.Extensions
{
public static class MqttClientOptionsBuilderExtension
{
public static MqttClientOptionsBuilder WithConnectionUri(this MqttClientOptionsBuilder builder, Uri uri)
{
var port = uri.IsDefaultPort ? null : (int?) uri.Port;
switch (uri.Scheme.ToLower())
{
case "tcp":
case "mqtt":
builder.WithTcpServer(uri.Host, port);
break;

case "mqtts":
builder.WithTcpServer(uri.Host, port).WithTls();
break;

case "ws":
case "wss":
builder.WithWebSocketServer(uri.ToString());
break;

default:
throw new ArgumentException("Unexpected scheme in uri.");
}
if (!string.IsNullOrEmpty(uri.UserInfo))
{
var userInfo = uri.UserInfo.Split(':');
var username = userInfo[0];
var password = userInfo.Length > 1 ? userInfo[1] : "";
builder.WithCredentials(username, password);
}

return builder;
}

public static MqttClientOptionsBuilder WithConnectionUri(this MqttClientOptionsBuilder builder, string uri)
{
return WithConnectionUri(builder, new Uri(uri, UriKind.Absolute));
}
}
}

+ 16
- 0
Source/MQTTnet/Extensions/UserPropertyExtension.cs View File

@@ -0,0 +1,16 @@
using System;
using System.Linq;

namespace MQTTnet.Extensions
{
public static class UserPropertyExtension
{
public static string GetUserProperty(this MqttApplicationMessage message, string propertyName, StringComparison comparisonType = StringComparison.Ordinal)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (propertyName == null) throw new ArgumentNullException(nameof(propertyName));

return message.UserProperties?.SingleOrDefault(up => up.Name.Equals(propertyName, comparisonType))?.Value;
}
}
}

+ 0
- 15
Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs View File

@@ -20,11 +20,6 @@ namespace MQTTnet.Formatter.V3
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

if (applicationMessage.UserProperties?.Any() == true)
{
throw new MqttProtocolViolationException("User properties are not supported in MQTT version 3.");
}

return new MqttPublishPacket
{
Topic = applicationMessage.Topic,
@@ -171,11 +166,6 @@ namespace MQTTnet.Formatter.V3
{
if (options == null) throw new ArgumentNullException(nameof(options));

if (options.UserProperties?.Any() == true)
{
throw new MqttProtocolViolationException("User properties are not supported in MQTT version 3.");
}

var subscribePacket = new MqttSubscribePacket();
subscribePacket.TopicFilters.AddRange(options.TopicFilters);
@@ -186,11 +176,6 @@ namespace MQTTnet.Formatter.V3
{
if (options == null) throw new ArgumentNullException(nameof(options));

if (options.UserProperties?.Any() == true)
{
throw new MqttProtocolViolationException("User properties are not supported in MQTT version 3.");
}

var unsubscribePacket = new MqttUnsubscribePacket();
unsubscribePacket.TopicFilters.AddRange(options.TopicFilters);



+ 229
- 0
Source/MQTTnet/Implementations/CrossPlatformSocket.cs View File

@@ -0,0 +1,229 @@
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public sealed class CrossPlatformSocket : IDisposable
{
readonly Socket _socket;

public CrossPlatformSocket(AddressFamily addressFamily)
{
_socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
}

public CrossPlatformSocket()
{
// Having this contructor is important because avoiding the address family as parameter
// will make use of dual mode in the .net framework.
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}

public CrossPlatformSocket(Socket socket)
{
_socket = socket ?? throw new ArgumentNullException(nameof(socket));
}

public bool NoDelay
{
get
{
return (int)_socket.GetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay) > 0;
}

set
{
_socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, value ? 1 : 0);
}
}

public bool DualMode
{
get
{
return _socket.DualMode;
}

set
{
_socket.DualMode = value;
}
}

public int ReceiveBufferSize
{
get
{
return _socket.ReceiveBufferSize;
}

set
{
_socket.ReceiveBufferSize = value;
}
}

public int SendBufferSize
{
get
{
return _socket.SendBufferSize;
}

set
{
_socket.SendBufferSize = value;
}
}

public EndPoint RemoteEndPoint
{
get
{
return _socket.RemoteEndPoint;
}
}

public bool ReuseAddress
{
get
{
return (int)_socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress) != 0;
}

set
{
_socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, value ? 1 : 0);
}
}

public async Task<CrossPlatformSocket> AcceptAsync()
{
try
{
#if NET452 || NET461
var clientSocket = await Task.Factory.FromAsync(_socket.BeginAccept, _socket.EndAccept, null).ConfigureAwait(false);
return new CrossPlatformSocket(clientSocket);
#else
var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false);
return new CrossPlatformSocket(clientSocket);
#endif
}
catch (ObjectDisposedException)
{
// This will happen when _socket.EndAccept gets called by Task library but the socket is already disposed.
return null;
}
}

public void Bind(EndPoint localEndPoint)
{
if (localEndPoint is null) throw new ArgumentNullException(nameof(localEndPoint));

_socket.Bind(localEndPoint);
}

public void Listen(int connectionBacklog)
{
_socket.Listen(connectionBacklog);
}

public async Task ConnectAsync(string host, int port, CancellationToken cancellationToken)
{
if (host is null) throw new ArgumentNullException(nameof(host));

try
{
// Workaround for: workaround for https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(() => _socket.Dispose()))
{
cancellationToken.ThrowIfCancellationRequested();

#if NET452 || NET461
await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, host, port, null).ConfigureAwait(false);
#else
await _socket.ConnectAsync(host, port).ConfigureAwait(false);
#endif
}
}
catch (ObjectDisposedException)
{
// This will happen when _socket.EndConnect gets called by Task library but the socket is already disposed.
}
}

public async Task SendAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)
{
try
{
#if NET452 || NET461
await Task.Factory.FromAsync(SocketWrapper.BeginSend, _socket.EndSend, new SocketWrapper(_socket, buffer, socketFlags)).ConfigureAwait(false);
#else
await _socket.SendAsync(buffer, socketFlags).ConfigureAwait(false);
#endif
}
catch (ObjectDisposedException)
{
// This will happen when _socket.EndConnect gets called by Task library but the socket is already disposed.
}
}

public async Task<int> ReceiveAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)
{
try
{
#if NET452 || NET461
return await Task.Factory.FromAsync(SocketWrapper.BeginReceive, _socket.EndReceive, new SocketWrapper(_socket, buffer, socketFlags)).ConfigureAwait(false);
#else
return await _socket.ReceiveAsync(buffer, socketFlags).ConfigureAwait(false);
#endif
}
catch (ObjectDisposedException)
{
// This will happen when _socket.EndReceive gets called by Task library but the socket is already disposed.
return -1;
}
}

public NetworkStream GetStream()
{
return new NetworkStream(_socket, true);
}

public void Dispose()
{
_socket?.Dispose();
}

#if NET452 || NET461
class SocketWrapper
{
readonly Socket _socket;
readonly ArraySegment<byte> _buffer;
readonly SocketFlags _socketFlags;

public SocketWrapper(Socket socket, ArraySegment<byte> buffer, SocketFlags socketFlags)
{
_socket = socket;
_buffer = buffer;
_socketFlags = socketFlags;
}

public static IAsyncResult BeginSend(AsyncCallback callback, object state)
{
var socketWrapper = (SocketWrapper)state;
return socketWrapper._socket.BeginSend(socketWrapper._buffer.Array, socketWrapper._buffer.Offset, socketWrapper._buffer.Count, socketWrapper._socketFlags, callback, state);
}

public static IAsyncResult BeginReceive(AsyncCallback callback, object state)
{
var socketWrapper = (SocketWrapper)state;
return socketWrapper._socket.BeginReceive(socketWrapper._buffer.Array, socketWrapper._buffer.Offset, socketWrapper._buffer.Count, socketWrapper._socketFlags, callback, state);
}
}
#endif
}
}

+ 4
- 4
Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs View File

@@ -1,17 +1,17 @@
using System;
using MQTTnet.Adapter;
using MQTTnet.Adapter;
using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using System;

namespace MQTTnet.Implementations
{
public class MqttClientAdapterFactory : IMqttClientAdapterFactory
{
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger)
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger)
{
if (options == null) throw new ArgumentNullException(nameof(options));
switch (options.ChannelOptions)
{
case MqttClientTcpOptions _:


+ 18
- 20
Source/MQTTnet/Implementations/MqttTcpChannel.cs View File

@@ -46,15 +46,15 @@ namespace MQTTnet.Implementations

public async Task ConnectAsync(CancellationToken cancellationToken)
{
Socket socket;
CrossPlatformSocket socket;

if (_options.AddressFamily == AddressFamily.Unspecified)
{
socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
socket = new CrossPlatformSocket();
}
else
{
socket = new Socket(_options.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
socket = new CrossPlatformSocket(_options.AddressFamily);
}

socket.ReceiveBufferSize = _options.BufferSize;
@@ -69,20 +69,24 @@ namespace MQTTnet.Implementations
socket.DualMode = _options.DualMode.Value;
}

// Workaround for: workaround for https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(() => socket.Dispose()))
{
await PlatformAbstractionLayer.ConnectAsync(socket, _options.Server, _options.GetPort()).ConfigureAwait(false);
}
await socket.ConnectAsync(_options.Server, _options.GetPort(), cancellationToken).ConfigureAwait(false);

var networkStream = new NetworkStream(socket, true);
var networkStream = socket.GetStream();

if (_options.TlsOptions.UseTls)
{
var sslStream = new SslStream(networkStream, false, InternalUserCertificateValidationCallback);
_stream = sslStream;
try
{
await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, !_options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
}
catch
{
sslStream.Dispose();
throw;
}

await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, !_options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
_stream = sslStream;
}
else
{
@@ -107,17 +111,14 @@ namespace MQTTnet.Implementations
// Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(Dispose))
{
if (cancellationToken.IsCancellationRequested)
{
return 0;
}
cancellationToken.ThrowIfCancellationRequested();

return await _stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
}
catch (ObjectDisposedException)
{
return 0;
return -1;
}
catch (IOException exception)
{
@@ -139,10 +140,7 @@ namespace MQTTnet.Implementations
// Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(Dispose))
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
cancellationToken.ThrowIfCancellationRequested();

await _stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}


+ 6
- 6
Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs View File

@@ -11,14 +11,14 @@ using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public class MqttTcpServerAdapter : IMqttServerAdapter
public sealed class MqttTcpServerAdapter : IMqttServerAdapter
{
private readonly IMqttNetChildLogger _logger;
readonly IMqttNetLogger _logger;

private IMqttServerOptions _options;
private StreamSocketListener _listener;
IMqttServerOptions _options;
StreamSocketListener _listener;

public MqttTcpServerAdapter(IMqttNetChildLogger logger)
public MqttTcpServerAdapter(IMqttNetLogger logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));

@@ -68,7 +68,7 @@ namespace MQTTnet.Implementations
_listener = null;
}

private async void OnConnectionReceivedAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args)
async void OnConnectionReceivedAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args)
{
try
{


+ 22
- 21
Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs View File

@@ -1,4 +1,8 @@
#if !WINDOWS_UWP
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Internal;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Net;
@@ -6,21 +10,17 @@ using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Internal;
using MQTTnet.Server;

namespace MQTTnet.Implementations
{
public class MqttTcpServerAdapter : Disposable, IMqttServerAdapter
public sealed class MqttTcpServerAdapter : Disposable, IMqttServerAdapter
{
private readonly List<MqttTcpServerListener> _listeners = new List<MqttTcpServerListener>();
private readonly IMqttNetChildLogger _logger;
readonly List<MqttTcpServerListener> _listeners = new List<MqttTcpServerListener>();
readonly IMqttNetLogger _logger;

private CancellationTokenSource _cancellationTokenSource;
CancellationTokenSource _cancellationTokenSource;

public MqttTcpServerAdapter(IMqttNetChildLogger logger)
public MqttTcpServerAdapter(IMqttNetLogger logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));

@@ -59,7 +59,7 @@ namespace MQTTnet.Implementations
{
tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate, options.TlsEndpointOptions.CertificateCredentials.Password);
}
if (!tlsCertificate.HasPrivateKey)
{
throw new InvalidOperationException("The certificate for TLS encryption must contain the private key.");
@@ -77,7 +77,17 @@ namespace MQTTnet.Implementations
return Task.FromResult(0);
}

private void Cleanup()
protected override void Dispose(bool disposing)
{
if (disposing)
{
Cleanup();
}

base.Dispose(disposing);
}

void Cleanup()
{
_cancellationTokenSource?.Cancel(false);
_cancellationTokenSource?.Dispose();
@@ -91,16 +101,7 @@ namespace MQTTnet.Implementations
_listeners.Clear();
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
Cleanup();
}
base.Dispose(disposing);
}

private void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, CancellationToken cancellationToken)
void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, CancellationToken cancellationToken)
{
if (!options.BoundInterNetworkAddress.Equals(IPAddress.None))
{


+ 28
- 28
Source/MQTTnet/Implementations/MqttTcpServerListener.cs View File

@@ -1,4 +1,9 @@
#if !WINDOWS_UWP
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using MQTTnet.Internal;
using MQTTnet.Server;
using System;
using System.IO;
using System.Net;
@@ -7,30 +12,25 @@ using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using MQTTnet.Internal;
using MQTTnet.Server;

namespace MQTTnet.Implementations
{
public class MqttTcpServerListener : IDisposable
public sealed class MqttTcpServerListener : IDisposable
{
private readonly IMqttNetChildLogger _logger;
private readonly AddressFamily _addressFamily;
private readonly MqttServerTcpEndpointBaseOptions _options;
private readonly MqttServerTlsTcpEndpointOptions _tlsOptions;
private readonly X509Certificate2 _tlsCertificate;
readonly IMqttNetLogger _logger;
readonly AddressFamily _addressFamily;
readonly MqttServerTcpEndpointBaseOptions _options;
readonly MqttServerTlsTcpEndpointOptions _tlsOptions;
readonly X509Certificate2 _tlsCertificate;

private Socket _socket;
private CrossPlatformSocket _socket;
private IPEndPoint _localEndPoint;

public MqttTcpServerListener(
AddressFamily addressFamily,
MqttServerTcpEndpointBaseOptions options,
X509Certificate2 tlsCertificate,
IMqttNetChildLogger logger)
IMqttNetLogger logger)
{
_addressFamily = addressFamily;
_options = options;
@@ -59,20 +59,20 @@ namespace MQTTnet.Implementations

_logger.Info($"Starting TCP listener for {_localEndPoint} TLS={_tlsCertificate != null}.");

_socket = new Socket(_addressFamily, SocketType.Stream, ProtocolType.Tcp);
_socket = new CrossPlatformSocket(_addressFamily);

// Usage of socket options is described here: https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.setsocketoption?view=netcore-2.2

if (_options.ReuseAddress)
{
_socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
_socket.ReuseAddress = true;
}
if (_options.NoDelay)
{
_socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true);
_socket.NoDelay = true;
}
_socket.Bind(_localEndPoint);
_socket.Listen(_options.ConnectionBacklog);

@@ -87,7 +87,7 @@ namespace MQTTnet.Implementations
throw;
}

_logger.Warning(exception,"Error while creating listener socket for local end point '{0}'.", _localEndPoint);
_logger.Warning(exception, "Error while creating listener socket for local end point '{0}'.", _localEndPoint);
return false;
}
}
@@ -101,13 +101,13 @@ namespace MQTTnet.Implementations
#endif
}

private async Task AcceptClientConnectionsAsync(CancellationToken cancellationToken)
async Task AcceptClientConnectionsAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var clientSocket = await PlatformAbstractionLayer.AcceptAsync(_socket).ConfigureAwait(false);
var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false);
if (clientSocket == null)
{
continue;
@@ -116,7 +116,7 @@ namespace MQTTnet.Implementations
Task.Run(() => TryHandleClientConnectionAsync(clientSocket), cancellationToken).Forget(_logger);
}
catch (OperationCanceledException)
{
{
}
catch (Exception exception)
{
@@ -128,14 +128,14 @@ namespace MQTTnet.Implementations
continue;
}
}
_logger.Error(exception, $"Error while accepting connection at TCP listener {_localEndPoint} TLS={_tlsCertificate != null}.");
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
}
}
}

private async Task TryHandleClientConnectionAsync(Socket clientSocket)
async Task TryHandleClientConnectionAsync(CrossPlatformSocket clientSocket)
{
Stream stream = null;
string remoteEndPoint = null;
@@ -151,7 +151,7 @@ namespace MQTTnet.Implementations

clientSocket.NoDelay = _options.NoDelay;

stream = new NetworkStream(clientSocket, true);
stream = clientSocket.GetStream();

X509Certificate2 clientCertificate = null;

@@ -160,9 +160,9 @@ namespace MQTTnet.Implementations
var sslStream = new SslStream(stream, false, _tlsOptions.RemoteCertificateValidationCallback);

await sslStream.AuthenticateAsServerAsync(
_tlsCertificate,
_tlsOptions.ClientCertificateRequired,
_tlsOptions.SslProtocol,
_tlsCertificate,
_tlsOptions.ClientCertificateRequired,
_tlsOptions.SslProtocol,
_tlsOptions.CheckCertificateRevocation).ConfigureAwait(false);

stream = sslStream;


+ 1
- 86
Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs View File

@@ -1,94 +1,9 @@
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public static class PlatformAbstractionLayer
{
// TODO: Consider creating primitives like "MqttNetSocket" which will wrap all required methods and do the platform stuff.
public static async Task<Socket> AcceptAsync(Socket socket)
{
#if NET452 || NET461
try
{
return await Task.Factory.FromAsync(socket.BeginAccept, socket.EndAccept, null).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
return null;
}
#else
return await socket.AcceptAsync().ConfigureAwait(false);
#endif
}


public static Task ConnectAsync(Socket socket, IPAddress ip, int port)
{
#if NET452 || NET461
return Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, ip, port, null);
#else
return socket.ConnectAsync(ip, port);
#endif
}

public static Task ConnectAsync(Socket socket, string host, int port)
{
#if NET452 || NET461
return Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, host, port, null);
#else
return socket.ConnectAsync(host, port);
#endif
}

#if NET452 || NET461
public class SocketWrapper
{
private readonly Socket _socket;
private readonly ArraySegment<byte> _buffer;
private readonly SocketFlags _socketFlags;

public SocketWrapper(Socket socket, ArraySegment<byte> buffer, SocketFlags socketFlags)
{
_socket = socket;
_buffer = buffer;
_socketFlags = socketFlags;
}

public static IAsyncResult BeginSend(AsyncCallback callback, object state)
{
var real = (SocketWrapper)state;
return real._socket.BeginSend(real._buffer.Array, real._buffer.Offset, real._buffer.Count, real._socketFlags, callback, state);
}

public static IAsyncResult BeginReceive(AsyncCallback callback, object state)
{
var real = (SocketWrapper)state;
return real._socket.BeginReceive(real._buffer.Array, real._buffer.Offset, real._buffer.Count, real._socketFlags, callback, state);
}
}
#endif

public static Task SendAsync(Socket socket, ArraySegment<byte> buffer, SocketFlags socketFlags)
{
#if NET452 || NET461
return Task.Factory.FromAsync(SocketWrapper.BeginSend, socket.EndSend, new SocketWrapper(socket, buffer, socketFlags));
#else
return socket.SendAsync(buffer, socketFlags);
#endif
}

public static Task<int> ReceiveAsync(Socket socket, ArraySegment<byte> buffer, SocketFlags socketFlags)
{
#if NET452 || NET461
return Task.Factory.FromAsync(SocketWrapper.BeginReceive, socket.EndReceive, new SocketWrapper(socket, buffer, socketFlags));
#else
return socket.ReceiveAsync(buffer, socketFlags);
#endif
}

public static Task CompletedTask
{
get


+ 3
- 3
Source/MQTTnet/Internal/TaskExtensions.cs View File

@@ -1,11 +1,11 @@
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
using MQTTnet.Diagnostics;
using System.Threading.Tasks;

namespace MQTTnet.Internal
{
public static class TaskExtensions
{
public static void Forget(this Task task, IMqttNetChildLogger logger)
public static void Forget(this Task task, IMqttNetLogger logger)
{
task?.ContinueWith(t =>
{


+ 19
- 0
Source/MQTTnet/LowLevelClient/ILowLevelMqttClient.cs View File

@@ -0,0 +1,19 @@
using MQTTnet.Client.Options;
using MQTTnet.Packets;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.LowLevelClient
{
public interface ILowLevelMqttClient : IDisposable
{
Task ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken);

Task DisconnectAsync(CancellationToken cancellationToken);

Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken);

Task<MqttBasePacket> ReceiveAsync(CancellationToken cancellationToken);
}
}

+ 128
- 0
Source/MQTTnet/LowLevelClient/LowLevelMqttClient.cs View File

@@ -0,0 +1,128 @@
using MQTTnet.Adapter;
using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;
using MQTTnet.Packets;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.LowLevelClient
{
public sealed class LowLevelMqttClient : ILowLevelMqttClient
{
readonly IMqttNetLogger _logger;
readonly IMqttClientAdapterFactory _clientAdapterFactory;

IMqttChannelAdapter _adapter;
IMqttClientOptions _options;

public LowLevelMqttClient(IMqttClientAdapterFactory clientAdapterFactory, IMqttNetLogger logger)
{
if (clientAdapterFactory is null) throw new ArgumentNullException(nameof(clientAdapterFactory));
if (logger is null) throw new ArgumentNullException(nameof(logger));

_clientAdapterFactory = clientAdapterFactory;
_logger = logger.CreateChildLogger(nameof(LowLevelMqttClient));
}

bool IsConnected => _adapter != null;

public async Task ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken)
{
if (options is null) throw new ArgumentNullException(nameof(options));

if (_adapter != null)
{
throw new InvalidOperationException("Low level MQTT client is already connected. Disconnect first before connecting again.");
}

var newAdapter = _clientAdapterFactory.CreateClientAdapter(options, _logger);

try
{
_logger.Verbose($"Trying to connect with server '{options.ChannelOptions}' (Timeout={options.CommunicationTimeout}).");
await newAdapter.ConnectAsync(options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
_logger.Verbose("Connection with server established.");

_options = options;
}
catch (Exception)
{
_adapter.Dispose();
throw;
}

_adapter = newAdapter;
}

public async Task DisconnectAsync(CancellationToken cancellationToken)
{
if (_adapter == null)
{
return;
}

await SafeDisconnect(cancellationToken).ConfigureAwait(false);
_adapter = null;
}

public async Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{
if (packet is null) throw new ArgumentNullException(nameof(packet));

if (_adapter == null)
{
throw new InvalidOperationException("Low level MQTT client is not connected.");
}

try
{
await _adapter.SendPacketAsync(packet, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
await SafeDisconnect(cancellationToken).ConfigureAwait(false);
throw;
}
}

public async Task<MqttBasePacket> ReceiveAsync(CancellationToken cancellationToken)
{
if (_adapter == null)
{
throw new InvalidOperationException("Low level MQTT client is not connected.");
}

try
{
return await _adapter.ReceivePacketAsync(_options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
await SafeDisconnect(cancellationToken).ConfigureAwait(false);
throw;
}
}

public void Dispose()
{
_adapter?.Dispose();
}

async Task SafeDisconnect(CancellationToken cancellationToken)
{
try
{
await _adapter.DisconnectAsync(_options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.Error(exception, "Error while disconnecting.");
}
finally
{
_adapter.Dispose();
}
}
}
}

+ 1
- 1
Source/MQTTnet/MQTTnet.csproj View File

@@ -64,7 +64,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NETCore.UniversalWindowsPlatform" Version="6.2.9" />
<PackageReference Include="Microsoft.NETCore.UniversalWindowsPlatform" Version="6.2.10" />
</ItemGroup>
</Project>

+ 37
- 9
Source/MQTTnet/MqttFactory.cs View File

@@ -1,16 +1,17 @@
using System;
using System.Collections.Generic;
using MQTTnet.Adapter;
using MQTTnet.Adapter;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Implementations;
using MQTTnet.LowLevelClient;
using MQTTnet.Server;
using System;
using System.Collections.Generic;

namespace MQTTnet
{
public class MqttFactory : IMqttFactory
public sealed class MqttFactory : IMqttFactory
{
private IMqttClientAdapterFactory _clientAdapterFactory = new MqttClientAdapterFactory();
IMqttClientAdapterFactory _clientAdapterFactory = new MqttClientAdapterFactory();

public MqttFactory() : this(new MqttNetLogger())
{
@@ -29,6 +30,33 @@ namespace MQTTnet
return this;
}

public ILowLevelMqttClient CreateLowLevelMqttClient()
{
return CreateLowLevelMqttClient(DefaultLogger);
}

public ILowLevelMqttClient CreateLowLevelMqttClient(IMqttNetLogger logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));

return new LowLevelMqttClient(_clientAdapterFactory, logger);
}

public ILowLevelMqttClient CreateLowLevelMqttClient(IMqttClientAdapterFactory clientAdapterFactory)
{
if (clientAdapterFactory == null) throw new ArgumentNullException(nameof(clientAdapterFactory));

return new LowLevelMqttClient(_clientAdapterFactory, DefaultLogger);
}

public ILowLevelMqttClient CreateLowLevelMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory clientAdapterFactoryy)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));
if (clientAdapterFactoryy == null) throw new ArgumentNullException(nameof(clientAdapterFactoryy));

return new LowLevelMqttClient(_clientAdapterFactory, logger);
}

public IMqttClient CreateMqttClient()
{
return CreateMqttClient(DefaultLogger);
@@ -65,7 +93,7 @@ namespace MQTTnet
{
if (logger == null) throw new ArgumentNullException(nameof(logger));

return CreateMqttServer(new List<IMqttServerAdapter> { new MqttTcpServerAdapter(logger.CreateChildLogger()) }, logger);
return CreateMqttServer(new List<IMqttServerAdapter> { new MqttTcpServerAdapter(logger) }, logger);
}

public IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> serverAdapters, IMqttNetLogger logger)
@@ -73,14 +101,14 @@ namespace MQTTnet
if (serverAdapters == null) throw new ArgumentNullException(nameof(serverAdapters));
if (logger == null) throw new ArgumentNullException(nameof(logger));

return new MqttServer(serverAdapters, logger.CreateChildLogger());
return new MqttServer(serverAdapters, logger);
}

public IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> serverAdapters)
{
if (serverAdapters == null) throw new ArgumentNullException(nameof(serverAdapters));
return new MqttServer(serverAdapters, DefaultLogger.CreateChildLogger());
return new MqttServer(serverAdapters, DefaultLogger);
}
}
}

+ 3
- 3
Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs View File

@@ -1,12 +1,12 @@
using System.Collections.Generic;
using MQTTnet.Diagnostics;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;

namespace MQTTnet.Server
{
public interface IMqttRetainedMessagesManager
{
Task Start(IMqttServerOptions options, IMqttNetChildLogger logger);
Task Start(IMqttServerOptions options, IMqttNetLogger logger);

Task LoadMessagesAsync();



+ 53
- 42
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -3,6 +3,7 @@ using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Formatter;
using MQTTnet.Implementations;
using MQTTnet.Internal;
using MQTTnet.PacketDispatcher;
using MQTTnet.Packets;
@@ -15,32 +16,34 @@ using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttClientConnection : IDisposable
public sealed class MqttClientConnection : IDisposable
{
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();
readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();

private readonly IMqttRetainedMessagesManager _retainedMessagesManager;
private readonly MqttClientKeepAliveMonitor _keepAliveMonitor;
private readonly MqttClientSessionsManager _sessionsManager;
readonly IMqttRetainedMessagesManager _retainedMessagesManager;
readonly MqttClientKeepAliveMonitor _keepAliveMonitor;
readonly MqttClientSessionsManager _sessionsManager;

private readonly IMqttNetChildLogger _logger;
private readonly IMqttServerOptions _serverOptions;
readonly IMqttNetLogger _logger;
readonly IMqttServerOptions _serverOptions;

private readonly IMqttChannelAdapter _channelAdapter;
private readonly IMqttDataConverter _dataConverter;
private readonly string _endpoint;
private readonly DateTime _connectedTimestamp;
readonly IMqttChannelAdapter _channelAdapter;
readonly IMqttDataConverter _dataConverter;
readonly string _endpoint;
readonly DateTime _connectedTimestamp;

private Task<MqttClientDisconnectType> _packageReceiverTask;
private DateTime _lastPacketReceivedTimestamp;
private DateTime _lastNonKeepAlivePacketReceivedTimestamp;
Task<MqttClientDisconnectType> _packageReceiverTask;
DateTime _lastPacketReceivedTimestamp;
DateTime _lastNonKeepAlivePacketReceivedTimestamp;

private long _receivedPacketsCount;
private long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere.
private long _receivedApplicationMessagesCount;
private long _sentApplicationMessagesCount;
long _receivedPacketsCount;
long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere.
long _receivedApplicationMessagesCount;
long _sentApplicationMessagesCount;

bool _isTakeover;

public MqttClientConnection(
MqttConnectPacket connectPacket,
@@ -49,7 +52,7 @@ namespace MQTTnet.Server
IMqttServerOptions serverOptions,
MqttClientSessionsManager sessionsManager,
IMqttRetainedMessagesManager retainedMessagesManager,
IMqttNetChildLogger logger)
IMqttNetLogger logger)
{
Session = session ?? throw new ArgumentNullException(nameof(session));
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
@@ -64,7 +67,7 @@ namespace MQTTnet.Server
if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttClientConnection));

_keepAliveMonitor = new MqttClientKeepAliveMonitor(ConnectPacket.ClientId, StopAsync, _logger);
_keepAliveMonitor = new MqttClientKeepAliveMonitor(ConnectPacket.ClientId, () => StopAsync(), _logger);

_connectedTimestamp = DateTime.UtcNow;
_lastPacketReceivedTimestamp = _connectedTimestamp;
@@ -77,15 +80,21 @@ namespace MQTTnet.Server

public MqttClientSession Session { get; }

public async Task StopAsync()
public bool IsFinalized { get; set; }

public Task StopAsync(bool isTakeover = false)
{
_isTakeover = isTakeover;

StopInternal();

var task = _packageReceiverTask;
if (task != null)
{
await task.ConfigureAwait(false);
return task;
}

return PlatformAbstractionLayer.CompletedTask;
}

public void ResetStatistics()
@@ -124,7 +133,7 @@ namespace MQTTnet.Server
return _packageReceiverTask;
}

private async Task<MqttClientDisconnectType> RunInternalAsync(MqttConnectionValidatorContext connectionValidatorContext)
async Task<MqttClientDisconnectType> RunInternalAsync(MqttConnectionValidatorContext connectionValidatorContext)
{
var disconnectType = MqttClientDisconnectType.NotClean;
try
@@ -243,20 +252,25 @@ namespace MQTTnet.Server
_channelAdapter.ReadingPacketStartedCallback = null;
_channelAdapter.ReadingPacketCompletedCallback = null;

_logger.Info("Client '{0}': Session stopped.", ClientId);
_logger.Info("Client '{0}': Connection stopped.", ClientId);

_packageReceiverTask = null;
}

if (_isTakeover)
{
return MqttClientDisconnectType.Takeover;
}

return disconnectType;
}

private void StopInternal()
void StopInternal()
{
_cancellationToken.Cancel(false);
}

private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection<TopicFilter> topicFilters)
async Task EnqueueSubscribedRetainedMessagesAsync(ICollection<TopicFilter> topicFilters)
{
var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false);
foreach (var applicationMessage in retainedMessages)
@@ -265,7 +279,7 @@ namespace MQTTnet.Server
}
}

private async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket)
async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket)
{
// TODO: Let the channel adapter create the packet.
var subscribeResult = await Session.SubscriptionsManager.SubscribeAsync(subscribePacket, ConnectPacket).ConfigureAwait(false);
@@ -281,14 +295,14 @@ namespace MQTTnet.Server
await EnqueueSubscribedRetainedMessagesAsync(subscribePacket.TopicFilters).ConfigureAwait(false);
}

private async Task HandleIncomingUnsubscribePacketAsync(MqttUnsubscribePacket unsubscribePacket)
async Task HandleIncomingUnsubscribePacketAsync(MqttUnsubscribePacket unsubscribePacket)
{
// TODO: Let the channel adapter create the packet.
var unsubscribeResult = await Session.SubscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false);
await SendAsync(unsubscribeResult).ConfigureAwait(false);
}

private Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
{
Interlocked.Increment(ref _sentApplicationMessagesCount);

@@ -313,16 +327,16 @@ namespace MQTTnet.Server
}
}

private Task HandleIncomingPublishPacketWithQoS0Async(MqttPublishPacket publishPacket)
Task HandleIncomingPublishPacketWithQoS0Async(MqttPublishPacket publishPacket)
{
var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket);

_sessionsManager.DispatchApplicationMessage(applicationMessage, this);

return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
}

private Task HandleIncomingPublishPacketWithQoS1Async(MqttPublishPacket publishPacket)
Task HandleIncomingPublishPacketWithQoS1Async(MqttPublishPacket publishPacket)
{
var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket);
_sessionsManager.DispatchApplicationMessage(applicationMessage, this);
@@ -331,7 +345,7 @@ namespace MQTTnet.Server
return SendAsync(pubAckPacket);
}

private Task HandleIncomingPublishPacketWithQoS2Async(MqttPublishPacket publishPacket)
Task HandleIncomingPublishPacketWithQoS2Async(MqttPublishPacket publishPacket)
{
var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket);
_sessionsManager.DispatchApplicationMessage(applicationMessage, this);
@@ -345,7 +359,7 @@ namespace MQTTnet.Server
return SendAsync(pubRecPacket);
}

private async Task SendPendingPacketsAsync(CancellationToken cancellationToken)
async Task SendPendingPacketsAsync(CancellationToken cancellationToken)
{
MqttQueuedApplicationMessage queuedApplicationMessage = null;
MqttPublishPacket publishPacket = null;
@@ -422,9 +436,6 @@ namespace MQTTnet.Server
}

_logger.Verbose("Queued application message sent (ClientId: {0}).", ClientId);

// TODO:
//Interlocked.Increment(ref _sentPacketsCount);
}
}
catch (Exception exception)
@@ -459,7 +470,7 @@ namespace MQTTnet.Server
}
}

private async Task SendAsync(MqttBasePacket packet)
async Task SendAsync(MqttBasePacket packet)
{
await _channelAdapter.SendPacketAsync(packet, _serverOptions.DefaultCommunicationTimeout, _cancellationToken.Token).ConfigureAwait(false);

@@ -471,12 +482,12 @@ namespace MQTTnet.Server
}
}

private void OnAdapterReadingPacketCompleted()
void OnAdapterReadingPacketCompleted()
{
_keepAliveMonitor?.Resume();
}

private void OnAdapterReadingPacketStarted()
void OnAdapterReadingPacketStarted()
{
_keepAliveMonitor?.Pause();
}


+ 12
- 12
Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs View File

@@ -1,27 +1,27 @@
using System;
using MQTTnet.Diagnostics;
using MQTTnet.Internal;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
using MQTTnet.Internal;

namespace MQTTnet.Server
{
public class MqttClientKeepAliveMonitor
{
private readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch();
readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch();

private readonly string _clientId;
private readonly Func<Task> _keepAliveElapsedCallback;
private readonly IMqttNetChildLogger _logger;
readonly string _clientId;
readonly Func<Task> _keepAliveElapsedCallback;
readonly IMqttNetLogger _logger;

private bool _isPaused;
bool _isPaused;

public MqttClientKeepAliveMonitor(string clientId, Func<Task> keepAliveElapsedCallback, IMqttNetChildLogger logger)
public MqttClientKeepAliveMonitor(string clientId, Func<Task> keepAliveElapsedCallback, IMqttNetLogger logger)
{
_clientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
_keepAliveElapsedCallback = keepAliveElapsedCallback ?? throw new ArgumentNullException(nameof(keepAliveElapsedCallback));
if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttClientKeepAliveMonitor));
}
@@ -32,7 +32,7 @@ namespace MQTTnet.Server
{
return;
}
Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken).Forget(_logger);
}

@@ -51,7 +51,7 @@ namespace MQTTnet.Server
_lastPacketReceivedTracker.Restart();
}

private async Task RunAsync(int keepAlivePeriod, CancellationToken cancellationToken)
async Task RunAsync(int keepAlivePeriod, CancellationToken cancellationToken)
{
try
{


+ 7
- 6
Source/MQTTnet/Server/MqttClientSession.cs View File

@@ -1,18 +1,18 @@
using System;
using MQTTnet.Diagnostics;
using MQTTnet.Server.Status;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
using MQTTnet.Server.Status;

namespace MQTTnet.Server
{
public class MqttClientSession
{
private readonly IMqttNetChildLogger _logger;
readonly IMqttNetLogger _logger;

private readonly DateTime _createdTimestamp = DateTime.UtcNow;
readonly DateTime _createdTimestamp = DateTime.UtcNow;

public MqttClientSession(string clientId, IDictionary<object, object> items, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, IMqttNetChildLogger logger)
public MqttClientSession(string clientId, IDictionary<object, object> items, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, IMqttNetLogger logger)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
Items = items ?? throw new ArgumentNullException(nameof(items));
@@ -73,6 +73,7 @@ namespace MQTTnet.Server
status.ClientId = ClientId;
status.CreatedTimestamp = _createdTimestamp;
status.PendingApplicationMessagesCount = ApplicationMessagesQueue.Count;
status.Items = Items;
}
}
}

+ 52
- 43
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -16,26 +16,26 @@ namespace MQTTnet.Server
{
public class MqttClientSessionsManager : Disposable
{
private readonly AsyncQueue<MqttEnqueuedApplicationMessage> _messageQueue = new AsyncQueue<MqttEnqueuedApplicationMessage>();
readonly AsyncQueue<MqttEnqueuedApplicationMessage> _messageQueue = new AsyncQueue<MqttEnqueuedApplicationMessage>();

private readonly SemaphoreSlim _createConnectionGate = new SemaphoreSlim(1, 1);
private readonly ConcurrentDictionary<string, MqttClientConnection> _connections = new ConcurrentDictionary<string, MqttClientConnection>();
private readonly ConcurrentDictionary<string, MqttClientSession> _sessions = new ConcurrentDictionary<string, MqttClientSession>();
private readonly IDictionary<object, object> _serverSessionItems = new ConcurrentDictionary<object, object>();
readonly AsyncLock _createConnectionGate = new AsyncLock();
readonly ConcurrentDictionary<string, MqttClientConnection> _connections = new ConcurrentDictionary<string, MqttClientConnection>();
readonly ConcurrentDictionary<string, MqttClientSession> _sessions = new ConcurrentDictionary<string, MqttClientSession>();
readonly IDictionary<object, object> _serverSessionItems = new ConcurrentDictionary<object, object>();

private readonly CancellationToken _cancellationToken;
private readonly MqttServerEventDispatcher _eventDispatcher;
readonly CancellationToken _cancellationToken;
readonly MqttServerEventDispatcher _eventDispatcher;

private readonly IMqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttServerOptions _options;
private readonly IMqttNetChildLogger _logger;
readonly IMqttRetainedMessagesManager _retainedMessagesManager;
readonly IMqttServerOptions _options;
readonly IMqttNetLogger _logger;

public MqttClientSessionsManager(
IMqttServerOptions options,
IMqttRetainedMessagesManager retainedMessagesManager,
CancellationToken cancellationToken,
MqttServerEventDispatcher eventDispatcher,
IMqttNetChildLogger logger)
IMqttNetLogger logger)
{
_cancellationToken = cancellationToken;

@@ -60,9 +60,11 @@ namespace MQTTnet.Server
}
}

public Task HandleClientAsync(IMqttChannelAdapter clientAdapter)
public Task HandleClientConnectionAsync(IMqttChannelAdapter clientAdapter)
{
return HandleClientAsync(clientAdapter, _cancellationToken);
if (clientAdapter is null) throw new ArgumentNullException(nameof(clientAdapter));

return HandleClientConnectionAsync(clientAdapter, _cancellationToken);
}

public Task<IList<IMqttClientStatus>> GetClientStatusAsync()
@@ -155,7 +157,7 @@ namespace MQTTnet.Server
base.Dispose(disposing);
}

private async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken)
async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
@@ -173,7 +175,7 @@ namespace MQTTnet.Server
}
}

private async Task TryProcessNextQueuedApplicationMessageAsync(CancellationToken cancellationToken)
async Task TryProcessNextQueuedApplicationMessageAsync(CancellationToken cancellationToken)
{
try
{
@@ -207,7 +209,7 @@ namespace MQTTnet.Server
applicationMessage = interceptorContext.ApplicationMessage;
}

await _eventDispatcher.HandleApplicationMessageReceivedAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false);
await _eventDispatcher.SafeNotifyApplicationMessageReceivedAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false);

if (applicationMessage.Retain)
{
@@ -231,14 +233,14 @@ namespace MQTTnet.Server
}
}

private async Task HandleClientAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
{
var disconnectType = MqttClientDisconnectType.NotClean;
string clientId = null;
var clientWasConnected = true;

MqttConnectPacket connectPacket = null;
var clientWasAuthorized = false;

MqttConnectPacket connectPacket;
MqttClientConnection clientConnection = null;
try
{
try
@@ -259,11 +261,8 @@ namespace MQTTnet.Server

var connectionValidatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false);

clientId = connectPacket.ClientId;

if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success)
{
clientWasConnected = false;
// Send failure response here without preparing a session. The result for a successful connect
// will be sent from the session itself.
var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext);
@@ -272,11 +271,13 @@ namespace MQTTnet.Server
return;
}

var connection = await CreateConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false);
clientWasAuthorized = true;
clientId = connectPacket.ClientId;
clientConnection = await CreateClientConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false);

await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false);
await _eventDispatcher.SafeNotifyClientConnectedAsync(clientId).ConfigureAwait(false);

disconnectType = await connection.RunAsync(connectionValidatorContext).ConfigureAwait(false);
disconnectType = await clientConnection.RunAsync(connectionValidatorContext).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -287,8 +288,10 @@ namespace MQTTnet.Server
}
finally
{
if (clientWasConnected)
if (clientWasAuthorized && disconnectType != MqttClientDisconnectType.Takeover)
{
// Only cleanup if the client was authorized. If not it will remove the existing connection, session etc.
// This allows to kill connections and sessions from known client IDs.
if (clientId != null)
{
_connections.TryRemove(clientId, out _);
@@ -298,18 +301,23 @@ namespace MQTTnet.Server
await DeleteSessionAsync(clientId).ConfigureAwait(false);
}
}
}

await TryCleanupChannelAsync(channelAdapter).ConfigureAwait(false);
await SafeCleanupChannelAsync(channelAdapter).ConfigureAwait(false);

if (clientId != null)
{
await _eventDispatcher.TryHandleClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false);
}
if (clientWasAuthorized && clientId != null)
{
await _eventDispatcher.SafeNotifyClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false);
}

if (clientConnection != null)
{
clientConnection.IsFinalized = true;
}
}
}

private async Task<MqttConnectionValidatorContext> ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter)
async Task<MqttConnectionValidatorContext> ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter)
{
var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter, new ConcurrentDictionary<object, object>());

@@ -337,17 +345,22 @@ namespace MQTTnet.Server
return context;
}

private async Task<MqttClientConnection> CreateConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter)
async Task<MqttClientConnection> CreateClientConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter)
{
await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false);
try
using (await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false))
{
var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var session);

var isConnectionPresent = _connections.TryGetValue(connectPacket.ClientId, out var existingConnection);
if (isConnectionPresent)
{
await existingConnection.StopAsync().ConfigureAwait(false);
await existingConnection.StopAsync(true);

// TODO: This fixes a race condition with unit test Same_Client_Id_Connect_Disconnect_Event_Order.
// It is not clear where the issue is coming from. The connected event is fired BEFORE the disconnected
// event. This is wrong. It seems that the finally block in HandleClientAsync must be finished before we
// can continue here. Maybe there is a better way to do this.
SpinWait.SpinUntil(() => existingConnection.IsFinalized, TimeSpan.FromSeconds(10));
}

if (isSessionPresent)
@@ -377,13 +390,9 @@ namespace MQTTnet.Server

return connection;
}
finally
{
_createConnectionGate.Release();
}
}

private async Task<MqttApplicationMessageInterceptorContext> InterceptApplicationMessageAsync(MqttClientConnection senderConnection, MqttApplicationMessage applicationMessage)
async Task<MqttApplicationMessageInterceptorContext> InterceptApplicationMessageAsync(MqttClientConnection senderConnection, MqttApplicationMessage applicationMessage)
{
var interceptor = _options.ApplicationMessageInterceptor;
if (interceptor == null)
@@ -411,7 +420,7 @@ namespace MQTTnet.Server
return interceptorContext;
}

private async Task TryCleanupChannelAsync(IMqttChannelAdapter channelAdapter)
async Task SafeCleanupChannelAsync(IMqttChannelAdapter channelAdapter)
{
try
{


+ 9
- 9
Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs View File

@@ -1,9 +1,9 @@
using System;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using MQTTnet.Packets;
using MQTTnet.Protocol;

namespace MQTTnet.Server
{
@@ -67,7 +67,7 @@ namespace MQTTnet.Server
_subscriptions[finalTopicFilter.Topic] = finalTopicFilter;
}

await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientSession.ClientId, finalTopicFilter).ConfigureAwait(false);
await _eventDispatcher.SafeNotifyClientSubscribedTopicAsync(_clientSession.ClientId, finalTopicFilter).ConfigureAwait(false);
}
}

@@ -83,7 +83,7 @@ namespace MQTTnet.Server
var interceptorContext = await InterceptSubscribeAsync(topicFilter).ConfigureAwait(false);
if (!interceptorContext.AcceptSubscription)
{
continue;
continue;
}

if (interceptorContext.AcceptSubscription)
@@ -93,7 +93,7 @@ namespace MQTTnet.Server
_subscriptions[topicFilter.Topic] = topicFilter;
}

await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false);
await _eventDispatcher.SafeNotifyClientSubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false);
}
}
}
@@ -131,9 +131,9 @@ namespace MQTTnet.Server

foreach (var topicFilter in unsubscribePacket.TopicFilters)
{
await _eventDispatcher.HandleClientUnsubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false);
await _eventDispatcher.SafeNotifyClientUnsubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false);
}
return unsubAckPacket;
}

@@ -152,7 +152,7 @@ namespace MQTTnet.Server
lock (_subscriptions)
{
_subscriptions.Remove(topicFilter);
}
}
}
}



+ 6
- 6
Source/MQTTnet/Server/MqttRetainedMessagesManager.cs View File

@@ -1,10 +1,10 @@
using System;
using MQTTnet.Diagnostics;
using MQTTnet.Implementations;
using MQTTnet.Internal;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
using MQTTnet.Implementations;
using MQTTnet.Internal;

namespace MQTTnet.Server
{
@@ -14,10 +14,10 @@ namespace MQTTnet.Server
private readonly AsyncLock _messagesLock = new AsyncLock();
private readonly Dictionary<string, MqttApplicationMessage> _messages = new Dictionary<string, MqttApplicationMessage>();

private IMqttNetChildLogger _logger;
private IMqttNetLogger _logger;
private IMqttServerOptions _options;

public Task Start(IMqttServerOptions options, IMqttNetChildLogger logger)
public Task Start(IMqttServerOptions options, IMqttNetLogger logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager));


+ 9
- 9
Source/MQTTnet/Server/MqttServer.cs View File

@@ -1,15 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Adapter;
using MQTTnet.Client.Publishing;
using MQTTnet.Client.Receiving;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Protocol;
using MQTTnet.Server.Status;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
@@ -17,13 +17,13 @@ namespace MQTTnet.Server
{
private readonly MqttServerEventDispatcher _eventDispatcher;
private readonly ICollection<IMqttServerAdapter> _adapters;
private readonly IMqttNetChildLogger _logger;
private readonly IMqttNetLogger _logger;

private MqttClientSessionsManager _clientSessionsManager;
private IMqttRetainedMessagesManager _retainedMessagesManager;
private CancellationTokenSource _cancellationTokenSource;

public MqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetChildLogger logger)
public MqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger)
{
if (adapters == null) throw new ArgumentNullException(nameof(adapters));
_adapters = adapters.ToList();
@@ -194,7 +194,7 @@ namespace MQTTnet.Server

private Task OnHandleClient(IMqttChannelAdapter channelAdapter)
{
return _clientSessionsManager.HandleClientAsync(channelAdapter);
return _clientSessionsManager.HandleClientConnectionAsync(channelAdapter);
}
}
}

+ 59
- 31
Source/MQTTnet/Server/MqttServerEventDispatcher.cs View File

@@ -1,15 +1,15 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Receiving;
using MQTTnet.Diagnostics;
using System;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttServerEventDispatcher
{
private readonly IMqttNetChildLogger _logger;
readonly IMqttNetLogger _logger;

public MqttServerEventDispatcher(IMqttNetChildLogger logger)
public MqttServerEventDispatcher(IMqttNetLogger logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
@@ -24,18 +24,25 @@ namespace MQTTnet.Server

public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get; set; }

public Task HandleClientConnectedAsync(string clientId)
public async Task SafeNotifyClientConnectedAsync(string clientId)
{
var handler = ClientConnectedHandler;
if (handler == null)
try
{
return Task.FromResult(0);
}
var handler = ClientConnectedHandler;
if (handler == null)
{
return;
}

return handler.HandleClientConnectedAsync(new MqttServerClientConnectedEventArgs(clientId));
await handler.HandleClientConnectedAsync(new MqttServerClientConnectedEventArgs(clientId)).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.Error(exception, "Error while handling custom 'ClientConnected' event.");
}
}

public async Task TryHandleClientDisconnectedAsync(string clientId, MqttClientDisconnectType disconnectType)
public async Task SafeNotifyClientDisconnectedAsync(string clientId, MqttClientDisconnectType disconnectType)
{
try
{
@@ -49,41 +56,62 @@ namespace MQTTnet.Server
}
catch (Exception exception)
{
_logger.Error(exception, "Error while handling 'ClientDisconnected' event.");
_logger.Error(exception, "Error while handling custom 'ClientDisconnected' event.");
}
}

public Task HandleClientSubscribedTopicAsync(string clientId, TopicFilter topicFilter)
public async Task SafeNotifyClientSubscribedTopicAsync(string clientId, TopicFilter topicFilter)
{
var handler = ClientSubscribedTopicHandler;
if (handler == null)
try
{
return Task.FromResult(0);
}
var handler = ClientSubscribedTopicHandler;
if (handler == null)
{
return;
}

return handler.HandleClientSubscribedTopicAsync(new MqttServerClientSubscribedTopicEventArgs(clientId, topicFilter));
await handler.HandleClientSubscribedTopicAsync(new MqttServerClientSubscribedTopicEventArgs(clientId, topicFilter)).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.Error(exception, "Error while handling custom 'ClientSubscribedTopic' event.");
}
}

public Task HandleClientUnsubscribedTopicAsync(string clientId, string topicFilter)
public async Task SafeNotifyClientUnsubscribedTopicAsync(string clientId, string topicFilter)
{
var handler = ClientUnsubscribedTopicHandler;
if (handler == null)
try
{
return Task.FromResult(0);
}
var handler = ClientUnsubscribedTopicHandler;
if (handler == null)
{
return;
}

return handler.HandleClientUnsubscribedTopicAsync(new MqttServerClientUnsubscribedTopicEventArgs(clientId, topicFilter));
await handler.HandleClientUnsubscribedTopicAsync(new MqttServerClientUnsubscribedTopicEventArgs(clientId, topicFilter)).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.Error(exception, "Error while handling custom 'ClientUnsubscribedTopic' event.");
}
}

public Task HandleApplicationMessageReceivedAsync(string senderClientId, MqttApplicationMessage applicationMessage)
public async Task SafeNotifyApplicationMessageReceivedAsync(string senderClientId, MqttApplicationMessage applicationMessage)
{
var handler = ApplicationMessageReceivedHandler;
if (handler == null)
try
{
return Task.FromResult(0);
}
var handler = ApplicationMessageReceivedHandler;
if (handler == null)
{
return;
}

return handler.HandleApplicationMessageReceivedAsync(new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage));
await handler.HandleApplicationMessageReceivedAsync(new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage)).ConfigureAwait(false); ;
}
catch (Exception exception)
{
_logger.Error(exception, "Error while handling custom 'ApplicationMessageReceived' event.");
}
}
}
}

+ 4
- 4
Source/MQTTnet/Server/Status/IMqttClientStatus.cs View File

@@ -1,6 +1,6 @@
using System;
using MQTTnet.Formatter;
using System;
using System.Threading.Tasks;
using MQTTnet.Formatter;

namespace MQTTnet.Server.Status
{
@@ -9,7 +9,7 @@ namespace MQTTnet.Server.Status
string ClientId { get; }

string Endpoint { get; }
MqttProtocolVersion ProtocolVersion { get; }

DateTime LastPacketReceivedTimestamp { get; }
@@ -29,7 +29,7 @@ namespace MQTTnet.Server.Status
long BytesSent { get; }

long BytesReceived { get; }
Task DisconnectAsync();

void ResetStatistics();


+ 6
- 3
Source/MQTTnet/Server/Status/IMqttSessionStatus.cs View File

@@ -1,12 +1,15 @@
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MQTTnet.Server.Status
{
public interface IMqttSessionStatus
{
string ClientId { get; set; }
string ClientId { get; }

long PendingApplicationMessagesCount { get; set; }
long PendingApplicationMessagesCount { get; }

IDictionary<object, object> Items { get; }

Task ClearPendingApplicationMessagesAsync();



+ 4
- 4
Source/MQTTnet/Server/Status/MqttClientStatus.cs View File

@@ -1,16 +1,16 @@
using System;
using MQTTnet.Formatter;
using System;
using System.Threading.Tasks;
using MQTTnet.Formatter;

namespace MQTTnet.Server.Status
{
public class MqttClientStatus : IMqttClientStatus
{
private readonly MqttClientConnection _connection;
readonly MqttClientConnection _connection;

public MqttClientStatus(MqttClientConnection connection)
{
_connection = connection;
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
}

public string ClientId { get; set; }


+ 7
- 4
Source/MQTTnet/Server/Status/MqttSessionStatus.cs View File

@@ -1,12 +1,13 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MQTTnet.Server.Status
{
public class MqttSessionStatus : IMqttSessionStatus
{
private readonly MqttClientSession _session;
private readonly MqttClientSessionsManager _sessionsManager;
readonly MqttClientSession _session;
readonly MqttClientSessionsManager _sessionsManager;

public MqttSessionStatus(MqttClientSession session, MqttClientSessionsManager sessionsManager)
{
@@ -17,14 +18,16 @@ namespace MQTTnet.Server.Status
public string ClientId { get; set; }

public long PendingApplicationMessagesCount { get; set; }
public DateTime CreatedTimestamp { get; set; }

public IDictionary<object, object> Items { get; set; }

public Task DeleteAsync()
{
return _sessionsManager.DeleteSessionAsync(ClientId);
}
public Task ClearPendingApplicationMessagesAsync()
{
_session.ApplicationMessagesQueue.Clear();


+ 9
- 8
Source/MQTTnet/TopicFilter.cs View File

@@ -2,6 +2,7 @@

namespace MQTTnet
{
// TODO: Consider renaming to "MqttTopicFilter"
public class TopicFilter
{
public string Topic { get; set; }
@@ -26,16 +27,16 @@ namespace MQTTnet
public override string ToString()
{
return string.Concat(
"TopicFilter: [Topic=",
"TopicFilter: [Topic=",
Topic,
"] [QualityOfServiceLevel=",
"] [QualityOfServiceLevel=",
QualityOfServiceLevel,
"] [NoLocal=",
NoLocal,
"] [RetainAsPublished=",
RetainAsPublished,
"] [RetainHandling=",
RetainHandling,
"] [NoLocal=",
NoLocal,
"] [RetainAsPublished=",
RetainAsPublished,
"] [RetainHandling=",
RetainHandling,
"]");
}
}

+ 3
- 3
Tests/MQTTnet.AspNetCore.Tests/MQTTnet.AspNetCore.Tests.csproj View File

@@ -6,9 +6,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.4.0" />
<PackageReference Include="MSTest.TestAdapter" Version="2.0.0" />
<PackageReference Include="MSTest.TestFramework" Version="2.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="MSTest.TestAdapter" Version="2.1.1" />
<PackageReference Include="MSTest.TestFramework" Version="2.1.1" />
</ItemGroup>

<ItemGroup>


+ 2
- 2
Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs View File

@@ -10,13 +10,13 @@ namespace MQTTnet.Benchmarks
public class LoggerBenchmark
{
private IMqttNetLogger _logger;
private IMqttNetChildLogger _childLogger;
private IMqttNetLogger _childLogger;
private bool _useHandler;

[GlobalSetup]
public void Setup()
{
_logger = new MqttNetLogger("1");
_logger = new MqttNetLogger();
_childLogger = _logger.CreateChildLogger("child");

MqttNetGlobalLogger.LogMessagePublished += OnLogMessagePublished;


+ 2
- 2
Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj View File

@@ -10,8 +10,8 @@

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.12.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.5.2" />
<PackageReference Include="Microsoft.AspNetCore" Version="2.1.6" />
<PackageReference Include="System.IO.Pipelines" Version="4.7.1" />
<PackageReference Include="Microsoft.AspNetCore" Version="2.2.0" />
</ItemGroup>

<ItemGroup>


+ 3
- 3
Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs View File

@@ -1,11 +1,11 @@
using BenchmarkDotNet.Attributes;
using MQTTnet.Channel;
using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;
using MQTTnet.Implementations;
using MQTTnet.Server;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client.Options;

namespace MQTTnet.Benchmarks
{
@@ -20,7 +20,7 @@ namespace MQTTnet.Benchmarks
public void Setup()
{
var factory = new MqttFactory();
var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger());
tcpServer.ClientHandler += args =>
{
_serverChannel =
@@ -30,7 +30,7 @@ namespace MQTTnet.Benchmarks

return Task.CompletedTask;
};
_mqttServer = factory.CreateMqttServer(new[] { tcpServer }, new MqttNetLogger());

var serverOptions = new MqttServerOptionsBuilder().Build();


+ 1
- 1
Tests/MQTTnet.Benchmarks/Program.cs View File

@@ -9,7 +9,7 @@ namespace MQTTnet.Benchmarks
{
public static void Main(string[] args)
{
Console.WriteLine($"MQTTnet - BenchmarkApp.{TargetFrameworkInfoProvider.TargetFramework}");
Console.WriteLine($"MQTTnet - BenchmarkApp.{TargetFrameworkProvider.TargetFramework}");
Console.WriteLine("1 = MessageProcessingBenchmark");
Console.WriteLine("2 = SerializerBenchmark");
Console.WriteLine("3 = LoggerBenchmark");


+ 74
- 0
Tests/MQTTnet.Core.Tests/CrossPlatformSocket_Tests.cs View File

@@ -0,0 +1,74 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Implementations;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Tests
{
[TestClass]
public class CrossPlatformSocket_Tests
{
[TestMethod]
public async Task Connect_Send_Receive()
{
var crossPlatformSocket = new CrossPlatformSocket();
await crossPlatformSocket.ConnectAsync("www.google.de", 80, CancellationToken.None);

var requestBuffer = Encoding.UTF8.GetBytes("GET / HTTP/1.1\r\nHost: www.google.de\r\n\r\n");
await crossPlatformSocket.SendAsync(new ArraySegment<byte>(requestBuffer), System.Net.Sockets.SocketFlags.None);

var buffer = new byte[1024];
var length = await crossPlatformSocket.ReceiveAsync(new ArraySegment<byte>(buffer), System.Net.Sockets.SocketFlags.None);
crossPlatformSocket.Dispose();

var responseText = Encoding.UTF8.GetString(buffer, 0, length);

Assert.IsTrue(responseText.Contains("HTTP/1.1 200 OK"));
}

[TestMethod]
public async Task Try_Connect_Invalid_Host()
{
var crossPlatformSocket = new CrossPlatformSocket();

var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(3));
cancellationToken.Token.Register(() => crossPlatformSocket.Dispose());

await crossPlatformSocket.ConnectAsync("www.google.de", 1234, CancellationToken.None);
}

//[TestMethod]
//public async Task Use_Disconnected_Socket()
//{
// var crossPlatformSocket = new CrossPlatformSocket();

// await crossPlatformSocket.ConnectAsync("www.google.de", 80);

// var requestBuffer = Encoding.UTF8.GetBytes("GET /wrong_uri HTTP/1.1\r\nConnection: close\r\n\r\n");
// await crossPlatformSocket.SendAsync(new ArraySegment<byte>(requestBuffer), System.Net.Sockets.SocketFlags.None);

// var buffer = new byte[64000];
// var length = await crossPlatformSocket.ReceiveAsync(new ArraySegment<byte>(buffer), System.Net.Sockets.SocketFlags.None);

// await Task.Delay(500);

// await crossPlatformSocket.SendAsync(new ArraySegment<byte>(requestBuffer), System.Net.Sockets.SocketFlags.None);
//}

[TestMethod]
public async Task Set_Options()
{
var crossPlatformSocket = new CrossPlatformSocket();

Assert.IsFalse(crossPlatformSocket.ReuseAddress);
crossPlatformSocket.ReuseAddress = true;
Assert.IsTrue(crossPlatformSocket.ReuseAddress);

Assert.IsFalse(crossPlatformSocket.NoDelay);
crossPlatformSocket.NoDelay = true;
Assert.IsTrue(crossPlatformSocket.NoDelay);
}
}
}

+ 111
- 0
Tests/MQTTnet.Core.Tests/LowLevelMqttClient_Tests.cs View File

@@ -0,0 +1,111 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client.Options;
using MQTTnet.LowLevelClient;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Tests.Mockups;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Tests
{
[TestClass]
public class LowLevelMqttClient_Tests
{
public TestContext TestContext { get; set; }

[TestMethod]
public async Task Connect_And_Disconnect()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();

var factory = new MqttFactory();
var lowLevelClient = factory.CreateLowLevelMqttClient();

await lowLevelClient.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build(), CancellationToken.None);

await lowLevelClient.DisconnectAsync(CancellationToken.None);
}
}

[TestMethod]
public async Task Authenticate()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();

var factory = new MqttFactory();
var lowLevelClient = factory.CreateLowLevelMqttClient();

await lowLevelClient.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build(), CancellationToken.None);

var receivedPacket = await Authenticate(lowLevelClient).ConfigureAwait(false);

await lowLevelClient.DisconnectAsync(CancellationToken.None).ConfigureAwait(false);

Assert.IsNotNull(receivedPacket);
Assert.AreEqual(MqttConnectReturnCode.ConnectionAccepted, receivedPacket.ReturnCode);
}
}

[TestMethod]
public async Task Subscribe()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();

var factory = new MqttFactory();
var lowLevelClient = factory.CreateLowLevelMqttClient();

await lowLevelClient.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build(), CancellationToken.None);

await Authenticate(lowLevelClient).ConfigureAwait(false);

var receivedPacket = await Subscribe(lowLevelClient, "a").ConfigureAwait(false);

await lowLevelClient.DisconnectAsync(CancellationToken.None).ConfigureAwait(false);

Assert.IsNotNull(receivedPacket);
Assert.AreEqual(MqttSubscribeReturnCode.SuccessMaximumQoS0, receivedPacket.ReturnCodes[0]);
}
}

async Task<MqttConnAckPacket> Authenticate(ILowLevelMqttClient client)
{
await client.SendAsync(new MqttConnectPacket()
{
CleanSession = true,
ClientId = TestContext.TestName,
Username = "user",
Password = Encoding.UTF8.GetBytes("pass")
},
CancellationToken.None).ConfigureAwait(false);

return await client.ReceiveAsync(CancellationToken.None).ConfigureAwait(false) as MqttConnAckPacket;
}

async Task<MqttSubAckPacket> Subscribe(ILowLevelMqttClient client, string topic)
{
await client.SendAsync(new MqttSubscribePacket
{
PacketIdentifier = 1,
TopicFilters = new List<TopicFilter>
{
new TopicFilter
{
Topic = topic
}
}
},
CancellationToken.None).ConfigureAwait(false);

return await client.ReceiveAsync(CancellationToken.None).ConfigureAwait(false) as MqttSubAckPacket;
}
}
}

+ 3
- 3
Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj View File

@@ -6,9 +6,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MSTest.TestAdapter" Version="2.0.0" />
<PackageReference Include="MSTest.TestFramework" Version="2.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.4.0" />
<PackageReference Include="MSTest.TestAdapter" Version="2.1.1" />
<PackageReference Include="MSTest.TestFramework" Version="2.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
</ItemGroup>

<ItemGroup>


+ 4
- 4
Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs View File

@@ -69,7 +69,7 @@ namespace MQTTnet.Tests
.WithTcpServer("localhost", testEnvironment.ServerPort)
.WithWillMessage(willMessage);
var dyingClient = testEnvironment.CreateClient();
var dyingManagedClient = new ManagedMqttClient(dyingClient, testEnvironment.ClientLogger.CreateChildLogger());
var dyingManagedClient = new ManagedMqttClient(dyingClient, testEnvironment.ClientLogger);
await dyingManagedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
.WithClientOptions(clientOptions)
.Build());
@@ -96,7 +96,7 @@ namespace MQTTnet.Tests

var server = await testEnvironment.StartServerAsync();

var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger());
var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger());
var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", testEnvironment.ServerPort);

@@ -128,7 +128,7 @@ namespace MQTTnet.Tests

var server = await testEnvironment.StartServerAsync();

var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger());
var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger());
var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", testEnvironment.ServerPort);
var storage = new ManagedMqttClientTestStorage();
@@ -351,7 +351,7 @@ namespace MQTTnet.Tests
managedOptions.ConnectionCheckInterval = connectionCheckInterval ?? TimeSpan.FromSeconds(0.1);

var managedClient =
new ManagedMqttClient(underlyingClient ?? testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger());
new ManagedMqttClient(underlyingClient ?? testEnvironment.CreateClient(), new MqttNetLogger());

var connected = GetConnectedTask(managedClient);



+ 31
- 29
Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs View File

@@ -1,6 +1,4 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
@@ -10,10 +8,12 @@ using MQTTnet.Client.Publishing;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Subscribing;
using MQTTnet.Client.Unsubscribing;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Tests.Mockups
{
public class TestClientWrapper : IMqttClient
public sealed class TestClientWrapper : IMqttClient
{
public TestClientWrapper(IMqttClient implementation, TestContext testContext)
{
@@ -22,40 +22,42 @@ namespace MQTTnet.Tests.Mockups
}

public IMqttClient Implementation { get; }

public TestContext TestContext { get; }

public bool IsConnected => Implementation.IsConnected;

public IMqttClientOptions Options => Implementation.Options;

public IMqttClientConnectedHandler ConnectedHandler { get => Implementation.ConnectedHandler; set => Implementation.ConnectedHandler = value; }
public IMqttClientDisconnectedHandler DisconnectedHandler { get => Implementation.DisconnectedHandler; set => Implementation.DisconnectedHandler = value; }
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get => Implementation.ApplicationMessageReceivedHandler; set => Implementation.ApplicationMessageReceivedHandler = value; }
public IMqttClientConnectedHandler ConnectedHandler
{
get => Implementation.ConnectedHandler;
set => Implementation.ConnectedHandler = value;
}

public IMqttClientDisconnectedHandler DisconnectedHandler
{
get => Implementation.DisconnectedHandler;
set => Implementation.DisconnectedHandler = value;
}

public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
{
get => Implementation.ApplicationMessageReceivedHandler;
set => Implementation.ApplicationMessageReceivedHandler = value;
}

public Task<MqttClientAuthenticateResult> ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken)
{
switch (options)
if (TestContext != null)
{
case MqttClientOptionsBuilder builder:
{
var existingClientId = builder.Build().ClientId;
if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName))
{
builder.WithClientId(TestContext.TestName + existingClientId);
}
}
break;
case MqttClientOptions op:
{
var existingClientId = op.ClientId;
if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName))
{
op.ClientId = TestContext.TestName + existingClientId;
}
}
break;
default:
break;
var clientOptions = (MqttClientOptions)options;

var existingClientId = clientOptions.ClientId;
if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName))
{
clientOptions.ClientId = TestContext.TestName + existingClientId;
}
}

return Implementation.ConnectAsync(options, cancellationToken);
@@ -81,7 +83,7 @@ namespace MQTTnet.Tests.Mockups
return Implementation.SendExtendedAuthenticationExchangeDataAsync(data, cancellationToken);
}

public Task<Client.Subscribing.MqttClientSubscribeResult> SubscribeAsync(MqttClientSubscribeOptions options, CancellationToken cancellationToken)
public Task<MqttClientSubscribeResult> SubscribeAsync(MqttClientSubscribeOptions options, CancellationToken cancellationToken)
{
return Implementation.SubscribeAsync(options, cancellationToken);
}


+ 30
- 21
Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs View File

@@ -1,27 +1,27 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;
using MQTTnet.Internal;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace MQTTnet.Tests.Mockups
{
public class TestEnvironment : Disposable
public sealed class TestEnvironment : Disposable
{
private readonly MqttFactory _mqttFactory = new MqttFactory();
private readonly List<IMqttClient> _clients = new List<IMqttClient>();
private readonly IMqttNetLogger _serverLogger = new MqttNetLogger("server");
private readonly IMqttNetLogger _clientLogger = new MqttNetLogger("client");
readonly MqttFactory _mqttFactory = new MqttFactory();
readonly List<IMqttClient> _clients = new List<IMqttClient>();
readonly IMqttNetLogger _serverLogger = new MqttNetLogger("server");
readonly IMqttNetLogger _clientLogger = new MqttNetLogger("client");

private readonly List<string> _serverErrors = new List<string>();
private readonly List<string> _clientErrors = new List<string>();
readonly List<string> _serverErrors = new List<string>();
readonly List<string> _clientErrors = new List<string>();

private readonly List<Exception> _exceptions = new List<Exception>();
readonly List<Exception> _exceptions = new List<Exception>();

public IMqttServer Server { get; private set; }

@@ -37,36 +37,42 @@ namespace MQTTnet.Tests.Mockups

public TestContext TestContext { get; }

public TestEnvironment() : this(null)
{
}

public TestEnvironment(TestContext testContext)
{
TestContext = testContext;

_serverLogger.LogMessagePublished += (s, e) =>
{
if (e.TraceMessage.Level == MqttNetLogLevel.Error)
if (e.LogMessage.Level == MqttNetLogLevel.Error)
{
lock (_serverErrors)
{
_serverErrors.Add(e.TraceMessage.ToString());
_serverErrors.Add(e.LogMessage.ToString());
}
}
};

_clientLogger.LogMessagePublished += (s, e) =>
{
lock (_clientErrors)
if (e.LogMessage.Level == MqttNetLogLevel.Error)
{
if (e.TraceMessage.Level == MqttNetLogLevel.Error)
lock (_clientErrors)
{
_clientErrors.Add(e.TraceMessage.ToString());
_clientErrors.Add(e.LogMessage.ToString());
}
}
};
TestContext = testContext;
}

public IMqttClient CreateClient()
{
var client = _mqttFactory.CreateMqttClient(_clientLogger);
_clients.Add(client);

return new TestClientWrapper(client, TestContext);
}

@@ -90,15 +96,17 @@ namespace MQTTnet.Tests.Mockups

public Task<IMqttClient> ConnectClientAsync()
{
return ConnectClientAsync(new MqttClientOptionsBuilder() );
return ConnectClientAsync(new MqttClientOptionsBuilder());
}

public async Task<IMqttClient> ConnectClientAsync(MqttClientOptionsBuilder options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

options = options.WithTcpServer("localhost", ServerPort);

var client = CreateClient();
await client.ConnectAsync(options.WithTcpServer("localhost", ServerPort).Build());
await client.ConnectAsync(options.Build());

return client;
}
@@ -150,6 +158,7 @@ namespace MQTTnet.Tests.Mockups
throw new Exception($"{_exceptions.Count} exceptions tracked.\r\n" + string.Join(Environment.NewLine, _exceptions));
}
}

base.Dispose(disposing);
}



+ 10
- 10
Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs View File

@@ -1,20 +1,15 @@
using System;
using MQTTnet.Diagnostics;
using MQTTnet.Diagnostics;
using System;

namespace MQTTnet.Tests.Mockups
{
public class TestLogger : IMqttNetLogger, IMqttNetChildLogger
public class TestLogger : IMqttNetLogger
{
public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;

IMqttNetChildLogger IMqttNetLogger.CreateChildLogger(string source)
public IMqttNetLogger CreateChildLogger(string source)
{
return new MqttNetChildLogger(this, source);
}

IMqttNetChildLogger IMqttNetChildLogger.CreateChildLogger(string source)
{
return new MqttNetChildLogger(this, source);
return new TestLogger();
}

public void Verbose(string message, params object[] parameters)
@@ -36,5 +31,10 @@ namespace MQTTnet.Tests.Mockups
public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception)
{
}

public void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception)
{
throw new NotImplementedException();
}
}
}

+ 2
- 2
Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapterFactory.cs View File

@@ -12,8 +12,8 @@ namespace MQTTnet.Tests.Mockups
{
_adapter = adapter;
}
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger)
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger)
{
return _adapter;
}


+ 55
- 29
Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs View File

@@ -1,16 +1,16 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client.Publishing;
using MQTTnet.Client.Receiving;
using MQTTnet.Server;
using MQTTnet.Server.Status;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Tests.Mockups
{
public class TestServerWrapper : IMqttServer
public sealed class TestServerWrapper : IMqttServer
{
public TestServerWrapper(IMqttServer implementation, TestContext testContext, TestEnvironment testEnvironment)
{
@@ -22,16 +22,50 @@ namespace MQTTnet.Tests.Mockups
public IMqttServer Implementation { get; }
public TestContext TestContext { get; }
public TestEnvironment TestEnvironment { get; }
public IMqttServerStartedHandler StartedHandler { get => Implementation.StartedHandler; set => Implementation.StartedHandler = value; }
public IMqttServerStoppedHandler StoppedHandler { get => Implementation.StoppedHandler; set => Implementation.StoppedHandler = value; }
public IMqttServerClientConnectedHandler ClientConnectedHandler { get => Implementation.ClientConnectedHandler; set => Implementation.ClientConnectedHandler = value; }
public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get => Implementation.ClientDisconnectedHandler; set => Implementation.ClientDisconnectedHandler = value; }
public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler { get => Implementation.ClientSubscribedTopicHandler; set => Implementation.ClientSubscribedTopicHandler = value; }
public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler { get => Implementation.ClientUnsubscribedTopicHandler; set => Implementation.ClientUnsubscribedTopicHandler = value; }

public IMqttServerStartedHandler StartedHandler
{
get => Implementation.StartedHandler;
set => Implementation.StartedHandler = value;
}

public IMqttServerStoppedHandler StoppedHandler
{
get => Implementation.StoppedHandler;
set => Implementation.StoppedHandler = value;
}

public IMqttServerClientConnectedHandler ClientConnectedHandler
{
get => Implementation.ClientConnectedHandler;
set => Implementation.ClientConnectedHandler = value;
}

public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler
{
get => Implementation.ClientDisconnectedHandler;
set => Implementation.ClientDisconnectedHandler = value;
}

public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler
{
get => Implementation.ClientSubscribedTopicHandler;
set => Implementation.ClientSubscribedTopicHandler = value;
}

public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler
{
get => Implementation.ClientUnsubscribedTopicHandler;
set => Implementation.ClientUnsubscribedTopicHandler = value;
}

public IMqttServerOptions Options => Implementation.Options;

public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get => Implementation.ApplicationMessageReceivedHandler; set => Implementation.ApplicationMessageReceivedHandler = value; }
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
{
get => Implementation.ApplicationMessageReceivedHandler;
set => Implementation.ApplicationMessageReceivedHandler = value;
}

public Task ClearRetainedApplicationMessagesAsync()
{
@@ -60,22 +94,14 @@ namespace MQTTnet.Tests.Mockups

public Task StartAsync(IMqttServerOptions options)
{
switch (options)
if (TestContext != null)
{
case MqttServerOptionsBuilder builder:
if (builder.Build().ConnectionValidator == null)
{
builder.WithConnectionValidator(ConnectionValidator);
}
break;
case MqttServerOptions op:
if (op.ConnectionValidator == null)
{
op.ConnectionValidator = new MqttServerConnectionValidatorDelegate(ConnectionValidator);
}
break;
default:
break;
var serverOptions = (MqttServerOptions)options;

if (serverOptions.ConnectionValidator == null)
{
serverOptions.ConnectionValidator = new MqttServerConnectionValidatorDelegate(ConnectionValidator);
}
}

return Implementation.StartAsync(options);
@@ -85,7 +111,7 @@ namespace MQTTnet.Tests.Mockups
{
if (!ctx.ClientId.StartsWith(TestContext.TestName))
{
TestEnvironment.TrackException(new InvalidOperationException($"invalid client connected '{ctx.ClientId}'"));
TestEnvironment.TrackException(new InvalidOperationException($"Invalid client ID used ({ctx.ClientId}). It must start with UnitTest name."));
ctx.ReasonCode = Protocol.MqttConnectReasonCode.ClientIdentifierNotValid;
}
}


+ 32
- 0
Tests/MQTTnet.Core.Tests/MqttApplicationMessage_Tests.cs View File

@@ -0,0 +1,32 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Extensions;
using MQTTnet.Packets;
using System.Collections.Generic;

namespace MQTTnet.Tests
{
[TestClass]
public class MqttApplicationMessage_Tests
{
[TestMethod]
public void GetUserProperty_Test()
{
var message = new MqttApplicationMessage
{
UserProperties = new List<MqttUserProperty>
{
new MqttUserProperty("foo", "bar"),
new MqttUserProperty("value", "1011"),
new MqttUserProperty("CASE", "insensitive")
}
};

Assert.AreEqual("bar", message.GetUserProperty("foo"));
//Assert.AreEqual(1011, message.GetUserProperty<int>("value"));
Assert.AreEqual(null, message.GetUserProperty("case"));
Assert.AreEqual(null, message.GetUserProperty("nonExists"));
//Assert.AreEqual(null, message.GetUserProperty<int?>("nonExists"));
//Assert.ThrowsException<InvalidOperationException>(() => message.GetUserProperty<int>("nonExists"));
}
}
}

+ 22
- 0
Tests/MQTTnet.Core.Tests/MqttClientOptionsBuilder_Tests.cs View File

@@ -0,0 +1,22 @@
using System.Linq;
using System.Text;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client.Options;
using MQTTnet.Extensions;

namespace MQTTnet.Tests
{
[TestClass]
public class MqttClientOptionsBuilder_Tests
{
[TestMethod]
public void WithConnectionUri_Credential_Test()
{
var options = new MqttClientOptionsBuilder()
.WithConnectionUri("mqtt://user:password@127.0.0.1")
.Build();
Assert.AreEqual("user", options.Credentials.Username);
Assert.IsTrue(Encoding.UTF8.GetBytes("password").SequenceEqual(options.Credentials.Password));
}
}
}

+ 15
- 13
Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs View File

@@ -1,9 +1,3 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
@@ -14,6 +8,12 @@ using MQTTnet.Exceptions;
using MQTTnet.Protocol;
using MQTTnet.Server;
using MQTTnet.Tests.Mockups;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Tests
{
@@ -29,9 +29,9 @@ namespace MQTTnet.Tests
{
await testEnvironment.StartServerAsync();
var client = await testEnvironment.ConnectClientAsync();
await client.SubscribeAsync("#");
var replyReceived = false;

client.UseApplicationMessageReceivedHandler(c =>
@@ -78,7 +78,7 @@ namespace MQTTnet.Tests
}
});

client2.UseApplicationMessageReceivedHandler(async c =>{ await client2.PublishAsync("reply", null, MqttQualityOfServiceLevel.AtLeastOnce); });
client2.UseApplicationMessageReceivedHandler(async c => { await client2.PublishAsync("reply", null, MqttQualityOfServiceLevel.AtLeastOnce); });

await client1.PublishAsync("request", null, MqttQualityOfServiceLevel.AtLeastOnce);

@@ -181,7 +181,7 @@ namespace MQTTnet.Tests
catch
{
}
SpinWait.SpinUntil(() => tries >= maxTries, 10000);

Assert.AreEqual(maxTries, tries);
@@ -215,7 +215,7 @@ namespace MQTTnet.Tests
Assert.AreEqual((ushort)4, result.PacketIdentifier);
}
}
[TestMethod]
public async Task Invalid_Connect_Throws_Exception()
{
@@ -558,6 +558,8 @@ namespace MQTTnet.Tests
clients.Add(await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a")));
}

await Task.Delay(500);

var clientStatus = await testEnvironment.Server.GetClientStatusAsync();
var sessionStatus = await testEnvironment.Server.GetSessionStatusAsync();

@@ -565,7 +567,7 @@ namespace MQTTnet.Tests
{
Assert.IsFalse(clients[i].IsConnected);
}
Assert.IsTrue(clients[99].IsConnected);

Assert.AreEqual(1, clientStatus.Count);
@@ -583,7 +585,7 @@ namespace MQTTnet.Tests
var sendClient = await testEnvironment.ConnectClientAsync();
await sendClient.PublishAsync("x", "1");

await Task.Delay(100);
await Task.Delay(250);

Assert.AreEqual("1", receivedPayload);
}


+ 7
- 5
Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs View File

@@ -18,13 +18,13 @@ namespace MQTTnet.Tests
// This test compares
// 1. correct logID
var logId = "logId";
string invalidLogId = null;
var hasInvalidLogId = false;

// 2. if the total log calls are the same for global and local
//var globalLogCount = 0;
var localLogCount = 0;

var logger = new MqttNetLogger(logId);
var logger = new MqttNetLogger(null, logId);

// TODO: This is commented out because it is affected by other tests.
//// we have a theoretical bug here if a concurrent test is also logging
@@ -42,9 +42,9 @@ namespace MQTTnet.Tests

logger.LogMessagePublished += (s, e) =>
{
if (e.TraceMessage.LogId != logId)
if (e.LogMessage.LogId != logId)
{
invalidLogId = e.TraceMessage.LogId;
hasInvalidLogId = true;
}

Interlocked.Increment(ref localLogCount);
@@ -72,7 +72,9 @@ namespace MQTTnet.Tests
//MqttNetGlobalLogger.LogMessagePublished -= globalLog;
}

Assert.IsNull(invalidLogId);
await Task.Delay(500);

Assert.IsFalse(hasInvalidLogId);
Assert.AreNotEqual(0, localLogCount);
}
}

+ 7
- 7
Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs View File

@@ -1,12 +1,12 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Server;
using MQTTnet.Server.Status;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Tests
{
@@ -23,7 +23,7 @@ namespace MQTTnet.Tests
counter++;
return Task.CompletedTask;
},
new MqttNetLogger().CreateChildLogger());
new MqttNetLogger());

Assert.AreEqual(0, counter);

@@ -46,7 +46,7 @@ namespace MQTTnet.Tests
counter++;
return Task.CompletedTask;
},
new MqttNetLogger().CreateChildLogger());
new MqttNetLogger());

Assert.AreEqual(0, counter);



+ 67
- 0
Tests/MQTTnet.Core.Tests/MqttNetLogger_Tests.cs View File

@@ -0,0 +1,67 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Diagnostics;

namespace MQTTnet.Tests
{
[TestClass]
public class MqttNetLogger_Tests
{
[TestMethod]
public void Root_Log_Messages()
{
var logger = new MqttNetLogger();

var logMessagesCount = 0;

logger.LogMessagePublished += (s, e) =>
{
logMessagesCount++;
};

logger.Verbose("Verbose");
logger.Info("Info");
logger.Warning(null, "Warning");
logger.Error(null, "Error");

Assert.AreEqual(4, logMessagesCount);
}

[TestMethod]
public void Bubbling_Log_Messages()
{
var logger = new MqttNetLogger();
var childLogger = logger.CreateChildLogger("Source1");

var logMessagesCount = 0;

logger.LogMessagePublished += (s, e) =>
{
logMessagesCount++;
};

childLogger.Verbose("Verbose");
childLogger.Info("Info");
childLogger.Warning(null, "Warning");
childLogger.Error(null, "Error");

Assert.AreEqual(4, logMessagesCount);
}

[TestMethod]
public void Set_Custom_Log_ID()
{
var logger = new MqttNetLogger(null, "logId");
var childLogger = logger.CreateChildLogger("Source1");

logger.LogMessagePublished += (s, e) =>
{
Assert.AreEqual("logId", e.LogMessage.LogId);
};

childLogger.Verbose("Verbose");
childLogger.Info("Info");
childLogger.Warning(null, "Warning");
childLogger.Error(null, "Error");
}
}
}

+ 9
- 9
Tests/MQTTnet.Core.Tests/MqttTcpChannel_Tests.cs View File

@@ -1,10 +1,10 @@
using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Implementations;
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Implementations;

namespace MQTTnet.Tests
{
@@ -15,7 +15,7 @@ namespace MQTTnet.Tests
public async Task Dispose_Channel_While_Used()
{
var ct = new CancellationTokenSource();
var serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var serverSocket = new CrossPlatformSocket(AddressFamily.InterNetwork);

try
{
@@ -28,18 +28,18 @@ namespace MQTTnet.Tests
{
while (!ct.IsCancellationRequested)
{
var client = await PlatformAbstractionLayer.AcceptAsync(serverSocket);
var client = await serverSocket.AcceptAsync();
var data = new byte[] { 128 };
await PlatformAbstractionLayer.SendAsync(client, new ArraySegment<byte>(data), SocketFlags.None);
await client.SendAsync(new ArraySegment<byte>(data), SocketFlags.None);
}
}, ct.Token);

var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await PlatformAbstractionLayer.ConnectAsync(clientSocket, IPAddress.Loopback, 50001);
var clientSocket = new CrossPlatformSocket(AddressFamily.InterNetwork);
await clientSocket.ConnectAsync("localhost", 50001, CancellationToken.None);

await Task.Delay(100, ct.Token);

var tcpChannel = new MqttTcpChannel(new NetworkStream(clientSocket, true), "test", null);
var tcpChannel = new MqttTcpChannel(clientSocket.GetStream(), "test", null);

var buffer = new byte[1];
await tcpChannel.ReadAsync(buffer, 0, 1, ct.Token);


+ 22
- 21
Tests/MQTTnet.Core.Tests/Server_Tests.cs View File

@@ -904,14 +904,13 @@ namespace MQTTnet.Tests

await testEnvironment.StartServerAsync(serverOptions);


var connectingFailedException = await Assert.ThrowsExceptionAsync<MqttConnectingFailedException>(() => testEnvironment.ConnectClientAsync());
Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode);
}
}

Dictionary<string, bool> _connected;

private Dictionary<string, bool> _connected;
private void ConnectionValidationHandler(MqttConnectionValidatorContext eventArgs)
{
if (_connected.ContainsKey(eventArgs.ClientId))
@@ -919,6 +918,7 @@ namespace MQTTnet.Tests
eventArgs.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}

_connected[eventArgs.ClientId] = true;
eventArgs.ReasonCode = MqttConnectReasonCode.Success;
return;
@@ -1016,7 +1016,7 @@ namespace MQTTnet.Tests
[TestMethod]
public async Task Same_Client_Id_Connect_Disconnect_Event_Order()
{
using (var testEnvironment = new TestEnvironment(TestContext))
using (var testEnvironment = new TestEnvironment())
{
var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());

@@ -1038,11 +1038,11 @@ namespace MQTTnet.Tests
}
});

var clientOptions = new MqttClientOptionsBuilder()
.WithClientId("same_id");
var clientOptionsBuilder = new MqttClientOptionsBuilder()
.WithClientId(Guid.NewGuid().ToString());

// c
var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
var c1 = await testEnvironment.ConnectClientAsync(clientOptionsBuilder);

await Task.Delay(500);

@@ -1051,7 +1051,13 @@ namespace MQTTnet.Tests

// dc
// Connect client with same client ID. Should disconnect existing client.
var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
var c2 = await testEnvironment.ConnectClientAsync(clientOptionsBuilder);

await Task.Delay(500);

flow = string.Join(string.Empty, events);

Assert.AreEqual("cdc", flow);

c2.UseApplicationMessageReceivedHandler(_ =>
{
@@ -1061,15 +1067,10 @@ namespace MQTTnet.Tests
}
});

c2.SubscribeAsync("topic").Wait();

await Task.Delay(500);

flow = string.Join(string.Empty, events);
Assert.AreEqual("cdc", flow);
await c2.SubscribeAsync("topic");

// r
c2.PublishAsync("topic").Wait();
await c2.PublishAsync("topic");

await Task.Delay(500);

@@ -1149,15 +1150,15 @@ namespace MQTTnet.Tests
{
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));

var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await PlatformAbstractionLayer.ConnectAsync(client, "localhost", testEnvironment.ServerPort);
var client = new CrossPlatformSocket(AddressFamily.InterNetwork);
await client.ConnectAsync("localhost", testEnvironment.ServerPort, CancellationToken.None);

// Don't send anything. The server should close the connection.
await Task.Delay(TimeSpan.FromSeconds(3));

try
{
var receivedBytes = await PlatformAbstractionLayer.ReceiveAsync(client, new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
if (receivedBytes == 0)
{
return;
@@ -1180,17 +1181,17 @@ namespace MQTTnet.Tests

// Send an invalid packet and ensure that the server will close the connection and stay in a waiting state
// forever. This is security related.
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await PlatformAbstractionLayer.ConnectAsync(client, "localhost", testEnvironment.ServerPort);
var client = new CrossPlatformSocket(AddressFamily.InterNetwork);
await client.ConnectAsync("localhost", testEnvironment.ServerPort, CancellationToken.None);

var buffer = Encoding.UTF8.GetBytes("Garbage");
client.Send(buffer, buffer.Length, SocketFlags.None);
await client.SendAsync(new ArraySegment<byte>(buffer), SocketFlags.None);

await Task.Delay(TimeSpan.FromSeconds(3));

try
{
var receivedBytes = await PlatformAbstractionLayer.ReceiveAsync(client, new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
if (receivedBytes == 0)
{
return;


+ 29
- 4
Tests/MQTTnet.Core.Tests/Session_Tests.cs View File

@@ -1,10 +1,11 @@
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Client.Subscribing;
using MQTTnet.Server;
using MQTTnet.Tests.Mockups;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQTTnet.Tests
{
@@ -42,7 +43,7 @@ namespace MQTTnet.Tests
string receivedPayload = null;

var client = await testEnvironment.ConnectClientAsync();
client.UseApplicationMessageReceivedHandler(delegate(MqttApplicationMessageReceivedEventArgs args)
client.UseApplicationMessageReceivedHandler(delegate (MqttApplicationMessageReceivedEventArgs args)
{
receivedPayload = args.ApplicationMessage.ConvertPayloadToString();
});
@@ -59,5 +60,29 @@ namespace MQTTnet.Tests
Assert.AreEqual("Hello World", receivedPayload);
}
}

[TestMethod]
public async Task Get_Session_Items_In_Status()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
var serverOptions = new MqttServerOptionsBuilder()
.WithConnectionValidator(delegate (MqttConnectionValidatorContext context)
{
// Don't validate anything. Just set some session items.
context.SessionItems["can_subscribe_x"] = true;
context.SessionItems["default_payload"] = "Hello World";
});

await testEnvironment.StartServerAsync(serverOptions);

var client = await testEnvironment.ConnectClientAsync();

var sessionStatus = await testEnvironment.Server.GetSessionStatusAsync();
var session = sessionStatus.First();

Assert.AreEqual(true, session.Items["can_subscribe_x"]);
}
}
}
}

+ 2
- 2
Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj View File

@@ -10,8 +10,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore" Version="2.1.6" />
<PackageReference Include="Microsoft.AspNetCore.StaticFiles" Version="2.1.1" />
<PackageReference Include="Microsoft.AspNetCore" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.StaticFiles" Version="2.2.0" />
</ItemGroup>

<ItemGroup>


+ 1
- 1
Tests/MQTTnet.TestApp.NetCore/Program.cs View File

@@ -17,7 +17,7 @@ namespace MQTTnet.TestApp.NetCore
{
public static void Main()
{
Console.WriteLine($"MQTTnet - TestApp.{TargetFrameworkInfoProvider.TargetFramework}");
Console.WriteLine($"MQTTnet - TestApp.{TargetFrameworkProvider.TargetFramework}");
Console.WriteLine("1 = Start client");
Console.WriteLine("2 = Start server");
Console.WriteLine("3 = Start performance test");


+ 1
- 1
Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj View File

@@ -147,7 +147,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NETCore.UniversalWindowsPlatform">
<Version>6.2.8</Version>
<Version>6.2.10</Version>
</PackageReference>
<PackageReference Include="Microsoft.Toolkit.Uwp.UI.Controls">
<Version>4.0.0</Version>


+ 27
- 17
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs View File

@@ -1,12 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.ObjectModel;
using System.Text;
using System.Threading.Tasks;
using Windows.Security.Cryptography.Certificates;
using Windows.UI.Core;
using Windows.UI.Xaml;
using MQTTnet.Client;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
@@ -14,14 +6,22 @@ using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Extensions.Rpc;
using MQTTnet.Extensions.WebSocket4Net;
using MQTTnet.Formatter;
using MQTTnet.Implementations;
using MQTTnet.Protocol;
using MQTTnet.Server;
using MQTTnet.Server.Status;
using System;
using System.Collections.Concurrent;
using System.Collections.ObjectModel;
using System.Text;
using System.Threading.Tasks;
using Windows.Security.Cryptography.Certificates;
using Windows.UI.Core;
using Windows.UI.Xaml;
using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs;
using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs;
using MQTTnet.Extensions.WebSocket4Net;

namespace MQTTnet.TestApp.UniversalWindows
{
@@ -141,7 +141,7 @@ namespace MQTTnet.TestApp.UniversalWindows
Password = Encoding.UTF8.GetBytes(Password.Text)
};
}
options.CleanSession = CleanSession.IsChecked == true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(double.Parse(KeepAliveInterval.Text));

@@ -198,16 +198,26 @@ namespace MQTTnet.TestApp.UniversalWindows

private void OnDisconnected(MqttClientDisconnectedEventArgs e)
{
_traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
"", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null));
_traceMessages.Enqueue(new MqttNetLogMessage
{
Timestamp = DateTime.UtcNow,
ThreadId = -1,
Level = MqttNetLogLevel.Info,
Message = "! DISCONNECTED EVENT FIRED",
});

Task.Run(UpdateLogAsync);
}

private void OnConnected(MqttClientConnectedEventArgs e)
{
_traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
"", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null));
_traceMessages.Enqueue(new MqttNetLogMessage
{
Timestamp = DateTime.UtcNow,
ThreadId = -1,
Level = MqttNetLogLevel.Info,
Message = "! CONNECTED EVENT FIRED",
});

Task.Run(UpdateLogAsync);
}
@@ -538,7 +548,7 @@ namespace MQTTnet.TestApp.UniversalWindows
{
//...
}
client.UseApplicationMessageReceivedHandler(e => Handler(e));

// Subscribe after connect
@@ -614,7 +624,7 @@ namespace MQTTnet.TestApp.UniversalWindows
};
}
}
// ----------------------------------
{
var options = new MqttServerOptions();


Loading…
Cancel
Save