@@ -14,12 +14,12 @@ | |||
<releaseNotes> | |||
* [Core] Renamed some topic filter relevant classes (BREAKING CHANGE!). | |||
* [Core] Improved task management for UWP connections (thanks to @xgstation). | |||
* [LowLevelMqttClient] | |||
* [Client] Added method to trigger PING/PONG manually (connection check etc.). | |||
* [Client] Added support for certificate validation callback when using Web Sockets (requires netstandard2.1+). | |||
* [Client] Fixed a memory leak when web socket based connections trying to reconnect with an offline server. | |||
* [ManagedClient] Added method to trigger PING/PONG manually (connection check etc.). | |||
* [Server] | |||
* [MQTTnet.AspNetCore] improved compatibility with AspNetCore 3.1 | |||
* [MQTTnet.Server] | |||
* [MQTTnet.AspNetCore] improved compatibility with AspNetCore 3.1. | |||
* [MQTTnet.Server] Fixed wrong version output. | |||
</releaseNotes> | |||
<copyright>Copyright Christian Kratky 2016-2020</copyright> | |||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | |||
@@ -36,6 +36,12 @@ | |||
<dependency id="System.Net.WebSockets" version="4.3.0" /> | |||
<dependency id="System.Net.WebSockets.Client" version="4.3.2" /> | |||
</group> | |||
<group targetFramework="netstandard2.1"> | |||
<dependency id="NETStandard.Library" version="2.0.0" /> | |||
<dependency id="System.Net.Security" version="4.3.2" /> | |||
<dependency id="System.Net.WebSockets" version="4.3.0" /> | |||
<dependency id="System.Net.WebSockets.Client" version="4.3.2" /> | |||
</group> | |||
<group targetFramework="uap10.0"> | |||
<dependency id="Microsoft.NETCore.UniversalWindowsPlatform" version="6.2.10" /> | |||
</group> | |||
@@ -55,6 +61,9 @@ | |||
<!-- .NET Standard 2.0 --> | |||
<file src="..\Source\MQTTnet\bin\Release\netstandard2.0\MQTTnet.*" target="lib\netstandard2.0\"/> | |||
<!-- .NET Standard 2.1 --> | |||
<file src="..\Source\MQTTnet\bin\Release\netstandard2.1\MQTTnet.*" target="lib\netstandard2.1\"/> | |||
<!-- Universal Windows --> | |||
<file src="..\Source\MQTTnet\bin\Release\uap10.0\MQTTnet.*" target="lib\uap10.0\"/> | |||
@@ -28,6 +28,7 @@ vstest.console.exe ..\Tests\MQTTnet.AspNetCore.Tests\bin\Release\netcoreapp3.1\M | |||
&$msbuild ..\Source\MQTTnet\MQTTnet.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet\MQTTnet.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet\MQTTnet.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet\MQTTnet.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.1" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet\MQTTnet.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
# Build the ASP.NET Core 2.0 extension | |||
@@ -38,6 +39,7 @@ vstest.console.exe ..\Tests\MQTTnet.AspNetCore.Tests\bin\Release\netcoreapp3.1\M | |||
&$msbuild ..\Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.1" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
# Build the Managed Client extension | |||
@@ -45,6 +47,7 @@ vstest.console.exe ..\Tests\MQTTnet.AspNetCore.Tests\bin\Release\netcoreapp3.1\M | |||
&$msbuild ..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.1" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
# Build the WebSocket4Net extension | |||
@@ -52,6 +55,7 @@ vstest.console.exe ..\Tests\MQTTnet.AspNetCore.Tests\bin\Release\netcoreapp3.1\M | |||
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.1" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
# Create NuGet packages. | |||
@@ -61,7 +61,7 @@ namespace MQTTnet.Client | |||
{ | |||
get | |||
{ | |||
return _isConnected || Interlocked.Read(ref _isDisconnectPending) != 0; | |||
return _isConnected && Interlocked.Read(ref _isDisconnectPending) == 0; | |||
} | |||
} | |||
@@ -0,0 +1,16 @@ | |||
using System.Net.Security; | |||
using System.Security.Cryptography.X509Certificates; | |||
namespace MQTTnet.Client.Options | |||
{ | |||
public class MqttClientCertificateValidationCallbackContext | |||
{ | |||
public X509Certificate Certificate { get; set; } | |||
public X509Chain Chain { get; set; } | |||
public SslPolicyErrors SslPolicyErrors { get; set; } | |||
public IMqttClientChannelOptions ClientOptions { get; set; } | |||
} | |||
} |
@@ -262,6 +262,7 @@ namespace MQTTnet.Client.Options | |||
Certificates = _tlsParameters.Certificates?.ToList(), | |||
#endif | |||
CertificateValidationCallback = _tlsParameters.CertificateValidationCallback, | |||
CertificateValidationHandler = _tlsParameters.CertificateValidationHandler, | |||
IgnoreCertificateChainErrors = _tlsParameters.IgnoreCertificateChainErrors, | |||
IgnoreCertificateRevocationErrors = _tlsParameters.IgnoreCertificateRevocationErrors | |||
}; | |||
@@ -10,12 +10,15 @@ namespace MQTTnet.Client.Options | |||
{ | |||
public bool UseTls { get; set; } | |||
[Obsolete("This property will be removed soon. Use CertificateValidationHandler instead.")] | |||
public Func<X509Certificate, X509Chain, SslPolicyErrors, IMqttClientOptions, bool> CertificateValidationCallback | |||
{ | |||
get; | |||
set; | |||
} | |||
public Func<MqttClientCertificateValidationCallbackContext, bool> CertificateValidationHandler { get; set; } | |||
public SslProtocols SslProtocol { get; set; } = SslProtocols.Tls12; | |||
#if WINDOWS_UWP | |||
@@ -23,7 +26,6 @@ namespace MQTTnet.Client.Options | |||
#else | |||
public IEnumerable<X509Certificate> Certificates { get; set; } | |||
#endif | |||
public bool AllowUntrustedCertificates { get; set; } | |||
@@ -15,6 +15,7 @@ namespace MQTTnet.Client.Options | |||
public bool IgnoreCertificateChainErrors { get; set; } | |||
public bool AllowUntrustedCertificates { get; set; } | |||
#if WINDOWS_UWP | |||
public List<byte[]> Certificates { get; set; } | |||
#else | |||
@@ -23,6 +24,9 @@ namespace MQTTnet.Client.Options | |||
public SslProtocols SslProtocol { get; set; } = SslProtocols.Tls12; | |||
[Obsolete("This property will be removed soon. Use CertificateValidationHandler instead.")] | |||
public Func<X509Certificate, X509Chain, SslPolicyErrors, IMqttClientOptions, bool> CertificateValidationCallback { get; set; } | |||
public Func<MqttClientCertificateValidationCallbackContext, bool> CertificateValidationHandler { get; set; } | |||
} | |||
} |
@@ -16,6 +16,8 @@ | |||
return "netstandard1.3"; | |||
#elif NETSTANDARD2_0 | |||
return "netstandard2.0"; | |||
#elif NETSTANDARD2_1 | |||
return "netstandard2.1"; | |||
#elif WINDOWS_UWP | |||
return "uap10.0"; | |||
#endif | |||
@@ -73,7 +73,7 @@ namespace MQTTnet.Implementations | |||
var networkStream = socket.GetStream(); | |||
if (_options.TlsOptions.UseTls) | |||
if (_options.TlsOptions?.UseTls == true) | |||
{ | |||
var sslStream = new SslStream(networkStream, false, InternalUserCertificateValidationCallback); | |||
try | |||
@@ -181,9 +181,28 @@ namespace MQTTnet.Implementations | |||
bool InternalUserCertificateValidationCallback(object sender, X509Certificate x509Certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) | |||
{ | |||
if (_options.TlsOptions.CertificateValidationCallback != null) | |||
#region OBSOLETE | |||
var certificateValidationCallback = _options?.TlsOptions?.CertificateValidationCallback; | |||
if (certificateValidationCallback != null) | |||
{ | |||
return _options.TlsOptions.CertificateValidationCallback(x509Certificate, chain, sslPolicyErrors, _clientOptions); | |||
return certificateValidationCallback(x509Certificate, chain, sslPolicyErrors, _clientOptions); | |||
} | |||
#endregion | |||
var certificateValidationHandler = _options?.TlsOptions?.CertificateValidationHandler; | |||
if (certificateValidationHandler != null) | |||
{ | |||
var context = new MqttClientCertificateValidationCallbackContext | |||
{ | |||
Certificate = x509Certificate, | |||
Chain = chain, | |||
SslPolicyErrors = sslPolicyErrors, | |||
ClientOptions = _options | |||
}; | |||
return certificateValidationHandler(context); | |||
} | |||
if (sslPolicyErrors == SslPolicyErrors.None) | |||
@@ -1,21 +1,21 @@ | |||
using System; | |||
using MQTTnet.Channel; | |||
using MQTTnet.Client.Options; | |||
using MQTTnet.Internal; | |||
using System; | |||
using System.Net; | |||
using System.Net.WebSockets; | |||
using System.Security.Cryptography.X509Certificates; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Channel; | |||
using MQTTnet.Client.Options; | |||
using MQTTnet.Internal; | |||
namespace MQTTnet.Implementations | |||
{ | |||
public class MqttWebSocketChannel : Disposable, IMqttChannel | |||
public sealed class MqttWebSocketChannel : IMqttChannel | |||
{ | |||
private readonly MqttClientWebSocketOptions _options; | |||
readonly MqttClientWebSocketOptions _options; | |||
private SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1); | |||
private WebSocket _webSocket; | |||
AsyncLock _sendLock = new AsyncLock(); | |||
WebSocket _webSocket; | |||
public MqttWebSocketChannel(MqttClientWebSocketOptions options) | |||
{ | |||
@@ -53,50 +53,20 @@ namespace MQTTnet.Implementations | |||
} | |||
var clientWebSocket = new ClientWebSocket(); | |||
if (_options.ProxyOptions != null) | |||
try | |||
{ | |||
clientWebSocket.Options.Proxy = CreateProxy(); | |||
} | |||
SetupClientWebSocket(clientWebSocket); | |||
if (_options.RequestHeaders != null) | |||
{ | |||
foreach (var requestHeader in _options.RequestHeaders) | |||
{ | |||
clientWebSocket.Options.SetRequestHeader(requestHeader.Key, requestHeader.Value); | |||
} | |||
await clientWebSocket.ConnectAsync(new Uri(uri), cancellationToken).ConfigureAwait(false); | |||
} | |||
if (_options.SubProtocols != null) | |||
catch (Exception) | |||
{ | |||
foreach (var subProtocol in _options.SubProtocols) | |||
{ | |||
clientWebSocket.Options.AddSubProtocol(subProtocol); | |||
} | |||
// Prevent a memory leak when always creating new instance which will fail while connecting. | |||
clientWebSocket.Dispose(); | |||
throw; | |||
} | |||
if (_options.CookieContainer != null) | |||
{ | |||
clientWebSocket.Options.Cookies = _options.CookieContainer; | |||
} | |||
if (_options.TlsOptions?.UseTls == true && _options.TlsOptions?.Certificates != null) | |||
{ | |||
clientWebSocket.Options.ClientCertificates = new X509CertificateCollection(); | |||
foreach (var certificate in _options.TlsOptions.Certificates) | |||
{ | |||
#if WINDOWS_UWP | |||
clientWebSocket.Options.ClientCertificates.Add(new X509Certificate(certificate)); | |||
#else | |||
clientWebSocket.Options.ClientCertificates.Add(certificate); | |||
#endif | |||
} | |||
} | |||
await clientWebSocket.ConnectAsync(new Uri(uri), cancellationToken).ConfigureAwait(false); | |||
_webSocket = clientWebSocket; | |||
IsSecureConnection = uri.StartsWith("wss://", StringComparison.OrdinalIgnoreCase); | |||
} | |||
@@ -131,27 +101,87 @@ namespace MQTTnet.Implementations | |||
return; | |||
} | |||
await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false); | |||
try | |||
using (await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false)) | |||
{ | |||
await _webSocket.SendAsync(new ArraySegment<byte>(buffer, offset, count), WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); | |||
} | |||
finally | |||
{ | |||
_sendLock?.Release(); | |||
} | |||
} | |||
protected override void Dispose(bool disposing) | |||
public void Dispose() | |||
{ | |||
Cleanup(); | |||
} | |||
void SetupClientWebSocket(ClientWebSocket clientWebSocket) | |||
{ | |||
if (disposing) | |||
if (_options.ProxyOptions != null) | |||
{ | |||
clientWebSocket.Options.Proxy = CreateProxy(); | |||
} | |||
if (_options.RequestHeaders != null) | |||
{ | |||
foreach (var requestHeader in _options.RequestHeaders) | |||
{ | |||
clientWebSocket.Options.SetRequestHeader(requestHeader.Key, requestHeader.Value); | |||
} | |||
} | |||
if (_options.SubProtocols != null) | |||
{ | |||
Cleanup(); | |||
foreach (var subProtocol in _options.SubProtocols) | |||
{ | |||
clientWebSocket.Options.AddSubProtocol(subProtocol); | |||
} | |||
} | |||
base.Dispose(disposing); | |||
if (_options.CookieContainer != null) | |||
{ | |||
clientWebSocket.Options.Cookies = _options.CookieContainer; | |||
} | |||
if (_options.TlsOptions?.UseTls == true && _options.TlsOptions?.Certificates != null) | |||
{ | |||
clientWebSocket.Options.ClientCertificates = new X509CertificateCollection(); | |||
foreach (var certificate in _options.TlsOptions.Certificates) | |||
{ | |||
#if WINDOWS_UWP | |||
clientWebSocket.Options.ClientCertificates.Add(new X509Certificate(certificate)); | |||
#else | |||
clientWebSocket.Options.ClientCertificates.Add(certificate); | |||
#endif | |||
} | |||
} | |||
var certificateValidationHandler = _options.TlsOptions?.CertificateValidationHandler; | |||
#if NETSTANDARD2_1 | |||
if (certificateValidationHandler != null) | |||
{ | |||
clientWebSocket.Options.RemoteCertificateValidationCallback = new System.Net.Security.RemoteCertificateValidationCallback((sender, certificate, chain, sslPolicyErrors) => | |||
{ | |||
// TODO: Find a way to add client options to same callback. Problem is that they have a different type. | |||
var context = new MqttClientCertificateValidationCallbackContext | |||
{ | |||
Certificate = certificate, | |||
Chain = chain, | |||
SslPolicyErrors = sslPolicyErrors, | |||
ClientOptions = _options | |||
}; | |||
return certificateValidationHandler(context); | |||
}); | |||
} | |||
#else | |||
if (certificateValidationHandler != null) | |||
{ | |||
throw new NotSupportedException("The remote certificate validation callback for Web Sockets is only supported for netstandard 2.1+"); | |||
} | |||
#endif | |||
} | |||
private void Cleanup() | |||
void Cleanup() | |||
{ | |||
_sendLock?.Dispose(); | |||
_sendLock = null; | |||
@@ -169,7 +199,7 @@ namespace MQTTnet.Implementations | |||
} | |||
} | |||
private IWebProxy CreateProxy() | |||
IWebProxy CreateProxy() | |||
{ | |||
if (string.IsNullOrEmpty(_options.ProxyOptions?.Address)) | |||
{ | |||
@@ -4,11 +4,12 @@ using System.Threading; | |||
namespace MQTTnet.Internal | |||
{ | |||
public class BlockingQueue<TItem> : Disposable | |||
public sealed class BlockingQueue<TItem> : IDisposable | |||
{ | |||
private readonly object _syncRoot = new object(); | |||
private readonly LinkedList<TItem> _items = new LinkedList<TItem>(); | |||
private readonly ManualResetEventSlim _gate = new ManualResetEventSlim(false); | |||
readonly object _syncRoot = new object(); | |||
readonly LinkedList<TItem> _items = new LinkedList<TItem>(); | |||
ManualResetEventSlim _gate = new ManualResetEventSlim(false); | |||
public int Count | |||
{ | |||
@@ -28,13 +29,13 @@ namespace MQTTnet.Internal | |||
lock (_syncRoot) | |||
{ | |||
_items.AddLast(item); | |||
_gate.Set(); | |||
_gate?.Set(); | |||
} | |||
} | |||
public TItem Dequeue(CancellationToken cancellationToken = default(CancellationToken)) | |||
public TItem Dequeue(CancellationToken cancellationToken = default) | |||
{ | |||
while (true) | |||
while (!cancellationToken.IsCancellationRequested) | |||
{ | |||
lock (_syncRoot) | |||
{ | |||
@@ -48,17 +49,19 @@ namespace MQTTnet.Internal | |||
if (_items.Count == 0) | |||
{ | |||
_gate.Reset(); | |||
_gate?.Reset(); | |||
} | |||
} | |||
_gate.Wait(cancellationToken); | |||
_gate?.Wait(cancellationToken); | |||
} | |||
throw new OperationCanceledException(); | |||
} | |||
public TItem PeekAndWait(CancellationToken cancellationToken = default(CancellationToken)) | |||
public TItem PeekAndWait(CancellationToken cancellationToken = default) | |||
{ | |||
while (true) | |||
while (!cancellationToken.IsCancellationRequested) | |||
{ | |||
lock (_syncRoot) | |||
{ | |||
@@ -69,12 +72,14 @@ namespace MQTTnet.Internal | |||
if (_items.Count == 0) | |||
{ | |||
_gate.Reset(); | |||
_gate?.Reset(); | |||
} | |||
} | |||
_gate.Wait(cancellationToken); | |||
_gate?.Wait(cancellationToken); | |||
} | |||
throw new OperationCanceledException(); | |||
} | |||
public void RemoveFirst(Predicate<TItem> match) | |||
@@ -109,13 +114,10 @@ namespace MQTTnet.Internal | |||
} | |||
} | |||
protected override void Dispose(bool disposing) | |||
public void Dispose() | |||
{ | |||
if (disposing) | |||
{ | |||
_gate.Dispose(); | |||
} | |||
base.Dispose(disposing); | |||
_gate?.Dispose(); | |||
_gate = null; | |||
} | |||
} | |||
} |
@@ -1,7 +1,7 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFrameworks>netstandard1.3;netstandard2.0</TargetFrameworks> | |||
<TargetFrameworks>netstandard1.3;netstandard2.0;netstandard2.1</TargetFrameworks> | |||
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' ">$(TargetFrameworks);net452;net461</TargetFrameworks> | |||
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' AND '$(MSBuildRuntimeType)' != 'Core' AND '$(SolutionName)' != 'MQTTnet.noUWP' ">$(TargetFrameworks);uap10.0</TargetFrameworks> | |||
<AssemblyName>MQTTnet</AssemblyName> | |||
@@ -1,7 +1,8 @@ | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||
using MQTTnet.Internal; | |||
using System; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace MQTTnet.Tests | |||
{ | |||
@@ -33,7 +34,7 @@ namespace MQTTnet.Tests | |||
Assert.AreEqual("a", queue.RemoveFirst()); | |||
Assert.AreEqual("b", queue.RemoveFirst()); | |||
Assert.AreEqual(1, queue.Count); | |||
Assert.AreEqual("c", queue.Dequeue()); | |||
@@ -81,7 +82,7 @@ namespace MQTTnet.Tests | |||
} | |||
[TestMethod] | |||
public void Wait_For_Times() | |||
public void Wait_For_Items() | |||
{ | |||
var number = 0; | |||
@@ -104,5 +105,21 @@ namespace MQTTnet.Tests | |||
Interlocked.Increment(ref number); | |||
} | |||
} | |||
[TestMethod] | |||
[ExpectedException(typeof(OperationCanceledException))] | |||
public void Use_Disposed_Queue() | |||
{ | |||
var queue = new BlockingQueue<int>(); | |||
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |||
Task.Run(() => | |||
{ | |||
Thread.Sleep(1000); | |||
queue.Dispose(); | |||
}); | |||
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |||
queue.Dequeue(new CancellationTokenSource(TimeSpan.FromSeconds(2)).Token); | |||
} | |||
} | |||
} | |||
} |
@@ -23,6 +23,18 @@ namespace MQTTnet.Tests | |||
{ | |||
public TestContext TestContext { get; set; } | |||
[TestMethod] | |||
public async Task Send_Manual_Ping() | |||
{ | |||
using (var testEnvironment = new TestEnvironment(TestContext)) | |||
{ | |||
await testEnvironment.StartServerAsync(); | |||
var client = await testEnvironment.ConnectClientAsync(); | |||
await client.PingAsync(CancellationToken.None); | |||
} | |||
} | |||
[TestMethod] | |||
public async Task Send_Reply_In_Message_Handler_For_Same_Client() | |||
{ | |||
@@ -1,15 +1,13 @@ | |||
using System; | |||
using MQTTnet.Client.Options; | |||
using MQTTnet.Diagnostics; | |||
using MQTTnet.Server; | |||
using Newtonsoft.Json; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.IO; | |||
using System.Net.Security; | |||
using System.Security.Cryptography.X509Certificates; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Client; | |||
using MQTTnet.Client.Options; | |||
using MQTTnet.Diagnostics; | |||
using MQTTnet.Server; | |||
using Newtonsoft.Json; | |||
namespace MQTTnet.TestApp.NetCore | |||
{ | |||
@@ -129,10 +127,15 @@ namespace MQTTnet.TestApp.NetCore | |||
var options = new MqttClientOptionsBuilder() | |||
.WithTls(new MqttClientOptionsBuilderTlsParameters | |||
{ | |||
CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) => | |||
CertificateValidationHandler = context => | |||
{ | |||
// TODO: Check conditions of certificate by using above parameters. | |||
return true; | |||
// TODO: Check conditions of certificate by using above context. | |||
if (context.SslPolicyErrors == SslPolicyErrors.None) | |||
{ | |||
return true; | |||
} | |||
return false; | |||
} | |||
}) | |||
.Build(); | |||