Browse Source

Merge branch 'master' into feature/low_level_client

release/3.x.x
Christian Kratky 4 years ago
parent
commit
0ca4c88e69
35 changed files with 368 additions and 256 deletions
  1. +1
    -1
      Build/MQTTnet.AspNetCore.nuspec
  2. +1
    -1
      Build/MQTTnet.Extensions.ManagedClient.nuspec
  3. +1
    -1
      Build/MQTTnet.Extensions.Rpc.nuspec
  4. +1
    -1
      Build/MQTTnet.Extensions.WebSocket4Net.nuspec
  5. +1
    -1
      Build/MQTTnet.NETStandard.nuspec
  6. +2
    -1
      Build/MQTTnet.nuspec
  7. +2
    -2
      Source/MQTTnet.AspnetCore/Extensions/ReaderExtensions.cs
  8. +9
    -9
      Source/MQTTnet.AspnetCore/Extensions/ServiceCollectionExtensions.cs
  9. +4
    -2
      Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj
  10. +4
    -4
      Source/MQTTnet.Extensions.ManagedClient/MqttFactoryExtensions.cs
  11. +12
    -3
      Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs
  12. +12
    -12
      Source/MQTTnet.Server/Mqtt/MqttServerService.cs
  13. +5
    -3
      Source/MQTTnet/Adapter/MqttChannelAdapter.cs
  14. +1
    -1
      Source/MQTTnet/Diagnostics/IMqttNetLogger.cs
  15. +7
    -18
      Source/MQTTnet/Diagnostics/MqttNetLogMessage.cs
  16. +2
    -1
      Source/MQTTnet/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs
  17. +39
    -7
      Source/MQTTnet/Diagnostics/MqttNetLogger.cs
  18. +20
    -26
      Source/MQTTnet/Implementations/MqttTcpChannel.cs
  19. +2
    -1
      Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs
  20. +8
    -21
      Source/MQTTnet/Internal/Disposable.cs
  21. +3
    -3
      Source/MQTTnet/MqttFactory.cs
  22. +6
    -5
      Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs
  23. +1
    -2
      Source/MQTTnet/Server/MqttClientConnection.cs
  24. +15
    -3
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  25. +3
    -3
      Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs
  26. +12
    -10
      Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs
  27. +22
    -25
      Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs
  28. +1
    -1
      Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs
  29. +46
    -27
      Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs
  30. +15
    -13
      Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs
  31. +25
    -23
      Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs
  32. +7
    -7
      Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs
  33. +50
    -0
      Tests/MQTTnet.Core.Tests/MqttNetLogger_Tests.cs
  34. +1
    -1
      Tests/MQTTnet.Core.Tests/Server_Tests.cs
  35. +27
    -17
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 1
- 1
Build/MQTTnet.AspNetCore.nuspec View File

@@ -11,7 +11,7 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is a support library to integrate MQTTnet into AspNetCore.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="$nugetVersion" />


+ 1
- 1
Build/MQTTnet.Extensions.ManagedClient.nuspec View File

@@ -11,7 +11,7 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is an extension library which provides a managed MQTT client with additional features using MQTTnet.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="$nugetVersion" />


+ 1
- 1
Build/MQTTnet.Extensions.Rpc.nuspec View File

@@ -11,7 +11,7 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is an extension library which allows executing synchronous device calls including a response using MQTTnet.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="$nugetVersion" />


+ 1
- 1
Build/MQTTnet.Extensions.WebSocket4Net.nuspec View File

@@ -11,7 +11,7 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is an extension library which allows using _WebSocket4Net_ as transport for MQTTnet clients.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="$nugetVersion" />


+ 1
- 1
Build/MQTTnet.NETStandard.nuspec View File

@@ -11,7 +11,7 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This package contains the .NET Standard version of MQTTnet only.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<group targetFramework="netstandard1.3">


+ 2
- 1
Build/MQTTnet.nuspec View File

@@ -11,6 +11,7 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol.</description>
<releaseNotes>
* [All] Due to a merge issue not all changes are included in 3.0.8. All these changes are now included in this version.
* [ManagedClient] Added builder class for MqttClientUnsubscribeOptions (thanks to @dominikviererbe).
* [ManagedClient] Added support for persisted sessions (thansk to @PMExtra).
* [Client] Improve connection stability (thanks to @jltjohanlindqvist).
@@ -23,7 +24,7 @@
* [MQTTnet.AspNetCore] improved compatibility with AspNetCore 3.1
* [LowLevelMqttClient] Added low level MQTT client in order to provide more flexibility when using the MQTT protocol. This client requires detailed knowledge about the MQTT protocol.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<group targetFramework="netstandard1.3">


+ 2
- 2
Source/MQTTnet.AspnetCore/Extensions/ReaderExtensions.cs View File

@@ -80,7 +80,7 @@ namespace MQTTnet.AspNetCore
headerLength = 0;
bodyLength = 0;
var temp = input.Slice(0, Math.Min(5, input.Length)).GetMemory();
var temp = input.Slice(0, Math.Min(5, input.Length)).GetMemory().Span;

do
{
@@ -88,7 +88,7 @@ namespace MQTTnet.AspNetCore
{
return false;
}
encodedByte = temp.Span[index];
encodedByte = temp[index];
index++;

value += (byte)(encodedByte & 127) * multiplier;


+ 9
- 9
Source/MQTTnet.AspnetCore/Extensions/ServiceCollectionExtensions.cs View File

@@ -1,10 +1,10 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Server;
using MQTTnet.Implementations;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet.Server;
using System;

namespace MQTTnet.AspNetCore
{
@@ -13,7 +13,7 @@ namespace MQTTnet.AspNetCore
public static IServiceCollection AddHostedMqttServer(this IServiceCollection services, IMqttServerOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
services.AddSingleton(options);

services.AddHostedMqttServer();
@@ -23,7 +23,8 @@ namespace MQTTnet.AspNetCore

public static IServiceCollection AddHostedMqttServer(this IServiceCollection services, Action<MqttServerOptionsBuilder> configure)
{
services.AddSingleton<IMqttServerOptions>(s => {
services.AddSingleton<IMqttServerOptions>(s =>
{
var builder = new MqttServerOptionsBuilder();
configure(builder);
return builder.Build();
@@ -36,7 +37,8 @@ namespace MQTTnet.AspNetCore

public static IServiceCollection AddHostedMqttServerWithServices(this IServiceCollection services, Action<AspNetMqttServerOptionsBuilder> configure)
{
services.AddSingleton<IMqttServerOptions>(s => {
services.AddSingleton<IMqttServerOptions>(s =>
{
var builder = new AspNetMqttServerOptionsBuilder(s);
configure(builder);
return builder.Build();
@@ -60,14 +62,12 @@ namespace MQTTnet.AspNetCore
private static IServiceCollection AddHostedMqttServer(this IServiceCollection services)
{
var logger = new MqttNetLogger();
var childLogger = logger.CreateChildLogger();

services.AddSingleton<IMqttNetLogger>(logger);
services.AddSingleton(childLogger);
services.AddSingleton<MqttHostedServer>();
services.AddSingleton<IHostedService>(s => s.GetService<MqttHostedServer>());
services.AddSingleton<IMqttServer>(s => s.GetService<MqttHostedServer>());
return services;
}



+ 4
- 2
Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj View File

@@ -14,11 +14,13 @@
<DefineConstants>RELEASE;NETSTANDARD2_0</DefineConstants>
</PropertyGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netcoreapp3.1' ">
<ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
<FrameworkReference Include="Microsoft.AspNetCore.App" />

<PackageReference Include="System.IO.Pipelines" Version="4.7.1" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' != 'netcoreapp3.1' ">
<ItemGroup Condition="'$(TargetFramework)' != 'netcoreapp3.1'">
<PackageReference Include="Microsoft.AspNetCore.Connections.Abstractions" Version="2.1.3" />
<PackageReference Include="Microsoft.AspNetCore.Http.Connections" Version="1.0.3" />
<PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="2.1.1" />


+ 4
- 4
Source/MQTTnet.Extensions.ManagedClient/MqttFactoryExtensions.cs View File

@@ -1,5 +1,5 @@
using System;
using MQTTnet.Diagnostics;
using MQTTnet.Diagnostics;
using System;

namespace MQTTnet.Extensions.ManagedClient
{
@@ -9,7 +9,7 @@ namespace MQTTnet.Extensions.ManagedClient
{
if (factory == null) throw new ArgumentNullException(nameof(factory));

return new ManagedMqttClient(factory.CreateMqttClient(), factory.DefaultLogger.CreateChildLogger());
return new ManagedMqttClient(factory.CreateMqttClient(), factory.DefaultLogger);
}

public static IManagedMqttClient CreateManagedMqttClient(this IMqttFactory factory, IMqttNetLogger logger)
@@ -17,7 +17,7 @@ namespace MQTTnet.Extensions.ManagedClient
if (factory == null) throw new ArgumentNullException(nameof(factory));
if (logger == null) throw new ArgumentNullException(nameof(logger));

return new ManagedMqttClient(factory.CreateMqttClient(logger), logger.CreateChildLogger());
return new ManagedMqttClient(factory.CreateMqttClient(logger), logger);
}
}
}

+ 12
- 3
Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs View File

@@ -21,15 +21,24 @@ namespace MQTTnet.Server.Logging
return new MqttNetLogger(source);
}

public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception)
public void Publish(MqttNetLogLevel level, string source, string message, object[] parameters, Exception exception)
{
var convertedLogLevel = ConvertLogLevel(logLevel);
var convertedLogLevel = ConvertLogLevel(level);
_logger.Log(convertedLogLevel, exception, message, parameters);

var logMessagePublishedEvent = LogMessagePublished;
if (logMessagePublishedEvent != null)
{
var logMessage = new MqttNetLogMessage(null, DateTime.UtcNow, Thread.CurrentThread.ManagedThreadId, source, logLevel, message, exception);
var logMessage = new MqttNetLogMessage
{
Timestamp = DateTime.UtcNow,
ThreadId = Thread.CurrentThread.ManagedThreadId,
Source = source,
Level = level,
Message = message,
Exception = exception
};

logMessagePublishedEvent.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage));
}
}


+ 12
- 12
Source/MQTTnet.Server/Mqtt/MqttServerService.cs View File

@@ -1,11 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Security.Authentication;
using System.Text;
using System.Threading.Tasks;
using IronPython.Runtime;
using IronPython.Runtime;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using MQTTnet.Adapter;
@@ -16,6 +9,13 @@ using MQTTnet.Protocol;
using MQTTnet.Server.Configuration;
using MQTTnet.Server.Scripting;
using MQTTnet.Server.Status;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Security.Authentication;
using System.Text;
using System.Threading.Tasks;

namespace MQTTnet.Server.Mqtt
{
@@ -65,11 +65,11 @@ namespace MQTTnet.Server.Mqtt
_pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

_webSocketServerAdapter = new MqttWebSocketServerAdapter(mqttFactory.Logger.CreateChildLogger());
_webSocketServerAdapter = new MqttWebSocketServerAdapter(mqttFactory.Logger);

var adapters = new List<IMqttServerAdapter>
{
new MqttTcpServerAdapter(mqttFactory.Logger.CreateChildLogger())
new MqttTcpServerAdapter(mqttFactory.Logger)
{
TreatSocketOpeningErrorAsWarning = true // Opening other ports than for HTTP is not allows in Azure App Services.
},
@@ -215,7 +215,7 @@ namespace MQTTnet.Server.Mqtt
options
.WithEncryptedEndpoint()
.WithEncryptionSslProtocol(SslProtocols.Tls12);
if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Path))
{
IMqttServerCertificateCredentials certificateCredentials = null;
@@ -230,7 +230,7 @@ namespace MQTTnet.Server.Mqtt

options.WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.Certificate.ReadCertificate(), certificateCredentials);
}
if (_settings.EncryptedTcpEndPoint.TryReadIPv4(out var address4))
{
options.WithEncryptedEndpointBoundIPAddress(address4);


+ 5
- 3
Source/MQTTnet/Adapter/MqttChannelAdapter.cs View File

@@ -19,14 +19,14 @@ namespace MQTTnet.Adapter
const uint ErrorOperationAborted = 0x800703E3;
const int ReadBufferSize = 4096; // TODO: Move buffer size to config

readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1);

readonly IMqttNetLogger _logger;
readonly IMqttChannel _channel;
readonly MqttPacketReader _packetReader;

readonly byte[] _fixedHeaderBuffer = new byte[2];

SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1);

long _bytesReceived;
long _bytesSent;

@@ -143,7 +143,7 @@ namespace MQTTnet.Adapter
}
finally
{
_writerSemaphore.Release();
_writerSemaphore?.Release();
}
}

@@ -212,7 +212,9 @@ namespace MQTTnet.Adapter
if (disposing)
{
_channel?.Dispose();

_writerSemaphore?.Dispose();
_writerSemaphore = null;
}

base.Dispose(disposing);


+ 1
- 1
Source/MQTTnet/Diagnostics/IMqttNetLogger.cs View File

@@ -6,7 +6,7 @@ namespace MQTTnet.Diagnostics
{
event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;

IMqttNetLogger CreateChildLogger(string source = null);
IMqttNetLogger CreateChildLogger(string source);

void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception);
}


+ 7
- 18
Source/MQTTnet/Diagnostics/MqttNetLogMessage.cs View File

@@ -4,30 +4,19 @@ namespace MQTTnet.Diagnostics
{
public class MqttNetLogMessage
{
public MqttNetLogMessage(string logId, DateTime timestamp, int threadId, string source, MqttNetLogLevel level, string message, Exception exception)
{
LogId = logId;
Timestamp = timestamp;
ThreadId = threadId;
Source = source;
Level = level;
Message = message;
Exception = exception;
}

public string LogId { get; }
public string LogId { get; set; }

public DateTime Timestamp { get; }
public DateTime Timestamp { get; set; }

public int ThreadId { get; }
public int ThreadId { get; set; }

public string Source { get; }
public string Source { get; set; }

public MqttNetLogLevel Level { get; }
public MqttNetLogLevel Level { get; set; }

public string Message { get; }
public string Message { get; set; }

public Exception Exception { get; }
public Exception Exception { get; set; }

public override string ToString()
{


+ 2
- 1
Source/MQTTnet/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs View File

@@ -6,8 +6,9 @@ namespace MQTTnet.Diagnostics
{
public MqttNetLogMessagePublishedEventArgs(MqttNetLogMessage logMessage)
{
TraceMessage = logMessage ?? throw new ArgumentNullException(nameof(logMessage));
LogMessage = logMessage ?? throw new ArgumentNullException(nameof(logMessage));

TraceMessage = logMessage ?? throw new ArgumentNullException(nameof(logMessage));
}

[Obsolete("Use new proeprty LogMessage instead.")]


+ 39
- 7
Source/MQTTnet/Diagnostics/MqttNetLogger.cs View File

@@ -7,6 +7,8 @@ namespace MQTTnet.Diagnostics
readonly string _logId;
readonly string _source;

readonly MqttNetLogger _parentLogger;

public MqttNetLogger(string source, string logId = null)
{
_source = source;
@@ -17,19 +19,31 @@ namespace MQTTnet.Diagnostics
{
}

MqttNetLogger(MqttNetLogger parentLogger, string logId, string source)
{
_parentLogger = parentLogger ?? throw new ArgumentNullException(nameof(parentLogger));
_source = source ?? throw new ArgumentNullException(nameof(source));

_logId = logId;
}

public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;

public IMqttNetLogger CreateChildLogger(string source = null)
// TODO: Consider creating a LoggerFactory which will allow creating loggers. The logger factory will
// be the only place which has the published event.
public IMqttNetLogger CreateChildLogger(string source)
{
return new MqttNetLogger(source, _logId);
if (source is null) throw new ArgumentNullException(nameof(source));

return new MqttNetLogger(this, _logId, source);
}

public void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception)
public void Publish(MqttNetLogLevel level, string message, object[] parameters, Exception exception)
{
var hasLocalListeners = LogMessagePublished != null;
var hasGlobalListeners = MqttNetGlobalLogger.HasListeners;

if (!hasLocalListeners && !hasGlobalListeners)
if (!hasLocalListeners && !hasGlobalListeners && _parentLogger == null)
{
return;
}
@@ -46,17 +60,35 @@ namespace MQTTnet.Diagnostics
}
}

var traceMessage = new MqttNetLogMessage(_logId, DateTime.UtcNow, Environment.CurrentManagedThreadId, _source, logLevel, message, exception);
var logMessage = new MqttNetLogMessage
{
LogId = _logId,
Timestamp = DateTime.UtcNow,
Source = _source,
ThreadId = Environment.CurrentManagedThreadId,
Level = level,
Message = message,
Exception = exception
};

if (hasGlobalListeners)
{
MqttNetGlobalLogger.Publish(traceMessage);
MqttNetGlobalLogger.Publish(logMessage);
}

if (hasLocalListeners)
{
LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(traceMessage));
LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage));
}

_parentLogger?.Publish(logMessage);
}

void Publish(MqttNetLogMessage logMessage)
{
LogMessagePublished?.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage));

_parentLogger?.Publish(logMessage);
}
}
}

+ 20
- 26
Source/MQTTnet/Implementations/MqttTcpChannel.cs View File

@@ -1,25 +1,24 @@
#if !WINDOWS_UWP
using MQTTnet.Channel;
using MQTTnet.Client.Options;
using System;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using System.IO;
using System.Linq;
using System.Net.Security;
using System.Net.Sockets;
using System.Runtime.ExceptionServices;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using MQTTnet.Channel;
using MQTTnet.Client.Options;
using MQTTnet.Internal;
using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public class MqttTcpChannel : Disposable, IMqttChannel
public sealed class MqttTcpChannel : IDisposable, IMqttChannel
{
private readonly IMqttClientOptions _clientOptions;
private readonly MqttClientTcpOptions _options;
readonly IMqttClientOptions _clientOptions;
readonly MqttClientTcpOptions _options;

private Stream _stream;
Stream _stream;

public MqttTcpChannel(IMqttClientOptions clientOptions)
{
@@ -69,7 +68,7 @@ namespace MQTTnet.Implementations
// of the actual value.
socket.DualMode = _options.DualMode.Value;
}
// Workaround for: workaround for https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(() => socket.Dispose()))
{
@@ -83,7 +82,7 @@ namespace MQTTnet.Implementations
var sslStream = new SslStream(networkStream, false, InternalUserCertificateValidationCallback);
_stream = sslStream;

await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, !_options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, !_options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
}
else
{
@@ -95,12 +94,14 @@ namespace MQTTnet.Implementations

public Task DisconnectAsync(CancellationToken cancellationToken)
{
Cleanup();
Dispose();
return Task.FromResult(0);
}

public async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer is null) throw new ArgumentNullException(nameof(buffer));

try
{
// Workaround for: https://github.com/dotnet/corefx/issues/24430
@@ -131,6 +132,8 @@ namespace MQTTnet.Implementations

public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer is null) throw new ArgumentNullException(nameof(buffer));

try
{
// Workaround for: https://github.com/dotnet/corefx/issues/24430
@@ -159,7 +162,7 @@ namespace MQTTnet.Implementations
}
}

private void Cleanup()
public void Dispose()
{
// When the stream is disposed it will also close the socket and this will also dispose it.
// So there is no need to dispose the socket again.
@@ -178,16 +181,7 @@ namespace MQTTnet.Implementations
_stream = null;
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
Cleanup();
}
base.Dispose(disposing);
}

private bool InternalUserCertificateValidationCallback(object sender, X509Certificate x509Certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
bool InternalUserCertificateValidationCallback(object sender, X509Certificate x509Certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
if (_options.TlsOptions.CertificateValidationCallback != null)
{
@@ -218,7 +212,7 @@ namespace MQTTnet.Implementations
return _options.TlsOptions.AllowUntrustedCertificates;
}

private X509CertificateCollection LoadCertificates()
X509CertificateCollection LoadCertificates()
{
var certificates = new X509CertificateCollection();
if (_options.TlsOptions.Certificates == null)


+ 2
- 1
Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs View File

@@ -7,6 +7,7 @@ namespace MQTTnet.Implementations
{
public static class PlatformAbstractionLayer
{
// TODO: Consider creating primitives like "MqttNetSocket" which will wrap all required methods and do the platform stuff.
public static async Task<Socket> AcceptAsync(Socket socket)
{
#if NET452 || NET461
@@ -90,7 +91,7 @@ namespace MQTTnet.Implementations

public static Task CompletedTask
{
get
get
{
#if NET452
return Task.FromResult(0);


+ 8
- 21
Source/MQTTnet/Internal/Disposable.cs View File

@@ -2,32 +2,20 @@

namespace MQTTnet.Internal
{
public class Disposable : IDisposable
public abstract class Disposable : IDisposable
{
protected bool IsDisposed => _isDisposed;
protected bool IsDisposed { get; private set; } = false;

protected void ThrowIfDisposed()
{
if (_isDisposed)
if (IsDisposed)
{
throw new ObjectDisposedException(GetType().Name);
}
}


#region IDisposable Support
private bool _isDisposed = false; // To detect redundant calls

protected virtual void Dispose(bool disposing)
{
if (disposing)
{
// TODO: dispose managed state (managed objects).
}

// TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
// TODO: set large fields to null.
}

// TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
@@ -40,18 +28,17 @@ namespace MQTTnet.Internal
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
if (_isDisposed)
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.

if (IsDisposed)
{
return;
}

_isDisposed = true;
IsDisposed = true;

// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// TODO: uncomment the following line if the finalizer is overridden above.
// GC.SuppressFinalize(this);
GC.SuppressFinalize(this);
}
#endregion
}
}

+ 3
- 3
Source/MQTTnet/MqttFactory.cs View File

@@ -93,7 +93,7 @@ namespace MQTTnet
{
if (logger == null) throw new ArgumentNullException(nameof(logger));

return CreateMqttServer(new List<IMqttServerAdapter> { new MqttTcpServerAdapter(logger.CreateChildLogger()) }, logger);
return CreateMqttServer(new List<IMqttServerAdapter> { new MqttTcpServerAdapter(logger) }, logger);
}

public IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> serverAdapters, IMqttNetLogger logger)
@@ -101,14 +101,14 @@ namespace MQTTnet
if (serverAdapters == null) throw new ArgumentNullException(nameof(serverAdapters));
if (logger == null) throw new ArgumentNullException(nameof(logger));

return new MqttServer(serverAdapters, logger.CreateChildLogger());
return new MqttServer(serverAdapters, logger);
}

public IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> serverAdapters)
{
if (serverAdapters == null) throw new ArgumentNullException(nameof(serverAdapters));

return new MqttServer(serverAdapters, DefaultLogger.CreateChildLogger());
return new MqttServer(serverAdapters, DefaultLogger);
}
}
}

+ 6
- 5
Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs View File

@@ -1,9 +1,9 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Exceptions;
using MQTTnet.Exceptions;
using MQTTnet.Internal;
using MQTTnet.Packets;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.PacketDispatcher
{
@@ -12,7 +12,7 @@ namespace MQTTnet.PacketDispatcher
private readonly TaskCompletionSource<MqttBasePacket> _taskCompletionSource;
private readonly ushort? _packetIdentifier;
private readonly MqttPacketDispatcher _owningPacketDispatcher;
public MqttPacketAwaiter(ushort? packetIdentifier, MqttPacketDispatcher owningPacketDispatcher)
{
_packetIdentifier = packetIdentifier;
@@ -87,6 +87,7 @@ namespace MQTTnet.PacketDispatcher
{
_owningPacketDispatcher.RemovePacketAwaiter<TPacket>(_packetIdentifier);
}

base.Dispose(disposing);
}
}

+ 1
- 2
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -437,9 +437,8 @@ namespace MQTTnet.Server
{
_logger.Warning(exception, "Sending publish packet failed: Communication exception (ClientId: {0}).", ClientId);
}
else if (exception is OperationCanceledException && _cancellationToken.Token.IsCancellationRequested)
else if (exception is OperationCanceledException)
{
// The cancellation was triggered externally.
}
else
{


+ 15
- 3
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -1,5 +1,6 @@
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Formatter;
using MQTTnet.Internal;
using MQTTnet.Packets;
@@ -236,12 +237,23 @@ namespace MQTTnet.Server
string clientId = null;
var clientWasConnected = true;

MqttConnectPacket connectPacket = null;

try
{
var firstPacket = await channelAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false);
if (!(firstPacket is MqttConnectPacket connectPacket))
try
{
var firstPacket = await channelAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false);
connectPacket = firstPacket as MqttConnectPacket;
if (connectPacket == null)
{
_logger.Warning(null, "The first packet from client '{0}' was no 'CONNECT' packet [MQTT-3.1.0-1].", channelAdapter.Endpoint);
return;
}
}
catch (MqttCommunicationTimedOutException)
{
_logger.Warning(null, "The first packet from client '{0}' was no 'CONNECT' packet [MQTT-3.1.0-1].", channelAdapter.Endpoint);
_logger.Warning(null, "Client '{0}' connected but did not sent a CONNECT packet.", channelAdapter.Endpoint);
return;
}



+ 3
- 3
Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs View File

@@ -1,11 +1,11 @@
using BenchmarkDotNet.Attributes;
using MQTTnet.Channel;
using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;
using MQTTnet.Implementations;
using MQTTnet.Server;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client.Options;

namespace MQTTnet.Benchmarks
{
@@ -20,7 +20,7 @@ namespace MQTTnet.Benchmarks
public void Setup()
{
var factory = new MqttFactory();
var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger());
tcpServer.ClientHandler += args =>
{
_serverChannel =
@@ -30,7 +30,7 @@ namespace MQTTnet.Benchmarks

return Task.CompletedTask;
};
_mqttServer = factory.CreateMqttServer(new[] { tcpServer }, new MqttNetLogger());

var serverOptions = new MqttServerOptionsBuilder().Build();


+ 12
- 10
Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs View File

@@ -1,9 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Options;
@@ -12,6 +7,11 @@ using MQTTnet.Diagnostics;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Server;
using MQTTnet.Tests.Mockups;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Tests
{
@@ -69,7 +69,7 @@ namespace MQTTnet.Tests
.WithTcpServer("localhost", testEnvironment.ServerPort)
.WithWillMessage(willMessage);
var dyingClient = testEnvironment.CreateClient();
var dyingManagedClient = new ManagedMqttClient(dyingClient, testEnvironment.ClientLogger.CreateChildLogger());
var dyingManagedClient = new ManagedMqttClient(dyingClient, testEnvironment.ClientLogger);
await dyingManagedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
.WithClientOptions(clientOptions)
.Build());
@@ -96,7 +96,7 @@ namespace MQTTnet.Tests

var server = await testEnvironment.StartServerAsync();

var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger());
var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger());
var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", testEnvironment.ServerPort);

@@ -110,6 +110,8 @@ namespace MQTTnet.Tests

await managedClient.StopAsync();

await Task.Delay(500);

Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count);
}
}
@@ -126,7 +128,7 @@ namespace MQTTnet.Tests

var server = await testEnvironment.StartServerAsync();

var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger());
var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger());
var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", testEnvironment.ServerPort);
var storage = new ManagedMqttClientTestStorage();
@@ -349,7 +351,7 @@ namespace MQTTnet.Tests
managedOptions.ConnectionCheckInterval = connectionCheckInterval ?? TimeSpan.FromSeconds(0.1);

var managedClient =
new ManagedMqttClient(underlyingClient ?? testEnvironment.CreateClient(), new MqttNetLogger().CreateChildLogger());
new ManagedMqttClient(underlyingClient ?? testEnvironment.CreateClient(), new MqttNetLogger());

var connected = GetConnectedTask(managedClient);



+ 22
- 25
Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs View File

@@ -29,37 +29,34 @@ namespace MQTTnet.Tests.Mockups

public IMqttClientOptions Options => Implementation.Options;

public IMqttClientConnectedHandler ConnectedHandler { get => Implementation.ConnectedHandler; set => Implementation.ConnectedHandler = value; }
public IMqttClientDisconnectedHandler DisconnectedHandler { get => Implementation.DisconnectedHandler; set => Implementation.DisconnectedHandler = value; }
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get => Implementation.ApplicationMessageReceivedHandler; set => Implementation.ApplicationMessageReceivedHandler = value; }
public IMqttClientConnectedHandler ConnectedHandler
{
get => Implementation.ConnectedHandler;
set => Implementation.ConnectedHandler = value;
}

public IMqttClientDisconnectedHandler DisconnectedHandler
{
get => Implementation.DisconnectedHandler;
set => Implementation.DisconnectedHandler = value;
}

public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
{
get => Implementation.ApplicationMessageReceivedHandler;
set => Implementation.ApplicationMessageReceivedHandler = value;
}

public Task<MqttClientAuthenticateResult> ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken)
{
if (TestContext != null)
{
switch (options)
var clientOptions = (MqttClientOptions)options;

var existingClientId = clientOptions.ClientId;
if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName))
{
case MqttClientOptionsBuilder builder:
{
var existingClientId = builder.Build().ClientId;
if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName))
{
builder.WithClientId(TestContext.TestName + existingClientId);
}

break;
}

case MqttClientOptions op:
{
var existingClientId = op.ClientId;
if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName))
{
op.ClientId = TestContext.TestName + existingClientId;
}

break;
}
clientOptions.ClientId = TestContext.TestName + existingClientId;
}
}



+ 1
- 1
Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs View File

@@ -7,7 +7,7 @@ namespace MQTTnet.Tests.Mockups
{
public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;

public IMqttNetLogger CreateChildLogger(string source = null)
public IMqttNetLogger CreateChildLogger(string source)
{
return new TestLogger();
}


+ 46
- 27
Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs View File

@@ -22,16 +22,50 @@ namespace MQTTnet.Tests.Mockups
public IMqttServer Implementation { get; }
public TestContext TestContext { get; }
public TestEnvironment TestEnvironment { get; }
public IMqttServerStartedHandler StartedHandler { get => Implementation.StartedHandler; set => Implementation.StartedHandler = value; }
public IMqttServerStoppedHandler StoppedHandler { get => Implementation.StoppedHandler; set => Implementation.StoppedHandler = value; }
public IMqttServerClientConnectedHandler ClientConnectedHandler { get => Implementation.ClientConnectedHandler; set => Implementation.ClientConnectedHandler = value; }
public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get => Implementation.ClientDisconnectedHandler; set => Implementation.ClientDisconnectedHandler = value; }
public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler { get => Implementation.ClientSubscribedTopicHandler; set => Implementation.ClientSubscribedTopicHandler = value; }
public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler { get => Implementation.ClientUnsubscribedTopicHandler; set => Implementation.ClientUnsubscribedTopicHandler = value; }

public IMqttServerStartedHandler StartedHandler
{
get => Implementation.StartedHandler;
set => Implementation.StartedHandler = value;
}

public IMqttServerStoppedHandler StoppedHandler
{
get => Implementation.StoppedHandler;
set => Implementation.StoppedHandler = value;
}

public IMqttServerClientConnectedHandler ClientConnectedHandler
{
get => Implementation.ClientConnectedHandler;
set => Implementation.ClientConnectedHandler = value;
}

public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler
{
get => Implementation.ClientDisconnectedHandler;
set => Implementation.ClientDisconnectedHandler = value;
}

public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler
{
get => Implementation.ClientSubscribedTopicHandler;
set => Implementation.ClientSubscribedTopicHandler = value;
}

public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler
{
get => Implementation.ClientUnsubscribedTopicHandler;
set => Implementation.ClientUnsubscribedTopicHandler = value;
}

public IMqttServerOptions Options => Implementation.Options;

public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get => Implementation.ApplicationMessageReceivedHandler; set => Implementation.ApplicationMessageReceivedHandler = value; }
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
{
get => Implementation.ApplicationMessageReceivedHandler;
set => Implementation.ApplicationMessageReceivedHandler = value;
}

public Task ClearRetainedApplicationMessagesAsync()
{
@@ -62,26 +96,11 @@ namespace MQTTnet.Tests.Mockups
{
if (TestContext != null)
{
switch (options)
var serverOptions = (MqttServerOptions)options;

if (serverOptions.ConnectionValidator == null)
{
case MqttServerOptionsBuilder builder:
{
if (builder.Build().ConnectionValidator == null)
{
builder.WithConnectionValidator(ConnectionValidator);
}

break;
}
case MqttServerOptions op:
{
if (op.ConnectionValidator == null)
{
op.ConnectionValidator = new MqttServerConnectionValidatorDelegate(ConnectionValidator);
}

break;
}
serverOptions.ConnectionValidator = new MqttServerConnectionValidatorDelegate(ConnectionValidator);
}
}

@@ -92,7 +111,7 @@ namespace MQTTnet.Tests.Mockups
{
if (!ctx.ClientId.StartsWith(TestContext.TestName))
{
TestEnvironment.TrackException(new InvalidOperationException($"invalid client connected '{ctx.ClientId}'"));
TestEnvironment.TrackException(new InvalidOperationException($"Invalid client ID used ({ctx.ClientId}). It must start with UnitTest name."));
ctx.ReasonCode = Protocol.MqttConnectReasonCode.ClientIdentifierNotValid;
}
}


+ 15
- 13
Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs View File

@@ -1,9 +1,3 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
@@ -14,6 +8,12 @@ using MQTTnet.Exceptions;
using MQTTnet.Protocol;
using MQTTnet.Server;
using MQTTnet.Tests.Mockups;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Tests
{
@@ -29,9 +29,9 @@ namespace MQTTnet.Tests
{
await testEnvironment.StartServerAsync();
var client = await testEnvironment.ConnectClientAsync();
await client.SubscribeAsync("#");
var replyReceived = false;

client.UseApplicationMessageReceivedHandler(c =>
@@ -78,7 +78,7 @@ namespace MQTTnet.Tests
}
});

client2.UseApplicationMessageReceivedHandler(async c =>{ await client2.PublishAsync("reply", null, MqttQualityOfServiceLevel.AtLeastOnce); });
client2.UseApplicationMessageReceivedHandler(async c => { await client2.PublishAsync("reply", null, MqttQualityOfServiceLevel.AtLeastOnce); });

await client1.PublishAsync("request", null, MqttQualityOfServiceLevel.AtLeastOnce);

@@ -181,7 +181,7 @@ namespace MQTTnet.Tests
catch
{
}
SpinWait.SpinUntil(() => tries >= maxTries, 10000);

Assert.AreEqual(maxTries, tries);
@@ -215,7 +215,7 @@ namespace MQTTnet.Tests
Assert.AreEqual((ushort)4, result.PacketIdentifier);
}
}
[TestMethod]
public async Task Invalid_Connect_Throws_Exception()
{
@@ -558,6 +558,8 @@ namespace MQTTnet.Tests
clients.Add(await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a")));
}

await Task.Delay(500);

var clientStatus = await testEnvironment.Server.GetClientStatusAsync();
var sessionStatus = await testEnvironment.Server.GetSessionStatusAsync();

@@ -565,7 +567,7 @@ namespace MQTTnet.Tests
{
Assert.IsFalse(clients[i].IsConnected);
}
Assert.IsTrue(clients[99].IsConnected);

Assert.AreEqual(1, clientStatus.Count);
@@ -583,7 +585,7 @@ namespace MQTTnet.Tests
var sendClient = await testEnvironment.ConnectClientAsync();
await sendClient.PublishAsync("x", "1");

await Task.Delay(100);
await Task.Delay(250);

Assert.AreEqual("1", receivedPayload);
}


+ 25
- 23
Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs View File

@@ -15,35 +15,36 @@ namespace MQTTnet.Tests
{
var factory = new MqttFactory();

//This test compares
//1. correct logID
// This test compares
// 1. correct logID
var logId = "logId";
string invalidLogId = null;
var hasInvalidLogId = false;

//2. if the total log calls are the same for global and local
var globalLogCount = 0;
// 2. if the total log calls are the same for global and local
//var globalLogCount = 0;
var localLogCount = 0;

var logger = new MqttNetLogger(logId);

//we have a theoretical bug here if a concurrent test is also logging
var globalLog = new EventHandler<MqttNetLogMessagePublishedEventArgs>((s, e) =>
{
if (logId != e.LogMessage.LogId)
{
invalidLogId = e.LogMessage.LogId;
}
// TODO: This is commented out because it is affected by other tests.
//// we have a theoretical bug here if a concurrent test is also logging
//var globalLog = new EventHandler<MqttNetLogMessagePublishedEventArgs>((s, e) =>
//{
// if (e.TraceMessage.LogId != logId)
// {
// invalidLogId = e.TraceMessage.LogId;
// }

Interlocked.Increment(ref globalLogCount);
});
// Interlocked.Increment(ref globalLogCount);
//});

MqttNetGlobalLogger.LogMessagePublished += globalLog;
//MqttNetGlobalLogger.LogMessagePublished += globalLog;

logger.LogMessagePublished += (s, e) =>
{
if (logId != e.LogMessage.LogId)
if (e.LogMessage.LogId != logId)
{
invalidLogId = e.LogMessage.LogId;
hasInvalidLogId = true;
}

Interlocked.Increment(ref localLogCount);
@@ -56,10 +57,10 @@ namespace MQTTnet.Tests

clientOptions.WithClientOptions(o => o.WithTcpServer("this_is_an_invalid_host").WithCommunicationTimeout(TimeSpan.FromSeconds(1)));

//try connect to get some log entries
// try connect to get some log entries
await managedClient.StartAsync(clientOptions.Build());

//wait at least connect timeout or we have some log messages
// wait at least connect timeout or we have some log messages
var tcs = new TaskCompletionSource<object>();
managedClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(e => tcs.TrySetResult(null));
await Task.WhenAny(Task.Delay(managedClient.Options.ClientOptions.CommunicationTimeout), tcs.Task);
@@ -68,12 +69,13 @@ namespace MQTTnet.Tests
{
await managedClient.StopAsync();

MqttNetGlobalLogger.LogMessagePublished -= globalLog;
//MqttNetGlobalLogger.LogMessagePublished -= globalLog;
}

Assert.IsNull(invalidLogId);
Assert.AreNotEqual(0, globalLogCount);
Assert.AreEqual(globalLogCount, localLogCount);
await Task.Delay(500);

Assert.IsFalse(hasInvalidLogId);
Assert.AreNotEqual(0, localLogCount);
}
}
}

+ 7
- 7
Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs View File

@@ -1,12 +1,12 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Server;
using MQTTnet.Server.Status;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Tests
{
@@ -23,7 +23,7 @@ namespace MQTTnet.Tests
counter++;
return Task.CompletedTask;
},
new MqttNetLogger().CreateChildLogger());
new MqttNetLogger());

Assert.AreEqual(0, counter);

@@ -46,7 +46,7 @@ namespace MQTTnet.Tests
counter++;
return Task.CompletedTask;
},
new MqttNetLogger().CreateChildLogger());
new MqttNetLogger());

Assert.AreEqual(0, counter);



+ 50
- 0
Tests/MQTTnet.Core.Tests/MqttNetLogger_Tests.cs View File

@@ -0,0 +1,50 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Diagnostics;

namespace MQTTnet.Tests
{
[TestClass]
public class MqttNetLogger_Tests
{
[TestMethod]
public void Root_Log_Messages()
{
var logger = new MqttNetLogger();

var logMessagesCount = 0;

logger.LogMessagePublished += (s, e) =>
{
logMessagesCount++;
};

logger.Verbose("Verbose");
logger.Info("Info");
logger.Warning(null, "Warning");
logger.Error(null, "Error");

Assert.AreEqual(4, logMessagesCount);
}

[TestMethod]
public void Bubbling_Log_Messages()
{
var logger = new MqttNetLogger();
var childLogger = logger.CreateChildLogger("Source1");

var logMessagesCount = 0;

logger.LogMessagePublished += (s, e) =>
{
logMessagesCount++;
};

childLogger.Verbose("Verbose");
childLogger.Info("Info");
childLogger.Warning(null, "Warning");
childLogger.Error(null, "Error");

Assert.AreEqual(4, logMessagesCount);
}
}
}

+ 1
- 1
Tests/MQTTnet.Core.Tests/Server_Tests.cs View File

@@ -1050,6 +1050,7 @@ namespace MQTTnet.Tests
Assert.AreEqual("c", flow);

// dc
// Connect client with same client ID. Should disconnect existing client.
var c2 = await testEnvironment.ConnectClientAsync(clientOptionsBuilder);

c2.UseApplicationMessageReceivedHandler(_ =>
@@ -1075,7 +1076,6 @@ namespace MQTTnet.Tests
flow = string.Join(string.Empty, events);
Assert.AreEqual("cdcr", flow);


// nothing

Assert.AreEqual(false, c1.IsConnected);


+ 27
- 17
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs View File

@@ -1,12 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.ObjectModel;
using System.Text;
using System.Threading.Tasks;
using Windows.Security.Cryptography.Certificates;
using Windows.UI.Core;
using Windows.UI.Xaml;
using MQTTnet.Client;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
@@ -14,14 +6,22 @@ using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Extensions.Rpc;
using MQTTnet.Extensions.WebSocket4Net;
using MQTTnet.Formatter;
using MQTTnet.Implementations;
using MQTTnet.Protocol;
using MQTTnet.Server;
using MQTTnet.Server.Status;
using System;
using System.Collections.Concurrent;
using System.Collections.ObjectModel;
using System.Text;
using System.Threading.Tasks;
using Windows.Security.Cryptography.Certificates;
using Windows.UI.Core;
using Windows.UI.Xaml;
using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs;
using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs;
using MQTTnet.Extensions.WebSocket4Net;

namespace MQTTnet.TestApp.UniversalWindows
{
@@ -141,7 +141,7 @@ namespace MQTTnet.TestApp.UniversalWindows
Password = Encoding.UTF8.GetBytes(Password.Text)
};
}
options.CleanSession = CleanSession.IsChecked == true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(double.Parse(KeepAliveInterval.Text));

@@ -198,16 +198,26 @@ namespace MQTTnet.TestApp.UniversalWindows

private void OnDisconnected(MqttClientDisconnectedEventArgs e)
{
_traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
"", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null));
_traceMessages.Enqueue(new MqttNetLogMessage
{
Timestamp = DateTime.UtcNow,
ThreadId = -1,
Level = MqttNetLogLevel.Info,
Message = "! DISCONNECTED EVENT FIRED",
});

Task.Run(UpdateLogAsync);
}

private void OnConnected(MqttClientConnectedEventArgs e)
{
_traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
"", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null));
_traceMessages.Enqueue(new MqttNetLogMessage
{
Timestamp = DateTime.UtcNow,
ThreadId = -1,
Level = MqttNetLogLevel.Info,
Message = "! CONNECTED EVENT FIRED",
});

Task.Run(UpdateLogAsync);
}
@@ -538,7 +548,7 @@ namespace MQTTnet.TestApp.UniversalWindows
{
//...
}
client.UseApplicationMessageReceivedHandler(e => Handler(e));

// Subscribe after connect
@@ -614,7 +624,7 @@ namespace MQTTnet.TestApp.UniversalWindows
};
}
}
// ----------------------------------
{
var options = new MqttServerOptions();


Loading…
Cancel
Save