@@ -1,6 +1,6 @@ | |||||
--- | --- | ||||
name: Bug report | name: Bug report | ||||
about: Create a report to help us improve | |||||
about: Create a report to help us improve. | |||||
title: '' | title: '' | ||||
labels: '' | labels: '' | ||||
assignees: '' | assignees: '' | ||||
@@ -11,11 +11,12 @@ assignees: '' | |||||
A clear and concise description of what the bug is. | A clear and concise description of what the bug is. | ||||
### Which project is your bug related to? | ### Which project is your bug related to? | ||||
- [x] Client | |||||
- [ ] ManagedClient | |||||
- [ ] MQTTnet.Server standalone | |||||
- [ ] Server | |||||
- [ ] Generic | |||||
<!-- Remove the items which don't apply from the following list --> | |||||
- Client | |||||
- ManagedClient | |||||
- MQTTnet.Server standalone | |||||
- Server | |||||
- Generic | |||||
### To Reproduce | ### To Reproduce | ||||
Steps to reproduce the behavior: | Steps to reproduce the behavior: | ||||
@@ -35,9 +36,11 @@ Add any other context about the problem here. | |||||
Include debugging or logging information here: | Include debugging or logging information here: | ||||
```batch | ```batch | ||||
\\ Put your logging output here. | |||||
``` | ``` | ||||
### Code example | ### Code example | ||||
Please provide full code examples below where possible to make it easier for the developers to check your issues. | Please provide full code examples below where possible to make it easier for the developers to check your issues. | ||||
```csharp | ```csharp | ||||
\\ Put your code here. | |||||
``` | ``` |
@@ -1,6 +1,6 @@ | |||||
--- | --- | ||||
name: Custom issue template | name: Custom issue template | ||||
about: Describe this issue template's purpose here. | |||||
about: Do you have a question related to the project? Use this template. | |||||
title: '' | title: '' | ||||
labels: '' | labels: '' | ||||
assignees: '' | assignees: '' | ||||
@@ -10,9 +10,10 @@ assignees: '' | |||||
### Describe your question | ### Describe your question | ||||
A clear and concise description of what you want to know. | A clear and concise description of what you want to know. | ||||
### Which project is your bug related to? | |||||
- [x] Client | |||||
- [ ] ManagedClient | |||||
- [ ] MQTTnet.Server standalone | |||||
- [ ] Server | |||||
- [ ] Generic | |||||
### Which project is your question related to? | |||||
<!-- Remove the items which don't apply from the following list --> | |||||
- Client | |||||
- ManagedClient | |||||
- MQTTnet.Server standalone | |||||
- Server | |||||
- Generic |
@@ -1,6 +1,6 @@ | |||||
--- | --- | ||||
name: Feature request | name: Feature request | ||||
about: Suggest an idea for this project | |||||
about: Suggest an idea for this project. | |||||
title: '' | title: '' | ||||
labels: '' | labels: '' | ||||
assignees: '' | assignees: '' | ||||
@@ -12,11 +12,12 @@ Is your feature request related to a problem? Please describe. | |||||
A clear and concise description of what the problem is. Example. I'm am trying to do [...] but [...] | A clear and concise description of what the problem is. Example. I'm am trying to do [...] but [...] | ||||
### Which project is your feature request related to? | ### Which project is your feature request related to? | ||||
- [x] Client | |||||
- [ ] ManagedClient | |||||
- [ ] MQTTnet.Server standalone | |||||
- [ ] Server | |||||
- [ ] Generic | |||||
<!-- Remove the items which don't apply from the following list --> | |||||
- Client | |||||
- ManagedClient | |||||
- MQTTnet.Server standalone | |||||
- Server | |||||
- Generic | |||||
### Describe the solution you'd like | ### Describe the solution you'd like | ||||
A clear and concise description of what you want to happen. | A clear and concise description of what you want to happen. | ||||
@@ -19,6 +19,7 @@ | |||||
* [Server] Fixed: Sending Large packets with AspnetCore based connection throws System.ArgumentException. | * [Server] Fixed: Sending Large packets with AspnetCore based connection throws System.ArgumentException. | ||||
* [Server] Fixed wrong usage of socket option _NoDelay_. | * [Server] Fixed wrong usage of socket option _NoDelay_. | ||||
* [Server] Added remote certificate validation callback (thanks to @rudacs). | * [Server] Added remote certificate validation callback (thanks to @rudacs). | ||||
* [Server] Add support for certificate passwords (thanks to @cslutgen). | |||||
* [MQTTnet.Server] Added REST API for publishing basic messages. | * [MQTTnet.Server] Added REST API for publishing basic messages. | ||||
</releaseNotes> | </releaseNotes> | ||||
<copyright>Copyright Christian Kratky 2016-2019</copyright> | <copyright>Copyright Christian Kratky 2016-2019</copyright> | ||||
@@ -0,0 +1,35 @@ | |||||
using System.IO; | |||||
namespace MQTTnet.Server.Configuration | |||||
{ | |||||
public class CertificateSettingsModel | |||||
{ | |||||
/// <summary> | |||||
/// Path to certificate. | |||||
/// </summary> | |||||
public string Path { get; set; } | |||||
/// <summary> | |||||
/// Password of certificate. | |||||
/// </summary> | |||||
public string Password { get; set; } | |||||
/// <summary> | |||||
/// Read certificate file. | |||||
/// </summary> | |||||
public byte[] ReadCertificate() | |||||
{ | |||||
if (string.IsNullOrEmpty(Path) || string.IsNullOrWhiteSpace(Path)) | |||||
{ | |||||
throw new FileNotFoundException("No path set"); | |||||
} | |||||
if (!File.Exists(Path)) | |||||
{ | |||||
throw new FileNotFoundException($"Could not find Certificate in path: {Path}"); | |||||
} | |||||
return File.ReadAllBytes(Path); | |||||
} | |||||
} | |||||
} |
@@ -9,9 +9,9 @@ namespace MQTTnet.Server.Configuration | |||||
public class TcpEndPointModel | public class TcpEndPointModel | ||||
{ | { | ||||
/// <summary> | /// <summary> | ||||
/// Path to Certificate | |||||
/// Certificate settings. | |||||
/// </summary> | /// </summary> | ||||
public string CertificatePath { get; set; } | |||||
public CertificateSettingsModel Certificate { get; set; } | |||||
/// <summary> | /// <summary> | ||||
/// Enabled / Disable | /// Enabled / Disable | ||||
@@ -33,25 +33,6 @@ namespace MQTTnet.Server.Configuration | |||||
/// </summary> | /// </summary> | ||||
public int Port { get; set; } = 1883; | public int Port { get; set; } = 1883; | ||||
/// <summary> | |||||
/// Read Certificate file | |||||
/// </summary> | |||||
/// <returns></returns> | |||||
public byte[] ReadCertificate() | |||||
{ | |||||
if (string.IsNullOrEmpty(CertificatePath) || string.IsNullOrWhiteSpace(CertificatePath)) | |||||
{ | |||||
throw new FileNotFoundException("No path set"); | |||||
} | |||||
if (!File.Exists(CertificatePath)) | |||||
{ | |||||
throw new FileNotFoundException($"Could not find Certificate in path: {CertificatePath}"); | |||||
} | |||||
return File.ReadAllBytes(CertificatePath); | |||||
} | |||||
/// <summary> | /// <summary> | ||||
/// Read IPv4 | /// Read IPv4 | ||||
/// </summary> | /// </summary> | ||||
@@ -47,7 +47,7 @@ namespace MQTTnet.Server.Mqtt | |||||
MqttSubscriptionInterceptor mqttSubscriptionInterceptor, | MqttSubscriptionInterceptor mqttSubscriptionInterceptor, | ||||
MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor, | MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor, | ||||
MqttServerStorage mqttServerStorage, | MqttServerStorage mqttServerStorage, | ||||
PythonScriptHostService pythonScriptHostService, | |||||
PythonScriptHostService pythonScriptHostService, | |||||
ILogger<MqttServerService> logger) | ILogger<MqttServerService> logger) | ||||
{ | { | ||||
_settings = mqttSettings ?? throw new ArgumentNullException(nameof(mqttSettings)); | _settings = mqttSettings ?? throw new ArgumentNullException(nameof(mqttSettings)); | ||||
@@ -179,7 +179,7 @@ namespace MQTTnet.Server.Mqtt | |||||
.WithApplicationMessageInterceptor(_mqttApplicationMessageInterceptor) | .WithApplicationMessageInterceptor(_mqttApplicationMessageInterceptor) | ||||
.WithSubscriptionInterceptor(_mqttSubscriptionInterceptor) | .WithSubscriptionInterceptor(_mqttSubscriptionInterceptor) | ||||
.WithStorage(_mqttServerStorage); | .WithStorage(_mqttServerStorage); | ||||
// Configure unencrypted connections | // Configure unencrypted connections | ||||
if (_settings.TcpEndPoint.Enabled) | if (_settings.TcpEndPoint.Enabled) | ||||
{ | { | ||||
@@ -210,9 +210,23 @@ namespace MQTTnet.Server.Mqtt | |||||
{ | { | ||||
options | options | ||||
.WithEncryptedEndpoint() | .WithEncryptedEndpoint() | ||||
.WithEncryptionSslProtocol(SslProtocols.Tls12) | |||||
.WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.ReadCertificate()); | |||||
.WithEncryptionSslProtocol(SslProtocols.Tls12); | |||||
if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Path)) | |||||
{ | |||||
IMqttServerCertificateCredentials certificateCredentials = null; | |||||
if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Password)) | |||||
{ | |||||
certificateCredentials = new MqttServerCertificateCredentials | |||||
{ | |||||
Password = _settings.EncryptedTcpEndPoint.Certificate.Password | |||||
}; | |||||
} | |||||
options.WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.Certificate.ReadCertificate(), certificateCredentials); | |||||
} | |||||
if (_settings.EncryptedTcpEndPoint.TryReadIPv4(out var address4)) | if (_settings.EncryptedTcpEndPoint.TryReadIPv4(out var address4)) | ||||
{ | { | ||||
options.WithEncryptedEndpointBoundIPAddress(address4); | options.WithEncryptedEndpointBoundIPAddress(address4); | ||||
@@ -27,7 +27,10 @@ | |||||
"IPv4": "*", | "IPv4": "*", | ||||
"IPv6": "*", | "IPv6": "*", | ||||
"Port": 8883, | "Port": 8883, | ||||
"CertificatePath": "/absolute/path/to/pfx" | |||||
"Certificate": { | |||||
"Path": "/absolute/path/to/pfx", | |||||
"Password": "" | |||||
} | |||||
}, | }, | ||||
"WebSocketEndPoint": { | "WebSocketEndPoint": { | ||||
"Enabled": true, | "Enabled": true, | ||||
@@ -63,4 +66,4 @@ | |||||
} | } | ||||
}, | }, | ||||
"AllowedHosts": "*" | "AllowedHosts": "*" | ||||
} | |||||
} |
@@ -139,6 +139,13 @@ namespace MQTTnet.Client.Options | |||||
return this; | return this; | ||||
} | } | ||||
public MqttClientOptionsBuilder WithCredentials(IMqttClientCredentials credentials) | |||||
{ | |||||
_options.Credentials = credentials; | |||||
return this; | |||||
} | |||||
public MqttClientOptionsBuilder WithExtendedAuthenticationExchangeHandler(IMqttExtendedAuthenticationExchangeHandler handler) | public MqttClientOptionsBuilder WithExtendedAuthenticationExchangeHandler(IMqttExtendedAuthenticationExchangeHandler handler) | ||||
{ | { | ||||
_options.ExtendedAuthenticationExchangeHandler = handler; | _options.ExtendedAuthenticationExchangeHandler = handler; | ||||
@@ -21,8 +21,6 @@ namespace MQTTnet.Formatter | |||||
{ | { | ||||
// The MQTT fixed header contains 1 byte of flags and at least 1 byte for the remaining data length. | // The MQTT fixed header contains 1 byte of flags and at least 1 byte for the remaining data length. | ||||
// So in all cases at least 2 bytes must be read for a complete MQTT packet. | // So in all cases at least 2 bytes must be read for a complete MQTT packet. | ||||
// async/await is used here because the next packet is received in a couple of minutes so the performance | |||||
// impact is acceptable according to a useless waiting thread. | |||||
var buffer = fixedHeaderBuffer; | var buffer = fixedHeaderBuffer; | ||||
var totalBytesRead = 0; | var totalBytesRead = 0; | ||||
@@ -55,16 +53,7 @@ namespace MQTTnet.Formatter | |||||
}; | }; | ||||
} | } | ||||
#if WINDOWS_UWP | |||||
// UWP will have a dead lock when calling this not async. | |||||
var bodyLength = await ReadBodyLengthAsync(buffer[1], cancellationToken).ConfigureAwait(false); | var bodyLength = await ReadBodyLengthAsync(buffer[1], cancellationToken).ConfigureAwait(false); | ||||
#else | |||||
// Here the async/await pattern is not used because the overhead of context switches | |||||
// is too big for reading 1 byte in a row. We expect that the remaining data was sent | |||||
// directly after the initial bytes. If the client disconnects just in this moment we | |||||
// will get an exception anyway. | |||||
var bodyLength = ReadBodyLength(buffer[1], cancellationToken); | |||||
#endif | |||||
if (!bodyLength.HasValue) | if (!bodyLength.HasValue) | ||||
{ | { | ||||
@@ -81,49 +70,6 @@ namespace MQTTnet.Formatter | |||||
}; | }; | ||||
} | } | ||||
#if !WINDOWS_UWP | |||||
private int? ReadBodyLength(byte initialEncodedByte, CancellationToken cancellationToken) | |||||
{ | |||||
var offset = 0; | |||||
var multiplier = 128; | |||||
var value = initialEncodedByte & 127; | |||||
int encodedByte = initialEncodedByte; | |||||
while ((encodedByte & 128) != 0) | |||||
{ | |||||
offset++; | |||||
if (offset > 3) | |||||
{ | |||||
throw new MqttProtocolViolationException("Remaining length is invalid."); | |||||
} | |||||
if (cancellationToken.IsCancellationRequested) | |||||
{ | |||||
return null; | |||||
} | |||||
var readCount = _channel.ReadAsync(_singleByteBuffer, 0, 1, cancellationToken).GetAwaiter().GetResult(); | |||||
if (cancellationToken.IsCancellationRequested) | |||||
{ | |||||
return null; | |||||
} | |||||
if (readCount == 0) | |||||
{ | |||||
return null; | |||||
} | |||||
encodedByte = _singleByteBuffer[0]; | |||||
value += (encodedByte & 127) * multiplier; | |||||
multiplier *= 128; | |||||
} | |||||
return value; | |||||
} | |||||
#else | |||||
private async Task<int?> ReadBodyLengthAsync(byte initialEncodedByte, CancellationToken cancellationToken) | private async Task<int?> ReadBodyLengthAsync(byte initialEncodedByte, CancellationToken cancellationToken) | ||||
{ | { | ||||
var offset = 0; | var offset = 0; | ||||
@@ -164,6 +110,5 @@ namespace MQTTnet.Formatter | |||||
return value; | return value; | ||||
} | } | ||||
#endif | |||||
} | } | ||||
} | } |
@@ -48,7 +48,7 @@ namespace MQTTnet.Implementations | |||||
throw new ArgumentException("TLS certificate is not set."); | throw new ArgumentException("TLS certificate is not set."); | ||||
} | } | ||||
var tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate); | |||||
var tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate, options.TlsEndpointOptions.CertificateCredentials.Password); | |||||
if (!tlsCertificate.HasPrivateKey) | if (!tlsCertificate.HasPrivateKey) | ||||
{ | { | ||||
throw new InvalidOperationException("The certificate for TLS encryption must contain the private key."); | throw new InvalidOperationException("The certificate for TLS encryption must contain the private key."); | ||||
@@ -41,13 +41,13 @@ | |||||
<DefineConstants>RELEASE;NETSTANDARD1_3</DefineConstants> | <DefineConstants>RELEASE;NETSTANDARD1_3</DefineConstants> | ||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup Condition="'$(TargetFramework)'=='netstandard2.0'"> | |||||
<ItemGroup Condition="'$(TargetFramework)'=='netstandard1.3'"> | |||||
<PackageReference Include="System.Net.Security" Version="4.3.2" /> | <PackageReference Include="System.Net.Security" Version="4.3.2" /> | ||||
<PackageReference Include="System.Net.WebSockets" Version="4.3.0" /> | <PackageReference Include="System.Net.WebSockets" Version="4.3.0" /> | ||||
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.2" /> | <PackageReference Include="System.Net.WebSockets.Client" Version="4.3.2" /> | ||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup Condition="'$(TargetFramework)'=='netstandard1.3'"> | |||||
<ItemGroup Condition="'$(TargetFramework)'=='netstandard2.0'"> | |||||
<PackageReference Include="System.Net.Security" Version="4.3.2" /> | <PackageReference Include="System.Net.Security" Version="4.3.2" /> | ||||
<PackageReference Include="System.Net.WebSockets" Version="4.3.0" /> | <PackageReference Include="System.Net.WebSockets" Version="4.3.0" /> | ||||
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.2" /> | <PackageReference Include="System.Net.WebSockets.Client" Version="4.3.2" /> | ||||
@@ -0,0 +1,4 @@ | |||||
public interface IMqttServerCertificateCredentials | |||||
{ | |||||
string Password { get; } | |||||
} |
@@ -0,0 +1,7 @@ | |||||
namespace MQTTnet.Server | |||||
{ | |||||
public class MqttServerCertificateCredentials : IMqttServerCertificateCredentials | |||||
{ | |||||
public string Password { get; set; } | |||||
} | |||||
} |
@@ -82,9 +82,10 @@ namespace MQTTnet.Server | |||||
return this; | return this; | ||||
} | } | ||||
public MqttServerOptionsBuilder WithEncryptionCertificate(byte[] value) | |||||
public MqttServerOptionsBuilder WithEncryptionCertificate(byte[] value, IMqttServerCertificateCredentials credentials = null) | |||||
{ | { | ||||
_options.TlsEndpointOptions.Certificate = value; | _options.TlsEndpointOptions.Certificate = value; | ||||
_options.TlsEndpointOptions.CertificateCredentials = credentials; | |||||
return this; | return this; | ||||
} | } | ||||
@@ -94,6 +95,16 @@ namespace MQTTnet.Server | |||||
return this; | return this; | ||||
} | } | ||||
#if !WINDOWS_UWP | |||||
public MqttServerOptionsBuilder WithClientCertificate(RemoteCertificateValidationCallback validationCallback = null, bool checkCertificateRevocation = false) | |||||
{ | |||||
_options.TlsEndpointOptions.ClientCertificateRequired = true; | |||||
_options.TlsEndpointOptions.CheckCertificateRevocation = checkCertificateRevocation; | |||||
_options.TlsEndpointOptions.RemoteCertificateValidationCallback = validationCallback; | |||||
return this; | |||||
} | |||||
#endif | |||||
public MqttServerOptionsBuilder WithoutEncryptedEndpoint() | public MqttServerOptionsBuilder WithoutEncryptedEndpoint() | ||||
{ | { | ||||
_options.TlsEndpointOptions.IsEnabled = false; | _options.TlsEndpointOptions.IsEnabled = false; | ||||
@@ -107,7 +118,7 @@ namespace MQTTnet.Server | |||||
return this; | return this; | ||||
} | } | ||||
#endif | #endif | ||||
public MqttServerOptionsBuilder WithStorage(IMqttServerStorage value) | public MqttServerOptionsBuilder WithStorage(IMqttServerStorage value) | ||||
{ | { | ||||
_options.Storage = value; | _options.Storage = value; | ||||
@@ -12,6 +12,8 @@ namespace MQTTnet.Server | |||||
public byte[] Certificate { get; set; } | public byte[] Certificate { get; set; } | ||||
public IMqttServerCertificateCredentials CertificateCredentials { get; set; } | |||||
public bool ClientCertificateRequired { get; set; } | public bool ClientCertificateRequired { get; set; } | ||||
public bool CheckCertificateRevocation { get; set; } | public bool CheckCertificateRevocation { get; set; } | ||||
@@ -8,6 +8,8 @@ using MQTTnet.Client.Receiving; | |||||
using MQTTnet.Exceptions; | using MQTTnet.Exceptions; | ||||
using MQTTnet.Extensions.Rpc; | using MQTTnet.Extensions.Rpc; | ||||
using MQTTnet.Protocol; | using MQTTnet.Protocol; | ||||
using MQTTnet.Client.Options; | |||||
using MQTTnet.Formatter; | |||||
namespace MQTTnet.Tests | namespace MQTTnet.Tests | ||||
{ | { | ||||
@@ -15,26 +17,39 @@ namespace MQTTnet.Tests | |||||
public class RPC_Tests | public class RPC_Tests | ||||
{ | { | ||||
[TestMethod] | [TestMethod] | ||||
public async Task Execute_Success() | |||||
public Task Execute_Success_With_QoS_0() | |||||
{ | { | ||||
using (var testEnvironment = new TestEnvironment()) | |||||
{ | |||||
await testEnvironment.StartServerAsync(); | |||||
var responseSender = await testEnvironment.ConnectClientAsync(); | |||||
await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping"); | |||||
return Execute_Success(MqttQualityOfServiceLevel.AtMostOnce, MqttProtocolVersion.V311); | |||||
} | |||||
responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e => | |||||
{ | |||||
await responseSender.PublishAsync(e.ApplicationMessage.Topic + "/response", "pong"); | |||||
}); | |||||
[TestMethod] | |||||
public Task Execute_Success_With_QoS_1() | |||||
{ | |||||
return Execute_Success(MqttQualityOfServiceLevel.AtLeastOnce, MqttProtocolVersion.V311); | |||||
} | |||||
var requestSender = await testEnvironment.ConnectClientAsync(); | |||||
[TestMethod] | |||||
public Task Execute_Success_With_QoS_2() | |||||
{ | |||||
return Execute_Success(MqttQualityOfServiceLevel.ExactlyOnce, MqttProtocolVersion.V311); | |||||
} | |||||
var rpcClient = new MqttRpcClient(requestSender); | |||||
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); | |||||
[TestMethod] | |||||
public Task Execute_Success_With_QoS_0_MQTT_V5() | |||||
{ | |||||
return Execute_Success(MqttQualityOfServiceLevel.AtMostOnce, MqttProtocolVersion.V500); | |||||
} | |||||
Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); | |||||
} | |||||
[TestMethod] | |||||
public Task Execute_Success_With_QoS_1_MQTT_V5() | |||||
{ | |||||
return Execute_Success(MqttQualityOfServiceLevel.AtLeastOnce, MqttProtocolVersion.V500); | |||||
} | |||||
[TestMethod] | |||||
public Task Execute_Success_With_QoS_2_MQTT_V5() | |||||
{ | |||||
return Execute_Success(MqttQualityOfServiceLevel.ExactlyOnce, MqttProtocolVersion.V500); | |||||
} | } | ||||
[TestMethod] | [TestMethod] | ||||
@@ -51,5 +66,27 @@ namespace MQTTnet.Tests | |||||
await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); | await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); | ||||
} | } | ||||
} | } | ||||
private async Task Execute_Success(MqttQualityOfServiceLevel qosLevel, MqttProtocolVersion protocolVersion) | |||||
{ | |||||
using (var testEnvironment = new TestEnvironment()) | |||||
{ | |||||
await testEnvironment.StartServerAsync(); | |||||
var responseSender = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithProtocolVersion(protocolVersion)); | |||||
await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping"); | |||||
responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e => | |||||
{ | |||||
await responseSender.PublishAsync(e.ApplicationMessage.Topic + "/response", "pong"); | |||||
}); | |||||
var requestSender = await testEnvironment.ConnectClientAsync(); | |||||
var rpcClient = new MqttRpcClient(requestSender); | |||||
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", qosLevel); | |||||
Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); | |||||
} | |||||
} | |||||
} | } | ||||
} | } |