Browse Source

Removed 3rd-Party libraries.

release/3.x.x
Christian Kratky 7 years ago
parent
commit
77b80dc53e
66 changed files with 614 additions and 993 deletions
  1. +6
    -21
      Build/MQTTnet.nuspec
  2. +7
    -9
      Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs
  3. +3
    -8
      Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs
  4. +1
    -1
      Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs
  5. +0
    -2
      Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs
  6. +50
    -0
      Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetConsoleTrace.cs
  7. +26
    -0
      Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs
  8. +6
    -7
      Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.Uwp.cs
  9. +8
    -9
      Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs
  10. +1
    -1
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs
  11. +1
    -1
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  12. +10
    -3
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  13. +5
    -20
      Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj
  14. +19
    -97
      Frameworks/MQTTnet.NetStandard/MqttFactory.cs
  15. +0
    -70
      Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs
  16. +1
    -1
      MQTTnet.Core/Adapter/IMqttChannelAdapter.cs
  17. +0
    -12
      MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs
  18. +9
    -9
      MQTTnet.Core/Adapter/MqttChannelAdapter.cs
  19. +2
    -2
      MQTTnet.Core/Adapter/MqttChannelAdapterExtensions.cs
  20. +2
    -2
      MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs
  21. +1
    -1
      MQTTnet.Core/Channel/IMqttChannel.cs
  22. +10
    -0
      MQTTnet.Core/Client/IMqttClientAdapterFactory.cs
  23. +6
    -1
      MQTTnet.Core/Client/IMqttClientFactory.cs
  24. +0
    -4
      MQTTnet.Core/Client/IMqttClientOptions.cs
  25. +29
    -31
      MQTTnet.Core/Client/MqttClient.cs
  26. +0
    -3
      MQTTnet.Core/Client/MqttClientOptions.cs
  27. +0
    -6
      MQTTnet.Core/Client/MqttClientOptionsBuilder.cs
  28. +4
    -4
      MQTTnet.Core/Client/MqttPacketDispatcher.cs
  29. +21
    -0
      MQTTnet.Core/Diagnostics/IMqttNetLogger.cs
  30. +18
    -0
      MQTTnet.Core/Diagnostics/MqttNetGlobalLog.cs
  31. +13
    -0
      MQTTnet.Core/Diagnostics/MqttNetLogLevel.cs
  32. +6
    -4
      MQTTnet.Core/Diagnostics/MqttNetLogMessage.cs
  33. +14
    -0
      MQTTnet.Core/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs
  34. +48
    -27
      MQTTnet.Core/Diagnostics/MqttNetLogger.cs
  35. +0
    -35
      MQTTnet.Core/Diagnostics/MqttNetTrace.cs
  36. +0
    -14
      MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs
  37. +2
    -8
      MQTTnet.Core/MQTTnet.Core.csproj
  38. +19
    -19
      MQTTnet.Core/ManagedClient/ManagedMqttClient.cs
  39. +0
    -15
      MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs
  40. +0
    -7
      MQTTnet.Core/Server/IMqttClientSesssionFactory.cs
  41. +1
    -1
      MQTTnet.Core/Server/IMqttServer.cs
  42. +4
    -2
      MQTTnet.Core/Server/IMqttServerFactory.cs
  43. +12
    -12
      MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs
  44. +21
    -23
      MQTTnet.Core/Server/MqttClientSession.cs
  45. +17
    -27
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  46. +2
    -3
      MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs
  47. +10
    -11
      MQTTnet.Core/Server/MqttRetainedMessagesManager.cs
  48. +62
    -43
      MQTTnet.Core/Server/MqttServer.cs
  49. +1
    -1
      Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj
  50. +0
    -51
      Tests/MQTTnet.Core.Tests/MqttLoggerProviderTest.cs
  51. +37
    -106
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs
  52. +4
    -5
      Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
  53. +0
    -30
      Tests/MQTTnet.Core.Tests/ServiceCollectionTest.cs
  54. +0
    -14
      Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs
  55. +0
    -26
      Tests/MQTTnet.Core.Tests/TestLogger.cs
  56. +1
    -1
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs
  57. +6
    -11
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs
  58. +3
    -3
      Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs
  59. +1
    -3
      Tests/MQTTnet.TestApp.NetCore/ClientTest.cs
  60. +0
    -6
      Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj
  61. +1
    -11
      Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs
  62. +14
    -43
      Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs
  63. +0
    -3
      Tests/MQTTnet.TestApp.NetCore/Program.cs
  64. +20
    -27
      Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
  65. +0
    -6
      Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj
  66. +49
    -70
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 6
- 21
Build/MQTTnet.nuspec View File

@@ -2,7 +2,7 @@
<package > <package >
<metadata> <metadata>
<id>MQTTnet</id> <id>MQTTnet</id>
<version>2.5.2</version>
<version>2.5.3</version>
<authors>Christian Kratky</authors> <authors>Christian Kratky</authors>
<owners>Christian Kratky</owners> <owners>Christian Kratky</owners>
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl> <licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl>
@@ -10,7 +10,9 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl> <iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance> <requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> <description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [Core] Refactored trace messages.
<releaseNotes>* [Core] Removed all dependencies to other libraries (BREAKING CHANGE!).
* [Core] Updated SDK libraries.
* [Client] Fixed broken support for WebSocketSecure connections (Thanks to @StAI).
</releaseNotes> </releaseNotes>
<copyright>Copyright Christian Kratky 2016-2017</copyright> <copyright>Copyright Christian Kratky 2016-2017</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
@@ -18,43 +20,26 @@


<group targetFramework="netstandard1.3"> <group targetFramework="netstandard1.3">
<dependency id="NETStandard.Library" version="1.3.0" /> <dependency id="NETStandard.Library" version="1.3.0" />
<dependency id="Microsoft.Extensions.DependencyInjection" version="1.0.2" />
<dependency id="Microsoft.Extensions.Logging" version="1.0.2" />
<dependency id="Microsoft.Extensions.Options" version="1.0.2" />
<dependency id="System.Net.Security" version="4.3.2" /> <dependency id="System.Net.Security" version="4.3.2" />
<dependency id="System.Net.WebSockets" version="4.3.0" /> <dependency id="System.Net.WebSockets" version="4.3.0" />
<dependency id="System.Net.WebSockets.Client" version="4.3.1" />
<dependency id="System.Threading.Thread" version="4.3.0" />
<dependency id="System.Net.WebSockets.Client" version="4.3.1" />
</group> </group>


<group targetFramework="netstandard2.0"> <group targetFramework="netstandard2.0">
<dependency id="NETStandard.Library" version="2.0.0" /> <dependency id="NETStandard.Library" version="2.0.0" />
<dependency id="Microsoft.Extensions.DependencyInjection" version="2.0.0" />
<dependency id="Microsoft.Extensions.Logging" version="2.0.0" />
<dependency id="Microsoft.Extensions.Options" version="2.0.0" />
<dependency id="System.Net.Security" version="4.3.2" /> <dependency id="System.Net.Security" version="4.3.2" />
<dependency id="System.Net.WebSockets" version="4.3.0" /> <dependency id="System.Net.WebSockets" version="4.3.0" />
<dependency id="System.Net.WebSockets.Client" version="4.3.1" /> <dependency id="System.Net.WebSockets.Client" version="4.3.1" />
<dependency id="System.Threading.Thread" version="4.3.0" />
</group> </group>
<group targetFramework="uap10.0"> <group targetFramework="uap10.0">
<dependency id="Microsoft.NETCore.UniversalWindowsPlatform" version="5.4.0" />
<dependency id="Microsoft.Extensions.DependencyInjection" version="1.1.1" />
<dependency id="Microsoft.Extensions.Logging" version="1.1.2" />
<dependency id="Microsoft.Extensions.Options" version="1.1.2" />
<dependency id="Microsoft.NETCore.UniversalWindowsPlatform" version="5.4.1" />
</group> </group>
<group targetFramework="net452"> <group targetFramework="net452">
<dependency id="Microsoft.Extensions.DependencyInjection" version="1.0.2" />
<dependency id="Microsoft.Extensions.Logging" version="1.0.2" />
<dependency id="Microsoft.Extensions.Options" version="1.0.2" />
</group> </group>


<group targetFramework="net461"> <group targetFramework="net461">
<dependency id="Microsoft.Extensions.DependencyInjection" version="2.0.0" />
<dependency id="Microsoft.Extensions.Logging" version="2.0.0" />
<dependency id="Microsoft.Extensions.Options" version="2.0.0" />
</group> </group>


</dependencies> </dependencies>


+ 7
- 9
Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs View File

@@ -2,29 +2,27 @@
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;


namespace MQTTnet.AspNetCore namespace MQTTnet.AspNetCore
{ {
public class MqttHostedServer : MqttServer, IHostedService public class MqttHostedServer : MqttServer, IHostedService
{ {
private readonly MqttServerOptions _options;

public MqttHostedServer( public MqttHostedServer(
IOptions<MqttServerOptions> options,
MqttServerOptions options,
IEnumerable<IMqttServerAdapter> adapters, IEnumerable<IMqttServerAdapter> adapters,
ILogger<MqttServer> logger,
MqttClientSessionsManager clientSessionsManager,
IMqttClientRetainedMessageManager clientRetainedMessageManager
)
: base(options, adapters, logger, clientSessionsManager, clientRetainedMessageManager)
IMqttNetLogger logger) : base(adapters, logger)
{ {
_options = options;
} }


public Task StartAsync(CancellationToken cancellationToken) public Task StartAsync(CancellationToken cancellationToken)
{ {
return StartAsync();
return StartAsync(_options);
} }


public Task StopAsync(CancellationToken cancellationToken) public Task StopAsync(CancellationToken cancellationToken)


+ 3
- 8
Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs View File

@@ -2,19 +2,14 @@
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Serializer;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;


namespace MQTTnet.AspNetCore namespace MQTTnet.AspNetCore
{ {
public class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable public class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable
{ {
private readonly IMqttCommunicationAdapterFactory _mqttCommunicationAdapterFactory;

public MqttWebSocketServerAdapter(IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory)
{
_mqttCommunicationAdapterFactory = mqttCommunicationAdapterFactory ?? throw new ArgumentNullException(nameof(mqttCommunicationAdapterFactory));
}

public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted; public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;


public Task StartAsync(MqttServerOptions options) public Task StartAsync(MqttServerOptions options)
@@ -32,7 +27,7 @@ namespace MQTTnet.AspNetCore
if (webSocket == null) throw new ArgumentNullException(nameof(webSocket)); if (webSocket == null) throw new ArgumentNullException(nameof(webSocket));


var channel = new MqttWebSocketServerChannel(webSocket); var channel = new MqttWebSocketServerChannel(webSocket);
var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerCommunicationAdapter(channel);
var clientAdapter = new MqttChannelAdapter(channel, new MqttPacketSerializer(), new MqttNetLogger());


var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter); var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter);
ClientAccepted?.Invoke(this, eventArgs); ClientAccepted?.Invoke(this, eventArgs);


+ 1
- 1
Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs View File

@@ -8,7 +8,7 @@ using MQTTnet.Implementations;


namespace MQTTnet.AspNetCore namespace MQTTnet.AspNetCore
{ {
public class MqttWebSocketServerChannel : IMqttCommunicationChannel, IDisposable
public class MqttWebSocketServerChannel : IMqttChannel, IDisposable
{ {
private WebSocket _webSocket; private WebSocket _webSocket;




+ 0
- 2
Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs View File

@@ -9,8 +9,6 @@ namespace MQTTnet.AspNetCore
{ {
public static IServiceCollection AddHostedMqttServer(this IServiceCollection services) public static IServiceCollection AddHostedMqttServer(this IServiceCollection services)
{ {
services.AddMqttServerServices();

services.AddSingleton<IHostedService>(s => s.GetService<MqttHostedServer>()); services.AddSingleton<IHostedService>(s => s.GetService<MqttHostedServer>());
services.AddSingleton<IMqttServer>(s => s.GetService<MqttHostedServer>()); services.AddSingleton<IMqttServer>(s => s.GetService<MqttHostedServer>());
services.AddSingleton<MqttHostedServer>(); services.AddSingleton<MqttHostedServer>();


+ 50
- 0
Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetConsoleTrace.cs View File

@@ -0,0 +1,50 @@
using System;
using System.Text;
using MQTTnet.Core.Diagnostics;

namespace MQTTnet.Diagnostics
{
public static class MqttNetConsoleTrace
{
private static readonly object Lock = new object();

public static void ForwardToConsole()
{
MqttNetGlobalLog.LogMessagePublished -= PrintToConsole;
MqttNetGlobalLog.LogMessagePublished += PrintToConsole;
}

private static void PrintToConsole(object sender, MqttNetLogMessagePublishedEventArgs e)
{
var output = new StringBuilder();
output.AppendLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}");
if (e.TraceMessage.Exception != null)
{
output.AppendLine(e.TraceMessage.Exception.ToString());
}

lock (Lock)
{
var backupColor = Console.ForegroundColor;
switch (e.TraceMessage.Level)
{
case MqttNetLogLevel.Error:
Console.ForegroundColor = ConsoleColor.Red;
break;
case MqttNetLogLevel.Warning:
Console.ForegroundColor = ConsoleColor.Yellow;
break;
case MqttNetLogLevel.Info:
Console.ForegroundColor = ConsoleColor.Green;
break;
case MqttNetLogLevel.Verbose:
Console.ForegroundColor = ConsoleColor.Gray;
break;
}

Console.Write(output);
Console.ForegroundColor = backupColor;
}
}
}
}

+ 26
- 0
Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs View File

@@ -0,0 +1,26 @@
using System;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Serializer;

namespace MQTTnet.Implementations
{
public class MqttClientAdapterFactory : IMqttClientAdapterFactory
{
public IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger)
{
if (options == null) throw new ArgumentNullException(nameof(options));

switch (options)
{
case MqttClientTcpOptions tcpOptions:
return new MqttChannelAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer(), logger);
case MqttClientWebSocketOptions webSocketOptions:
return new MqttChannelAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer(), logger);
default:
throw new NotSupportedException();
}
}
}
}

+ 6
- 7
Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.Uwp.cs View File

@@ -4,20 +4,19 @@ using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;
using Windows.Networking.Sockets; using Windows.Networking.Sockets;
using Microsoft.Extensions.Logging;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Serializer;


namespace MQTTnet.Implementations namespace MQTTnet.Implementations
{ {
public class MqttServerAdapter : IMqttServerAdapter, IDisposable public class MqttServerAdapter : IMqttServerAdapter, IDisposable
{ {
private readonly ILogger<MqttServerAdapter> _logger;
private readonly IMqttCommunicationAdapterFactory _communicationAdapterFactory;
private readonly IMqttNetLogger _logger;
private StreamSocketListener _defaultEndpointSocket; private StreamSocketListener _defaultEndpointSocket;


public MqttServerAdapter(ILogger<MqttServerAdapter> logger, IMqttCommunicationAdapterFactory communicationAdapterFactory)
public MqttServerAdapter(IMqttNetLogger logger)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_communicationAdapterFactory = communicationAdapterFactory ?? throw new ArgumentNullException(nameof(communicationAdapterFactory));
} }


public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted; public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;
@@ -63,12 +62,12 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
var clientAdapter = _communicationAdapterFactory.CreateServerCommunicationAdapter(new MqttTcpChannel(args.Socket));
var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer(), _logger);
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Error while accepting connection at default endpoint.");
_logger.Error<MqttServerAdapter>(exception, "Error while accepting connection at default endpoint.");
} }
} }
} }


+ 8
- 9
Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs View File

@@ -9,24 +9,23 @@ using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;
using Microsoft.Extensions.Logging;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Serializer;


namespace MQTTnet.Implementations namespace MQTTnet.Implementations
{ {
public class MqttServerAdapter : IMqttServerAdapter, IDisposable public class MqttServerAdapter : IMqttServerAdapter, IDisposable
{ {
private readonly ILogger<MqttServerAdapter> _logger;
private readonly IMqttCommunicationAdapterFactory _communicationAdapterFactory;
private readonly IMqttNetLogger _logger;


private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
private Socket _defaultEndpointSocket; private Socket _defaultEndpointSocket;
private Socket _tlsEndpointSocket; private Socket _tlsEndpointSocket;
private X509Certificate2 _tlsCertificate; private X509Certificate2 _tlsCertificate;


public MqttServerAdapter(ILogger<MqttServerAdapter> logger, IMqttCommunicationAdapterFactory communicationAdapterFactory)
public MqttServerAdapter(IMqttNetLogger logger)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_communicationAdapterFactory = communicationAdapterFactory ?? throw new ArgumentNullException(nameof(communicationAdapterFactory));
} }


public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted; public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;
@@ -103,12 +102,12 @@ namespace MQTTnet.Implementations
#else #else
var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false); var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false);
#endif #endif
var clientAdapter = _communicationAdapterFactory.CreateServerCommunicationAdapter(new MqttTcpChannel(clientSocket, null));
var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer(), _logger);
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Error while accepting connection at default endpoint.");
_logger.Error<MqttServerAdapter>(exception, "Error while accepting connection at default endpoint.");


//excessive CPU consumed if in endless loop of socket errors //excessive CPU consumed if in endless loop of socket errors
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
@@ -131,12 +130,12 @@ namespace MQTTnet.Implementations
var sslStream = new SslStream(new NetworkStream(clientSocket)); var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false);


var clientAdapter = _communicationAdapterFactory.CreateServerCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream));
var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer(), _logger);
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Error while accepting connection at TLS endpoint.");
_logger.Error<MqttServerAdapter>(exception, "Error while accepting connection at TLS endpoint.");


//excessive CPU consumed if in endless loop of socket errors //excessive CPU consumed if in endless loop of socket errors
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);


+ 1
- 1
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs View File

@@ -13,7 +13,7 @@ using MQTTnet.Core.Client;


namespace MQTTnet.Implementations namespace MQTTnet.Implementations
{ {
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
public sealed class MqttTcpChannel : IMqttChannel, IDisposable
{ {
private readonly MqttClientTcpOptions _options; private readonly MqttClientTcpOptions _options;
private StreamSocket _socket; private StreamSocket _socket;


+ 1
- 1
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs View File

@@ -12,7 +12,7 @@ using System.Linq;


namespace MQTTnet.Implementations namespace MQTTnet.Implementations
{ {
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
public sealed class MqttTcpChannel : IMqttChannel, IDisposable
{ {
private readonly MqttClientTcpOptions _options; private readonly MqttClientTcpOptions _options;




+ 10
- 3
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs View File

@@ -9,7 +9,7 @@ using System.Threading.Tasks;


namespace MQTTnet.Implementations namespace MQTTnet.Implementations
{ {
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
public sealed class MqttWebSocketChannel : IMqttChannel, IDisposable
{ {
// ReSharper disable once MemberCanBePrivate.Global // ReSharper disable once MemberCanBePrivate.Global
// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global
@@ -29,9 +29,16 @@ namespace MQTTnet.Implementations
public async Task ConnectAsync() public async Task ConnectAsync()
{ {
var uri = _options.Uri; var uri = _options.Uri;
if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase))
if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase) && !uri.StartsWith("wss://", StringComparison.OrdinalIgnoreCase))
{ {
uri = "ws://" + uri;
if (!_options.TlsOptions.UseTls)
{
uri = "ws://" + uri;
}
else
{
uri = "wss://" + uri;
}
} }


_webSocket = new ClientWebSocket(); _webSocket = new ClientWebSocket();


+ 5
- 20
Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj View File

@@ -4,8 +4,10 @@
<TargetFrameworks>netstandard1.3;netstandard2.0;net452;net461;uap10.0</TargetFrameworks> <TargetFrameworks>netstandard1.3;netstandard2.0;net452;net461;uap10.0</TargetFrameworks>
<AssemblyName>MQTTnet</AssemblyName> <AssemblyName>MQTTnet</AssemblyName>
<RootNamespace>MQTTnet</RootNamespace> <RootNamespace>MQTTnet</RootNamespace>
<AssemblyVersion>2.5.2.0</AssemblyVersion>
<FileVersion>2.5.2.0</FileVersion>
<GeneratePackageOnBuild>False</GeneratePackageOnBuild>
<DebugType>Full</DebugType>
<AssemblyVersion>2.5.3.0</AssemblyVersion>
<FileVersion>2.5.3.0</FileVersion>
<Version>0.0.0.0</Version> <Version>0.0.0.0</Version>
<Company /> <Company />
<Product /> <Product />
@@ -31,9 +33,6 @@


<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" /> <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" />


<ItemGroup>
</ItemGroup>

<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj" /> <ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj" />
</ItemGroup> </ItemGroup>
@@ -42,36 +41,22 @@
<PackageReference Include="System.Net.Security" Version="4.3.2" /> <PackageReference Include="System.Net.Security" Version="4.3.2" />
<PackageReference Include="System.Net.WebSockets" Version="4.3.0" /> <PackageReference Include="System.Net.WebSockets" Version="4.3.0" />
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.1" /> <PackageReference Include="System.Net.WebSockets.Client" Version="4.3.1" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.0.0" />
</ItemGroup> </ItemGroup>


<ItemGroup Condition="'$(TargetFramework)'=='netstandard1.3'"> <ItemGroup Condition="'$(TargetFramework)'=='netstandard1.3'">
<PackageReference Include="System.Net.Security" Version="4.3.2" /> <PackageReference Include="System.Net.Security" Version="4.3.2" />
<PackageReference Include="System.Net.WebSockets" Version="4.3.0" /> <PackageReference Include="System.Net.WebSockets" Version="4.3.0" />
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.1" /> <PackageReference Include="System.Net.WebSockets.Client" Version="4.3.1" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.0.2" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.0.2" />
</ItemGroup> </ItemGroup>


<ItemGroup Condition="'$(TargetFramework)'=='uap10.0'"> <ItemGroup Condition="'$(TargetFramework)'=='uap10.0'">
<PackageReference Include="Microsoft.NETCore.UniversalWindowsPlatform" Version="5.4.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.2" />
<PackageReference Include="Microsoft.NETCore.UniversalWindowsPlatform" Version="5.4.1" />
</ItemGroup> </ItemGroup>


<ItemGroup Condition="'$(TargetFramework)'=='net452'"> <ItemGroup Condition="'$(TargetFramework)'=='net452'">
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.0.2" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.0.2" />
</ItemGroup> </ItemGroup>


<ItemGroup Condition="'$(TargetFramework)'=='net461'"> <ItemGroup Condition="'$(TargetFramework)'=='net461'">
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.0.0" />
</ItemGroup> </ItemGroup>


</Project> </Project>

+ 19
- 97
Frameworks/MQTTnet.NetStandard/MqttFactory.cs View File

@@ -1,130 +1,52 @@
using System; using System;
using System.Collections.Generic;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MQTTnet.Implementations; using MQTTnet.Implementations;
using MQTTnet.Core.ManagedClient; using MQTTnet.Core.ManagedClient;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Diagnostics;


namespace MQTTnet namespace MQTTnet
{ {
public class MqttFactory : IMqttCommunicationAdapterFactory, IMqttClientSesssionFactory, IMqttClientFactory, IMqttServerFactory
public class MqttFactory : IMqttClientFactory, IMqttServerFactory
{ {
private readonly IServiceProvider _serviceProvider;

public MqttFactory()
: this(BuildServiceProvider())
{
}

public MqttFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}

public ILoggerFactory GetLoggerFactory()
{
return _serviceProvider.GetRequiredService<ILoggerFactory>();
}

public IMqttCommunicationAdapter CreateClientCommunicationAdapter(IMqttClientOptions options)
{
var logger = _serviceProvider.GetRequiredService<ILogger<MqttChannelCommunicationAdapter>>();
return new MqttChannelCommunicationAdapter(CreateCommunicationChannel(options.ChannelOptions), CreateSerializer(options.ProtocolVersion), logger);
}

public IMqttCommunicationAdapter CreateServerCommunicationAdapter(IMqttCommunicationChannel channel)
{
var serializer = _serviceProvider.GetRequiredService<IMqttPacketSerializer>();
var logger = _serviceProvider.GetRequiredService<ILogger<MqttChannelCommunicationAdapter>>();
return new MqttChannelCommunicationAdapter(channel, serializer, logger);
}

public IMqttCommunicationChannel CreateCommunicationChannel(IMqttClientChannelOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

switch (options)
{
case MqttClientTcpOptions tcpOptions:
return CreateTcpChannel(tcpOptions);
case MqttClientWebSocketOptions webSocketOptions:
return CreateWebSocketChannel(webSocketOptions);
default:
throw new NotSupportedException();
}
}

public MqttTcpChannel CreateTcpChannel(MqttClientTcpOptions tcpOptions)
public IMqttClient CreateMqttClient()
{ {
return new MqttTcpChannel(tcpOptions);
return CreateMqttClient(new MqttNetLogger());
} }


public MqttWebSocketChannel CreateWebSocketChannel(MqttClientWebSocketOptions webSocketOptions)
public IMqttClient CreateMqttClient(IMqttNetLogger logger)
{ {
return new MqttWebSocketChannel(webSocketOptions);
}
if (logger == null) throw new ArgumentNullException(nameof(logger));


public MqttPacketSerializer CreateSerializer(MqttProtocolVersion protocolVersion)
{
return new MqttPacketSerializer
{
ProtocolVersion = protocolVersion
};
return new MqttClient(new MqttClientAdapterFactory(), logger);
} }


public MqttClientSession CreateClientSession(string clientId, MqttClientSessionsManager clientSessionsManager)
public IManagedMqttClient CreateManagedMqttClient()
{ {
return new MqttClientSession(
clientId,
_serviceProvider.GetRequiredService<IOptions<MqttServerOptions>>(),
clientSessionsManager,
_serviceProvider.GetRequiredService<MqttClientSubscriptionsManager>(),
_serviceProvider.GetRequiredService<ILogger<MqttClientSession>>(),
_serviceProvider.GetRequiredService<ILogger<MqttClientPendingMessagesQueue>>());
return new ManagedMqttClient(CreateMqttClient(), new MqttNetLogger());
} }


public IMqttClient CreateMqttClient()
public IManagedMqttClient CreateManagedMqttClient(IMqttNetLogger logger)
{ {
return _serviceProvider.GetRequiredService<IMqttClient>();
}
if (logger == null) throw new ArgumentNullException(nameof(logger));


public IManagedMqttClient CreateManagedMqttClient()
{
return _serviceProvider.GetRequiredService<IManagedMqttClient>();
return new ManagedMqttClient(CreateMqttClient(), logger);
} }


public IMqttServer CreateMqttServer() public IMqttServer CreateMqttServer()
{ {
return _serviceProvider.GetRequiredService<IMqttServer>();
}

public IMqttServer CreateMqttServer(Action<MqttServerOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));

var options = _serviceProvider.GetRequiredService<IOptions<MqttServerOptions>>();
configure(options.Value);

return _serviceProvider.GetRequiredService<IMqttServer>();
var logger = new MqttNetLogger();
return CreateMqttServer(new List<IMqttServerAdapter> { new MqttServerAdapter(logger) }, logger);
} }


private static IServiceProvider BuildServiceProvider()
public IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger)
{ {
var serviceProvider = new ServiceCollection()
.AddMqttClient()
.AddMqttServer()
.AddLogging()
.BuildServiceProvider();

serviceProvider.GetRequiredService<ILoggerFactory>()
.AddMqttTrace();
if (adapters == null) throw new ArgumentNullException(nameof(adapters));
if (logger == null) throw new ArgumentNullException(nameof(logger));


return serviceProvider;
return new MqttServer(adapters, logger);
} }
} }
} }

+ 0
- 70
Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs View File

@@ -1,70 +0,0 @@
using Microsoft.Extensions.DependencyInjection;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
using MQTTnet.Core.ManagedClient;
using MQTTnet.Core.Serializer;
using MQTTnet.Core.Server;
using MQTTnet.Implementations;
using System;
using Microsoft.Extensions.Logging;
using MQTTnet.Core.Diagnostics;

namespace MQTTnet
{
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddMqttServer(this IServiceCollection services)
{
services.AddMqttServerServices();
services.AddSingleton<IMqttServer>(s => s.GetService<MqttServer>());
services.AddSingleton<MqttServer>();

return services;
}

public static IServiceCollection AddMqttServerServices(this IServiceCollection services)
{
services.AddOptions();
services.AddSingleton<MqttFactory>();
services.AddSingleton<IMqttCommunicationAdapterFactory>(s => s.GetService<MqttFactory>());
services.AddSingleton<IMqttClientSesssionFactory>(s => s.GetService<MqttFactory>());

services.AddTransient<IMqttServerAdapter, MqttServerAdapter>();
services.AddTransient<IMqttPacketSerializer, MqttPacketSerializer>();

services.AddSingleton<MqttClientSessionsManager>();
services.AddTransient<MqttClientSubscriptionsManager>();
services.AddSingleton<IMqttClientRetainedMessageManager, MqttClientRetainedMessagesManager>();
return services;
}

public static IServiceCollection AddMqttServer(this IServiceCollection services, Action<MqttServerOptions> configureOptions)
{
return services
.AddMqttServer()
.Configure(configureOptions);
}

public static IServiceCollection AddMqttClient(this IServiceCollection services)
{
services.AddSingleton<MqttFactory>();
services.AddSingleton<IMqttCommunicationAdapterFactory>(s => s.GetService<MqttFactory>());

services.AddTransient<IMqttClient, MqttClient>();
services.AddTransient<MqttClient>();
services.AddTransient<IManagedMqttClient, ManagedMqttClient>();
services.AddTransient<ManagedMqttClient>();
services.AddTransient<IMqttPacketSerializer, MqttPacketSerializer>();
services.AddTransient<MqttPacketDispatcher>();

return services;
}

public static ILoggerFactory AddMqttTrace(this ILoggerFactory factory)
{
factory.AddProvider(new MqttNetTrace());
return factory;
}
}
}

MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs → MQTTnet.Core/Adapter/IMqttChannelAdapter.cs View File

@@ -7,7 +7,7 @@ using MQTTnet.Core.Serializer;


namespace MQTTnet.Core.Adapter namespace MQTTnet.Core.Adapter
{ {
public interface IMqttCommunicationAdapter
public interface IMqttChannelAdapter
{ {
IMqttPacketSerializer PacketSerializer { get; } IMqttPacketSerializer PacketSerializer { get; }



+ 0
- 12
MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs View File

@@ -1,12 +0,0 @@
using MQTTnet.Core.Client;
using MQTTnet.Core.Channel;

namespace MQTTnet.Core.Adapter
{
public interface IMqttCommunicationAdapterFactory
{
IMqttCommunicationAdapter CreateClientCommunicationAdapter(IMqttClientOptions options);

IMqttCommunicationAdapter CreateServerCommunicationAdapter(IMqttCommunicationChannel channel);
}
}

MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs → MQTTnet.Core/Adapter/MqttChannelAdapter.cs View File

@@ -5,23 +5,23 @@ using System.Runtime.InteropServices;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Channel; using MQTTnet.Core.Channel;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions; using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal; using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Serializer; using MQTTnet.Core.Serializer;
using Microsoft.Extensions.Logging;


namespace MQTTnet.Core.Adapter namespace MQTTnet.Core.Adapter
{ {
public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter
public class MqttChannelAdapter : IMqttChannelAdapter
{ {
private const uint ErrorOperationAborted = 0x800703E3; private const uint ErrorOperationAborted = 0x800703E3;


private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly ILogger<MqttChannelCommunicationAdapter> _logger;
private readonly IMqttCommunicationChannel _channel;
private readonly IMqttNetLogger _logger;
private readonly IMqttChannel _channel;


public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer, ILogger<MqttChannelCommunicationAdapter> logger)
public MqttChannelAdapter(IMqttChannel channel, IMqttPacketSerializer serializer, IMqttNetLogger logger)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_channel = channel ?? throw new ArgumentNullException(nameof(channel)); _channel = channel ?? throw new ArgumentNullException(nameof(channel));
@@ -32,14 +32,14 @@ namespace MQTTnet.Core.Adapter


public async Task ConnectAsync(TimeSpan timeout) public async Task ConnectAsync(TimeSpan timeout)
{ {
_logger.LogInformation("Connecting [Timeout={0}]", timeout);
_logger.Info<MqttChannelAdapter>("Connecting [Timeout={0}]", timeout);


await ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout)); await ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout));
} }


public async Task DisconnectAsync(TimeSpan timeout) public async Task DisconnectAsync(TimeSpan timeout)
{ {
_logger.LogInformation("Disconnecting [Timeout={0}]", timeout);
_logger.Info<MqttChannelAdapter>("Disconnecting [Timeout={0}]", timeout);


await ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout)); await ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout));
} }
@@ -58,7 +58,7 @@ namespace MQTTnet.Core.Adapter
continue; continue;
} }


_logger.LogInformation("TX >>> {0} [Timeout={1}]", packet, timeout);
_logger.Trace<MqttChannelAdapter>("TX >>> {0} [Timeout={1}]", packet, timeout);


var writeBuffer = PacketSerializer.Serialize(packet); var writeBuffer = PacketSerializer.Serialize(packet);
await _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false); await _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false);
@@ -108,7 +108,7 @@ namespace MQTTnet.Core.Adapter
throw new MqttProtocolViolationException("Received malformed packet."); throw new MqttProtocolViolationException("Received malformed packet.");
} }


_logger.LogInformation("RX <<< {0}", packet);
_logger.Trace<MqttChannelAdapter>("RX <<< {0}", packet);
} }
finally finally
{ {

MQTTnet.Core/Adapter/MqttCommunicationAdapterExtensions.cs → MQTTnet.Core/Adapter/MqttChannelAdapterExtensions.cs View File

@@ -5,9 +5,9 @@ using MQTTnet.Core.Packets;


namespace MQTTnet.Core.Adapter namespace MQTTnet.Core.Adapter
{ {
public static class MqttCommunicationAdapterExtensions
public static class MqttChannelAdapterExtensions
{ {
public static Task SendPacketsAsync(this IMqttCommunicationAdapter adapter, TimeSpan timeout, CancellationToken cancellationToken, params MqttBasePacket[] packets)
public static Task SendPacketsAsync(this IMqttChannelAdapter adapter, TimeSpan timeout, CancellationToken cancellationToken, params MqttBasePacket[] packets)
{ {
if (adapter == null) throw new ArgumentNullException(nameof(adapter)); if (adapter == null) throw new ArgumentNullException(nameof(adapter));



+ 2
- 2
MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs View File

@@ -5,12 +5,12 @@ namespace MQTTnet.Core.Adapter
{ {
public class MqttServerAdapterClientAcceptedEventArgs : EventArgs public class MqttServerAdapterClientAcceptedEventArgs : EventArgs
{ {
public MqttServerAdapterClientAcceptedEventArgs(IMqttCommunicationAdapter client)
public MqttServerAdapterClientAcceptedEventArgs(IMqttChannelAdapter client)
{ {
Client = client ?? throw new ArgumentNullException(nameof(client)); Client = client ?? throw new ArgumentNullException(nameof(client));
} }


public IMqttCommunicationAdapter Client { get; }
public IMqttChannelAdapter Client { get; }


public Task SessionTask { get; set; } public Task SessionTask { get; set; }
} }


MQTTnet.Core/Channel/IMqttCommunicationChannel.cs → MQTTnet.Core/Channel/IMqttChannel.cs View File

@@ -3,7 +3,7 @@ using System.IO;


namespace MQTTnet.Core.Channel namespace MQTTnet.Core.Channel
{ {
public interface IMqttCommunicationChannel
public interface IMqttChannel
{ {
Stream SendStream { get; } Stream SendStream { get; }
Stream ReceiveStream { get; } Stream ReceiveStream { get; }

+ 10
- 0
MQTTnet.Core/Client/IMqttClientAdapterFactory.cs View File

@@ -0,0 +1,10 @@
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;

namespace MQTTnet.Core.Client
{
public interface IMqttClientAdapterFactory
{
IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger);
}
}

+ 6
- 1
MQTTnet.Core/Client/IMqttClientFactory.cs View File

@@ -1,4 +1,5 @@
using MQTTnet.Core.ManagedClient;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.ManagedClient;


namespace MQTTnet.Core.Client namespace MQTTnet.Core.Client
{ {
@@ -6,6 +7,10 @@ namespace MQTTnet.Core.Client
{ {
IMqttClient CreateMqttClient(); IMqttClient CreateMqttClient();


IMqttClient CreateMqttClient(IMqttNetLogger logger);
IManagedMqttClient CreateManagedMqttClient(); IManagedMqttClient CreateManagedMqttClient();

IManagedMqttClient CreateManagedMqttClient(IMqttNetLogger logger);
} }
} }

+ 0
- 4
MQTTnet.Core/Client/IMqttClientOptions.cs View File

@@ -7,10 +7,6 @@ namespace MQTTnet.Core.Client
{ {
string ClientId { get; } string ClientId { get; }


/// <summary>
/// The LogId is used to create a scope to correlate logging. If no value is provided the ClientId is used instead
/// </summary>
string LogId { get; }
IMqttClientCredentials Credentials { get; } IMqttClientCredentials Credentials { get; }
bool CleanSession { get; } bool CleanSession { get; }
MqttApplicationMessage WillMessage { get; } MqttApplicationMessage WillMessage { get; }


+ 29
- 31
MQTTnet.Core/Client/MqttClient.cs View File

@@ -4,33 +4,34 @@ using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions; using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal; using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using Microsoft.Extensions.Logging;


namespace MQTTnet.Core.Client namespace MQTTnet.Core.Client
{ {
public class MqttClient : IMqttClient public class MqttClient : IMqttClient
{ {
private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>(); private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>();

private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly MqttPacketDispatcher _packetDispatcher; private readonly MqttPacketDispatcher _packetDispatcher;
private readonly IMqttCommunicationAdapterFactory _communicationAdapterFactory;
private readonly ILogger<MqttClient> _logger;
private readonly IMqttNetLogger _logger;


private IMqttClientOptions _options; private IMqttClientOptions _options;
private bool _isReceivingPackets; private bool _isReceivingPackets;
private int _latestPacketIdentifier; private int _latestPacketIdentifier;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
private IMqttCommunicationAdapter _adapter;
private IDisposable _scopeHandle;
private IMqttChannelAdapter _adapter;


public MqttClient(IMqttCommunicationAdapterFactory communicationAdapterFactory, ILogger<MqttClient> logger, MqttPacketDispatcher packetDispatcher)
public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger)
{ {
_communicationAdapterFactory = communicationAdapterFactory ?? throw new ArgumentNullException(nameof(communicationAdapterFactory));
_adapterFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_packetDispatcher = packetDispatcher ?? throw new ArgumentNullException(nameof(packetDispatcher));
_packetDispatcher = new MqttPacketDispatcher(logger);
} }


public event EventHandler<MqttClientConnectedEventArgs> Connected; public event EventHandler<MqttClientConnectedEventArgs> Connected;
@@ -53,17 +54,16 @@ namespace MQTTnet.Core.Client
_latestPacketIdentifier = 0; _latestPacketIdentifier = 0;
_packetDispatcher.Reset(); _packetDispatcher.Reset();


_adapter = _communicationAdapterFactory.CreateClientCommunicationAdapter(options);

_scopeHandle = _logger.BeginScope(options.LogId ?? options.ClientId);
_logger.LogTrace("Trying to connect with server.");
_adapter = _adapterFactory.CreateClientAdapter(options.ChannelOptions, _logger);
_logger.Trace<MqttClient>("Trying to connect with server.");
await _adapter.ConnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); await _adapter.ConnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
_logger.LogTrace("Connection with server established.");
_logger.Trace<MqttClient>("Connection with server established.");


await StartReceivingPacketsAsync(_cancellationTokenSource.Token).ConfigureAwait(false); await StartReceivingPacketsAsync(_cancellationTokenSource.Token).ConfigureAwait(false);
var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false); var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false);


_logger.LogTrace("MQTT connection with server established.");
_logger.Trace<MqttClient>("MQTT connection with server established.");


if (_options.KeepAlivePeriod != TimeSpan.Zero) if (_options.KeepAlivePeriod != TimeSpan.Zero)
{ {
@@ -76,7 +76,7 @@ namespace MQTTnet.Core.Client
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Error while connecting with server.");
_logger.Error<MqttClient>(exception, "Error while connecting with server.");
await DisconnectInternalAsync().ConfigureAwait(false); await DisconnectInternalAsync().ConfigureAwait(false);
throw; throw;
} }
@@ -215,8 +215,6 @@ namespace MQTTnet.Core.Client


private async Task DisconnectInternalAsync() private async Task DisconnectInternalAsync()
{ {
_scopeHandle?.Dispose();

var clientWasConnected = IsConnected; var clientWasConnected = IsConnected;
IsConnected = false; IsConnected = false;


@@ -233,15 +231,15 @@ namespace MQTTnet.Core.Client
try try
{ {
await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
_logger.LogInformation("Disconnected from adapter.");
_logger.Info<MqttClient>("Disconnected from adapter.");
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogWarning(new EventId(), exception, "Error while disconnecting from adapter.");
_logger.Warning<MqttClient>(exception, "Error while disconnecting from adapter.");
} }
finally finally
{ {
_logger.LogInformation("Disconnected.");
_logger.Info<MqttClient>("Disconnected.");
Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected)); Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected));
} }
} }
@@ -250,7 +248,7 @@ namespace MQTTnet.Core.Client
{ {
try try
{ {
_logger.LogInformation("Received <<< {0}", packet);
_logger.Info<MqttClient>("Received <<< {0}", packet);


if (packet is MqttPingReqPacket) if (packet is MqttPingReqPacket)
{ {
@@ -280,7 +278,7 @@ namespace MQTTnet.Core.Client
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Unhandled exception while processing received packet.");
_logger.Error<MqttClient>(exception, "Unhandled exception while processing received packet.");
} }
} }


@@ -293,7 +291,7 @@ namespace MQTTnet.Core.Client
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Unhandled exception while handling application message.");
_logger.Error<MqttClient>(exception, "Unhandled exception while handling application message.");
} }
} }


@@ -357,7 +355,7 @@ namespace MQTTnet.Core.Client


private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
{ {
_logger.LogInformation("Start sending keep alive packets.");
_logger.Info<MqttClient>("Start sending keep alive packets.");


try try
{ {
@@ -388,23 +386,23 @@ namespace MQTTnet.Core.Client
return; return;
} }


_logger.LogWarning(new EventId(), exception, "MQTT communication exception while sending/receiving keep alive packets.");
_logger.Warning<MqttClient>(exception, "MQTT communication exception while sending/receiving keep alive packets.");
await DisconnectInternalAsync().ConfigureAwait(false); await DisconnectInternalAsync().ConfigureAwait(false);
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogWarning(new EventId(), exception, "Unhandled exception while sending/receiving keep alive packets.");
_logger.Warning<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets.");
await DisconnectInternalAsync().ConfigureAwait(false); await DisconnectInternalAsync().ConfigureAwait(false);
} }
finally finally
{ {
_logger.LogInformation("Stopped sending keep alive packets.");
_logger.Info<MqttClient>("Stopped sending keep alive packets.");
} }
} }


private async Task ReceivePacketsAsync(CancellationToken cancellationToken) private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
{ {
_logger.LogInformation("Start receiving packets.");
_logger.Info<MqttClient>("Start receiving packets.");


try try
{ {
@@ -438,17 +436,17 @@ namespace MQTTnet.Core.Client
return; return;
} }


_logger.LogWarning(new EventId(), exception, "MQTT communication exception while receiving packets.");
_logger.Warning<MqttClient>(exception, "MQTT communication exception while receiving packets.");
await DisconnectInternalAsync().ConfigureAwait(false); await DisconnectInternalAsync().ConfigureAwait(false);
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Unhandled exception while receiving packets.");
_logger.Error<MqttClient>(exception, "Unhandled exception while receiving packets.");
await DisconnectInternalAsync().ConfigureAwait(false); await DisconnectInternalAsync().ConfigureAwait(false);
} }
finally finally
{ {
_logger.LogInformation(nameof(MqttClient), "Stopped receiving packets.");
_logger.Info<MqttClient>("Stopped receiving packets.");
} }
} }




+ 0
- 3
MQTTnet.Core/Client/MqttClientOptions.cs View File

@@ -9,9 +9,6 @@ namespace MQTTnet.Core.Client


public string ClientId { get; set; } = Guid.NewGuid().ToString("N"); public string ClientId { get; set; } = Guid.NewGuid().ToString("N");


/// <inheritdoc />
public string LogId { get; set; }

public bool CleanSession { get; set; } = true; public bool CleanSession { get; set; } = true;


public IMqttClientCredentials Credentials { get; set; } = new MqttClientCredentials(); public IMqttClientCredentials Credentials { get; set; } = new MqttClientCredentials();


+ 0
- 6
MQTTnet.Core/Client/MqttClientOptionsBuilder.cs View File

@@ -12,12 +12,6 @@ namespace MQTTnet.Core.Client


private MqttClientTlsOptions _tlsOptions; private MqttClientTlsOptions _tlsOptions;


public MqttClientOptionsBuilder WithLogId(string value)
{
_options.LogId = value;
return this;
}

public MqttClientOptionsBuilder WithProtocolVersion(MqttProtocolVersion value) public MqttClientOptionsBuilder WithProtocolVersion(MqttProtocolVersion value)
{ {
_options.ProtocolVersion = value; _options.ProtocolVersion = value;


+ 4
- 4
MQTTnet.Core/Client/MqttPacketDispatcher.cs View File

@@ -1,10 +1,10 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions; using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal; using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using Microsoft.Extensions.Logging;


namespace MQTTnet.Core.Client namespace MQTTnet.Core.Client
{ {
@@ -12,9 +12,9 @@ namespace MQTTnet.Core.Client
{ {
private readonly ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>> _packetByResponseType = new ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>>(); private readonly ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>> _packetByResponseType = new ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>>();
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>> _packetByResponseTypeAndIdentifier = new ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>>(); private readonly ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>> _packetByResponseTypeAndIdentifier = new ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>>();
private readonly ILogger<MqttPacketDispatcher> _logger;
private readonly IMqttNetLogger _logger;


public MqttPacketDispatcher(ILogger<MqttPacketDispatcher> logger)
public MqttPacketDispatcher(IMqttNetLogger logger)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
} }
@@ -30,7 +30,7 @@ namespace MQTTnet.Core.Client
} }
catch (MqttCommunicationTimedOutException) catch (MqttCommunicationTimedOutException)
{ {
_logger.LogWarning("Timeout while waiting for packet of type '{0}'.", responseType.Name);
_logger.Warning<MqttPacketDispatcher>("Timeout while waiting for packet of type '{0}'.", responseType.Name);
throw; throw;
} }
finally finally


+ 21
- 0
MQTTnet.Core/Diagnostics/IMqttNetLogger.cs View File

@@ -0,0 +1,21 @@
using System;

namespace MQTTnet.Core.Diagnostics
{
public interface IMqttNetLogger
{
event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;

void Trace<TSource>(string message, params object[] parameters);

void Info<TSource>(string message, params object[] parameters);

void Warning<TSource>(Exception exception, string message, params object[] parameters);

void Warning<TSource>(string message, params object[] parameters);

void Error<TSource>(Exception exception, string message, params object[] parameters);

void Error<TSource>(string message, params object[] parameters);
}
}

+ 18
- 0
MQTTnet.Core/Diagnostics/MqttNetGlobalLog.cs View File

@@ -0,0 +1,18 @@
using System;

namespace MQTTnet.Core.Diagnostics
{
public static class MqttNetGlobalLog
{
public static event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;

public static bool HasListeners => LogMessagePublished != null;

public static void Publish(MqttNetLogMessage logMessage)
{
if (logMessage == null) throw new ArgumentNullException(nameof(logMessage));

LogMessagePublished?.Invoke(null, new MqttNetLogMessagePublishedEventArgs(logMessage));
}
}
}

+ 13
- 0
MQTTnet.Core/Diagnostics/MqttNetLogLevel.cs View File

@@ -0,0 +1,13 @@
namespace MQTTnet.Core.Diagnostics
{
public enum MqttNetLogLevel
{
Verbose,

Info,

Warning,

Error
}
}

MQTTnet.Core/Diagnostics/MqttTraceMessage.cs → MQTTnet.Core/Diagnostics/MqttNetLogMessage.cs View File

@@ -1,12 +1,12 @@
using System; using System;
using Microsoft.Extensions.Logging;


namespace MQTTnet.Core.Diagnostics namespace MQTTnet.Core.Diagnostics
{ {
public sealed class MqttNetTraceMessage
public sealed class MqttNetLogMessage
{ {
public MqttNetTraceMessage(DateTime timestamp, int threadId, string source, LogLevel level, string message, Exception exception)
public MqttNetLogMessage(string logId, DateTime timestamp, int threadId, string source, MqttNetLogLevel level, string message, Exception exception)
{ {
LogId = logId;
Timestamp = timestamp; Timestamp = timestamp;
ThreadId = threadId; ThreadId = threadId;
Source = source; Source = source;
@@ -15,13 +15,15 @@ namespace MQTTnet.Core.Diagnostics
Exception = exception; Exception = exception;
} }


public string LogId { get; }

public DateTime Timestamp { get; } public DateTime Timestamp { get; }


public int ThreadId { get; } public int ThreadId { get; }


public string Source { get; } public string Source { get; }


public LogLevel Level { get; }
public MqttNetLogLevel Level { get; }


public string Message { get; } public string Message { get; }



+ 14
- 0
MQTTnet.Core/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs View File

@@ -0,0 +1,14 @@
using System;

namespace MQTTnet.Core.Diagnostics
{
public sealed class MqttNetLogMessagePublishedEventArgs : EventArgs
{
public MqttNetLogMessagePublishedEventArgs(MqttNetLogMessage logMessage)
{
TraceMessage = logMessage ?? throw new ArgumentNullException(nameof(logMessage));
}

public MqttNetLogMessage TraceMessage { get; }
}
}

+ 48
- 27
MQTTnet.Core/Diagnostics/MqttNetLogger.cs View File

@@ -1,52 +1,73 @@
using System; using System;
using Microsoft.Extensions.Logging;


namespace MQTTnet.Core.Diagnostics namespace MQTTnet.Core.Diagnostics
{ {
public class MqttNetLogger : ILogger
public class MqttNetLogger : IMqttNetLogger
{ {
private readonly string _categoryName;
private readonly MqttNetTrace _mqttNetTrace;
private readonly string _logId;


public MqttNetLogger(string categoryName, MqttNetTrace mqttNetTrace)
public MqttNetLogger(string logId = null)
{ {
_categoryName = categoryName;
_mqttNetTrace = mqttNetTrace;
_logId = logId;
} }
public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;


public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
public void Trace<TSource>(string message, params object[] parameters)
{ {
if (formatter == null)
{
throw new ArgumentNullException(nameof(formatter));
}
Publish<TSource>(MqttNetLogLevel.Verbose, null, message, parameters);
}


if (!MqttNetTrace.HasListeners)
{
return;
}
public void Info<TSource>(string message, params object[] parameters)
{
Publish<TSource>(MqttNetLogLevel.Info, null, message, parameters);
}

public void Warning<TSource>(Exception exception, string message, params object[] parameters)
{
Publish<TSource>(MqttNetLogLevel.Warning, exception, message, parameters);
}


var message = formatter(state, exception);
var traceMessage = new MqttNetTraceMessage(DateTime.Now, Environment.CurrentManagedThreadId, _categoryName, logLevel, message, exception);
_mqttNetTrace.Publish(traceMessage);
public void Warning<TSource>(string message, params object[] parameters)
{
Warning<TSource>(null, message, parameters);
} }


public bool IsEnabled(LogLevel logLevel)
public void Error<TSource>(Exception exception, string message, params object[] parameters)
{ {
return MqttNetTrace.HasListeners;
Publish<TSource>(MqttNetLogLevel.Error, exception, message, parameters);
} }


//not supported: async local requires netstandard1.3
//for implementation see https://github.com/aspnet/Logging/blob/dev/src/Microsoft.Extensions.Logging.Console/ConsoleLogScope.cs
public IDisposable BeginScope<TState>(TState state)
public void Error<TSource>(string message, params object[] parameters)
{ {
return new DisposableScope();
Warning<TSource>(null, message, parameters);
} }


private class DisposableScope : IDisposable
private void Publish<TSource>(MqttNetLogLevel logLevel, Exception exception, string message, object[] parameters)
{ {
public void Dispose()
var hasLocalListeners = LogMessagePublished != null;
var hasGlobalListeners = MqttNetGlobalLog.HasListeners;

if (!hasLocalListeners && !hasGlobalListeners)
{
return;
}

if (parameters.Length > 0)
{
message = string.Format(message, parameters);
}

var traceMessage = new MqttNetLogMessage(_logId, DateTime.Now, Environment.CurrentManagedThreadId, typeof(TSource).Name, logLevel, message, exception);

if (hasGlobalListeners)
{
MqttNetGlobalLog.Publish(traceMessage);
}

if (hasLocalListeners)
{ {
LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(traceMessage));
} }
} }
} }

+ 0
- 35
MQTTnet.Core/Diagnostics/MqttNetTrace.cs View File

@@ -1,35 +0,0 @@
using System;
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;

namespace MQTTnet.Core.Diagnostics
{
public class MqttNetTrace : ILoggerProvider
{
private readonly ConcurrentDictionary<string, MqttNetLogger> _loggers = new ConcurrentDictionary<string, MqttNetLogger>();

public static event EventHandler<MqttNetTraceMessagePublishedEventArgs> TraceMessagePublished;

public static bool HasListeners => TraceMessagePublished != null;

public void Publish(MqttNetTraceMessage traceMessage)
{
TraceMessagePublished?.Invoke(this, new MqttNetTraceMessagePublishedEventArgs(traceMessage));
}

public void Dispose()
{
TraceMessagePublished = null;
}

public ILogger CreateLogger(string categoryName)
{
return _loggers.GetOrAdd(categoryName, CreateLoggerImplementation);
}

private MqttNetLogger CreateLoggerImplementation(string categoryName)
{
return new MqttNetLogger(categoryName, this);
}
}
}

+ 0
- 14
MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs View File

@@ -1,14 +0,0 @@
using System;

namespace MQTTnet.Core.Diagnostics
{
public sealed class MqttNetTraceMessagePublishedEventArgs : EventArgs
{
public MqttNetTraceMessagePublishedEventArgs(MqttNetTraceMessage traceMessage)
{
TraceMessage = traceMessage ?? throw new ArgumentNullException(nameof(traceMessage));
}

public MqttNetTraceMessage TraceMessage { get; }
}
}

+ 2
- 8
MQTTnet.Core/MQTTnet.Core.csproj View File

@@ -17,15 +17,9 @@
<PackageIconUrl></PackageIconUrl> <PackageIconUrl></PackageIconUrl>
<RepositoryUrl></RepositoryUrl> <RepositoryUrl></RepositoryUrl>
<PackageTags></PackageTags> <PackageTags></PackageTags>
<FileVersion>2.5.2.0</FileVersion>
<AssemblyVersion>2.5.2.0</AssemblyVersion>
<FileVersion>2.5.3.0</FileVersion>
<AssemblyVersion>2.5.3.0</AssemblyVersion>
<PackageLicenseUrl></PackageLicenseUrl> <PackageLicenseUrl></PackageLicenseUrl>
</PropertyGroup> </PropertyGroup>


<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.0.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="1.0.2" />
<PackageReference Include="System.Threading" Version="4.3.0" />
</ItemGroup>

</Project> </Project>

+ 19
- 19
MQTTnet.Core/ManagedClient/ManagedMqttClient.cs View File

@@ -5,9 +5,9 @@ using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions; using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using Microsoft.Extensions.Logging;


namespace MQTTnet.Core.ManagedClient namespace MQTTnet.Core.ManagedClient
{ {
@@ -18,7 +18,7 @@ namespace MQTTnet.Core.ManagedClient
private readonly HashSet<TopicFilter> _subscriptions = new HashSet<TopicFilter>(); private readonly HashSet<TopicFilter> _subscriptions = new HashSet<TopicFilter>();


private readonly IMqttClient _mqttClient; private readonly IMqttClient _mqttClient;
private readonly ILogger<ManagedMqttClient> _logger;
private readonly IMqttNetLogger _logger;


private CancellationTokenSource _connectionCancellationToken; private CancellationTokenSource _connectionCancellationToken;
private CancellationTokenSource _publishingCancellationToken; private CancellationTokenSource _publishingCancellationToken;
@@ -26,7 +26,7 @@ namespace MQTTnet.Core.ManagedClient
private IManagedMqttClientOptions _options; private IManagedMqttClientOptions _options;
private bool _subscriptionsNotPushed; private bool _subscriptionsNotPushed;
public ManagedMqttClient(ILogger<ManagedMqttClient> logger, IMqttClient mqttClient)
public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
@@ -52,10 +52,7 @@ namespace MQTTnet.Core.ManagedClient
throw new NotSupportedException("The managed client does not support existing sessions."); throw new NotSupportedException("The managed client does not support existing sessions.");
} }


if (_connectionCancellationToken != null)
{
throw new InvalidOperationException("The managed client is already started.");
}
if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");


_options = options; _options = options;
await _storageManager.SetStorageAsync(_options.Storage).ConfigureAwait(false); await _storageManager.SetStorageAsync(_options.Storage).ConfigureAwait(false);
@@ -72,10 +69,10 @@ namespace MQTTnet.Core.ManagedClient
_connectionCancellationToken = new CancellationTokenSource(); _connectionCancellationToken = new CancellationTokenSource();
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Factory.StartNew(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed


_logger.LogInformation("Started");
_logger.Info<ManagedMqttClient>("Started");
} }


public Task StopAsync() public Task StopAsync()
@@ -83,6 +80,9 @@ namespace MQTTnet.Core.ManagedClient
_connectionCancellationToken?.Cancel(false); _connectionCancellationToken?.Cancel(false);
_connectionCancellationToken = null; _connectionCancellationToken = null;


_publishingCancellationToken?.Cancel(false);
_publishingCancellationToken = null;

while (_messageQueue.Any()) while (_messageQueue.Any())
{ {
_messageQueue.Take(); _messageQueue.Take();
@@ -159,7 +159,7 @@ namespace MQTTnet.Core.ManagedClient
_publishingCancellationToken = new CancellationTokenSource(); _publishingCancellationToken = new CancellationTokenSource();


#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Factory.StartNew(() => PublishQueuedMessagesAsync(_publishingCancellationToken.Token), _publishingCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
Task.Run(async () => await PublishQueuedMessagesAsync(_publishingCancellationToken.Token), _publishingCancellationToken.Token).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed


continue; continue;
@@ -167,7 +167,7 @@ namespace MQTTnet.Core.ManagedClient


if (connectionState == ReconnectionResult.StillConnected) if (connectionState == ReconnectionResult.StillConnected)
{ {
await Task.Delay(100, _connectionCancellationToken.Token).ConfigureAwait(false); // Consider using the _Disconnected_ event here. (TaskCompletionSource)
await Task.Delay(TimeSpan.FromSeconds(1), _connectionCancellationToken.Token).ConfigureAwait(false);
} }
} }
} }
@@ -176,16 +176,16 @@ namespace MQTTnet.Core.ManagedClient
} }
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
_logger.LogWarning(new EventId(), exception, "Communication exception while maintaining connection.");
_logger.Warning<ManagedMqttClient>(exception, "Communication exception while maintaining connection.");
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Unhandled exception while maintaining connection.");
_logger.Error<ManagedMqttClient>(exception, "Unhandled exception while maintaining connection.");
} }
finally finally
{ {
await _mqttClient.DisconnectAsync().ConfigureAwait(false); await _mqttClient.DisconnectAsync().ConfigureAwait(false);
_logger.LogInformation("Stopped");
_logger.Info<ManagedMqttClient>("Stopped");
} }
} }


@@ -215,7 +215,7 @@ namespace MQTTnet.Core.ManagedClient
} }
finally finally
{ {
_logger.LogInformation("Stopped publishing messages");
_logger.Info<ManagedMqttClient>("Stopped publishing messages");
} }
} }


@@ -227,7 +227,7 @@ namespace MQTTnet.Core.ManagedClient
} }
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
_logger.LogWarning(new EventId(), exception, "Publishing application message failed.");
_logger.Warning<ManagedMqttClient>(exception, "Publishing application message failed.");


if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{ {
@@ -236,13 +236,13 @@ namespace MQTTnet.Core.ManagedClient
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Unhandled exception while publishing queued application message.");
_logger.Error<ManagedMqttClient>(exception, "Unhandled exception while publishing queued application message.");
} }
} }


private async Task PushSubscriptionsAsync() private async Task PushSubscriptionsAsync()
{ {
_logger.LogInformation(nameof(ManagedMqttClient), "Synchronizing subscriptions");
_logger.Info<ManagedMqttClient>(nameof(ManagedMqttClient), "Synchronizing subscriptions");


List<TopicFilter> subscriptions; List<TopicFilter> subscriptions;
lock (_subscriptions) lock (_subscriptions)
@@ -262,7 +262,7 @@ namespace MQTTnet.Core.ManagedClient
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogWarning(new EventId(), exception, "Synchronizing subscriptions failed");
_logger.Warning<ManagedMqttClient>(exception, "Synchronizing subscriptions failed");
_subscriptionsNotPushed = true; _subscriptionsNotPushed = true;
} }
} }


+ 0
- 15
MQTTnet.Core/Server/IMqttClientRetainedMessageManager.cs View File

@@ -1,15 +0,0 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Server
{
public interface IMqttClientRetainedMessageManager
{
Task LoadMessagesAsync();

Task HandleMessageAsync(string clientId, MqttApplicationMessage applicationMessage);

Task<List<MqttApplicationMessage>> GetSubscribedMessagesAsync(MqttSubscribePacket subscribePacket);
}
}

+ 0
- 7
MQTTnet.Core/Server/IMqttClientSesssionFactory.cs View File

@@ -1,7 +0,0 @@
namespace MQTTnet.Core.Server
{
public interface IMqttClientSesssionFactory
{
MqttClientSession CreateClientSession(string sessionId, MqttClientSessionsManager mqttClientSessionsManager);
}
}

+ 1
- 1
MQTTnet.Core/Server/IMqttServer.cs View File

@@ -12,7 +12,7 @@ namespace MQTTnet.Core.Server


Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync(); Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync();


Task StartAsync();
Task StartAsync(MqttServerOptions options);
Task StopAsync(); Task StopAsync();
} }
} }

+ 4
- 2
MQTTnet.Core/Server/IMqttServerFactory.cs View File

@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;


namespace MQTTnet.Core.Server namespace MQTTnet.Core.Server
{ {
@@ -6,6 +8,6 @@ namespace MQTTnet.Core.Server
{ {
IMqttServer CreateMqttServer(); IMqttServer CreateMqttServer();


IMqttServer CreateMqttServer(Action<MqttServerOptions> configure);
IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger);
} }
} }

+ 12
- 12
MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs View File

@@ -3,10 +3,10 @@ using System.Collections.Concurrent;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions; using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using Microsoft.Extensions.Logging;


namespace MQTTnet.Core.Server namespace MQTTnet.Core.Server
{ {
@@ -15,16 +15,16 @@ namespace MQTTnet.Core.Server
private readonly BlockingCollection<MqttPublishPacket> _pendingPublishPackets = new BlockingCollection<MqttPublishPacket>(); private readonly BlockingCollection<MqttPublishPacket> _pendingPublishPackets = new BlockingCollection<MqttPublishPacket>();
private readonly MqttServerOptions _options; private readonly MqttServerOptions _options;
private readonly MqttClientSession _session; private readonly MqttClientSession _session;
private readonly ILogger<MqttClientPendingMessagesQueue> _logger;
private readonly IMqttNetLogger _logger;


public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session, ILogger<MqttClientPendingMessagesQueue> logger)
public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session, IMqttNetLogger logger)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_session = session ?? throw new ArgumentNullException(nameof(session)); _session = session ?? throw new ArgumentNullException(nameof(session));
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
} }


public void Start(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
public void Start(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
{ {
if (adapter == null) throw new ArgumentNullException(nameof(adapter)); if (adapter == null) throw new ArgumentNullException(nameof(adapter));


@@ -41,10 +41,10 @@ namespace MQTTnet.Core.Server
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));


_pendingPublishPackets.Add(publishPacket); _pendingPublishPackets.Add(publishPacket);
_logger.LogTrace("Enqueued packet (ClientId: {0}).", _session.ClientId);
_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _session.ClientId);
} }


private async Task SendPendingPublishPacketsAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
private async Task SendPendingPublishPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
{ {
try try
{ {
@@ -58,11 +58,11 @@ namespace MQTTnet.Core.Server
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _session.ClientId);
_logger.Error<MqttClientPendingMessagesQueue>(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _session.ClientId);
} }
} }


private async Task SendPendingPublishPacketAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
private async Task SendPendingPublishPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
{ {
MqttPublishPacket packet = null; MqttPublishPacket packet = null;
try try
@@ -70,24 +70,24 @@ namespace MQTTnet.Core.Server
packet = _pendingPublishPackets.Take(cancellationToken); packet = _pendingPublishPackets.Take(cancellationToken);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false);


_logger.LogTrace("Enqueued packet sent (ClientId: {0}).", _session.ClientId);
_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _session.ClientId);
} }
catch (Exception exception) catch (Exception exception)
{ {
if (exception is MqttCommunicationTimedOutException) if (exception is MqttCommunicationTimedOutException)
{ {
_logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _session.ClientId);
_logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _session.ClientId);
} }
else if (exception is MqttCommunicationException) else if (exception is MqttCommunicationException)
{ {
_logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _session.ClientId);
_logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _session.ClientId);
} }
else if (exception is OperationCanceledException) else if (exception is OperationCanceledException)
{ {
} }
else else
{ {
_logger.LogError(new EventId(), exception, "Sending publish packet failed (ClientId: {0}).", _session.ClientId);
_logger.Error<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed (ClientId: {0}).", _session.ClientId);
} }


if (packet != null && packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) if (packet != null && packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)


+ 21
- 23
MQTTnet.Core/Server/MqttClientSession.cs View File

@@ -3,13 +3,12 @@ using System.Collections.Generic;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions; using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal; using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using MQTTnet.Core.Serializer; using MQTTnet.Core.Serializer;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;


namespace MQTTnet.Core.Server namespace MQTTnet.Core.Server
{ {
@@ -21,28 +20,27 @@ namespace MQTTnet.Core.Server
private readonly MqttClientSessionsManager _sessionsManager; private readonly MqttClientSessionsManager _sessionsManager;
private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
private readonly MqttServerOptions _options; private readonly MqttServerOptions _options;
private readonly ILogger<MqttClientSession> _logger;
private readonly IMqttNetLogger _logger;


private IMqttCommunicationAdapter _adapter;
private IMqttChannelAdapter _adapter;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
private MqttApplicationMessage _willMessage; private MqttApplicationMessage _willMessage;


public MqttClientSession( public MqttClientSession(
string clientId, string clientId,
IOptions<MqttServerOptions> options,
MqttServerOptions options,
MqttClientSessionsManager sessionsManager, MqttClientSessionsManager sessionsManager,
MqttClientSubscriptionsManager subscriptionsManager,
ILogger<MqttClientSession> logger,
ILogger<MqttClientPendingMessagesQueue> messageQueueLogger)
IMqttNetLogger logger)
{ {
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(subscriptionsManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));


ClientId = clientId; ClientId = clientId;


_options = options.Value;
_pendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, messageQueueLogger);
_options = options;

_subscriptionsManager = new MqttClientSubscriptionsManager(_options);
_pendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger);
} }


public string ClientId { get; } public string ClientId { get; }
@@ -51,7 +49,7 @@ namespace MQTTnet.Core.Server


public bool IsConnected => _adapter != null; public bool IsConnected => _adapter != null;


public async Task RunAsync(MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter)
public async Task RunAsync(MqttApplicationMessage willMessage, IMqttChannelAdapter adapter)
{ {
if (adapter == null) throw new ArgumentNullException(nameof(adapter)); if (adapter == null) throw new ArgumentNullException(nameof(adapter));


@@ -71,11 +69,11 @@ namespace MQTTnet.Core.Server
} }
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
_logger.LogWarning(new EventId(), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
_logger.Warning<MqttClientSession>(exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
_logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
} }
} }


@@ -93,7 +91,7 @@ namespace MQTTnet.Core.Server
_adapter = null; _adapter = null;
} }


_logger.LogInformation("Client '{0}': Session stopped.", ClientId);
_logger.Info<MqttClientSession>("Client '{0}': Session stopped.", ClientId);
} }
finally finally
{ {
@@ -120,7 +118,7 @@ namespace MQTTnet.Core.Server
_pendingMessagesQueue.Enqueue(publishPacket); _pendingMessagesQueue.Enqueue(publishPacket);
} }


private async Task ReceivePacketsAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
private async Task ReceivePacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
{ {
try try
{ {
@@ -135,17 +133,17 @@ namespace MQTTnet.Core.Server
} }
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
_logger.LogWarning(new EventId(), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
_logger.Warning<MqttClientSession>(exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
await StopAsync(); await StopAsync();
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
_logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
await StopAsync(); await StopAsync();
} }
} }


private Task ProcessReceivedPacketAsync(IMqttCommunicationAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken)
private Task ProcessReceivedPacketAsync(IMqttChannelAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken)
{ {
if (packet is MqttPingReqPacket) if (packet is MqttPingReqPacket)
{ {
@@ -188,11 +186,11 @@ namespace MQTTnet.Core.Server
return StopAsync(); return StopAsync();
} }


_logger.LogWarning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
_logger.Warning<MqttClientSession>("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
return StopAsync(); return StopAsync();
} }


private async Task HandleIncomingSubscribePacketAsync(IMqttCommunicationAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken)
private async Task HandleIncomingSubscribePacketAsync(IMqttChannelAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken)
{ {
var subscribeResult = _subscriptionsManager.Subscribe(subscribePacket, ClientId); var subscribeResult = _subscriptionsManager.Subscribe(subscribePacket, ClientId);


@@ -215,7 +213,7 @@ namespace MQTTnet.Core.Server
} }
} }


private async Task HandleIncomingPublishPacketAsync(IMqttCommunicationAdapter adapter, MqttPublishPacket publishPacket, CancellationToken cancellationToken)
private async Task HandleIncomingPublishPacketAsync(IMqttChannelAdapter adapter, MqttPublishPacket publishPacket, CancellationToken cancellationToken)
{ {
var applicationMessage = publishPacket.ToApplicationMessage(); var applicationMessage = publishPacket.ToApplicationMessage();


@@ -255,7 +253,7 @@ namespace MQTTnet.Core.Server
} }
} }


private Task HandleIncomingPubRelPacketAsync(IMqttCommunicationAdapter adapter, MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken)
private Task HandleIncomingPubRelPacketAsync(IMqttChannelAdapter adapter, MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken)
{ {
lock (_unacknowledgedPublishPackets) lock (_unacknowledgedPublishPackets)
{ {


+ 17
- 27
MQTTnet.Core/Server/MqttClientSessionsManager.cs View File

@@ -4,13 +4,12 @@ using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions; using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal; using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using MQTTnet.Core.Serializer; using MQTTnet.Core.Serializer;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;


namespace MQTTnet.Core.Server namespace MQTTnet.Core.Server
{ {
@@ -20,27 +19,21 @@ namespace MQTTnet.Core.Server
private readonly SemaphoreSlim _sessionsSemaphore = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim _sessionsSemaphore = new SemaphoreSlim(1, 1);


private readonly MqttServerOptions _options; private readonly MqttServerOptions _options;
private readonly ILogger<MqttClientSessionsManager> _logger;
private readonly IMqttClientSesssionFactory _clientSesssionFactory;
private readonly IMqttClientRetainedMessageManager _clientRetainedMessageManager;

public MqttClientSessionsManager(
IOptions<MqttServerOptions> options,
ILogger<MqttClientSessionsManager> logger,
IMqttClientSesssionFactory clientSesssionFactory,
IMqttClientRetainedMessageManager clientRetainedMessageManager)
private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttNetLogger _logger;

public MqttClientSessionsManager(MqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_clientSesssionFactory = clientSesssionFactory ?? throw new ArgumentNullException(nameof(clientSesssionFactory));
_clientRetainedMessageManager = clientRetainedMessageManager ?? throw new ArgumentNullException(nameof(clientRetainedMessageManager));
_options = options ?? throw new ArgumentNullException(nameof(options));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
} }


public event EventHandler<MqttClientConnectedEventArgs> ClientConnected; public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected; public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;


public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter, CancellationToken cancellationToken)
public async Task RunClientSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
{ {
var clientId = string.Empty; var clientId = string.Empty;
try try
@@ -80,14 +73,11 @@ namespace MQTTnet.Core.Server
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
})); }));


using (_logger.BeginScope(clientId))
{
await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false);
}
await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false);
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, exception.Message);
_logger.Error<MqttClientSessionsManager>(exception, exception.Message);
} }
finally finally
{ {
@@ -157,7 +147,7 @@ namespace MQTTnet.Core.Server


if (applicationMessage.Retain) if (applicationMessage.Retain)
{ {
await _clientRetainedMessageManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false);
await _retainedMessagesManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false);
} }


var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession?.ClientId, applicationMessage); var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession?.ClientId, applicationMessage);
@@ -165,7 +155,7 @@ namespace MQTTnet.Core.Server
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Error while processing application message");
_logger.Error<MqttClientSessionsManager>(exception, "Error while processing application message");
} }


lock (_sessions) lock (_sessions)
@@ -179,7 +169,7 @@ namespace MQTTnet.Core.Server


public Task<List<MqttApplicationMessage>> GetRetainedMessagesAsync(MqttSubscribePacket subscribePacket) public Task<List<MqttApplicationMessage>> GetRetainedMessagesAsync(MqttSubscribePacket subscribePacket)
{ {
return _clientRetainedMessageManager.GetSubscribedMessagesAsync(subscribePacket);
return _retainedMessagesManager.GetSubscribedMessagesAsync(subscribePacket);
} }


private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket) private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
@@ -206,11 +196,11 @@ namespace MQTTnet.Core.Server
await clientSession.StopAsync(); await clientSession.StopAsync();
clientSession = null; clientSession = null;


_logger.LogTrace("Stopped existing session of client '{0}'.", connectPacket.ClientId);
_logger.Trace<MqttClientSessionsManager>("Stopped existing session of client '{0}'.", connectPacket.ClientId);
} }
else else
{ {
_logger.LogTrace("Reusing existing session of client '{0}'.", connectPacket.ClientId);
_logger.Trace<MqttClientSessionsManager>("Reusing existing session of client '{0}'.", connectPacket.ClientId);
} }
} }


@@ -219,10 +209,10 @@ namespace MQTTnet.Core.Server
{ {
isExistingSession = false; isExistingSession = false;


clientSession = _clientSesssionFactory.CreateClientSession(connectPacket.ClientId, this);
clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _logger);
_sessions[connectPacket.ClientId] = clientSession; _sessions[connectPacket.ClientId] = clientSession;


_logger.LogTrace("Created a new session for client '{0}'.", connectPacket.ClientId);
_logger.Trace<MqttClientSessionsManager>("Created a new session for client '{0}'.", connectPacket.ClientId);
} }


return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession };


+ 2
- 3
MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs View File

@@ -1,6 +1,5 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using Microsoft.Extensions.Options;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;


@@ -11,9 +10,9 @@ namespace MQTTnet.Core.Server
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>(); private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly MqttServerOptions _options; private readonly MqttServerOptions _options;


public MqttClientSubscriptionsManager(IOptions<MqttServerOptions> options)
public MqttClientSubscriptionsManager(MqttServerOptions options)
{ {
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_options = options ?? throw new ArgumentNullException(nameof(options));
} }


public MqttClientSubscribeResult Subscribe(MqttSubscribePacket subscribePacket, string clientId) public MqttClientSubscribeResult Subscribe(MqttSubscribePacket subscribePacket, string clientId)


MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs → MQTTnet.Core/Server/MqttRetainedMessagesManager.cs View File

@@ -4,22 +4,21 @@ using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MQTTnet.Core.Diagnostics;


namespace MQTTnet.Core.Server namespace MQTTnet.Core.Server
{ {
public sealed class MqttClientRetainedMessagesManager : IMqttClientRetainedMessageManager
public sealed class MqttRetainedMessagesManager
{ {
private readonly Dictionary<string, MqttApplicationMessage> _retainedMessages = new Dictionary<string, MqttApplicationMessage>(); private readonly Dictionary<string, MqttApplicationMessage> _retainedMessages = new Dictionary<string, MqttApplicationMessage>();
private readonly SemaphoreSlim _gate = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim _gate = new SemaphoreSlim(1, 1);
private readonly ILogger<MqttClientRetainedMessagesManager> _logger;
private readonly IMqttNetLogger _logger;
private readonly MqttServerOptions _options; private readonly MqttServerOptions _options;


public MqttClientRetainedMessagesManager(IOptions<MqttServerOptions> options, ILogger<MqttClientRetainedMessagesManager> logger)
public MqttRetainedMessagesManager(MqttServerOptions options, IMqttNetLogger logger)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_options = options ?? throw new ArgumentNullException(nameof(options));
} }


public async Task LoadMessagesAsync() public async Task LoadMessagesAsync()
@@ -42,7 +41,7 @@ namespace MQTTnet.Core.Server
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Unhandled exception while loading retained messages.");
_logger.Error<MqttRetainedMessagesManager>(exception, "Unhandled exception while loading retained messages.");
} }
finally finally
{ {
@@ -61,7 +60,7 @@ namespace MQTTnet.Core.Server
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogError(new EventId(), exception, "Unhandled exception while handling retained messages.");
_logger.Error<MqttRetainedMessagesManager>(exception, "Unhandled exception while handling retained messages.");
} }
finally finally
{ {
@@ -110,7 +109,7 @@ namespace MQTTnet.Core.Server
if (applicationMessage.Payload?.Any() == false) if (applicationMessage.Payload?.Any() == false)
{ {
saveIsRequired = _retainedMessages.Remove(applicationMessage.Topic); saveIsRequired = _retainedMessages.Remove(applicationMessage.Topic);
_logger.LogInformation("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic);
_logger.Info<MqttRetainedMessagesManager>("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic);
} }
else else
{ {
@@ -129,12 +128,12 @@ namespace MQTTnet.Core.Server
} }
} }


_logger.LogInformation("Client '{0}' set retained message for topic '{1}'.", clientId, applicationMessage.Topic);
_logger.Info<MqttRetainedMessagesManager>("Client '{0}' set retained message for topic '{1}'.", clientId, applicationMessage.Topic);
} }


if (!saveIsRequired) if (!saveIsRequired)
{ {
_logger.LogTrace("Skipped saving retained messages because no changes were detected.");
_logger.Trace<MqttRetainedMessagesManager>("Skipped saving retained messages because no changes were detected.");
} }


if (saveIsRequired && _options.Storage != null) if (saveIsRequired && _options.Storage != null)

+ 62
- 43
MQTTnet.Core/Server/MqttServer.cs View File

@@ -3,33 +3,24 @@ using System.Collections.Generic;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Linq; using System.Linq;
using MQTTnet.Core.Diagnostics;


namespace MQTTnet.Core.Server namespace MQTTnet.Core.Server
{ {
public class MqttServer : IMqttServer public class MqttServer : IMqttServer
{ {
private readonly IMqttClientRetainedMessageManager _clientRetainedMessageManager;
private readonly ILogger<MqttServer> _logger;
private readonly MqttClientSessionsManager _clientSessionsManager;
private readonly ICollection<IMqttServerAdapter> _adapters; private readonly ICollection<IMqttServerAdapter> _adapters;
private readonly MqttServerOptions _options;
private readonly IMqttNetLogger _logger;


private MqttClientSessionsManager _clientSessionsManager;
private MqttRetainedMessagesManager _retainedMessagesManager;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
private MqttServerOptions _options;


public MqttServer(
IOptions<MqttServerOptions> options,
IEnumerable<IMqttServerAdapter> adapters,
ILogger<MqttServer> logger,
MqttClientSessionsManager clientSessionsManager,
IMqttClientRetainedMessageManager clientRetainedMessageManager)
public MqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger)
{ {
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_clientSessionsManager = clientSessionsManager ?? throw new ArgumentNullException(nameof(clientSessionsManager));
_clientRetainedMessageManager = clientRetainedMessageManager ?? throw new ArgumentNullException(nameof(clientRetainedMessageManager));


if (adapters == null) if (adapters == null)
{ {
@@ -37,15 +28,6 @@ namespace MQTTnet.Core.Server
} }


_adapters = adapters.ToList(); _adapters = adapters.ToList();

_clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e);
_clientSessionsManager.ClientConnected += OnClientConnected;
_clientSessionsManager.ClientDisconnected += OnClientDisconnected;
}

public Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync()
{
return _clientSessionsManager.GetConnectedClientsAsync();
} }


public event EventHandler<MqttServerStartedEventArgs> Started; public event EventHandler<MqttServerStartedEventArgs> Started;
@@ -53,14 +35,16 @@ namespace MQTTnet.Core.Server
public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected; public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;


public Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync()
{
return _clientSessionsManager.GetConnectedClientsAsync();
}

public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages) public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{ {
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));


if (_cancellationTokenSource == null)
{
throw new InvalidOperationException("The server is not started.");
}
if (_cancellationTokenSource == null) throw new InvalidOperationException("The server is not started.");


foreach (var applicationMessage in applicationMessages) foreach (var applicationMessage in applicationMessages)
{ {
@@ -68,13 +52,21 @@ namespace MQTTnet.Core.Server
} }
} }


public async Task StartAsync()
public async Task StartAsync(MqttServerOptions options)
{ {
if (_cancellationTokenSource != null) throw new InvalidOperationException("The MQTT server is already started.");
_options = options ?? throw new ArgumentNullException(nameof(options));

if (_cancellationTokenSource != null) throw new InvalidOperationException("The server is already started.");


_cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource = new CancellationTokenSource();
_retainedMessagesManager = new MqttRetainedMessagesManager(_options, _logger);


await _clientRetainedMessageManager.LoadMessagesAsync();
_clientSessionsManager = new MqttClientSessionsManager(_options, _retainedMessagesManager, _logger);
_clientSessionsManager.ApplicationMessageReceived += OnApplicationMessageReceived;
_clientSessionsManager.ClientConnected += OnClientConnected;
_clientSessionsManager.ClientDisconnected += OnClientDisconnected;

await _retainedMessagesManager.LoadMessagesAsync();


foreach (var adapter in _adapters) foreach (var adapter in _adapters)
{ {
@@ -82,26 +74,48 @@ namespace MQTTnet.Core.Server
await adapter.StartAsync(_options); await adapter.StartAsync(_options);
} }


_logger.LogInformation("Started.");
_logger.Info<MqttServer>("Started.");


Started?.Invoke(this, new MqttServerStartedEventArgs()); Started?.Invoke(this, new MqttServerStartedEventArgs());
} }


public async Task StopAsync() public async Task StopAsync()
{ {
_cancellationTokenSource?.Cancel(false);
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

foreach (var adapter in _adapters)
try
{ {
adapter.ClientAccepted -= OnClientAccepted;
await adapter.StopAsync();
if (_cancellationTokenSource == null)
{
return;
}

_cancellationTokenSource.Cancel(false);
_cancellationTokenSource.Dispose();

foreach (var adapter in _adapters)
{
adapter.ClientAccepted -= OnClientAccepted;
await adapter.StopAsync();
}

await _clientSessionsManager.StopAsync();

_logger.Info<MqttServer>("Stopped.");
} }
finally
{
_cancellationTokenSource = null;


await _clientSessionsManager.StopAsync();
_retainedMessagesManager = null;


_logger.LogInformation("Stopped.");
if (_clientSessionsManager != null)
{
_clientSessionsManager.ApplicationMessageReceived -= OnApplicationMessageReceived;
_clientSessionsManager.ClientConnected -= OnClientConnected;
_clientSessionsManager.ClientDisconnected -= OnClientDisconnected;
}

_clientSessionsManager = null;
}
} }


private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs)
@@ -111,14 +125,19 @@ namespace MQTTnet.Core.Server


private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs) private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs)
{ {
_logger.LogInformation("Client '{0}': Connected.", eventArgs.Client.ClientId);
_logger.Info<MqttServer>("Client '{0}': Connected.", eventArgs.Client.ClientId);
ClientConnected?.Invoke(this, eventArgs); ClientConnected?.Invoke(this, eventArgs);
} }


private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
{ {
_logger.LogInformation("Client '{0}': Disconnected.", eventArgs.Client.ClientId);
_logger.Info<MqttServer>("Client '{0}': Disconnected.", eventArgs.Client.ClientId);
ClientDisconnected?.Invoke(this, eventArgs); ClientDisconnected?.Invoke(this, eventArgs);
} }

private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
ApplicationMessageReceived?.Invoke(this, e);
}
} }
} }

+ 1
- 1
Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj View File

@@ -9,7 +9,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="MSTest.TestAdapter" Version="1.2.0" /> <PackageReference Include="MSTest.TestAdapter" Version="1.2.0" />
<PackageReference Include="MSTest.TestFramework" Version="1.2.0" /> <PackageReference Include="MSTest.TestFramework" Version="1.2.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.3.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0" />
</ItemGroup> </ItemGroup>


<ItemGroup> <ItemGroup>


+ 0
- 51
Tests/MQTTnet.Core.Tests/MqttLoggerProviderTest.cs View File

@@ -1,51 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Core.Diagnostics;

namespace MQTTnet.Core.Tests
{
[TestClass]
public class MqttLoggerProviderTest
{
[TestMethod]
public void TestLoggerCallback()
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddLogging();

var serviceProvider = serviceCollection.BuildServiceProvider();
using ((IDisposable)serviceProvider)
{
var loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();

loggerFactory.AddMqttTrace();

var expectedMsg = "Hello World!";
MqttNetTraceMessage msg = null;

MqttNetTrace.TraceMessagePublished += (sender, args) =>
{
msg = args.TraceMessage;
};

var logger = loggerFactory.CreateLogger<MqttLoggerProviderTest>();

logger.LogInformation(expectedMsg);

Assert.AreEqual(expectedMsg, msg.Message);

var expectedException = new Exception("bad stuff");

logger.LogError(new EventId(), expectedException, expectedException.Message);
Assert.AreEqual(expectedException, msg.Exception);
Assert.AreEqual(expectedException.Message, msg.Message);
}
}
}
}

+ 37
- 106
Tests/MQTTnet.Core.Tests/MqttServerTests.cs View File

@@ -1,16 +1,12 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting; using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets;


namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
{ {
@@ -54,18 +50,12 @@ namespace MQTTnet.Core.Tests
public async Task MqttServer_WillMessage() public async Task MqttServer_WillMessage()
{ {
var serverAdapter = new TestMqttServerAdapter(); var serverAdapter = new TestMqttServerAdapter();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());


var services = new ServiceCollection()
.AddLogging()
.AddMqttServer()
.AddSingleton<IMqttServerAdapter>(serverAdapter)
.BuildServiceProvider();

var s = new MqttFactory(services).CreateMqttServer();
var receivedMessagesCount = 0; var receivedMessagesCount = 0;
try try
{ {
await s.StartAsync();
await s.StartAsync(new MqttServerOptions());


var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build(); var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c1 = await serverAdapter.ConnectTestClient(s, "c1");
@@ -92,19 +82,13 @@ namespace MQTTnet.Core.Tests
public async Task MqttServer_Unsubscribe() public async Task MqttServer_Unsubscribe()
{ {
var serverAdapter = new TestMqttServerAdapter(); var serverAdapter = new TestMqttServerAdapter();
var services = new ServiceCollection()
.AddLogging()
.AddMqttServer()
.AddSingleton<IMqttServerAdapter>(serverAdapter)
.BuildServiceProvider();

var s = new MqttFactory(services).CreateMqttServer();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());


var receivedMessagesCount = 0; var receivedMessagesCount = 0;


try try
{ {
await s.StartAsync();
await s.StartAsync(new MqttServerOptions());


var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c1 = await serverAdapter.ConnectTestClient(s, "c1");
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var c2 = await serverAdapter.ConnectTestClient(s, "c2");
@@ -140,18 +124,12 @@ namespace MQTTnet.Core.Tests
public async Task MqttServer_Publish() public async Task MqttServer_Publish()
{ {
var serverAdapter = new TestMqttServerAdapter(); var serverAdapter = new TestMqttServerAdapter();
var services = new ServiceCollection()
.AddLogging()
.AddMqttServer()
.AddSingleton<IMqttServerAdapter>(serverAdapter)
.BuildServiceProvider();

var s = new MqttFactory(services).CreateMqttServer();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
var receivedMessagesCount = 0; var receivedMessagesCount = 0;


try try
{ {
await s.StartAsync();
await s.StartAsync(new MqttServerOptions());


var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c1 = await serverAdapter.ConnectTestClient(s, "c1");


@@ -167,7 +145,7 @@ namespace MQTTnet.Core.Tests
{ {
await s.StopAsync(); await s.StopAsync();
} }
Assert.AreEqual(1, receivedMessagesCount); Assert.AreEqual(1, receivedMessagesCount);
} }


@@ -175,18 +153,13 @@ namespace MQTTnet.Core.Tests
public async Task MqttServer_NoRetainedMessage() public async Task MqttServer_NoRetainedMessage()
{ {
var serverAdapter = new TestMqttServerAdapter(); var serverAdapter = new TestMqttServerAdapter();
var services = new ServiceCollection()
.AddLogging()
.AddMqttServer()
.AddSingleton<IMqttServerAdapter>(serverAdapter)
.BuildServiceProvider();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());


var s = new MqttFactory(services).CreateMqttServer();
var receivedMessagesCount = 0; var receivedMessagesCount = 0;


try try
{ {
await s.StartAsync();
await s.StartAsync(new MqttServerOptions());


var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c1 = await serverAdapter.ConnectTestClient(s, "c1");
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).Build()); await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).Build());
@@ -194,7 +167,7 @@ namespace MQTTnet.Core.Tests


var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
await c2.SubscribeAsync(new TopicFilter("retained"));


await Task.Delay(500); await Task.Delay(500);
} }
@@ -210,28 +183,23 @@ namespace MQTTnet.Core.Tests
public async Task MqttServer_RetainedMessage() public async Task MqttServer_RetainedMessage()
{ {
var serverAdapter = new TestMqttServerAdapter(); var serverAdapter = new TestMqttServerAdapter();
var services = new ServiceCollection()
.AddLogging()
.AddMqttServer()
.AddSingleton<IMqttServerAdapter>(serverAdapter)
.BuildServiceProvider();

var s = new MqttFactory(services).CreateMqttServer();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());


var receivedMessagesCount = 0; var receivedMessagesCount = 0;
try try
{ {
await s.StartAsync();
await s.StartAsync(new MqttServerOptions());


var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c1 = await serverAdapter.ConnectTestClient(s, "c1");
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
await c1.DisconnectAsync(); await c1.DisconnectAsync();


await services.WaitForRetainedMessage("retained").TimeoutAfter(TimeSpan.FromSeconds(5));
await Task.Delay(TimeSpan.FromSeconds(2));
// TODO: Find another way to wait for the retained components.


var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
await c2.SubscribeAsync(new TopicFilter("retained"));


await Task.Delay(500); await Task.Delay(500);
} }
@@ -239,7 +207,7 @@ namespace MQTTnet.Core.Tests
{ {
await s.StopAsync(); await s.StopAsync();
} }
Assert.AreEqual(1, receivedMessagesCount); Assert.AreEqual(1, receivedMessagesCount);
} }


@@ -247,17 +215,12 @@ namespace MQTTnet.Core.Tests
public async Task MqttServer_ClearRetainedMessage() public async Task MqttServer_ClearRetainedMessage()
{ {
var serverAdapter = new TestMqttServerAdapter(); var serverAdapter = new TestMqttServerAdapter();
var services = new ServiceCollection()
.AddLogging()
.AddMqttServer()
.AddSingleton<IMqttServerAdapter>(serverAdapter)
.BuildServiceProvider();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());


var s = new MqttFactory(services).CreateMqttServer();
var receivedMessagesCount = 0; var receivedMessagesCount = 0;
try try
{ {
await s.StartAsync();
await s.StartAsync(new MqttServerOptions());


var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c1 = await serverAdapter.ConnectTestClient(s, "c1");
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
@@ -284,18 +247,13 @@ namespace MQTTnet.Core.Tests
{ {
var storage = new TestStorage(); var storage = new TestStorage();
var serverAdapter = new TestMqttServerAdapter(); var serverAdapter = new TestMqttServerAdapter();

var services = new ServiceCollection()
.AddLogging()
.AddMqttServer()
.AddSingleton<IMqttServerAdapter>(serverAdapter)
.BuildServiceProvider();

var s = new MqttFactory(services).CreateMqttServer(options => options.Storage = storage);
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());


try try
{ {
await s.StartAsync();
var options = new MqttServerOptions { Storage = storage };

await s.StartAsync(options);


var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c1 = await serverAdapter.ConnectTestClient(s, "c1");
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
@@ -306,18 +264,20 @@ namespace MQTTnet.Core.Tests
await s.StopAsync(); await s.StopAsync();
} }


await services.WaitForRetainedMessage("retained").TimeoutAfter(TimeSpan.FromSeconds(5));
await Task.Delay(TimeSpan.FromSeconds(2));
// TODO: Find another way to wait for the retained components.


s = new MqttFactory(services).CreateMqttServer(options => options.Storage = storage);
s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());


var receivedMessagesCount = 0; var receivedMessagesCount = 0;
try try
{ {
await s.StartAsync();
var options = new MqttServerOptions { Storage = storage };
await s.StartAsync(options);


var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
await c2.SubscribeAsync(new TopicFilter("retained"));


await Task.Delay(500); await Task.Delay(500);
} }
@@ -338,16 +298,13 @@ namespace MQTTnet.Core.Tests
} }


var serverAdapter = new TestMqttServerAdapter(); var serverAdapter = new TestMqttServerAdapter();
var services = new ServiceCollection()
.AddLogging()
.AddMqttServer()
.AddSingleton<IMqttServerAdapter>(serverAdapter)
.BuildServiceProvider();

var s = new MqttFactory(services).CreateMqttServer(options => options.ApplicationMessageInterceptor = Interceptor);
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
try try
{ {
await s.StartAsync();
var options = new MqttServerOptions { ApplicationMessageInterceptor = Interceptor };

await s.StartAsync(options);


var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c1 = await serverAdapter.ConnectTestClient(s, "c1");
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var c2 = await serverAdapter.ConnectTestClient(s, "c2");
@@ -397,17 +354,12 @@ namespace MQTTnet.Core.Tests
int expectedReceivedMessagesCount) int expectedReceivedMessagesCount)
{ {
var serverAdapter = new TestMqttServerAdapter(); var serverAdapter = new TestMqttServerAdapter();
var services = new ServiceCollection()
.AddMqttServer()
.AddLogging()
.AddSingleton<IMqttServerAdapter>(serverAdapter)
.BuildServiceProvider();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());


var s = services.GetRequiredService<IMqttServer>();
var receivedMessagesCount = 0; var receivedMessagesCount = 0;
try try
{ {
await s.StartAsync();
await s.StartAsync(new MqttServerOptions());


var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c1 = await serverAdapter.ConnectTestClient(s, "c1");
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var c2 = await serverAdapter.ConnectTestClient(s, "c2");
@@ -426,29 +378,8 @@ namespace MQTTnet.Core.Tests
{ {
await s.StopAsync(); await s.StopAsync();
} }
Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
}
}


public static class TestExtensions
{
public static async Task WaitForRetainedMessage(this IServiceProvider services, string topic)
{
var retainMessagemanager = services.GetRequiredService<IMqttClientRetainedMessageManager>();
var subscribe = new MqttSubscribePacket()
{
TopicFilters = new List<TopicFilter>()
{
new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)
}
};

while (!(await retainMessagemanager.GetSubscribedMessagesAsync(subscribe)).Any())
{
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
} }
} }
} }

+ 4
- 5
Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs View File

@@ -1,5 +1,4 @@
using Microsoft.Extensions.Options;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;
@@ -12,7 +11,7 @@ namespace MQTTnet.Core.Tests
[TestMethod] [TestMethod]
public void MqttSubscriptionsManager_SubscribeSingleSuccess() public void MqttSubscriptionsManager_SubscribeSingleSuccess()
{ {
var sm = new MqttClientSubscriptionsManager(new OptionsWrapper<MqttServerOptions>(new MqttServerOptions()));
var sm = new MqttClientSubscriptionsManager(new MqttServerOptions());


var sp = new MqttSubscribePacket(); var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("A/B/C")); sp.TopicFilters.Add(new TopicFilter("A/B/C"));
@@ -31,7 +30,7 @@ namespace MQTTnet.Core.Tests
[TestMethod] [TestMethod]
public void MqttSubscriptionsManager_SubscribeSingleNoSuccess() public void MqttSubscriptionsManager_SubscribeSingleNoSuccess()
{ {
var sm = new MqttClientSubscriptionsManager(new OptionsWrapper<MqttServerOptions>(new MqttServerOptions()));
var sm = new MqttClientSubscriptionsManager(new MqttServerOptions());


var sp = new MqttSubscribePacket(); var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("A/B/C")); sp.TopicFilters.Add(new TopicFilter("A/B/C"));
@@ -50,7 +49,7 @@ namespace MQTTnet.Core.Tests
[TestMethod] [TestMethod]
public void MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle() public void MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle()
{ {
var sm = new MqttClientSubscriptionsManager(new OptionsWrapper<MqttServerOptions>(new MqttServerOptions()));
var sm = new MqttClientSubscriptionsManager(new MqttServerOptions());


var sp = new MqttSubscribePacket(); var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("A/B/C")); sp.TopicFilters.Add(new TopicFilter("A/B/C"));


+ 0
- 30
Tests/MQTTnet.Core.Tests/ServiceCollectionTest.cs View File

@@ -1,30 +0,0 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace MQTTnet.Core.Tests
{
[TestClass]
public class ServiceCollectionTest
{
[TestMethod]
public void TestCanConstructAllServices()
{
var services = new ServiceCollection()
.AddLogging()
.AddMqttServer()
.AddMqttClient();

var serviceProvider = services
.BuildServiceProvider();

foreach (var service in services)
{
if (service.ServiceType.IsGenericType)
{
continue;
}
serviceProvider.GetRequiredService(service.ServiceType);
}
}
}
}

+ 0
- 14
Tests/MQTTnet.Core.Tests/TestClientSessionFactory.cs View File

@@ -1,14 +0,0 @@
using System;
using MQTTnet.Core.Server;

namespace MQTTnet.Core.Tests
{
public class TestClientSessionFactory : IMqttClientSesssionFactory
{
public MqttClientSession CreateClientSession(string clientId, MqttClientSessionsManager mqttClientSessionsManager)
{
throw new NotImplementedException();
//return new MqttClientSession(clientId, mqttClientSessionsManager, new TestLogger<MqttClientSession>(), new TestLogger<MqttClientPendingMessagesQueue>());
}
}
}

+ 0
- 26
Tests/MQTTnet.Core.Tests/TestLogger.cs View File

@@ -1,26 +0,0 @@
using Microsoft.Extensions.Logging;
using System;

namespace MQTTnet.Core.Tests
{
public class TestLogger<T> : IDisposable, ILogger<T>
{
public IDisposable BeginScope<TState>(TState state)
{
return this;
}

public bool IsEnabled(LogLevel logLevel)
{
return true;
}

public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
}

public void Dispose()
{
}
}
}

+ 1
- 1
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs View File

@@ -9,7 +9,7 @@ using MQTTnet.Core.Serializer;


namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
{ {
public class TestMqttCommunicationAdapter : IMqttCommunicationAdapter
public class TestMqttCommunicationAdapter : IMqttChannelAdapter
{ {
private readonly BlockingCollection<MqttBasePacket> _incomingPackets = new BlockingCollection<MqttBasePacket>(); private readonly BlockingCollection<MqttBasePacket> _incomingPackets = new BlockingCollection<MqttBasePacket>();




+ 6
- 11
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs View File

@@ -1,24 +1,19 @@
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Diagnostics;


namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
{ {
public class TestMqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
public class TestMqttCommunicationAdapterFactory : IMqttClientAdapterFactory
{ {
private readonly IMqttCommunicationAdapter _adapter;
private readonly IMqttChannelAdapter _adapter;


public TestMqttCommunicationAdapterFactory(IMqttCommunicationAdapter adapter)
public TestMqttCommunicationAdapterFactory(IMqttChannelAdapter adapter)
{ {
_adapter = adapter; _adapter = adapter;
} }

public IMqttCommunicationAdapter CreateClientCommunicationAdapter(IMqttClientOptions options)
{
return _adapter;
}

public IMqttCommunicationAdapter CreateServerCommunicationAdapter(IMqttCommunicationChannel channel)
public IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger)
{ {
return _adapter; return _adapter;
} }


+ 3
- 3
Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs View File

@@ -3,6 +3,7 @@ using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Diagnostics;


namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
{ {
@@ -19,8 +20,7 @@ namespace MQTTnet.Core.Tests


var client = new MqttClient( var client = new MqttClient(
new TestMqttCommunicationAdapterFactory(adapterA), new TestMqttCommunicationAdapterFactory(adapterA),
new TestLogger<MqttClient>(),
new MqttPacketDispatcher(new TestLogger<MqttPacketDispatcher>()));
new MqttNetLogger());


var connected = WaitForClientToConnect(server, clientId); var connected = WaitForClientToConnect(server, clientId);


@@ -57,7 +57,7 @@ namespace MQTTnet.Core.Tests
return tcs.Task; return tcs.Task;
} }


private void FireClientAcceptedEvent(IMqttCommunicationAdapter adapter)
private void FireClientAcceptedEvent(IMqttChannelAdapter adapter)
{ {
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(adapter)); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(adapter));
} }


+ 1
- 3
Tests/MQTTnet.TestApp.NetCore/ClientTest.cs View File

@@ -1,7 +1,6 @@
using System; using System;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using MQTTnet.Core; using MQTTnet.Core;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;


@@ -28,8 +27,7 @@ namespace MQTTnet.TestApp.NetCore
}; };


var factory = new MqttFactory(); var factory = new MqttFactory();
factory.GetLoggerFactory().AddConsole();

var client = factory.CreateMqttClient(); var client = factory.CreateMqttClient();


client.ApplicationMessageReceived += (s, e) => client.ApplicationMessageReceived += (s, e) =>


+ 0
- 6
Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj View File

@@ -14,10 +14,4 @@
<ProjectReference Include="..\..\Frameworks\MQTTnet.Netstandard\MQTTnet.NetStandard.csproj" /> <ProjectReference Include="..\..\Frameworks\MQTTnet.Netstandard\MQTTnet.NetStandard.csproj" />
</ItemGroup> </ItemGroup>


<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Console">
<Version>1.1.2</Version>
</PackageReference>
</ItemGroup>

</Project> </Project>

+ 1
- 11
Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs View File

@@ -4,8 +4,6 @@ using MQTTnet.Core;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.ManagedClient; using MQTTnet.Core.ManagedClient;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System.IO; using System.IO;
using Newtonsoft.Json; using Newtonsoft.Json;
using System.Collections.Generic; using System.Collections.Generic;
@@ -16,14 +14,6 @@ namespace MQTTnet.TestApp.NetCore
{ {
public static async Task RunAsync() public static async Task RunAsync()
{ {
var services = new ServiceCollection()
.AddMqttClient()
.AddLogging()
.BuildServiceProvider();

services.GetService<ILoggerFactory>()
.AddConsole();

var ms = new ClientRetainedMessageHandler(); var ms = new ClientRetainedMessageHandler();
var options = new ManagedMqttClientOptions var options = new ManagedMqttClientOptions
@@ -44,7 +34,7 @@ namespace MQTTnet.TestApp.NetCore


try try
{ {
var managedClient = services.GetRequiredService<ManagedMqttClient>();
var managedClient = new MqttFactory().CreateManagedMqttClient();
managedClient.ApplicationMessageReceived += (s, e) => managedClient.ApplicationMessageReceived += (s, e) =>
{ {
Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic); Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic);


+ 14
- 43
Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs View File

@@ -1,9 +1,6 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using MQTTnet.Core;
using MQTTnet.Core;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
@@ -11,6 +8,7 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Server;


namespace MQTTnet.TestApp.NetCore namespace MQTTnet.TestApp.NetCore
{ {
@@ -18,46 +16,21 @@ namespace MQTTnet.TestApp.NetCore
{ {
public static async Task RunAsync() public static async Task RunAsync()
{ {
var services = new ServiceCollection()
.AddMqttServer(options =>
{

options.ConnectionValidator = p =>
{
if (p.ClientId == "SpecialClient")
{
if (p.Username != "USER" || p.Password != "PASS")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
}

return MqttConnectReturnCode.ConnectionAccepted;
};

options.DefaultCommunicationTimeout = TimeSpan.FromMinutes(10);
})
.AddMqttClient()
.AddLogging()
.BuildServiceProvider();

//services.GetService<ILoggerFactory>().AddConsole(LogLevel.Warning, true);

Console.WriteLine("Press 'c' for concurrent sends. Otherwise in one batch."); Console.WriteLine("Press 'c' for concurrent sends. Otherwise in one batch.");
var concurrent = Console.ReadKey(true).KeyChar == 'c'; var concurrent = Console.ReadKey(true).KeyChar == 'c';


var server = Task.Factory.StartNew(() => RunServerAsync(services), TaskCreationOptions.LongRunning);
var client = Task.Factory.StartNew(() => RunClientAsync(2000, TimeSpan.FromMilliseconds(10), services, concurrent), TaskCreationOptions.LongRunning);
var server = Task.Factory.StartNew(async () => await RunServerAsync(), TaskCreationOptions.LongRunning);
var client = Task.Factory.StartNew(async () => await RunClientAsync(2000, TimeSpan.FromMilliseconds(10), concurrent), TaskCreationOptions.LongRunning);


await Task.WhenAll(server, client).ConfigureAwait(false); await Task.WhenAll(server, client).ConfigureAwait(false);
} }


private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval, IServiceProvider serviceProvider, bool concurrent)
private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval, bool concurrent)
{ {
return Task.WhenAll(Enumerable.Range(0, 3).Select(i => Task.Run(() => RunClientAsync(msgChunkSize, interval, serviceProvider, concurrent))));
return Task.WhenAll(Enumerable.Range(0, 3).Select(i => Task.Run(() => RunClientAsync(msgChunkSize, interval, concurrent))));
} }


private static async Task RunClientAsync(int msgChunkSize, TimeSpan interval, IServiceProvider serviceProvider, bool concurrent)
private static async Task RunClientAsync(int msgChunkSize, TimeSpan interval, bool concurrent)
{ {
try try
{ {
@@ -69,7 +42,7 @@ namespace MQTTnet.TestApp.NetCore
CommunicationTimeout = TimeSpan.FromMinutes(10) CommunicationTimeout = TimeSpan.FromMinutes(10)
}; };


var client = serviceProvider.GetRequiredService<IMqttClient>();
var client = new MqttFactory().CreateMqttClient();


client.Connected += async (s, e) => client.Connected += async (s, e) =>
{ {
@@ -77,7 +50,7 @@ namespace MQTTnet.TestApp.NetCore


await client.SubscribeAsync(new List<TopicFilter> await client.SubscribeAsync(new List<TopicFilter>
{ {
new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)
new TopicFilter("#")
}); });


Console.WriteLine("### SUBSCRIBED ###"); Console.WriteLine("### SUBSCRIBED ###");
@@ -120,7 +93,6 @@ namespace MQTTnet.TestApp.NetCore
stopwatch.Stop(); stopwatch.Stop();
Console.WriteLine($"Sent 10.000 messages within {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / (float)testMessageCount} ms / message)."); Console.WriteLine($"Sent 10.000 messages within {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / (float)testMessageCount} ms / message).");



var messages = new[] { message }; var messages = new[] { message };
var sentMessagesCount = 0; var sentMessagesCount = 0;


@@ -158,9 +130,7 @@ namespace MQTTnet.TestApp.NetCore
msgCount += msgs.Count; msgCount += msgs.Count;
//send multiple //send multiple
} }



var now = DateTime.Now; var now = DateTime.Now;
if (last < now - TimeSpan.FromSeconds(1)) if (last < now - TimeSpan.FromSeconds(1))
{ {
@@ -194,11 +164,12 @@ namespace MQTTnet.TestApp.NetCore
return Task.Run(() => client.PublishAsync(applicationMessage)); return Task.Run(() => client.PublishAsync(applicationMessage));
} }


private static async Task RunServerAsync(IServiceProvider serviceProvider)
private static async Task RunServerAsync()
{ {
try try
{ {
var mqttServer = serviceProvider.GetRequiredService<IMqttServer>();
var mqttServer = new MqttFactory().CreateMqttServer();

var msgs = 0; var msgs = 0;
var stopwatch = Stopwatch.StartNew(); var stopwatch = Stopwatch.StartNew();
mqttServer.ApplicationMessageReceived += (sender, args) => mqttServer.ApplicationMessageReceived += (sender, args) =>
@@ -211,7 +182,7 @@ namespace MQTTnet.TestApp.NetCore
stopwatch.Restart(); stopwatch.Restart();
} }
}; };
await mqttServer.StartAsync();
await mqttServer.StartAsync(new MqttServerOptions());


Console.WriteLine("Press any key to exit."); Console.WriteLine("Press any key to exit.");
Console.ReadLine(); Console.ReadLine();


+ 0
- 3
Tests/MQTTnet.TestApp.NetCore/Program.cs View File

@@ -7,7 +7,6 @@ using System.IO;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Newtonsoft.Json; using Newtonsoft.Json;
using Microsoft.Extensions.Logging;


namespace MQTTnet.TestApp.NetCore namespace MQTTnet.TestApp.NetCore
{ {
@@ -70,8 +69,6 @@ namespace MQTTnet.TestApp.NetCore


{ {
var factory = new MqttFactory(); var factory = new MqttFactory();
factory.GetLoggerFactory().AddConsole();

var client = factory.CreateMqttClient(); var client = factory.CreateMqttClient();
} }
} }


+ 20
- 27
Tests/MQTTnet.TestApp.NetCore/ServerTest.cs View File

@@ -3,24 +3,21 @@ using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using MQTTnet.Diagnostics;


namespace MQTTnet.TestApp.NetCore namespace MQTTnet.TestApp.NetCore
{ {
public static class ServerTest public static class ServerTest
{ {
public static Task RunAsync()
public static async Task RunAsync()
{ {
try try
{ {
var services = new ServiceCollection()
.AddMqttServer()
.AddLogging();
MqttNetConsoleTrace.ForwardToConsole();


services.Configure<MqttServerOptions>(options =>
var options = new MqttServerOptions
{ {
options.ConnectionValidator = p =>
ConnectionValidator = p =>
{ {
if (p.ClientId == "SpecialClient") if (p.ClientId == "SpecialClient")
{ {
@@ -31,22 +28,19 @@ namespace MQTTnet.TestApp.NetCore
} }


return MqttConnectReturnCode.ConnectionAccepted; return MqttConnectReturnCode.ConnectionAccepted;
};
},


options.Storage = new RetainedMessageHandler();

// Extend the timestamp for all messages from clients.
options.ApplicationMessageInterceptor = context =>
Storage = new RetainedMessageHandler(),
ApplicationMessageInterceptor = context =>
{ {
if (MqttTopicFilterComparer.IsMatch(context.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#")) if (MqttTopicFilterComparer.IsMatch(context.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#"))
{ {
// Replace the payload with the timestamp. But also extending a JSON
// based payload with the timestamp is a suitable use case.
context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
// Replace the payload with the timestamp. But also extending a JSON
// based payload with the timestamp is a suitable use case.
context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
} }
};
// Protect several topics from being subscribed from every client.
options.SubscriptionsInterceptor = context =>
},
SubscriptionsInterceptor = context =>
{ {
if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin") if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
{ {
@@ -58,11 +52,11 @@ namespace MQTTnet.TestApp.NetCore
context.AcceptSubscription = false; context.AcceptSubscription = false;
context.CloseConnection = true; context.CloseConnection = true;
} }
};
});
}
};


var serviceProvider = services.BuildServiceProvider();
serviceProvider.GetRequiredService<ILoggerFactory>().AddConsole();
// Extend the timestamp for all messages from clients.
// Protect several topics from being subscribed from every client.


//var certificate = new X509Certificate(@"C:\certs\test\test.cer", ""); //var certificate = new X509Certificate(@"C:\certs\test\test.cer", "");
//options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert); //options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert);
@@ -70,18 +64,18 @@ namespace MQTTnet.TestApp.NetCore
//options.DefaultEndpointOptions.IsEnabled = true; //options.DefaultEndpointOptions.IsEnabled = true;
//options.TlsEndpointOptions.IsEnabled = false; //options.TlsEndpointOptions.IsEnabled = false;


var mqttServer = new MqttFactory(serviceProvider).CreateMqttServer();
var mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.ClientDisconnected += (s, e) => mqttServer.ClientDisconnected += (s, e) =>
{ {
Console.Write("Client disconnected event fired."); Console.Write("Client disconnected event fired.");
}; };


mqttServer.StartAsync();
await mqttServer.StartAsync(options);


Console.WriteLine("Press any key to exit."); Console.WriteLine("Press any key to exit.");
Console.ReadLine(); Console.ReadLine();


mqttServer.StopAsync();
await mqttServer.StopAsync();
} }
catch (Exception e) catch (Exception e)
{ {
@@ -89,7 +83,6 @@ namespace MQTTnet.TestApp.NetCore
} }


Console.ReadLine(); Console.ReadLine();
return Task.FromResult(0);
} }
} }
} }

+ 0
- 6
Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj View File

@@ -137,12 +137,6 @@
</ProjectReference> </ProjectReference>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging">
<Version>1.1.2</Version>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Logging.Console">
<Version>1.1.2</Version>
</PackageReference>
<PackageReference Include="Microsoft.NETCore.UniversalWindowsPlatform"> <PackageReference Include="Microsoft.NETCore.UniversalWindowsPlatform">
<Version>5.4.0</Version> <Version>5.4.0</Version>
</PackageReference> </PackageReference>


+ 49
- 70
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs View File

@@ -5,8 +5,6 @@ using System.Threading.Tasks;
using Windows.Security.Cryptography.Certificates; using Windows.Security.Cryptography.Certificates;
using Windows.UI.Core; using Windows.UI.Core;
using Windows.UI.Xaml; using Windows.UI.Xaml;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using MQTTnet.Core; using MQTTnet.Core;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Diagnostics;
@@ -19,7 +17,7 @@ namespace MQTTnet.TestApp.UniversalWindows
{ {
public sealed partial class MainPage public sealed partial class MainPage
{ {
private readonly ConcurrentQueue<MqttNetTraceMessage> _traceMessages = new ConcurrentQueue<MqttNetTraceMessage>();
private readonly ConcurrentQueue<MqttNetLogMessage> _traceMessages = new ConcurrentQueue<MqttNetLogMessage>();


private IMqttClient _mqttClient; private IMqttClient _mqttClient;
private IMqttServer _mqttServer; private IMqttServer _mqttServer;
@@ -28,10 +26,10 @@ namespace MQTTnet.TestApp.UniversalWindows
{ {
InitializeComponent(); InitializeComponent();


MqttNetTrace.TraceMessagePublished += OnTraceMessagePublished;
MqttNetGlobalLog.LogMessagePublished += OnTraceMessagePublished;
} }


private async void OnTraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e)
private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
{ {
_traceMessages.Enqueue(e.TraceMessage); _traceMessages.Enqueue(e.TraceMessage);
while (_traceMessages.Count > 100) while (_traceMessages.Count > 100)
@@ -252,7 +250,7 @@ namespace MQTTnet.TestApp.UniversalWindows
{ {
{ {
// Write all trace messages to the console window. // Write all trace messages to the console window.
MqttNetTrace.TraceMessagePublished += (s, e) =>
MqttNetGlobalLog.LogMessagePublished += (s, e) =>
{ {
Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}");
if (e.TraceMessage.Exception != null) if (e.TraceMessage.Exception != null)
@@ -262,32 +260,12 @@ namespace MQTTnet.TestApp.UniversalWindows
}; };
} }


{
// Add the console logger to the logger factory.
var services = new ServiceCollection()
.AddMqttClient()
.BuildServiceProvider();

services.GetRequiredService<ILoggerFactory>()
.AddConsole();
}

{ {
// Use a custom identifier for the trace messages. // Use a custom identifier for the trace messages.
var clientOptions = new MqttClientOptionsBuilder() var clientOptions = new MqttClientOptionsBuilder()
.WithLogId("ClientX")
.Build(); .Build();
} }


{
// Create a client from the service provider manually.
var serviceProvider = new ServiceCollection()
.AddMqttClient()
.BuildServiceProvider();

var mqttClient = serviceProvider.GetRequiredService<IMqttClient>();
}

{ {
// Create a new MQTT client. // Create a new MQTT client.
var factory = new MqttFactory(); var factory = new MqttFactory();
@@ -371,34 +349,31 @@ namespace MQTTnet.TestApp.UniversalWindows


// ---------------------------------- // ----------------------------------
{ {
var services = new ServiceCollection()
.AddMqttServer(options =>
var options = new MqttServerOptions();

options.ConnectionValidator = c =>
{
if (c.ClientId.Length < 10)
{ {
options.ConnectionValidator = c =>
{
if (c.ClientId.Length < 10)
{
return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
}
return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
}


if (c.Username != "mySecretUser")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
if (c.Username != "mySecretUser")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}


if (c.Password != "mySecretPassword")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
if (c.Password != "mySecretPassword")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}


return MqttConnectReturnCode.ConnectionAccepted;
};
})
.BuildServiceProvider();
return MqttConnectReturnCode.ConnectionAccepted;
};


var factory = new MqttFactory(services);
var factory = new MqttFactory();
var mqttServer = factory.CreateMqttServer(); var mqttServer = factory.CreateMqttServer();
await mqttServer.StartAsync();
await mqttServer.StartAsync(options);


Console.WriteLine("Press any key to exit."); Console.WriteLine("Press any key to exit.");
Console.ReadLine(); Console.ReadLine();
@@ -422,7 +397,7 @@ namespace MQTTnet.TestApp.UniversalWindows
{ {
// Start a MQTT server. // Start a MQTT server.
var mqttServer = new MqttFactory().CreateMqttServer(); var mqttServer = new MqttFactory().CreateMqttServer();
await mqttServer.StartAsync();
await mqttServer.StartAsync(new MqttServerOptions());
Console.WriteLine("Press any key to exit."); Console.WriteLine("Press any key to exit.");
Console.ReadLine(); Console.ReadLine();
await mqttServer.StopAsync(); await mqttServer.StopAsync();
@@ -430,27 +405,31 @@ namespace MQTTnet.TestApp.UniversalWindows


{ {
// Configure MQTT server. // Configure MQTT server.
var mqttServer = new MqttFactory().CreateMqttServer(options =>
var options = new MqttServerOptions
{
ConnectionBacklog = 100
};

options.DefaultEndpointOptions.Port = 1884;
options.ConnectionValidator = packet =>
{ {
options.ConnectionBacklog = 100;
options.DefaultEndpointOptions.Port = 1884;
options.ConnectionValidator = packet =>
if (packet.ClientId != "Highlander")
{ {
if (packet.ClientId != "Highlander")
{
return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
}
return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
}


return MqttConnectReturnCode.ConnectionAccepted;
};
});
return MqttConnectReturnCode.ConnectionAccepted;
};

var mqttServer = new MqttFactory().CreateMqttServer();
await mqttServer.StartAsync(options);
} }


{ {
// Setup client validator. // Setup client validator.
var mqttServer = new MqttFactory().CreateMqttServer(options =>
var options = new MqttServerOptions
{ {
options.ConnectionValidator = c =>
ConnectionValidator = c =>
{ {
if (c.ClientId.Length < 10) if (c.ClientId.Length < 10)
{ {
@@ -468,8 +447,8 @@ namespace MQTTnet.TestApp.UniversalWindows
} }


return MqttConnectReturnCode.ConnectionAccepted; return MqttConnectReturnCode.ConnectionAccepted;
};
});
}
};
} }


{ {
@@ -512,13 +491,13 @@ namespace MQTTnet.TestApp.UniversalWindows
} }
} }


_mqttServer = new MqttFactory().CreateMqttServer(o =>
{
o.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text);
o.Storage = storage;
});
_mqttServer = new MqttFactory().CreateMqttServer();
var options = new MqttServerOptions();
options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text);
options.Storage = storage;


await _mqttServer.StartAsync();
await _mqttServer.StartAsync(options);
} }


private async void StopServer(object sender, RoutedEventArgs e) private async void StopServer(object sender, RoutedEventArgs e)


Loading…
Cancel
Save