diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index 89bc251..32bb327 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -14,7 +14,7 @@
* [ManagedClient] Extended ReconnectAsync (thanks to @nvsnkv, #1202).
* [MQTTnet.Server] Moved server project to a dedicated GitHub repository.
-
+* [MQTTnet, MQTTnet.Extensions.ManagedClient] Fixed bug that allowed invalid subscriptions (Thanks to @marcelwinh).
Git commit: $gitCommit
Copyright Christian Kratky 2016-2020
diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
index 931326b..d349f81 100644
--- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
+++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
@@ -215,6 +215,11 @@ namespace MQTTnet.Extensions.ManagedClient
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
+ foreach (var topicFilter in topicFilters)
+ {
+ MqttTopicValidator.ThrowIfInvalidSubscribe(topicFilter.Topic);
+ }
+
lock (_subscriptions)
{
foreach (var topicFilter in topicFilters)
diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs
index 6e81345..8163fc0 100644
--- a/Source/MQTTnet/Client/MqttClient.cs
+++ b/Source/MQTTnet/Client/MqttClient.cs
@@ -194,6 +194,11 @@ namespace MQTTnet.Client
{
if (options == null) throw new ArgumentNullException(nameof(options));
+ foreach (var topicFilter in options.TopicFilters)
+ {
+ MqttTopicValidator.ThrowIfInvalidSubscribe(topicFilter.Topic);
+ }
+
ThrowIfDisposed();
ThrowIfNotConnected();
diff --git a/Source/MQTTnet/Client/MqttClientExtensions.cs b/Source/MQTTnet/Client/MqttClientExtensions.cs
index e9fd436..91a7c75 100644
--- a/Source/MQTTnet/Client/MqttClientExtensions.cs
+++ b/Source/MQTTnet/Client/MqttClientExtensions.cs
@@ -1,4 +1,4 @@
-using MQTTnet.Client.Connecting;
+using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.ExtendedAuthenticationExchange;
using MQTTnet.Client.Options;
@@ -112,7 +112,7 @@ namespace MQTTnet.Client
return client;
}
- public static Task ReconnectAsync(this IMqttClient client)
+ public static Task ReconnectAsync(this IMqttClient client)
{
if (client == null) throw new ArgumentNullException(nameof(client));
@@ -124,6 +124,18 @@ namespace MQTTnet.Client
return client.ConnectAsync(client.Options);
}
+ public static Task ReconnectAsync(this IMqttClient client, CancellationToken cancellationToken)
+ {
+ if (client == null) throw new ArgumentNullException(nameof(client));
+
+ if (client.Options == null)
+ {
+ throw new InvalidOperationException("_ReconnectAsync_ can be used only if _ConnectAsync_ was called before.");
+ }
+
+ return client.ConnectAsync(client.Options, cancellationToken);
+ }
+
public static Task DisconnectAsync(this IMqttClient client)
{
if (client == null) throw new ArgumentNullException(nameof(client));
diff --git a/Source/MQTTnet/Protocol/MqttTopicValidator.cs b/Source/MQTTnet/Protocol/MqttTopicValidator.cs
index 00b1679..b8f0295 100644
--- a/Source/MQTTnet/Protocol/MqttTopicValidator.cs
+++ b/Source/MQTTnet/Protocol/MqttTopicValidator.cs
@@ -42,5 +42,15 @@ namespace MQTTnet.Protocol
}
}
}
+
+ public static void ThrowIfInvalidSubscribe(string topic)
+ {
+ if (string.IsNullOrEmpty(topic))
+ {
+ throw new MqttProtocolViolationException("Topic should not be empty.");
+ }
+
+ if (topic.IndexOf("#") != -1 && topic.IndexOf("#") != topic.Length - 1) throw new MqttProtocolViolationException("The character '#' is only allowed as last character");
+ }
}
}
diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs
index 1dbe39a..92ef34f 100644
--- a/Source/MQTTnet/Server/MqttServer.cs
+++ b/Source/MQTTnet/Server/MqttServer.cs
@@ -113,7 +113,12 @@ namespace MQTTnet.Server
{
if (clientId == null) throw new ArgumentNullException(nameof(clientId));
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
-
+
+ foreach (var topicFilter in topicFilters)
+ {
+ MqttTopicValidator.ThrowIfInvalidSubscribe(topicFilter.Topic);
+ }
+
ThrowIfDisposed();
ThrowIfNotStarted();
diff --git a/Tests/MQTTnet.Core.Tests/MqttTopicValidatorSubscribe_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttTopicValidatorSubscribe_Tests.cs
new file mode 100644
index 0000000..14797a0
--- /dev/null
+++ b/Tests/MQTTnet.Core.Tests/MqttTopicValidatorSubscribe_Tests.cs
@@ -0,0 +1,54 @@
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using MQTTnet.Exceptions;
+using MQTTnet.Protocol;
+
+namespace MQTTnet.Tests
+{
+ [TestClass]
+ public class MqttTopicValidatorSubscribe_Tests
+ {
+ [TestMethod]
+ public void Valid_Topic()
+ {
+ MqttTopicValidator.ThrowIfInvalidSubscribe("/a/b/c");
+ }
+
+ [TestMethod]
+ public void Valid_Topic_Plus_In_Between()
+ {
+ MqttTopicValidator.ThrowIfInvalidSubscribe("/a/+/c");
+ }
+
+ [TestMethod]
+ public void Valid_Topic_Plus_Last_Char()
+ {
+ MqttTopicValidator.ThrowIfInvalidSubscribe("/a/+");
+ }
+
+ [TestMethod]
+ public void Valid_Topic_Hash_Last_Char()
+ {
+ MqttTopicValidator.ThrowIfInvalidSubscribe("/a/#");
+ }
+
+ [TestMethod]
+ public void Valid_Topic_Only_Hash()
+ {
+ MqttTopicValidator.ThrowIfInvalidSubscribe("#");
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(MqttProtocolViolationException))]
+ public void Invalid_Topic_Hash_In_Between()
+ {
+ MqttTopicValidator.ThrowIfInvalidSubscribe("/a/#/c");
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(MqttProtocolViolationException))]
+ public void Invalid_Topic_Empty()
+ {
+ MqttTopicValidator.ThrowIfInvalidSubscribe(string.Empty);
+ }
+ }
+}
diff --git a/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs b/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs
index 58c4ad1..d8b8024 100644
--- a/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs
+++ b/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs
@@ -1,4 +1,4 @@
-using MQTTnet.Client;
+using MQTTnet.Client;
using System;
using System.IO;
using System.Net;
@@ -7,10 +7,8 @@ using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
-using MQTTnet.Extensions.WebSocket4Net;
using MQTTnet.Formatter;
using MQTTnet.Protocol;
-using Newtonsoft.Json;
namespace MQTTnet.TestApp.NetCore
{
@@ -18,57 +16,114 @@ namespace MQTTnet.TestApp.NetCore
{
public static async Task RunAsync()
{
- //MqttNetConsoleLogger.ForwardToConsole();
+ // MqttNetConsoleLogger.ForwardToConsole();
- // iot.eclipse.org
- await ExecuteTestAsync("iot.eclipse.org TCP",
- new MqttClientOptionsBuilder().WithTcpServer("iot.eclipse.org", 1883).WithProtocolVersion(MqttProtocolVersion.V311).Build());
+ // For most of these connections to work, set output target to Net5.0.
- await ExecuteTestAsync("iot.eclipse.org WS",
- new MqttClientOptionsBuilder().WithWebSocketServer("iot.eclipse.org:80/mqtt").WithProtocolVersion(MqttProtocolVersion.V311).Build());
-
- await ExecuteTestAsync("iot.eclipse.org WS TLS",
- new MqttClientOptionsBuilder().WithWebSocketServer("iot.eclipse.org:443/mqtt").WithProtocolVersion(MqttProtocolVersion.V311).WithTls().Build());
+#if NET5_0_OR_GREATER
+ // TLS13 is only available in Net5.0
+ var unsafeTls13 = new MqttClientOptionsBuilderTlsParameters
+ {
+ UseTls = true,
+ SslProtocol = SslProtocols.Tls13,
+ // Don't use this in production code. This handler simply allows any invalid certificate to work.
+ CertificateValidationHandler = (w) => true
+ };
+#endif
+
+ // Also defining TLS12 for servers that don't seem no to support TLS13.
+ var unsafeTls12 = new MqttClientOptionsBuilderTlsParameters
+ {
+ UseTls = true,
+ SslProtocol = SslProtocols.Tls12,
+ // Don't use this in production code. This handler simply allows any invalid certificate to work.
+ CertificateValidationHandler = (w) => true
+ };
+
+ // mqtt.eclipseprojects.io
+ await ExecuteTestAsync("mqtt.eclipseprojects.io TCP",
+ new MqttClientOptionsBuilder().WithTcpServer("mqtt.eclipseprojects.io", 1883)
+ .WithProtocolVersion(MqttProtocolVersion.V311).Build());
+
+ await ExecuteTestAsync("mqtt.eclipseprojects.io WS",
+ new MqttClientOptionsBuilder().WithWebSocketServer("mqtt.eclipseprojects.io:80/mqtt")
+ .WithProtocolVersion(MqttProtocolVersion.V311).Build());
+
+#if NET5_0_OR_GREATER
+ await ExecuteTestAsync("mqtt.eclipseprojects.io WS TLS13",
+ new MqttClientOptionsBuilder().WithWebSocketServer("mqtt.eclipseprojects.io:443/mqtt")
+ .WithProtocolVersion(MqttProtocolVersion.V311).WithTls(unsafeTls13).Build());
+#endif
// test.mosquitto.org
await ExecuteTestAsync("test.mosquitto.org TCP",
- new MqttClientOptionsBuilder().WithTcpServer("test.mosquitto.org", 1883).WithProtocolVersion(MqttProtocolVersion.V311).Build());
+ new MqttClientOptionsBuilder().WithTcpServer("test.mosquitto.org", 1883)
+ .WithProtocolVersion(MqttProtocolVersion.V311).Build());
+
+ await ExecuteTestAsync("test.mosquitto.org TCP - Authenticated",
+ new MqttClientOptionsBuilder().WithTcpServer("test.mosquitto.org", 1884)
+ .WithCredentials("rw", "readwrite")
+ .WithProtocolVersion(MqttProtocolVersion.V311).Build());
- await ExecuteTestAsync("test.mosquitto.org TCP TLS",
- new MqttClientOptionsBuilder().WithTcpServer("test.mosquitto.org", 8883).WithProtocolVersion(MqttProtocolVersion.V311).WithTls().Build());
+ await ExecuteTestAsync("test.mosquitto.org TCP TLS12",
+ new MqttClientOptionsBuilder().WithTcpServer("test.mosquitto.org", 8883)
+ .WithProtocolVersion(MqttProtocolVersion.V311).WithTls(unsafeTls12).Build());
+
+#if NET5_0_OR_GREATER
+ await ExecuteTestAsync("test.mosquitto.org TCP TLS13",
+ new MqttClientOptionsBuilder().WithTcpServer("test.mosquitto.org", 8883)
+ .WithProtocolVersion(MqttProtocolVersion.V311).WithTls(unsafeTls13).Build());
+#endif
+
+ await ExecuteTestAsync("test.mosquitto.org TCP TLS12 - Authenticated",
+ new MqttClientOptionsBuilder().WithTcpServer("test.mosquitto.org", 8885)
+ .WithCredentials("rw", "readwrite")
+ .WithProtocolVersion(MqttProtocolVersion.V311).WithTls(unsafeTls12).Build());
await ExecuteTestAsync("test.mosquitto.org WS",
- new MqttClientOptionsBuilder().WithWebSocketServer("test.mosquitto.org:8080/mqtt").WithProtocolVersion(MqttProtocolVersion.V311).Build());
+ new MqttClientOptionsBuilder().WithWebSocketServer("test.mosquitto.org:8080/mqtt")
+ .WithProtocolVersion(MqttProtocolVersion.V311).Build());
- await ExecuteTestAsync("test.mosquitto.org WS TLS",
- new MqttClientOptionsBuilder().WithWebSocketServer("test.mosquitto.org:8081/mqtt").WithProtocolVersion(MqttProtocolVersion.V311).WithTls().Build());
+ await ExecuteTestAsync("test.mosquitto.org WS TLS12",
+ new MqttClientOptionsBuilder().WithWebSocketServer("test.mosquitto.org:8081/mqtt")
+ .WithProtocolVersion(MqttProtocolVersion.V311).WithTls(unsafeTls12).Build());
- // broker.hivemq.com
- await ExecuteTestAsync("broker.hivemq.com TCP",
- new MqttClientOptionsBuilder().WithTcpServer("broker.hivemq.com", 1883).WithProtocolVersion(MqttProtocolVersion.V311).Build());
+ // broker.emqx.io
+ await ExecuteTestAsync("broker.emqx.io TCP",
+ new MqttClientOptionsBuilder().WithTcpServer("broker.emqx.io", 1883)
+ .WithProtocolVersion(MqttProtocolVersion.V311).Build());
- await ExecuteTestAsync("broker.hivemq.com WS",
- new MqttClientOptionsBuilder().WithWebSocketServer("broker.hivemq.com:8000/mqtt").WithProtocolVersion(MqttProtocolVersion.V311).Build());
+ await ExecuteTestAsync("broker.emqx.io TCP TLS12",
+ new MqttClientOptionsBuilder().WithTcpServer("broker.emqx.io", 8083)
+ .WithProtocolVersion(MqttProtocolVersion.V311).WithTls(unsafeTls12).Build());
- // mqtt.swifitch.cz
- await ExecuteTestAsync("mqtt.swifitch.cz",
- new MqttClientOptionsBuilder().WithTcpServer("mqtt.swifitch.cz", 1883).WithProtocolVersion(MqttProtocolVersion.V311).Build());
+#if NET5_0_OR_GREATER
+ await ExecuteTestAsync("broker.emqx.io TCP TLS13",
+ new MqttClientOptionsBuilder().WithTcpServer("broker.emqx.io", 8083)
+ .WithProtocolVersion(MqttProtocolVersion.V311).WithTls(unsafeTls13).Build());
+#endif
- // CloudMQTT
- var configFile = Path.Combine("E:\\CloudMqttTestConfig.json");
- if (File.Exists(configFile))
- {
- var config = JsonConvert.DeserializeObject(File.ReadAllText(configFile));
+ await ExecuteTestAsync("broker.emqx.io WS",
+ new MqttClientOptionsBuilder().WithWebSocketServer("broker.emqx.io:8083/mqtt")
+ .WithProtocolVersion(MqttProtocolVersion.V311).Build());
- await ExecuteTestAsync("CloudMQTT TCP",
- new MqttClientOptionsBuilder().WithTcpServer(config.Server, config.Port).WithCredentials(config.Username, config.Password).WithProtocolVersion(MqttProtocolVersion.V311).Build());
+ await ExecuteTestAsync("broker.emqx.io WS TLS12",
+ new MqttClientOptionsBuilder().WithWebSocketServer("broker.emqx.io:8084/mqtt")
+ .WithProtocolVersion(MqttProtocolVersion.V311).WithTls(unsafeTls12).Build());
- await ExecuteTestAsync("CloudMQTT TCP TLS",
- new MqttClientOptionsBuilder().WithTcpServer(config.Server, config.SslPort).WithCredentials(config.Username, config.Password).WithTls().WithProtocolVersion(MqttProtocolVersion.V311).Build());
- await ExecuteTestAsync("CloudMQTT WS TLS",
- new MqttClientOptionsBuilder().WithWebSocketServer(config.Server + ":" + config.SslWebSocketPort + "/mqtt").WithCredentials(config.Username, config.Password).WithTls().WithProtocolVersion(MqttProtocolVersion.V311).Build());
- }
+ // broker.hivemq.com
+ await ExecuteTestAsync("broker.hivemq.com TCP",
+ new MqttClientOptionsBuilder().WithTcpServer("broker.hivemq.com", 1883)
+ .WithProtocolVersion(MqttProtocolVersion.V311).Build());
+
+ await ExecuteTestAsync("broker.hivemq.com WS",
+ new MqttClientOptionsBuilder().WithWebSocketServer("broker.hivemq.com:8000/mqtt")
+ .WithProtocolVersion(MqttProtocolVersion.V311).Build());
+
+ // mqtt.swifitch.cz: Does not seem to operate any more
+
+ // cloudmqtt.com: Cannot test because it does not offer a free plan any more.
Write("Finished.", ConsoleColor.White);
Console.ReadLine();
@@ -115,21 +170,5 @@ namespace MQTTnet.TestApp.NetCore
Console.Write(message);
}
- public class MqttConfig
- {
- public string Server { get; set; }
-
- public string Username { get; set; }
-
- public string Password { get; set; }
-
- public int Port { get; set; }
-
- public int SslPort { get; set; }
-
- public int WebSocketPort { get; set; }
-
- public int SslWebSocketPort { get; set; }
- }
}
}