diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index 6aafdc6..32b02ca 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -11,16 +11,14 @@
false
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol.
-* [Core] Nuget packages with symbols are now also published to improve debugging.
-* [Core] Improve task handling (thanks to @mwinterb)
-* [ManagedClient] Fix a race condition in the message storage (thanks to @PaulFake).
-* [Server] Added items dictionary to client session in order to share data across interceptors as along as the session exists.
-* [Server] Exposed CONNECT packet properties in Application Message and Subscription interceptor.
-* [Server] Fixed: Sending Large packets with AspnetCore based connection throws System.ArgumentException.
-* [Server] Fixed wrong usage of socket option _NoDelay_.
-* [Server] Added remote certificate validation callback (thanks to @rudacs).
-* [Server] Add support for certificate passwords (thanks to @cslutgen).
-* [MQTTnet.Server] Added REST API for publishing basic messages.
+* [Core] Converted all pending methods to use async/await.
+* [Core] Fixed an issue when serializing a PubRec (QoS 2) packet for MQTTv5.
+* [Client] Fixed an issue when checking for revoked SSL certificates (thanks to @cslutgen).
+* [RpcClient] Added support for custom topic generation strategies.
+* [Server] Refactoring of server certificate password classes (BREAKING CHANGE!).
+* [Server] Fixed an issue with empty server certificate passwords (thanks to @SeppPenner).
+* [MQTTnet.Server] Added support for certificate passwords (BREAKING CHANGE IN CONFIG!)
+* [MQTTnet.AspNetCore] Fixed an issue with MQTTv5 package serialization (#743, thanks to @JanEggers, @pcbing).
Copyright Christian Kratky 2016-2019
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin
diff --git a/Build/build.ps1 b/Build/build.ps1
index 33f2767..57a60f6 100644
--- a/Build/build.ps1
+++ b/Build/build.ps1
@@ -43,8 +43,14 @@ Write-Host
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx"
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx"
-# Create NuGet packages.
+# Build and execute tests
+&$msbuild ..\Tests\MQTTnet.Core.Tests\MQTTnet.Tests.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netcoreapp2.1" /verbosity:m
+&$msbuild ..\Tests\MQTTnet.AspNetCore.Tests\MQTTnet.AspNetCore.Tests.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netcoreapp2.1" /verbosity:m
+
+vstest.console.exe ..\Tests\MQTTnet.Core.Tests\bin\Release\netcoreapp2.1\MQTTnet.Tests.dll
+vstest.console.exe ..\Tests\MQTTnet.AspNetCore.Tests\bin\Release\netcoreapp2.1\MQTTnet.AspNetCore.Tests.dll
+# Create NuGet packages.
Invoke-WebRequest -Uri "https://dist.nuget.org/win-x86-commandline/latest/nuget.exe" -OutFile "nuget.exe"
Remove-Item .\NuGet -Force -Recurse -ErrorAction SilentlyContinue
diff --git a/Source/MQTTnet.AspnetCore/SpanBasedMqttPacketWriter.cs b/Source/MQTTnet.AspnetCore/SpanBasedMqttPacketWriter.cs
index 27e13c4..66addfd 100644
--- a/Source/MQTTnet.AspnetCore/SpanBasedMqttPacketWriter.cs
+++ b/Source/MQTTnet.AspnetCore/SpanBasedMqttPacketWriter.cs
@@ -64,7 +64,6 @@ namespace MQTTnet.AspNetCore
GrowIfNeeded(propertyWriter.Length);
Write(propertyWriter.GetBuffer(), 0, propertyWriter.Length);
- Commit(propertyWriter.Length);
}
public void Write(byte[] payload, int start, int length)
diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
index f53a8dd..c9053e4 100644
--- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
+++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
@@ -252,6 +252,7 @@ namespace MQTTnet.Extensions.ManagedClient
_maintainConnectionTask = null;
}
+ _messageQueueLock.Dispose();
_mqttClient.Dispose();
}
diff --git a/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs b/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs
index c025bdf..8696f83 100644
--- a/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs
+++ b/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs
@@ -5,6 +5,8 @@ using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client;
using MQTTnet.Exceptions;
+using MQTTnet.Extensions.Rpc.Options;
+using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
using MQTTnet.Protocol;
namespace MQTTnet.Extensions.Rpc
@@ -13,11 +15,18 @@ namespace MQTTnet.Extensions.Rpc
{
private readonly ConcurrentDictionary> _waitingCalls = new ConcurrentDictionary>();
private readonly IMqttClient _mqttClient;
+ private readonly IMqttRpcClientOptions _options;
private readonly RpcAwareApplicationMessageReceivedHandler _applicationMessageReceivedHandler;
- public MqttRpcClient(IMqttClient mqttClient)
+ [Obsolete("Use MqttRpcClient(IMqttClient mqttClient, IMqttRpcClientOptions options).")]
+ public MqttRpcClient(IMqttClient mqttClient) : this(mqttClient, new MqttRpcClientOptions())
+ {
+ }
+
+ public MqttRpcClient(IMqttClient mqttClient, IMqttRpcClientOptions options)
{
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
+ _options = options ?? throw new ArgumentNullException(nameof(options));
_applicationMessageReceivedHandler = new RpcAwareApplicationMessageReceivedHandler(
mqttClient.ApplicationMessageReceivedHandler,
@@ -55,8 +64,26 @@ namespace MQTTnet.Extensions.Rpc
throw new InvalidOperationException("The application message received handler was modified.");
}
- var requestTopic = $"MQTTnet.RPC/{Guid.NewGuid():N}/{methodName}";
- var responseTopic = requestTopic + "/response";
+ var topicNames = _options.TopicGenerationStrategy.CreateRpcTopics(new TopicGenerationContext
+ {
+ MethodName = methodName,
+ QualityOfServiceLevel = qualityOfServiceLevel,
+ MqttClient = _mqttClient,
+ Options = _options
+ });
+
+ var requestTopic = topicNames.RequestTopic;
+ var responseTopic = topicNames.ResponseTopic;
+
+ if (string.IsNullOrWhiteSpace(requestTopic))
+ {
+ throw new MqttProtocolViolationException("RPC request topic is empty.");
+ }
+
+ if (string.IsNullOrWhiteSpace(responseTopic))
+ {
+ throw new MqttProtocolViolationException("RPC response topic is empty.");
+ }
var requestMessage = new MqttApplicationMessageBuilder()
.WithTopic(requestTopic)
diff --git a/Source/MQTTnet.Extensions.Rpc/Options/IMqttRpcClientOptions.cs b/Source/MQTTnet.Extensions.Rpc/Options/IMqttRpcClientOptions.cs
new file mode 100644
index 0000000..6b7afa9
--- /dev/null
+++ b/Source/MQTTnet.Extensions.Rpc/Options/IMqttRpcClientOptions.cs
@@ -0,0 +1,9 @@
+using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
+
+namespace MQTTnet.Extensions.Rpc.Options
+{
+ public interface IMqttRpcClientOptions
+ {
+ IMqttRpcClientTopicGenerationStrategy TopicGenerationStrategy { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptions.cs b/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptions.cs
new file mode 100644
index 0000000..0dc4517
--- /dev/null
+++ b/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptions.cs
@@ -0,0 +1,9 @@
+using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
+
+namespace MQTTnet.Extensions.Rpc.Options
+{
+ public class MqttRpcClientOptions : IMqttRpcClientOptions
+ {
+ public IMqttRpcClientTopicGenerationStrategy TopicGenerationStrategy { get; set; } = new DefaultMqttRpcClientTopicGenerationStrategy();
+ }
+}
diff --git a/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptionsBuilder.cs b/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptionsBuilder.cs
new file mode 100644
index 0000000..69277ac
--- /dev/null
+++ b/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptionsBuilder.cs
@@ -0,0 +1,25 @@
+using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
+using System;
+
+namespace MQTTnet.Extensions.Rpc.Options
+{
+ public class MqttRpcClientOptionsBuilder
+ {
+ IMqttRpcClientTopicGenerationStrategy _topicGenerationStrategy = new DefaultMqttRpcClientTopicGenerationStrategy();
+
+ public MqttRpcClientOptionsBuilder WithTopicGenerationStrategy(IMqttRpcClientTopicGenerationStrategy value)
+ {
+ _topicGenerationStrategy = value ?? throw new ArgumentNullException(nameof(value));
+
+ return this;
+ }
+
+ public IMqttRpcClientOptions Build()
+ {
+ return new MqttRpcClientOptions
+ {
+ TopicGenerationStrategy = _topicGenerationStrategy
+ };
+ }
+ }
+}
diff --git a/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcTopicPair.cs b/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcTopicPair.cs
new file mode 100644
index 0000000..12bccce
--- /dev/null
+++ b/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcTopicPair.cs
@@ -0,0 +1,9 @@
+namespace MQTTnet.Extensions.Rpc.Options
+{
+ public class MqttRpcTopicPair
+ {
+ public string RequestTopic { get; set; }
+
+ public string ResponseTopic { get; set; }
+ }
+}
diff --git a/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/DefaultMqttRpcClientTopicGenerationStrategy.cs b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/DefaultMqttRpcClientTopicGenerationStrategy.cs
new file mode 100644
index 0000000..d38aa0b
--- /dev/null
+++ b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/DefaultMqttRpcClientTopicGenerationStrategy.cs
@@ -0,0 +1,20 @@
+using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
+using System;
+
+namespace MQTTnet.Extensions.Rpc.Options
+{
+ public class DefaultMqttRpcClientTopicGenerationStrategy : IMqttRpcClientTopicGenerationStrategy
+ {
+ public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context)
+ {
+ var requestTopic = $"MQTTnet.RPC/{Guid.NewGuid():N}/{context.MethodName}";
+ var responseTopic = requestTopic + "/response";
+
+ return new MqttRpcTopicPair
+ {
+ RequestTopic = requestTopic,
+ ResponseTopic = responseTopic
+ };
+ }
+ }
+}
diff --git a/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/IMqttRpcClientTopicGenerationStrategy.cs b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/IMqttRpcClientTopicGenerationStrategy.cs
new file mode 100644
index 0000000..19c78d3
--- /dev/null
+++ b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/IMqttRpcClientTopicGenerationStrategy.cs
@@ -0,0 +1,7 @@
+namespace MQTTnet.Extensions.Rpc.Options.TopicGeneration
+{
+ public interface IMqttRpcClientTopicGenerationStrategy
+ {
+ MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context);
+ }
+}
diff --git a/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/TopicGenerationContext.cs b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/TopicGenerationContext.cs
new file mode 100644
index 0000000..dc7263f
--- /dev/null
+++ b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/TopicGenerationContext.cs
@@ -0,0 +1,16 @@
+using MQTTnet.Client;
+using MQTTnet.Protocol;
+
+namespace MQTTnet.Extensions.Rpc.Options.TopicGeneration
+{
+ public class TopicGenerationContext
+ {
+ public string MethodName { get; set; }
+
+ public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; }
+
+ public IMqttClient MqttClient { get; set; }
+
+ public IMqttRpcClientOptions Options { get; set; }
+ }
+}
diff --git a/Source/MQTTnet.Server/Configuration/CertificateSettingsModel.cs b/Source/MQTTnet.Server/Configuration/CertificateSettingsModel.cs
new file mode 100644
index 0000000..89eb48b
--- /dev/null
+++ b/Source/MQTTnet.Server/Configuration/CertificateSettingsModel.cs
@@ -0,0 +1,35 @@
+using System.IO;
+
+namespace MQTTnet.Server.Configuration
+{
+ public class CertificateSettingsModel
+ {
+ ///
+ /// Path to certificate.
+ ///
+ public string Path { get; set; }
+
+ ///
+ /// Password of certificate.
+ ///
+ public string Password { get; set; }
+
+ ///
+ /// Read certificate file.
+ ///
+ public byte[] ReadCertificate()
+ {
+ if (string.IsNullOrEmpty(Path) || string.IsNullOrWhiteSpace(Path))
+ {
+ throw new FileNotFoundException("No path set");
+ }
+
+ if (!File.Exists(Path))
+ {
+ throw new FileNotFoundException($"Could not find Certificate in path: {Path}");
+ }
+
+ return File.ReadAllBytes(Path);
+ }
+ }
+}
diff --git a/Source/MQTTnet.Server/Configuration/TcpEndpointModel.cs b/Source/MQTTnet.Server/Configuration/TcpEndpointModel.cs
index 8221390..8693268 100644
--- a/Source/MQTTnet.Server/Configuration/TcpEndpointModel.cs
+++ b/Source/MQTTnet.Server/Configuration/TcpEndpointModel.cs
@@ -9,9 +9,9 @@ namespace MQTTnet.Server.Configuration
public class TcpEndPointModel
{
///
- /// Path to Certificate
+ /// Certificate settings.
///
- public string CertificatePath { get; set; }
+ public CertificateSettingsModel Certificate { get; set; }
///
/// Enabled / Disable
@@ -33,25 +33,6 @@ namespace MQTTnet.Server.Configuration
///
public int Port { get; set; } = 1883;
- ///
- /// Read Certificate file
- ///
- ///
- public byte[] ReadCertificate()
- {
- if (string.IsNullOrEmpty(CertificatePath) || string.IsNullOrWhiteSpace(CertificatePath))
- {
- throw new FileNotFoundException("No path set");
- }
-
- if (!File.Exists(CertificatePath))
- {
- throw new FileNotFoundException($"Could not find Certificate in path: {CertificatePath}");
- }
-
- return File.ReadAllBytes(CertificatePath);
- }
-
///
/// Read IPv4
///
diff --git a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs
index efb0010..b8c463f 100644
--- a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs
+++ b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs
@@ -47,7 +47,7 @@ namespace MQTTnet.Server.Mqtt
MqttSubscriptionInterceptor mqttSubscriptionInterceptor,
MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor,
MqttServerStorage mqttServerStorage,
- PythonScriptHostService pythonScriptHostService,
+ PythonScriptHostService pythonScriptHostService,
ILogger logger)
{
_settings = mqttSettings ?? throw new ArgumentNullException(nameof(mqttSettings));
@@ -179,7 +179,7 @@ namespace MQTTnet.Server.Mqtt
.WithApplicationMessageInterceptor(_mqttApplicationMessageInterceptor)
.WithSubscriptionInterceptor(_mqttSubscriptionInterceptor)
.WithStorage(_mqttServerStorage);
-
+
// Configure unencrypted connections
if (_settings.TcpEndPoint.Enabled)
{
@@ -210,9 +210,23 @@ namespace MQTTnet.Server.Mqtt
{
options
.WithEncryptedEndpoint()
- .WithEncryptionSslProtocol(SslProtocols.Tls12)
- .WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.ReadCertificate());
+ .WithEncryptionSslProtocol(SslProtocols.Tls12);
+
+ if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Path))
+ {
+ IMqttServerCertificateCredentials certificateCredentials = null;
+ if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Password))
+ {
+ certificateCredentials = new MqttServerCertificateCredentials
+ {
+ Password = _settings.EncryptedTcpEndPoint.Certificate.Password
+ };
+ }
+
+ options.WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.Certificate.ReadCertificate(), certificateCredentials);
+ }
+
if (_settings.EncryptedTcpEndPoint.TryReadIPv4(out var address4))
{
options.WithEncryptedEndpointBoundIPAddress(address4);
diff --git a/Source/MQTTnet.Server/appsettings.json b/Source/MQTTnet.Server/appsettings.json
index 8ea10d6..71eaf20 100644
--- a/Source/MQTTnet.Server/appsettings.json
+++ b/Source/MQTTnet.Server/appsettings.json
@@ -27,7 +27,10 @@
"IPv4": "*",
"IPv6": "*",
"Port": 8883,
- "CertificatePath": "/absolute/path/to/pfx"
+ "Certificate": {
+ "Path": "/absolute/path/to/pfx",
+ "Password": ""
+ }
},
"WebSocketEndPoint": {
"Enabled": true,
@@ -63,4 +66,4 @@
}
},
"AllowedHosts": "*"
-}
+}
\ No newline at end of file
diff --git a/Source/MQTTnet/Formatter/IMqttPacketWriter.cs b/Source/MQTTnet/Formatter/IMqttPacketWriter.cs
index 5ea6ca3..bae6dec 100644
--- a/Source/MQTTnet/Formatter/IMqttPacketWriter.cs
+++ b/Source/MQTTnet/Formatter/IMqttPacketWriter.cs
@@ -5,15 +5,25 @@
int Length { get; }
void WriteWithLengthPrefix(string value);
- void Write(byte returnCode);
- void WriteWithLengthPrefix(byte[] payload);
- void Write(ushort keepAlivePeriod);
- void Write(IMqttPacketWriter propertyWriter);
- void WriteVariableLengthInteger(uint length);
- void Write(byte[] payload, int v, int length);
- void Reset(int v);
- void Seek(int v);
+
+ void Write(byte value);
+
+ void WriteWithLengthPrefix(byte[] value);
+
+ void Write(ushort value);
+
+ void Write(IMqttPacketWriter value);
+
+ void WriteVariableLengthInteger(uint value);
+
+ void Write(byte[] value, int offset, int length);
+
+ void Reset(int length);
+
+ void Seek(int offset);
+
void FreeBuffer();
+
byte[] GetBuffer();
}
}
diff --git a/Source/MQTTnet/Formatter/MqttPacketReader.cs b/Source/MQTTnet/Formatter/MqttPacketReader.cs
index 2589c5b..61698a1 100644
--- a/Source/MQTTnet/Formatter/MqttPacketReader.cs
+++ b/Source/MQTTnet/Formatter/MqttPacketReader.cs
@@ -21,8 +21,6 @@ namespace MQTTnet.Formatter
{
// The MQTT fixed header contains 1 byte of flags and at least 1 byte for the remaining data length.
// So in all cases at least 2 bytes must be read for a complete MQTT packet.
- // async/await is used here because the next packet is received in a couple of minutes so the performance
- // impact is acceptable according to a useless waiting thread.
var buffer = fixedHeaderBuffer;
var totalBytesRead = 0;
@@ -55,16 +53,7 @@ namespace MQTTnet.Formatter
};
}
-#if WINDOWS_UWP
- // UWP will have a dead lock when calling this not async.
var bodyLength = await ReadBodyLengthAsync(buffer[1], cancellationToken).ConfigureAwait(false);
-#else
- // Here the async/await pattern is not used because the overhead of context switches
- // is too big for reading 1 byte in a row. We expect that the remaining data was sent
- // directly after the initial bytes. If the client disconnects just in this moment we
- // will get an exception anyway.
- var bodyLength = ReadBodyLength(buffer[1], cancellationToken);
-#endif
if (!bodyLength.HasValue)
{
@@ -81,49 +70,6 @@ namespace MQTTnet.Formatter
};
}
-#if !WINDOWS_UWP
- private int? ReadBodyLength(byte initialEncodedByte, CancellationToken cancellationToken)
- {
- var offset = 0;
- var multiplier = 128;
- var value = initialEncodedByte & 127;
- int encodedByte = initialEncodedByte;
-
- while ((encodedByte & 128) != 0)
- {
- offset++;
- if (offset > 3)
- {
- throw new MqttProtocolViolationException("Remaining length is invalid.");
- }
-
- if (cancellationToken.IsCancellationRequested)
- {
- return null;
- }
-
- var readCount = _channel.ReadAsync(_singleByteBuffer, 0, 1, cancellationToken).GetAwaiter().GetResult();
-
- if (cancellationToken.IsCancellationRequested)
- {
- return null;
- }
-
- if (readCount == 0)
- {
- return null;
- }
-
- encodedByte = _singleByteBuffer[0];
-
- value += (encodedByte & 127) * multiplier;
- multiplier *= 128;
- }
-
- return value;
- }
-#else
-
private async Task ReadBodyLengthAsync(byte initialEncodedByte, CancellationToken cancellationToken)
{
var offset = 0;
@@ -164,6 +110,5 @@ namespace MQTTnet.Formatter
return value;
}
-#endif
}
}
diff --git a/Source/MQTTnet/Formatter/V5/MqttV500PacketEncoder.cs b/Source/MQTTnet/Formatter/V5/MqttV500PacketEncoder.cs
index 23b7f1a..9ebc740 100644
--- a/Source/MQTTnet/Formatter/V5/MqttV500PacketEncoder.cs
+++ b/Source/MQTTnet/Formatter/V5/MqttV500PacketEncoder.cs
@@ -328,10 +328,7 @@ namespace MQTTnet.Formatter.V5
{
ThrowReasonCodeNotSetException();
}
-
- packetWriter.Write(packet.PacketIdentifier.Value);
- packetWriter.Write((byte)packet.ReasonCode.Value);
-
+
var propertiesWriter = new MqttV500PropertiesWriter();
if (packet.Properties != null)
{
@@ -339,6 +336,8 @@ namespace MQTTnet.Formatter.V5
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
}
+ packetWriter.Write(packet.PacketIdentifier.Value);
+
if (packetWriter.Length > 0 || packet.ReasonCode.Value != MqttPubRecReasonCode.Success)
{
packetWriter.Write((byte)packet.ReasonCode.Value);
diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs
index 12fd2bb..d7943ad 100644
--- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs
+++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs
@@ -86,7 +86,7 @@ namespace MQTTnet.Implementations
var sslStream = new SslStream(networkStream, false, InternalUserCertificateValidationCallback);
_stream = sslStream;
- await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
+ await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, !_options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
}
else
{
diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
index e3dcab8..d7f4e6f 100644
--- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
+++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
@@ -41,14 +41,24 @@ namespace MQTTnet.Implementations
RegisterListeners(options.DefaultEndpointOptions, null, _cancellationTokenSource.Token);
}
- if (options.TlsEndpointOptions.IsEnabled)
+ if (options.TlsEndpointOptions?.IsEnabled == true)
{
if (options.TlsEndpointOptions.Certificate == null)
{
throw new ArgumentException("TLS certificate is not set.");
}
- var tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate, options.TlsEndpointOptions.CertificateCredentials.Password);
+ X509Certificate2 tlsCertificate;
+ if (string.IsNullOrEmpty(options.TlsEndpointOptions.CertificateCredentials?.Password))
+ {
+ // Use a different overload when no password is specified. Otherwise the constructor will fail.
+ tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate);
+ }
+ else
+ {
+ tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate, options.TlsEndpointOptions.CertificateCredentials.Password);
+ }
+
if (!tlsCertificate.HasPrivateKey)
{
throw new InvalidOperationException("The certificate for TLS encryption must contain the private key.");
diff --git a/Source/MQTTnet/Server/IMqttServerCertificateCredentials.cs b/Source/MQTTnet/Server/IMqttServerCertificateCredentials.cs
new file mode 100644
index 0000000..3f5fe0e
--- /dev/null
+++ b/Source/MQTTnet/Server/IMqttServerCertificateCredentials.cs
@@ -0,0 +1,4 @@
+public interface IMqttServerCertificateCredentials
+{
+ string Password { get; }
+}
diff --git a/Source/MQTTnet/Server/IMqttServerCredentials.cs b/Source/MQTTnet/Server/IMqttServerCredentials.cs
deleted file mode 100644
index 5e75be9..0000000
--- a/Source/MQTTnet/Server/IMqttServerCredentials.cs
+++ /dev/null
@@ -1,6 +0,0 @@
-using System;
-
-public interface IMqttServerCredentials
-{
- String Password { get; }
-}
diff --git a/Source/MQTTnet/Server/MqttServerCertificateCredentials.cs b/Source/MQTTnet/Server/MqttServerCertificateCredentials.cs
new file mode 100644
index 0000000..05b6c5f
--- /dev/null
+++ b/Source/MQTTnet/Server/MqttServerCertificateCredentials.cs
@@ -0,0 +1,7 @@
+namespace MQTTnet.Server
+{
+ public class MqttServerCertificateCredentials : IMqttServerCertificateCredentials
+ {
+ public string Password { get; set; }
+ }
+}
diff --git a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs
index 5991e7d..c25af84 100644
--- a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs
+++ b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs
@@ -82,7 +82,7 @@ namespace MQTTnet.Server
return this;
}
- public MqttServerOptionsBuilder WithEncryptionCertificate(byte[] value, IMqttServerCredentials credentials = null)
+ public MqttServerOptionsBuilder WithEncryptionCertificate(byte[] value, IMqttServerCertificateCredentials credentials = null)
{
_options.TlsEndpointOptions.Certificate = value;
_options.TlsEndpointOptions.CertificateCredentials = credentials;
diff --git a/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs b/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs
index e92d987..9bf325b 100644
--- a/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs
+++ b/Source/MQTTnet/Server/MqttServerTlsTcpEndpointOptions.cs
@@ -12,7 +12,7 @@ namespace MQTTnet.Server
public byte[] Certificate { get; set; }
- public IMqttServerCredentials CertificateCredentials { get; set; }
+ public IMqttServerCertificateCredentials CertificateCredentials { get; set; }
public bool ClientCertificateRequired { get; set; }
diff --git a/Tests/MQTTnet.AspNetCore.Tests/MQTTnet.AspNetCore.Tests.csproj b/Tests/MQTTnet.AspNetCore.Tests/MQTTnet.AspNetCore.Tests.csproj
index 06f0e5f..4569af3 100644
--- a/Tests/MQTTnet.AspNetCore.Tests/MQTTnet.AspNetCore.Tests.csproj
+++ b/Tests/MQTTnet.AspNetCore.Tests/MQTTnet.AspNetCore.Tests.csproj
@@ -2,12 +2,11 @@
netcoreapp2.1
-
false
-
+
diff --git a/Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj b/Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj
index 74937a9..830051d 100644
--- a/Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj
+++ b/Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj
@@ -1,15 +1,14 @@
- Exe
- Full
netcoreapp2.1
+ false
-
+
diff --git a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs
index adf9a2b..fe1a0c5 100644
--- a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs
@@ -14,7 +14,7 @@ namespace MQTTnet.Tests
public class MqttKeepAliveMonitor_Tests
{
[TestMethod]
- public void KeepAlive_Timeout()
+ public async Task KeepAlive_Timeout()
{
var counter = 0;
@@ -31,13 +31,13 @@ namespace MQTTnet.Tests
Assert.AreEqual(0, counter);
- Thread.Sleep(2000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification.
+ await Task.Delay(2000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification.
Assert.AreEqual(1, counter);
}
[TestMethod]
- public void KeepAlive_NoTimeout()
+ public async Task KeepAlive_NoTimeout()
{
var counter = 0;
@@ -55,15 +55,15 @@ namespace MQTTnet.Tests
Assert.AreEqual(0, counter);
// Simulate traffic.
- Thread.Sleep(1000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification.
+ await Task.Delay(1000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification.
monitor.PacketReceived();
- Thread.Sleep(1000);
+ await Task.Delay(1000);
monitor.PacketReceived();
- Thread.Sleep(1000);
+ await Task.Delay(1000);
Assert.AreEqual(0, counter);
- Thread.Sleep(2000);
+ await Task.Delay(2000);
Assert.AreEqual(1, counter);
}
diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializer_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializer_Tests.cs
index b67d2a2..a1f798a 100644
--- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializer_Tests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializer_Tests.cs
@@ -8,6 +8,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Adapter;
using MQTTnet.Formatter;
using MQTTnet.Formatter.V3;
+using MQTTnet.Formatter.V5;
using MQTTnet.Internal;
using MQTTnet.Packets;
using MQTTnet.Protocol;
@@ -258,6 +259,26 @@ namespace MQTTnet.Tests
Assert.IsTrue(deserialized.Properties.UserProperties.Any(x => x.Name == "Foo"));
}
+
+ [TestMethod]
+ public void SerializeV500_MqttPublishPacket_CorrelationData()
+ {
+ var data = "123456789";
+ var req = new MqttApplicationMessageBuilder()
+ .WithTopic("Foo")
+ .WithResponseTopic($"_")
+ .WithCorrelationData(Guid.NewGuid().ToByteArray())
+ .WithPayload(data)
+ .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
+ .Build();
+
+ var p = new MqttV500DataConverter().CreatePublishPacket(req);
+
+ var deserialized = Roundtrip(p, MqttProtocolVersion.V500);
+
+ Assert.IsTrue(p.Payload.SequenceEqual(deserialized.Payload));
+ }
+
[TestMethod]
public void DeserializeV311_MqttPublishPacket()
{
diff --git a/Tests/MQTTnet.Core.Tests/RPC_Tests.cs b/Tests/MQTTnet.Core.Tests/RPC_Tests.cs
index 9f03172..a420697 100644
--- a/Tests/MQTTnet.Core.Tests/RPC_Tests.cs
+++ b/Tests/MQTTnet.Core.Tests/RPC_Tests.cs
@@ -10,6 +10,8 @@ using MQTTnet.Extensions.Rpc;
using MQTTnet.Protocol;
using MQTTnet.Client.Options;
using MQTTnet.Formatter;
+using MQTTnet.Extensions.Rpc.Options;
+using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
namespace MQTTnet.Tests
{
@@ -62,7 +64,22 @@ namespace MQTTnet.Tests
var requestSender = await testEnvironment.ConnectClientAsync();
- var rpcClient = new MqttRpcClient(requestSender);
+ var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build());
+ await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(MqttCommunicationTimedOutException))]
+ public async Task Execute_With_Custom_Topic_Names()
+ {
+ using (var testEnvironment = new TestEnvironment())
+ {
+ await testEnvironment.StartServerAsync();
+
+ var requestSender = await testEnvironment.ConnectClientAsync();
+
+ var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().WithTopicGenerationStrategy(new TestTopicStrategy()) .Build());
await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
}
}
@@ -82,11 +99,23 @@ namespace MQTTnet.Tests
var requestSender = await testEnvironment.ConnectClientAsync();
- var rpcClient = new MqttRpcClient(requestSender);
+ var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build());
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", qosLevel);
Assert.AreEqual("pong", Encoding.UTF8.GetString(response));
}
}
+
+ private class TestTopicStrategy : IMqttRpcClientTopicGenerationStrategy
+ {
+ public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context)
+ {
+ return new MqttRpcTopicPair
+ {
+ RequestTopic = "a",
+ ResponseTopic = "b"
+ };
+ }
+ }
}
}
diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs
index d10ebaf..b2b3b70 100644
--- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs
+++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs
@@ -1063,7 +1063,9 @@ namespace MQTTnet.Tests
// forever. This is security related.
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await client.ConnectAsync("localhost", testEnvironment.ServerPort);
- await client.SendAsync(Encoding.UTF8.GetBytes("Garbage"), SocketFlags.None);
+
+ var buffer = Encoding.UTF8.GetBytes("Garbage");
+ client.Send(buffer, buffer.Length, SocketFlags.None);
await Task.Delay(TimeSpan.FromSeconds(3));