@@ -11,16 +11,14 @@ | |||
<requireLicenseAcceptance>false</requireLicenseAcceptance> | |||
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol.</description> | |||
<releaseNotes> | |||
* [Core] Nuget packages with symbols are now also published to improve debugging. | |||
* [Core] Improve task handling (thanks to @mwinterb) | |||
* [ManagedClient] Fix a race condition in the message storage (thanks to @PaulFake). | |||
* [Server] Added items dictionary to client session in order to share data across interceptors as along as the session exists. | |||
* [Server] Exposed CONNECT packet properties in Application Message and Subscription interceptor. | |||
* [Server] Fixed: Sending Large packets with AspnetCore based connection throws System.ArgumentException. | |||
* [Server] Fixed wrong usage of socket option _NoDelay_. | |||
* [Server] Added remote certificate validation callback (thanks to @rudacs). | |||
* [Server] Add support for certificate passwords (thanks to @cslutgen). | |||
* [MQTTnet.Server] Added REST API for publishing basic messages. | |||
* [Core] Converted all pending methods to use async/await. | |||
* [Core] Fixed an issue when serializing a PubRec (QoS 2) packet for MQTTv5. | |||
* [Client] Fixed an issue when checking for revoked SSL certificates (thanks to @cslutgen). | |||
* [RpcClient] Added support for custom topic generation strategies. | |||
* [Server] Refactoring of server certificate password classes (BREAKING CHANGE!). | |||
* [Server] Fixed an issue with empty server certificate passwords (thanks to @SeppPenner). | |||
* [MQTTnet.Server] Added support for certificate passwords (BREAKING CHANGE IN CONFIG!) | |||
* [MQTTnet.AspNetCore] Fixed an issue with MQTTv5 package serialization (#743, thanks to @JanEggers, @pcbing). | |||
</releaseNotes> | |||
<copyright>Copyright Christian Kratky 2016-2019</copyright> | |||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | |||
@@ -43,8 +43,14 @@ Write-Host | |||
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
# Create NuGet packages. | |||
# Build and execute tests | |||
&$msbuild ..\Tests\MQTTnet.Core.Tests\MQTTnet.Tests.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netcoreapp2.1" /verbosity:m | |||
&$msbuild ..\Tests\MQTTnet.AspNetCore.Tests\MQTTnet.AspNetCore.Tests.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netcoreapp2.1" /verbosity:m | |||
vstest.console.exe ..\Tests\MQTTnet.Core.Tests\bin\Release\netcoreapp2.1\MQTTnet.Tests.dll | |||
vstest.console.exe ..\Tests\MQTTnet.AspNetCore.Tests\bin\Release\netcoreapp2.1\MQTTnet.AspNetCore.Tests.dll | |||
# Create NuGet packages. | |||
Invoke-WebRequest -Uri "https://dist.nuget.org/win-x86-commandline/latest/nuget.exe" -OutFile "nuget.exe" | |||
Remove-Item .\NuGet -Force -Recurse -ErrorAction SilentlyContinue | |||
@@ -64,7 +64,6 @@ namespace MQTTnet.AspNetCore | |||
GrowIfNeeded(propertyWriter.Length); | |||
Write(propertyWriter.GetBuffer(), 0, propertyWriter.Length); | |||
Commit(propertyWriter.Length); | |||
} | |||
public void Write(byte[] payload, int start, int length) | |||
@@ -252,6 +252,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||
_maintainConnectionTask = null; | |||
} | |||
_messageQueueLock.Dispose(); | |||
_mqttClient.Dispose(); | |||
} | |||
@@ -5,6 +5,8 @@ using System.Threading; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Client; | |||
using MQTTnet.Exceptions; | |||
using MQTTnet.Extensions.Rpc.Options; | |||
using MQTTnet.Extensions.Rpc.Options.TopicGeneration; | |||
using MQTTnet.Protocol; | |||
namespace MQTTnet.Extensions.Rpc | |||
@@ -13,11 +15,18 @@ namespace MQTTnet.Extensions.Rpc | |||
{ | |||
private readonly ConcurrentDictionary<string, TaskCompletionSource<byte[]>> _waitingCalls = new ConcurrentDictionary<string, TaskCompletionSource<byte[]>>(); | |||
private readonly IMqttClient _mqttClient; | |||
private readonly IMqttRpcClientOptions _options; | |||
private readonly RpcAwareApplicationMessageReceivedHandler _applicationMessageReceivedHandler; | |||
public MqttRpcClient(IMqttClient mqttClient) | |||
[Obsolete("Use MqttRpcClient(IMqttClient mqttClient, IMqttRpcClientOptions options).")] | |||
public MqttRpcClient(IMqttClient mqttClient) : this(mqttClient, new MqttRpcClientOptions()) | |||
{ | |||
} | |||
public MqttRpcClient(IMqttClient mqttClient, IMqttRpcClientOptions options) | |||
{ | |||
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); | |||
_options = options ?? throw new ArgumentNullException(nameof(options)); | |||
_applicationMessageReceivedHandler = new RpcAwareApplicationMessageReceivedHandler( | |||
mqttClient.ApplicationMessageReceivedHandler, | |||
@@ -55,8 +64,26 @@ namespace MQTTnet.Extensions.Rpc | |||
throw new InvalidOperationException("The application message received handler was modified."); | |||
} | |||
var requestTopic = $"MQTTnet.RPC/{Guid.NewGuid():N}/{methodName}"; | |||
var responseTopic = requestTopic + "/response"; | |||
var topicNames = _options.TopicGenerationStrategy.CreateRpcTopics(new TopicGenerationContext | |||
{ | |||
MethodName = methodName, | |||
QualityOfServiceLevel = qualityOfServiceLevel, | |||
MqttClient = _mqttClient, | |||
Options = _options | |||
}); | |||
var requestTopic = topicNames.RequestTopic; | |||
var responseTopic = topicNames.ResponseTopic; | |||
if (string.IsNullOrWhiteSpace(requestTopic)) | |||
{ | |||
throw new MqttProtocolViolationException("RPC request topic is empty."); | |||
} | |||
if (string.IsNullOrWhiteSpace(responseTopic)) | |||
{ | |||
throw new MqttProtocolViolationException("RPC response topic is empty."); | |||
} | |||
var requestMessage = new MqttApplicationMessageBuilder() | |||
.WithTopic(requestTopic) | |||
@@ -0,0 +1,9 @@ | |||
using MQTTnet.Extensions.Rpc.Options.TopicGeneration; | |||
namespace MQTTnet.Extensions.Rpc.Options | |||
{ | |||
public interface IMqttRpcClientOptions | |||
{ | |||
IMqttRpcClientTopicGenerationStrategy TopicGenerationStrategy { get; set; } | |||
} | |||
} |
@@ -0,0 +1,9 @@ | |||
using MQTTnet.Extensions.Rpc.Options.TopicGeneration; | |||
namespace MQTTnet.Extensions.Rpc.Options | |||
{ | |||
public class MqttRpcClientOptions : IMqttRpcClientOptions | |||
{ | |||
public IMqttRpcClientTopicGenerationStrategy TopicGenerationStrategy { get; set; } = new DefaultMqttRpcClientTopicGenerationStrategy(); | |||
} | |||
} |
@@ -0,0 +1,25 @@ | |||
using MQTTnet.Extensions.Rpc.Options.TopicGeneration; | |||
using System; | |||
namespace MQTTnet.Extensions.Rpc.Options | |||
{ | |||
public class MqttRpcClientOptionsBuilder | |||
{ | |||
IMqttRpcClientTopicGenerationStrategy _topicGenerationStrategy = new DefaultMqttRpcClientTopicGenerationStrategy(); | |||
public MqttRpcClientOptionsBuilder WithTopicGenerationStrategy(IMqttRpcClientTopicGenerationStrategy value) | |||
{ | |||
_topicGenerationStrategy = value ?? throw new ArgumentNullException(nameof(value)); | |||
return this; | |||
} | |||
public IMqttRpcClientOptions Build() | |||
{ | |||
return new MqttRpcClientOptions | |||
{ | |||
TopicGenerationStrategy = _topicGenerationStrategy | |||
}; | |||
} | |||
} | |||
} |
@@ -0,0 +1,9 @@ | |||
namespace MQTTnet.Extensions.Rpc.Options | |||
{ | |||
public class MqttRpcTopicPair | |||
{ | |||
public string RequestTopic { get; set; } | |||
public string ResponseTopic { get; set; } | |||
} | |||
} |
@@ -0,0 +1,20 @@ | |||
using MQTTnet.Extensions.Rpc.Options.TopicGeneration; | |||
using System; | |||
namespace MQTTnet.Extensions.Rpc.Options | |||
{ | |||
public class DefaultMqttRpcClientTopicGenerationStrategy : IMqttRpcClientTopicGenerationStrategy | |||
{ | |||
public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context) | |||
{ | |||
var requestTopic = $"MQTTnet.RPC/{Guid.NewGuid():N}/{context.MethodName}"; | |||
var responseTopic = requestTopic + "/response"; | |||
return new MqttRpcTopicPair | |||
{ | |||
RequestTopic = requestTopic, | |||
ResponseTopic = responseTopic | |||
}; | |||
} | |||
} | |||
} |
@@ -0,0 +1,7 @@ | |||
namespace MQTTnet.Extensions.Rpc.Options.TopicGeneration | |||
{ | |||
public interface IMqttRpcClientTopicGenerationStrategy | |||
{ | |||
MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context); | |||
} | |||
} |
@@ -0,0 +1,16 @@ | |||
using MQTTnet.Client; | |||
using MQTTnet.Protocol; | |||
namespace MQTTnet.Extensions.Rpc.Options.TopicGeneration | |||
{ | |||
public class TopicGenerationContext | |||
{ | |||
public string MethodName { get; set; } | |||
public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } | |||
public IMqttClient MqttClient { get; set; } | |||
public IMqttRpcClientOptions Options { get; set; } | |||
} | |||
} |
@@ -0,0 +1,35 @@ | |||
using System.IO; | |||
namespace MQTTnet.Server.Configuration | |||
{ | |||
public class CertificateSettingsModel | |||
{ | |||
/// <summary> | |||
/// Path to certificate. | |||
/// </summary> | |||
public string Path { get; set; } | |||
/// <summary> | |||
/// Password of certificate. | |||
/// </summary> | |||
public string Password { get; set; } | |||
/// <summary> | |||
/// Read certificate file. | |||
/// </summary> | |||
public byte[] ReadCertificate() | |||
{ | |||
if (string.IsNullOrEmpty(Path) || string.IsNullOrWhiteSpace(Path)) | |||
{ | |||
throw new FileNotFoundException("No path set"); | |||
} | |||
if (!File.Exists(Path)) | |||
{ | |||
throw new FileNotFoundException($"Could not find Certificate in path: {Path}"); | |||
} | |||
return File.ReadAllBytes(Path); | |||
} | |||
} | |||
} |
@@ -9,9 +9,9 @@ namespace MQTTnet.Server.Configuration | |||
public class TcpEndPointModel | |||
{ | |||
/// <summary> | |||
/// Path to Certificate | |||
/// Certificate settings. | |||
/// </summary> | |||
public string CertificatePath { get; set; } | |||
public CertificateSettingsModel Certificate { get; set; } | |||
/// <summary> | |||
/// Enabled / Disable | |||
@@ -33,25 +33,6 @@ namespace MQTTnet.Server.Configuration | |||
/// </summary> | |||
public int Port { get; set; } = 1883; | |||
/// <summary> | |||
/// Read Certificate file | |||
/// </summary> | |||
/// <returns></returns> | |||
public byte[] ReadCertificate() | |||
{ | |||
if (string.IsNullOrEmpty(CertificatePath) || string.IsNullOrWhiteSpace(CertificatePath)) | |||
{ | |||
throw new FileNotFoundException("No path set"); | |||
} | |||
if (!File.Exists(CertificatePath)) | |||
{ | |||
throw new FileNotFoundException($"Could not find Certificate in path: {CertificatePath}"); | |||
} | |||
return File.ReadAllBytes(CertificatePath); | |||
} | |||
/// <summary> | |||
/// Read IPv4 | |||
/// </summary> | |||
@@ -47,7 +47,7 @@ namespace MQTTnet.Server.Mqtt | |||
MqttSubscriptionInterceptor mqttSubscriptionInterceptor, | |||
MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor, | |||
MqttServerStorage mqttServerStorage, | |||
PythonScriptHostService pythonScriptHostService, | |||
PythonScriptHostService pythonScriptHostService, | |||
ILogger<MqttServerService> logger) | |||
{ | |||
_settings = mqttSettings ?? throw new ArgumentNullException(nameof(mqttSettings)); | |||
@@ -179,7 +179,7 @@ namespace MQTTnet.Server.Mqtt | |||
.WithApplicationMessageInterceptor(_mqttApplicationMessageInterceptor) | |||
.WithSubscriptionInterceptor(_mqttSubscriptionInterceptor) | |||
.WithStorage(_mqttServerStorage); | |||
// Configure unencrypted connections | |||
if (_settings.TcpEndPoint.Enabled) | |||
{ | |||
@@ -210,9 +210,23 @@ namespace MQTTnet.Server.Mqtt | |||
{ | |||
options | |||
.WithEncryptedEndpoint() | |||
.WithEncryptionSslProtocol(SslProtocols.Tls12) | |||
.WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.ReadCertificate()); | |||
.WithEncryptionSslProtocol(SslProtocols.Tls12); | |||
if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Path)) | |||
{ | |||
IMqttServerCertificateCredentials certificateCredentials = null; | |||
if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Password)) | |||
{ | |||
certificateCredentials = new MqttServerCertificateCredentials | |||
{ | |||
Password = _settings.EncryptedTcpEndPoint.Certificate.Password | |||
}; | |||
} | |||
options.WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.Certificate.ReadCertificate(), certificateCredentials); | |||
} | |||
if (_settings.EncryptedTcpEndPoint.TryReadIPv4(out var address4)) | |||
{ | |||
options.WithEncryptedEndpointBoundIPAddress(address4); | |||
@@ -27,7 +27,10 @@ | |||
"IPv4": "*", | |||
"IPv6": "*", | |||
"Port": 8883, | |||
"CertificatePath": "/absolute/path/to/pfx" | |||
"Certificate": { | |||
"Path": "/absolute/path/to/pfx", | |||
"Password": "" | |||
} | |||
}, | |||
"WebSocketEndPoint": { | |||
"Enabled": true, | |||
@@ -63,4 +66,4 @@ | |||
} | |||
}, | |||
"AllowedHosts": "*" | |||
} | |||
} |
@@ -5,15 +5,25 @@ | |||
int Length { get; } | |||
void WriteWithLengthPrefix(string value); | |||
void Write(byte returnCode); | |||
void WriteWithLengthPrefix(byte[] payload); | |||
void Write(ushort keepAlivePeriod); | |||
void Write(IMqttPacketWriter propertyWriter); | |||
void WriteVariableLengthInteger(uint length); | |||
void Write(byte[] payload, int v, int length); | |||
void Reset(int v); | |||
void Seek(int v); | |||
void Write(byte value); | |||
void WriteWithLengthPrefix(byte[] value); | |||
void Write(ushort value); | |||
void Write(IMqttPacketWriter value); | |||
void WriteVariableLengthInteger(uint value); | |||
void Write(byte[] value, int offset, int length); | |||
void Reset(int length); | |||
void Seek(int offset); | |||
void FreeBuffer(); | |||
byte[] GetBuffer(); | |||
} | |||
} |
@@ -21,8 +21,6 @@ namespace MQTTnet.Formatter | |||
{ | |||
// The MQTT fixed header contains 1 byte of flags and at least 1 byte for the remaining data length. | |||
// So in all cases at least 2 bytes must be read for a complete MQTT packet. | |||
// async/await is used here because the next packet is received in a couple of minutes so the performance | |||
// impact is acceptable according to a useless waiting thread. | |||
var buffer = fixedHeaderBuffer; | |||
var totalBytesRead = 0; | |||
@@ -55,16 +53,7 @@ namespace MQTTnet.Formatter | |||
}; | |||
} | |||
#if WINDOWS_UWP | |||
// UWP will have a dead lock when calling this not async. | |||
var bodyLength = await ReadBodyLengthAsync(buffer[1], cancellationToken).ConfigureAwait(false); | |||
#else | |||
// Here the async/await pattern is not used because the overhead of context switches | |||
// is too big for reading 1 byte in a row. We expect that the remaining data was sent | |||
// directly after the initial bytes. If the client disconnects just in this moment we | |||
// will get an exception anyway. | |||
var bodyLength = ReadBodyLength(buffer[1], cancellationToken); | |||
#endif | |||
if (!bodyLength.HasValue) | |||
{ | |||
@@ -81,49 +70,6 @@ namespace MQTTnet.Formatter | |||
}; | |||
} | |||
#if !WINDOWS_UWP | |||
private int? ReadBodyLength(byte initialEncodedByte, CancellationToken cancellationToken) | |||
{ | |||
var offset = 0; | |||
var multiplier = 128; | |||
var value = initialEncodedByte & 127; | |||
int encodedByte = initialEncodedByte; | |||
while ((encodedByte & 128) != 0) | |||
{ | |||
offset++; | |||
if (offset > 3) | |||
{ | |||
throw new MqttProtocolViolationException("Remaining length is invalid."); | |||
} | |||
if (cancellationToken.IsCancellationRequested) | |||
{ | |||
return null; | |||
} | |||
var readCount = _channel.ReadAsync(_singleByteBuffer, 0, 1, cancellationToken).GetAwaiter().GetResult(); | |||
if (cancellationToken.IsCancellationRequested) | |||
{ | |||
return null; | |||
} | |||
if (readCount == 0) | |||
{ | |||
return null; | |||
} | |||
encodedByte = _singleByteBuffer[0]; | |||
value += (encodedByte & 127) * multiplier; | |||
multiplier *= 128; | |||
} | |||
return value; | |||
} | |||
#else | |||
private async Task<int?> ReadBodyLengthAsync(byte initialEncodedByte, CancellationToken cancellationToken) | |||
{ | |||
var offset = 0; | |||
@@ -164,6 +110,5 @@ namespace MQTTnet.Formatter | |||
return value; | |||
} | |||
#endif | |||
} | |||
} |
@@ -328,10 +328,7 @@ namespace MQTTnet.Formatter.V5 | |||
{ | |||
ThrowReasonCodeNotSetException(); | |||
} | |||
packetWriter.Write(packet.PacketIdentifier.Value); | |||
packetWriter.Write((byte)packet.ReasonCode.Value); | |||
var propertiesWriter = new MqttV500PropertiesWriter(); | |||
if (packet.Properties != null) | |||
{ | |||
@@ -339,6 +336,8 @@ namespace MQTTnet.Formatter.V5 | |||
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties); | |||
} | |||
packetWriter.Write(packet.PacketIdentifier.Value); | |||
if (packetWriter.Length > 0 || packet.ReasonCode.Value != MqttPubRecReasonCode.Success) | |||
{ | |||
packetWriter.Write((byte)packet.ReasonCode.Value); | |||
@@ -86,7 +86,7 @@ namespace MQTTnet.Implementations | |||
var sslStream = new SslStream(networkStream, false, InternalUserCertificateValidationCallback); | |||
_stream = sslStream; | |||
await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false); | |||
await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, !_options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false); | |||
} | |||
else | |||
{ | |||
@@ -41,14 +41,24 @@ namespace MQTTnet.Implementations | |||
RegisterListeners(options.DefaultEndpointOptions, null, _cancellationTokenSource.Token); | |||
} | |||
if (options.TlsEndpointOptions.IsEnabled) | |||
if (options.TlsEndpointOptions?.IsEnabled == true) | |||
{ | |||
if (options.TlsEndpointOptions.Certificate == null) | |||
{ | |||
throw new ArgumentException("TLS certificate is not set."); | |||
} | |||
var tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate, options.TlsEndpointOptions.CertificateCredentials.Password); | |||
X509Certificate2 tlsCertificate; | |||
if (string.IsNullOrEmpty(options.TlsEndpointOptions.CertificateCredentials?.Password)) | |||
{ | |||
// Use a different overload when no password is specified. Otherwise the constructor will fail. | |||
tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate); | |||
} | |||
else | |||
{ | |||
tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate, options.TlsEndpointOptions.CertificateCredentials.Password); | |||
} | |||
if (!tlsCertificate.HasPrivateKey) | |||
{ | |||
throw new InvalidOperationException("The certificate for TLS encryption must contain the private key."); | |||
@@ -0,0 +1,4 @@ | |||
public interface IMqttServerCertificateCredentials | |||
{ | |||
string Password { get; } | |||
} |
@@ -1,6 +0,0 @@ | |||
using System; | |||
public interface IMqttServerCredentials | |||
{ | |||
String Password { get; } | |||
} |
@@ -0,0 +1,7 @@ | |||
namespace MQTTnet.Server | |||
{ | |||
public class MqttServerCertificateCredentials : IMqttServerCertificateCredentials | |||
{ | |||
public string Password { get; set; } | |||
} | |||
} |
@@ -82,7 +82,7 @@ namespace MQTTnet.Server | |||
return this; | |||
} | |||
public MqttServerOptionsBuilder WithEncryptionCertificate(byte[] value, IMqttServerCredentials credentials = null) | |||
public MqttServerOptionsBuilder WithEncryptionCertificate(byte[] value, IMqttServerCertificateCredentials credentials = null) | |||
{ | |||
_options.TlsEndpointOptions.Certificate = value; | |||
_options.TlsEndpointOptions.CertificateCredentials = credentials; | |||
@@ -12,7 +12,7 @@ namespace MQTTnet.Server | |||
public byte[] Certificate { get; set; } | |||
public IMqttServerCredentials CertificateCredentials { get; set; } | |||
public IMqttServerCertificateCredentials CertificateCredentials { get; set; } | |||
public bool ClientCertificateRequired { get; set; } | |||
@@ -2,12 +2,11 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netcoreapp2.1</TargetFramework> | |||
<IsPackable>false</IsPackable> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.0.1" /> | |||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" /> | |||
<PackageReference Include="MSTest.TestAdapter" Version="1.4.0" /> | |||
<PackageReference Include="MSTest.TestFramework" Version="1.4.0" /> | |||
</ItemGroup> | |||
@@ -1,15 +1,14 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<OutputType>Exe</OutputType> | |||
<DebugType>Full</DebugType> | |||
<TargetFramework>netcoreapp2.1</TargetFramework> | |||
<IsPackable>false</IsPackable> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="MSTest.TestAdapter" Version="1.4.0" /> | |||
<PackageReference Include="MSTest.TestFramework" Version="1.4.0" /> | |||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.0.1" /> | |||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
@@ -14,7 +14,7 @@ namespace MQTTnet.Tests | |||
public class MqttKeepAliveMonitor_Tests | |||
{ | |||
[TestMethod] | |||
public void KeepAlive_Timeout() | |||
public async Task KeepAlive_Timeout() | |||
{ | |||
var counter = 0; | |||
@@ -31,13 +31,13 @@ namespace MQTTnet.Tests | |||
Assert.AreEqual(0, counter); | |||
Thread.Sleep(2000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification. | |||
await Task.Delay(2000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification. | |||
Assert.AreEqual(1, counter); | |||
} | |||
[TestMethod] | |||
public void KeepAlive_NoTimeout() | |||
public async Task KeepAlive_NoTimeout() | |||
{ | |||
var counter = 0; | |||
@@ -55,15 +55,15 @@ namespace MQTTnet.Tests | |||
Assert.AreEqual(0, counter); | |||
// Simulate traffic. | |||
Thread.Sleep(1000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification. | |||
await Task.Delay(1000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification. | |||
monitor.PacketReceived(); | |||
Thread.Sleep(1000); | |||
await Task.Delay(1000); | |||
monitor.PacketReceived(); | |||
Thread.Sleep(1000); | |||
await Task.Delay(1000); | |||
Assert.AreEqual(0, counter); | |||
Thread.Sleep(2000); | |||
await Task.Delay(2000); | |||
Assert.AreEqual(1, counter); | |||
} | |||
@@ -8,6 +8,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; | |||
using MQTTnet.Adapter; | |||
using MQTTnet.Formatter; | |||
using MQTTnet.Formatter.V3; | |||
using MQTTnet.Formatter.V5; | |||
using MQTTnet.Internal; | |||
using MQTTnet.Packets; | |||
using MQTTnet.Protocol; | |||
@@ -258,6 +259,26 @@ namespace MQTTnet.Tests | |||
Assert.IsTrue(deserialized.Properties.UserProperties.Any(x => x.Name == "Foo")); | |||
} | |||
[TestMethod] | |||
public void SerializeV500_MqttPublishPacket_CorrelationData() | |||
{ | |||
var data = "123456789"; | |||
var req = new MqttApplicationMessageBuilder() | |||
.WithTopic("Foo") | |||
.WithResponseTopic($"_") | |||
.WithCorrelationData(Guid.NewGuid().ToByteArray()) | |||
.WithPayload(data) | |||
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce) | |||
.Build(); | |||
var p = new MqttV500DataConverter().CreatePublishPacket(req); | |||
var deserialized = Roundtrip(p, MqttProtocolVersion.V500); | |||
Assert.IsTrue(p.Payload.SequenceEqual(deserialized.Payload)); | |||
} | |||
[TestMethod] | |||
public void DeserializeV311_MqttPublishPacket() | |||
{ | |||
@@ -10,6 +10,8 @@ using MQTTnet.Extensions.Rpc; | |||
using MQTTnet.Protocol; | |||
using MQTTnet.Client.Options; | |||
using MQTTnet.Formatter; | |||
using MQTTnet.Extensions.Rpc.Options; | |||
using MQTTnet.Extensions.Rpc.Options.TopicGeneration; | |||
namespace MQTTnet.Tests | |||
{ | |||
@@ -62,7 +64,22 @@ namespace MQTTnet.Tests | |||
var requestSender = await testEnvironment.ConnectClientAsync(); | |||
var rpcClient = new MqttRpcClient(requestSender); | |||
var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build()); | |||
await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); | |||
} | |||
} | |||
[TestMethod] | |||
[ExpectedException(typeof(MqttCommunicationTimedOutException))] | |||
public async Task Execute_With_Custom_Topic_Names() | |||
{ | |||
using (var testEnvironment = new TestEnvironment()) | |||
{ | |||
await testEnvironment.StartServerAsync(); | |||
var requestSender = await testEnvironment.ConnectClientAsync(); | |||
var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().WithTopicGenerationStrategy(new TestTopicStrategy()) .Build()); | |||
await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); | |||
} | |||
} | |||
@@ -82,11 +99,23 @@ namespace MQTTnet.Tests | |||
var requestSender = await testEnvironment.ConnectClientAsync(); | |||
var rpcClient = new MqttRpcClient(requestSender); | |||
var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build()); | |||
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", qosLevel); | |||
Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); | |||
} | |||
} | |||
private class TestTopicStrategy : IMqttRpcClientTopicGenerationStrategy | |||
{ | |||
public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context) | |||
{ | |||
return new MqttRpcTopicPair | |||
{ | |||
RequestTopic = "a", | |||
ResponseTopic = "b" | |||
}; | |||
} | |||
} | |||
} | |||
} |
@@ -1063,7 +1063,9 @@ namespace MQTTnet.Tests | |||
// forever. This is security related. | |||
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); | |||
await client.ConnectAsync("localhost", testEnvironment.ServerPort); | |||
await client.SendAsync(Encoding.UTF8.GetBytes("Garbage"), SocketFlags.None); | |||
var buffer = Encoding.UTF8.GetBytes("Garbage"); | |||
client.Send(buffer, buffer.Length, SocketFlags.None); | |||
await Task.Delay(TimeSpan.FromSeconds(3)); | |||