diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index 71a9d55..302b128 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -14,10 +14,14 @@
* [Core] Added a strong name for the assembly.
* [Core] Performance optimizations.
* [Core] Fixed a logging issue when dealing with IOExceptions.
+* [Core] Fixed a typo in the global logger class (BREAKING CHANGE! Please find new example in Wiki).
* [Client] Fixed an issue in _ManagedClient_ which can cause the client to stop when publishing subscriptions.
* [Server] The application message interceptor can now delete any received application message.
-* [Server] Added a ConnectionValidator context to align with other APIs.
+* [Server] Added a ConnectionValidator context to align with other APIs (BREAKING CHANGE! Please find new example in Wiki).
* [Server] Added an interface for the _MqttServerOptions_.
+* [Server] Added packet statistics for the connected clients.
+* [Server] Fixed a security issue which sends retained packages to a failed subscription.
+* [Server] Fixed the response (MaximumQoS) of a subscription (Thanks to @redbeans2017).
Copyright Christian Kratky 2016-2017
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin
diff --git a/Extensions/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj b/Extensions/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj
new file mode 100644
index 0000000..fd16072
--- /dev/null
+++ b/Extensions/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj
@@ -0,0 +1,30 @@
+
+
+
+ netstandard1.3;netstandard2.0;net452;net461;uap10.0
+ 0.0.0.0
+ 0.0.0.0
+
+
+
+
+ 0.0.0.0
+
+
+
+ false
+ UAP,Version=v10.0
+ UAP
+ 10.0.16299.0
+ 10.0.10240.0
+ .NETCore
+ v5.0
+ $(DefineConstants);WINDOWS_UWP
+ $(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets
+
+
+
+
+
+
+
diff --git a/Extensions/MQTTnet.Extensions.Rpc/MqttRpcClient.cs b/Extensions/MQTTnet.Extensions.Rpc/MqttRpcClient.cs
new file mode 100644
index 0000000..6a9d1c1
--- /dev/null
+++ b/Extensions/MQTTnet.Extensions.Rpc/MqttRpcClient.cs
@@ -0,0 +1,101 @@
+using System;
+using System.Collections.Concurrent;
+using System.Threading.Tasks;
+using MQTTnet.Client;
+using MQTTnet.Internal;
+using MQTTnet.Protocol;
+
+namespace MQTTnet.Extensions.Rpc
+{
+ public sealed class MqttRpcClient : IDisposable
+ {
+ private const string ResponseTopic = "$RPC/+/+/response";
+ private readonly ConcurrentDictionary> _waitingCalls = new ConcurrentDictionary>();
+ private readonly IMqttClient _mqttClient;
+ private bool _isEnabled;
+
+ public MqttRpcClient(IMqttClient mqttClient)
+ {
+ _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
+
+ _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
+ }
+
+ public async Task EnableAsync()
+ {
+ await _mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(ResponseTopic).WithAtLeastOnceQoS().Build());
+ _isEnabled = true;
+ }
+
+ public async Task DisableAsync()
+ {
+ await _mqttClient.UnsubscribeAsync(ResponseTopic);
+ _isEnabled = false;
+ }
+
+ public async Task ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
+ {
+ if (methodName == null) throw new ArgumentNullException(nameof(methodName));
+
+ if (methodName.Contains("/") || methodName.Contains("+") || methodName.Contains("#"))
+ {
+ throw new ArgumentException("The method name cannot contain /, + or #.");
+ }
+
+ if (!_isEnabled)
+ {
+ throw new InvalidOperationException("The RPC client is not enabled.");
+ }
+
+ var requestTopic = $"$MQTTnet.RPC/{Guid.NewGuid():N}/{methodName}";
+ var responseTopic = requestTopic + "/response";
+
+ var requestMessage = new MqttApplicationMessageBuilder()
+ .WithTopic(requestTopic)
+ .WithPayload(payload)
+ .WithQualityOfServiceLevel(qualityOfServiceLevel)
+ .Build();
+
+ try
+ {
+ var tcs = new TaskCompletionSource();
+ if (!_waitingCalls.TryAdd(responseTopic, tcs))
+ {
+ throw new InvalidOperationException();
+ }
+
+ await _mqttClient.PublishAsync(requestMessage);
+ return await tcs.Task.TimeoutAfter(timeout);
+ }
+ finally
+ {
+ _waitingCalls.TryRemove(responseTopic, out _);
+ }
+ }
+
+ private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
+ {
+ if (!_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out TaskCompletionSource tcs))
+ {
+ return;
+ }
+
+ if (tcs.Task.IsCompleted || tcs.Task.IsCanceled)
+ {
+ return;
+ }
+
+ tcs.TrySetResult(eventArgs.ApplicationMessage.Payload);
+ }
+
+ public void Dispose()
+ {
+ foreach (var tcs in _waitingCalls)
+ {
+ tcs.Value.SetCanceled();
+ }
+
+ _waitingCalls.Clear();
+ }
+ }
+}
diff --git a/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj b/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj
index fbde3f2..1f23602 100644
--- a/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj
+++ b/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj
@@ -2,8 +2,13 @@
netstandard2.0
- 2.5.2.0
- 2.5.2.0
+ 0.0.0.0
+ 0.0.0.0
+ 0.0.0.0
+
+
+
+
diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
index af39958..0cbcc93 100644
--- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
+++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
@@ -18,6 +18,8 @@ namespace MQTTnet.Adapter
{
private const uint ErrorOperationAborted = 0x800703E3;
+ private static readonly byte[] EmptyBody = new byte[0];
+
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly IMqttNetLogger _logger;
private readonly IMqttChannel _channel;
@@ -89,35 +91,28 @@ namespace MQTTnet.Adapter
MqttBasePacket packet = null;
await ExecuteAndWrapExceptionAsync(async () =>
{
- ReceivedMqttPacket receivedMqttPacket = null;
- try
+ ReceivedMqttPacket receivedMqttPacket;
+ if (timeout > TimeSpan.Zero)
{
- if (timeout > TimeSpan.Zero)
- {
- receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
- }
- else
- {
- receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).ConfigureAwait(false);
- }
-
- if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested)
- {
- throw new TaskCanceledException();
- }
-
- packet = PacketSerializer.Deserialize(receivedMqttPacket);
- if (packet == null)
- {
- throw new MqttProtocolViolationException("Received malformed packet.");
- }
+ receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
+ }
+ else
+ {
+ receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).ConfigureAwait(false);
+ }
- _logger.Trace("RX <<< {0}", packet);
+ if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested)
+ {
+ throw new TaskCanceledException();
}
- finally
+
+ packet = PacketSerializer.Deserialize(receivedMqttPacket.Header, receivedMqttPacket.Body);
+ if (packet == null)
{
- receivedMqttPacket?.Dispose();
+ throw new MqttProtocolViolationException("Received malformed packet.");
}
+
+ _logger.Trace("RX <<< {0}", packet);
}).ConfigureAwait(false);
return packet;
@@ -133,7 +128,7 @@ namespace MQTTnet.Adapter
if (header.BodyLength == 0)
{
- return new ReceivedMqttPacket(header, new MemoryStream(0));
+ return new ReceivedMqttPacket(header, EmptyBody);
}
var body = new byte[header.BodyLength];
@@ -145,7 +140,7 @@ namespace MQTTnet.Adapter
offset += readBytesCount;
} while (offset < header.BodyLength);
- return new ReceivedMqttPacket(header, new MemoryStream(body, 0, body.Length, false, true));
+ return new ReceivedMqttPacket(header, body);
}
private static async Task ExecuteAndWrapExceptionAsync(Func action)
diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/ReceivedMqttPacket.cs b/Frameworks/MQTTnet.NetStandard/Adapter/ReceivedMqttPacket.cs
index a44fb54..c92f9d0 100644
--- a/Frameworks/MQTTnet.NetStandard/Adapter/ReceivedMqttPacket.cs
+++ b/Frameworks/MQTTnet.NetStandard/Adapter/ReceivedMqttPacket.cs
@@ -1,12 +1,11 @@
using System;
-using System.IO;
using MQTTnet.Packets;
namespace MQTTnet.Adapter
{
- public sealed class ReceivedMqttPacket : IDisposable
+ public class ReceivedMqttPacket
{
- public ReceivedMqttPacket(MqttPacketHeader header, MemoryStream body)
+ public ReceivedMqttPacket(MqttPacketHeader header, byte[] body)
{
Header = header ?? throw new ArgumentNullException(nameof(header));
Body = body ?? throw new ArgumentNullException(nameof(body));
@@ -14,11 +13,6 @@ namespace MQTTnet.Adapter
public MqttPacketHeader Header { get; }
- public MemoryStream Body { get; }
-
- public void Dispose()
- {
- Body?.Dispose();
- }
+ public byte[] Body { get; }
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
index 7e23fdb..7e6f663 100644
--- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
+++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
@@ -70,7 +70,7 @@ namespace MQTTnet.Implementations
_sslStream = new SslStream(new NetworkStream(_socket, true), false, InternalUserCertificateValidationCallback);
await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(_options), SslProtocols.Tls12, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
}
-
+
CreateStreams(_socket, _sslStream);
}
@@ -139,10 +139,7 @@ namespace MQTTnet.Implementations
private void CreateStreams(Socket socket, Stream sslStream)
{
var stream = sslStream ?? new NetworkStream(socket);
-
- //cannot use this as default buffering prevents from receiving the first connect message
- //need two streams otherwise read and write have to be synchronized
-
+
//todo: if branch can be used with min dependency NetStandard1.6
#if NET452 || NET461
SendStream = new BufferedStream(stream, BufferSize);
diff --git a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj
index abcc865..92920cd 100644
--- a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj
+++ b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj
@@ -6,8 +6,8 @@
MQTTnet
False
Full
- 2.5.3.0
- 2.5.3.0
+ 0.0.0.0
+ 0.0.0.0
0.0.0.0
@@ -29,7 +29,6 @@
v5.0
$(DefineConstants);WINDOWS_UWP
$(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets
-
diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs
index 7ae7250..9b01156 100644
--- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs
+++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs
@@ -13,9 +13,9 @@ namespace MQTTnet.ManagedClient
{
public class ManagedMqttClient : IManagedMqttClient
{
- private readonly ManagedMqttClientStorageManager _storageManager = new ManagedMqttClientStorageManager();
private readonly BlockingCollection _messageQueue = new BlockingCollection();
private readonly HashSet _subscriptions = new HashSet();
+ private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1);
private readonly IMqttClient _mqttClient;
private readonly IMqttNetLogger _logger;
@@ -23,7 +23,9 @@ namespace MQTTnet.ManagedClient
private CancellationTokenSource _connectionCancellationToken;
private CancellationTokenSource _publishingCancellationToken;
+ private ManagedMqttClientStorageManager _storageManager;
private IManagedMqttClientOptions _options;
+
private bool _subscriptionsNotPushed;
public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
@@ -55,15 +57,11 @@ namespace MQTTnet.ManagedClient
if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");
_options = options;
- await _storageManager.SetStorageAsync(_options.Storage).ConfigureAwait(false);
-
+
if (_options.Storage != null)
{
- var loadedMessages = await _options.Storage.LoadQueuedMessagesAsync().ConfigureAwait(false);
- foreach (var loadedMessage in loadedMessages)
- {
- _messageQueue.Add(loadedMessage);
- }
+ _storageManager = new ManagedMqttClientStorageManager(_options.Storage);
+ await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
}
_connectionCancellationToken = new CancellationTokenSource();
@@ -97,16 +95,21 @@ namespace MQTTnet.ManagedClient
foreach (var applicationMessage in applicationMessages)
{
- await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
+ if (_storageManager != null)
+ {
+ await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
+ }
+
_messageQueue.Add(applicationMessage);
}
}
- public Task SubscribeAsync(IEnumerable topicFilters)
+ public async Task SubscribeAsync(IEnumerable topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
- lock (_subscriptions)
+ await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
+ try
{
foreach (var topicFilter in topicFilters)
{
@@ -116,13 +119,16 @@ namespace MQTTnet.ManagedClient
}
}
}
-
- return Task.FromResult(0);
+ finally
+ {
+ _subscriptionsSemaphore.Release();
+ }
}
- public Task UnsubscribeAsync(IEnumerable topicFilters)
+ public async Task UnsubscribeAsync(IEnumerable topicFilters)
{
- lock (_subscriptions)
+ await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
+ try
{
foreach (var topicFilter in topicFilters)
{
@@ -132,8 +138,10 @@ namespace MQTTnet.ManagedClient
}
}
}
-
- return Task.FromResult(0);
+ finally
+ {
+ _subscriptionsSemaphore.Release();
+ }
}
private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
@@ -242,7 +250,11 @@ namespace MQTTnet.ManagedClient
try
{
await _mqttClient.PublishAsync(message).ConfigureAwait(false);
- await _storageManager.RemoveAsync(message).ConfigureAwait(false);
+
+ if (_storageManager != null)
+ {
+ await _storageManager.RemoveAsync(message).ConfigureAwait(false);
+ }
}
catch (MqttCommunicationException exception)
{
@@ -264,13 +276,18 @@ namespace MQTTnet.ManagedClient
_logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions");
List subscriptions;
- lock (_subscriptions)
+ await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
+ try
{
subscriptions = _subscriptions.ToList();
_subscriptionsNotPushed = false;
}
+ finally
+ {
+ _subscriptionsSemaphore.Release();
+ }
- if (!_subscriptions.Any())
+ if (!subscriptions.Any())
{
return;
}
diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs
index d55f137..0e71776 100644
--- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs
+++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs
@@ -1,4 +1,5 @@
-using System.Collections.Generic;
+using System;
+using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@@ -8,18 +9,19 @@ namespace MQTTnet.ManagedClient
{
private readonly List _applicationMessages = new List();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
- private IManagedMqttClientStorage _storage;
+ private readonly IManagedMqttClientStorage _storage;
- public async Task SetStorageAsync(IManagedMqttClientStorage storage)
+ public ManagedMqttClientStorageManager(IManagedMqttClientStorage storage)
{
- await _semaphore.WaitAsync().ConfigureAwait(false);
- try
- {
- _storage = storage;
- }
- finally
+ _storage = storage ?? throw new ArgumentNullException(nameof(storage));
+ }
+
+ public async Task LoadQueuedMessagesAsync()
+ {
+ var loadedMessages = await _storage.LoadQueuedMessagesAsync().ConfigureAwait(false);
+ foreach (var loadedMessage in loadedMessages)
{
- _semaphore.Release();
+ _applicationMessages.Add(loadedMessage);
}
}
@@ -28,11 +30,6 @@ namespace MQTTnet.ManagedClient
await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
- if (_storage == null)
- {
- return;
- }
-
_applicationMessages.Add(applicationMessage);
await SaveAsync().ConfigureAwait(false);
}
@@ -47,11 +44,6 @@ namespace MQTTnet.ManagedClient
await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
- if (_storage == null)
- {
- return;
- }
-
var index = _applicationMessages.IndexOf(applicationMessage);
if (index == -1)
{
diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/IMqttPacketSerializer.cs b/Frameworks/MQTTnet.NetStandard/Serializer/IMqttPacketSerializer.cs
index 439460d..2068641 100644
--- a/Frameworks/MQTTnet.NetStandard/Serializer/IMqttPacketSerializer.cs
+++ b/Frameworks/MQTTnet.NetStandard/Serializer/IMqttPacketSerializer.cs
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
-using MQTTnet.Adapter;
using MQTTnet.Packets;
namespace MQTTnet.Serializer
@@ -9,8 +8,8 @@ namespace MQTTnet.Serializer
{
MqttProtocolVersion ProtocolVersion { get; set; }
- IEnumerable> Serialize(MqttBasePacket mqttPacket);
+ ICollection> Serialize(MqttBasePacket mqttPacket);
- MqttBasePacket Deserialize(ReceivedMqttPacket receivedMqttPacket);
+ MqttBasePacket Deserialize(MqttPacketHeader header, byte[] body);
}
}
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs
index 04ee01f..a22707c 100644
--- a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs
+++ b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs
@@ -1,9 +1,9 @@
using System;
+using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
-using MQTTnet.Adapter;
using MQTTnet.Exceptions;
using MQTTnet.Packets;
using MQTTnet.Protocol;
@@ -12,15 +12,15 @@ namespace MQTTnet.Serializer
{
public sealed class MqttPacketReader : BinaryReader
{
- private readonly ReceivedMqttPacket _receivedMqttPacket;
-
- public MqttPacketReader(ReceivedMqttPacket receivedMqttPacket)
- : base(receivedMqttPacket.Body, Encoding.UTF8, true)
+ private readonly MqttPacketHeader _header;
+
+ public MqttPacketReader(MqttPacketHeader header, Stream bodyStream)
+ : base(bodyStream, Encoding.UTF8, true)
{
- _receivedMqttPacket = receivedMqttPacket;
+ _header = header;
}
- public bool EndOfRemainingData => BaseStream.Position == _receivedMqttPacket.Header.BodyLength;
+ public bool EndOfRemainingData => BaseStream.Position == _header.BodyLength;
public static MqttPacketHeader ReadHeaderFromSource(Stream stream, CancellationToken cancellationToken)
{
@@ -77,7 +77,7 @@ namespace MQTTnet.Serializer
public byte[] ReadRemainingData()
{
- return ReadBytes(_receivedMqttPacket.Header.BodyLength - (int)BaseStream.Position);
+ return ReadBytes(_header.BodyLength - (int)BaseStream.Position);
}
private static int ReadBodyLengthFromSource(Stream stream, CancellationToken cancellationToken)
@@ -87,7 +87,7 @@ namespace MQTTnet.Serializer
var value = 0;
byte encodedByte;
- ////var readBytes = new List();
+ var readBytes = new List();
do
{
if (cancellationToken.IsCancellationRequested)
@@ -101,15 +101,14 @@ namespace MQTTnet.Serializer
throw new MqttCommunicationException("Connection closed while reading remaining length data.");
}
- ////readBytes.Add(buffer);
-
encodedByte = (byte)buffer;
+ readBytes.Add(encodedByte);
+
value += (byte)(encodedByte & 127) * multiplier;
multiplier *= 128;
if (multiplier > 128 * 128 * 128)
{
- //throw new MqttProtocolViolationException($"Remaining length is invalid (Data={string.Join(",", readBytes)}).");
- throw new MqttProtocolViolationException("Remaining length is invalid.");
+ throw new MqttProtocolViolationException($"Remaining length is invalid (Data={string.Join(",", readBytes)}).");
}
} while ((encodedByte & 128) != 0);
diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs
index 0b04114..88414d5 100644
--- a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs
+++ b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs
@@ -1,5 +1,4 @@
-using MQTTnet.Adapter;
-using MQTTnet.Exceptions;
+using MQTTnet.Exceptions;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using System;
@@ -17,7 +16,7 @@ namespace MQTTnet.Serializer
public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
- public IEnumerable> Serialize(MqttBasePacket packet)
+ public ICollection> Serialize(MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
@@ -43,13 +42,15 @@ namespace MQTTnet.Serializer
}
}
- public MqttBasePacket Deserialize(ReceivedMqttPacket receivedMqttPacket)
+ public MqttBasePacket Deserialize(MqttPacketHeader header, byte[] body)
{
- if (receivedMqttPacket == null) throw new ArgumentNullException(nameof(receivedMqttPacket));
+ if (header == null) throw new ArgumentNullException(nameof(header));
+ if (body == null) throw new ArgumentNullException(nameof(body));
- using (var reader = new MqttPacketReader(receivedMqttPacket))
+ using (var bodyStream = new MemoryStream(body))
+ using (var reader = new MqttPacketReader(header, bodyStream))
{
- return Deserialize(receivedMqttPacket.Header, reader);
+ return Deserialize(header, reader);
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
index 12ef891..8f7199a 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
@@ -14,15 +14,17 @@ namespace MQTTnet.Server
{
public sealed class MqttClientSession
{
- private readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch();
- private readonly Stopwatch _lastNonKeepAlivePacketReceivedTracker = new Stopwatch();
+ private readonly Stopwatch _lastPacketReceivedTracker = Stopwatch.StartNew();
+ private readonly Stopwatch _lastNonKeepAlivePacketReceivedTracker = Stopwatch.StartNew();
- private readonly MqttClientSubscriptionsManager _subscriptionsManager;
- private readonly MqttClientSessionsManager _sessionsManager;
- private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
private readonly IMqttServerOptions _options;
private readonly IMqttNetLogger _logger;
+ private readonly MqttClientSessionsManager _sessionsManager;
+ private readonly MqttRetainedMessagesManager _retainedMessagesManager;
+ private readonly MqttClientSubscriptionsManager _subscriptionsManager;
+ private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
+
private IMqttChannelAdapter _adapter;
private CancellationTokenSource _cancellationTokenSource;
private MqttApplicationMessage _willMessage;
@@ -30,16 +32,17 @@ namespace MQTTnet.Server
public MqttClientSession(
string clientId,
IMqttServerOptions options,
+ MqttRetainedMessagesManager retainedMessagesManager,
MqttClientSessionsManager sessionsManager,
IMqttNetLogger logger)
{
+ _options = options ?? throw new ArgumentNullException(nameof(options));
+ _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
ClientId = clientId;
-
- _options = options;
-
+
_subscriptionsManager = new MqttClientSubscriptionsManager(_options);
_pendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger);
}
@@ -117,11 +120,11 @@ namespace MQTTnet.Server
}
}
- public void EnqueueApplicationMessage(MqttApplicationMessage applicationMessage)
+ public async Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
- var result = _subscriptionsManager.CheckSubscriptions(applicationMessage);
+ var result = await _subscriptionsManager.CheckSubscriptionsAsync(applicationMessage);
if (!result.IsSubscribed)
{
return;
@@ -129,6 +132,7 @@ namespace MQTTnet.Server
var publishPacket = applicationMessage.ToPublishPacket();
publishPacket.QualityOfServiceLevel = result.QualityOfServiceLevel;
+
_pendingMessagesQueue.Enqueue(publishPacket);
}
@@ -199,7 +203,7 @@ namespace MQTTnet.Server
if (packet is MqttUnsubscribePacket unsubscribePacket)
{
- return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, _subscriptionsManager.Unsubscribe(unsubscribePacket));
+ return HandleIncomingUnsubscribePacketAsync(adapter, unsubscribePacket, cancellationToken);
}
if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
@@ -213,24 +217,31 @@ namespace MQTTnet.Server
private async Task HandleIncomingSubscribePacketAsync(IMqttChannelAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken)
{
- var subscribeResult = _subscriptionsManager.Subscribe(subscribePacket, ClientId);
+ var subscribeResult = await _subscriptionsManager.SubscribeAsync(subscribePacket, ClientId);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket).ConfigureAwait(false);
- await EnqueueSubscribedRetainedMessagesAsync(subscribePacket).ConfigureAwait(false);
-
if (subscribeResult.CloseConnection)
{
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttDisconnectPacket()).ConfigureAwait(false);
await StopAsync().ConfigureAwait(false);
}
+
+ await EnqueueSubscribedRetainedMessagesAsync(subscribePacket).ConfigureAwait(false);
+ }
+
+ private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken)
+ {
+ var unsubscribeResult = await _subscriptionsManager.UnsubscribeAsync(unsubscribePacket);
+
+ await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, unsubscribeResult);
}
private async Task EnqueueSubscribedRetainedMessagesAsync(MqttSubscribePacket subscribePacket)
{
- var retainedMessages = await _sessionsManager.GetRetainedMessagesAsync(subscribePacket).ConfigureAwait(false);
+ var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(subscribePacket);
foreach (var publishPacket in retainedMessages)
{
- EnqueueApplicationMessage(publishPacket);
+ await EnqueueApplicationMessageAsync(publishPacket);
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
index 5bb6598..739f964 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
@@ -15,23 +15,21 @@ namespace MQTTnet.Server
public sealed class MqttClientSessionsManager
{
private readonly Dictionary _sessions = new Dictionary();
- private readonly SemaphoreSlim _sessionsSemaphore = new SemaphoreSlim(1, 1);
+ private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly IMqttServerOptions _options;
+ private readonly MqttServer _server;
private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttNetLogger _logger;
- public MqttClientSessionsManager(IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger)
+ public MqttClientSessionsManager(IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, MqttServer server, IMqttNetLogger logger)
{
+ _server = server ?? throw new ArgumentNullException(nameof(server));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options ?? throw new ArgumentNullException(nameof(options));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
}
- public event EventHandler ClientConnected;
- public event EventHandler ClientDisconnected;
- public event EventHandler ApplicationMessageReceived;
-
public async Task RunClientSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
{
var clientId = string.Empty;
@@ -66,11 +64,11 @@ namespace MQTTnet.Server
IsSessionPresent = clientSession.IsExistingSession
}).ConfigureAwait(false);
- ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(new ConnectedMqttClient
+ _server.OnClientConnected(new ConnectedMqttClient
{
ClientId = clientId,
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
- }));
+ });
await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false);
}
@@ -89,17 +87,17 @@ namespace MQTTnet.Server
// ignored
}
- ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(new ConnectedMqttClient
+ _server.OnClientDisconnected(new ConnectedMqttClient
{
ClientId = clientId,
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
- }));
+ });
}
}
public async Task StopAsync()
{
- await _sessionsSemaphore.WaitAsync().ConfigureAwait(false);
+ await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
foreach (var session in _sessions)
@@ -111,13 +109,13 @@ namespace MQTTnet.Server
}
finally
{
- _sessionsSemaphore.Release();
+ _semaphore.Release();
}
}
public async Task> GetConnectedClientsAsync()
{
- await _sessionsSemaphore.WaitAsync().ConfigureAwait(false);
+ await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
return _sessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient
@@ -130,7 +128,7 @@ namespace MQTTnet.Server
}
finally
{
- _sessionsSemaphore.Release();
+ _semaphore.Release();
}
}
@@ -138,17 +136,7 @@ namespace MQTTnet.Server
{
try
{
- if (_options.ApplicationMessageInterceptor != null)
- {
- var interceptorContext = new MqttApplicationMessageInterceptorContext
- {
- ApplicationMessage = applicationMessage
- };
-
- _options.ApplicationMessageInterceptor(interceptorContext);
- applicationMessage = interceptorContext.ApplicationMessage;
- }
-
+ applicationMessage = InterceptApplicationMessage(applicationMessage);
if (applicationMessage == null)
{
return;
@@ -159,26 +147,41 @@ namespace MQTTnet.Server
await _retainedMessagesManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false);
}
- var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession?.ClientId, applicationMessage);
- ApplicationMessageReceived?.Invoke(this, eventArgs);
+ _server.OnApplicationMessageReceived(senderClientSession?.ClientId, applicationMessage);
}
catch (Exception exception)
{
_logger.Error(exception, "Error while processing application message");
}
- lock (_sessions)
+ await _semaphore.WaitAsync().ConfigureAwait(false);
+ try
{
foreach (var clientSession in _sessions.Values)
{
- clientSession.EnqueueApplicationMessage(applicationMessage);
+ await clientSession.EnqueueApplicationMessageAsync(applicationMessage);
}
}
+ finally
+ {
+ _semaphore.Release();
+ }
}
- public Task> GetRetainedMessagesAsync(MqttSubscribePacket subscribePacket)
+ private MqttApplicationMessage InterceptApplicationMessage(MqttApplicationMessage applicationMessage)
{
- return _retainedMessagesManager.GetSubscribedMessagesAsync(subscribePacket);
+ if (_options.ApplicationMessageInterceptor == null)
+ {
+ return applicationMessage;
+ }
+
+ var interceptorContext = new MqttApplicationMessageInterceptorContext
+ {
+ ApplicationMessage = applicationMessage
+ };
+
+ _options.ApplicationMessageInterceptor(interceptorContext);
+ return interceptorContext.ApplicationMessage;
}
private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
@@ -200,7 +203,7 @@ namespace MQTTnet.Server
private async Task GetOrCreateClientSessionAsync(MqttConnectPacket connectPacket)
{
- await _sessionsSemaphore.WaitAsync().ConfigureAwait(false);
+ await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var clientSession);
@@ -225,7 +228,7 @@ namespace MQTTnet.Server
{
isExistingSession = false;
- clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _logger);
+ clientSession = new MqttClientSession(connectPacket.ClientId, _options, _retainedMessagesManager, this, _logger);
_sessions[connectPacket.ClientId] = clientSession;
_logger.Trace("Created a new session for client '{0}'.", connectPacket.ClientId);
@@ -235,7 +238,7 @@ namespace MQTTnet.Server
}
finally
{
- _sessionsSemaphore.Release();
+ _semaphore.Release();
}
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs
index 73548ce..2151f77 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs
@@ -1,5 +1,8 @@
using System;
using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
using MQTTnet.Packets;
using MQTTnet.Protocol;
@@ -7,6 +10,7 @@ namespace MQTTnet.Server
{
public sealed class MqttClientSubscriptionsManager
{
+ private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly Dictionary _subscriptions = new Dictionary();
private readonly IMqttServerOptions _options;
@@ -15,24 +19,34 @@ namespace MQTTnet.Server
_options = options ?? throw new ArgumentNullException(nameof(options));
}
- public MqttClientSubscribeResult Subscribe(MqttSubscribePacket subscribePacket, string clientId)
+ public async Task SubscribeAsync(MqttSubscribePacket subscribePacket, string clientId)
{
if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket));
- var responsePacket = subscribePacket.CreateResponse();
- var closeConnection = false;
+ var result = new MqttClientSubscribeResult
+ {
+ ResponsePacket = subscribePacket.CreateResponse(),
+ CloseConnection = false
+ };
- lock (_subscriptions)
+ await _semaphore.WaitAsync().ConfigureAwait(false);
+ try
{
foreach (var topicFilter in subscribePacket.TopicFilters)
{
- var interceptorContext = new MqttSubscriptionInterceptorContext(clientId, topicFilter);
- _options.SubscriptionInterceptor?.Invoke(interceptorContext);
- responsePacket.SubscribeReturnCodes.Add(interceptorContext.AcceptSubscription ? MqttSubscribeReturnCode.SuccessMaximumQoS1 : MqttSubscribeReturnCode.Failure);
-
+ var interceptorContext = InterceptSubscribe(clientId, topicFilter);
+ if (!interceptorContext.AcceptSubscription)
+ {
+ result.ResponsePacket.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.Failure);
+ }
+ else
+ {
+ result.ResponsePacket.SubscribeReturnCodes.Add(ConvertToMaximumQoS(topicFilter.QualityOfServiceLevel));
+ }
+
if (interceptorContext.CloseConnection)
{
- closeConnection = true;
+ result.CloseConnection = true;
}
if (interceptorContext.AcceptSubscription)
@@ -41,35 +55,42 @@ namespace MQTTnet.Server
}
}
}
-
- return new MqttClientSubscribeResult
+ finally
{
- ResponsePacket = responsePacket,
- CloseConnection = closeConnection
- };
+ _semaphore.Release();
+ }
+
+ return result;
}
- public MqttUnsubAckPacket Unsubscribe(MqttUnsubscribePacket unsubscribePacket)
+ public async Task UnsubscribeAsync(MqttUnsubscribePacket unsubscribePacket)
{
if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket));
- lock (_subscriptions)
+ await _semaphore.WaitAsync().ConfigureAwait(false);
+ try
{
foreach (var topicFilter in unsubscribePacket.TopicFilters)
{
_subscriptions.Remove(topicFilter);
}
}
+ finally
+ {
+ _semaphore.Release();
+ }
return unsubscribePacket.CreateResponse();
}
- public CheckSubscriptionsResult CheckSubscriptions(MqttApplicationMessage applicationMessage)
+ public async Task CheckSubscriptionsAsync(MqttApplicationMessage applicationMessage)
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
- lock (_subscriptions)
+ await _semaphore.WaitAsync().ConfigureAwait(false);
+ try
{
+ var qosLevels = new HashSet();
foreach (var subscription in _subscriptions)
{
if (!MqttTopicFilterComparer.IsMatch(applicationMessage.Topic, subscription.Key))
@@ -77,24 +98,64 @@ namespace MQTTnet.Server
continue;
}
- var effectiveQos = subscription.Value;
- if (applicationMessage.QualityOfServiceLevel < effectiveQos)
- {
- effectiveQos = applicationMessage.QualityOfServiceLevel;
- }
+ qosLevels.Add(subscription.Value);
+ }
+ if (qosLevels.Count == 0)
+ {
return new CheckSubscriptionsResult
{
- IsSubscribed = true,
- QualityOfServiceLevel = effectiveQos
+ IsSubscribed = false
};
}
+
+ return CreateSubscriptionResult(applicationMessage, qosLevels);
+ }
+ finally
+ {
+ _semaphore.Release();
+ }
+ }
+
+ private MqttSubscriptionInterceptorContext InterceptSubscribe(string clientId, TopicFilter topicFilter)
+ {
+ var interceptorContext = new MqttSubscriptionInterceptorContext(clientId, topicFilter);
+ _options.SubscriptionInterceptor?.Invoke(interceptorContext);
+ return interceptorContext;
+ }
+
+ private static CheckSubscriptionsResult CreateSubscriptionResult(MqttApplicationMessage applicationMessage, HashSet subscribedQoSLevels)
+ {
+ MqttQualityOfServiceLevel effectiveQoS;
+ if (subscribedQoSLevels.Contains(applicationMessage.QualityOfServiceLevel))
+ {
+ effectiveQoS = applicationMessage.QualityOfServiceLevel;
+ }
+ else if (subscribedQoSLevels.Count == 1)
+ {
+ effectiveQoS = subscribedQoSLevels.First();
+ }
+ else
+ {
+ effectiveQoS = subscribedQoSLevels.Max();
}
return new CheckSubscriptionsResult
{
- IsSubscribed = false
+ IsSubscribed = true,
+ QualityOfServiceLevel = effectiveQoS
};
}
+
+ private static MqttSubscribeReturnCode ConvertToMaximumQoS(MqttQualityOfServiceLevel qualityOfServiceLevel)
+ {
+ switch (qualityOfServiceLevel)
+ {
+ case MqttQualityOfServiceLevel.AtMostOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS0;
+ case MqttQualityOfServiceLevel.AtLeastOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS1;
+ case MqttQualityOfServiceLevel.ExactlyOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS2;
+ default: return MqttSubscribeReturnCode.Failure;
+ }
+ }
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs
index 0b8d669..2c0b260 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs
@@ -11,7 +11,7 @@ namespace MQTTnet.Server
public sealed class MqttRetainedMessagesManager
{
private readonly Dictionary _retainedMessages = new Dictionary();
- private readonly SemaphoreSlim _gate = new SemaphoreSlim(1, 1);
+ private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly IMqttNetLogger _logger;
private readonly IMqttServerOptions _options;
@@ -28,7 +28,7 @@ namespace MQTTnet.Server
return;
}
- await _gate.WaitAsync();
+ await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
var retainedMessages = await _options.Storage.LoadRetainedMessagesAsync();
@@ -45,7 +45,7 @@ namespace MQTTnet.Server
}
finally
{
- _gate.Release();
+ _semaphore.Release();
}
}
@@ -53,7 +53,7 @@ namespace MQTTnet.Server
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
- await _gate.WaitAsync().ConfigureAwait(false);
+ await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
await HandleMessageInternalAsync(clientId, applicationMessage);
@@ -64,7 +64,7 @@ namespace MQTTnet.Server
}
finally
{
- _gate.Release();
+ _semaphore.Release();
}
}
@@ -72,7 +72,7 @@ namespace MQTTnet.Server
{
var retainedMessages = new List();
- await _gate.WaitAsync().ConfigureAwait(false);
+ await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
foreach (var retainedMessage in _retainedMessages.Values)
@@ -96,7 +96,7 @@ namespace MQTTnet.Server
}
finally
{
- _gate.Release();
+ _semaphore.Release();
}
return retainedMessages;
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs
index 42c6854..888dd42 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs
@@ -60,11 +60,7 @@ namespace MQTTnet.Server
_cancellationTokenSource = new CancellationTokenSource();
_retainedMessagesManager = new MqttRetainedMessagesManager(_options, _logger);
-
- _clientSessionsManager = new MqttClientSessionsManager(_options, _retainedMessagesManager, _logger);
- _clientSessionsManager.ApplicationMessageReceived += OnApplicationMessageReceived;
- _clientSessionsManager.ClientConnected += OnClientConnected;
- _clientSessionsManager.ClientDisconnected += OnClientDisconnected;
+ _clientSessionsManager = new MqttClientSessionsManager(_options, _retainedMessagesManager, this, _logger);
await _retainedMessagesManager.LoadMessagesAsync();
@@ -104,40 +100,39 @@ namespace MQTTnet.Server
finally
{
_cancellationTokenSource = null;
-
_retainedMessagesManager = null;
-
- if (_clientSessionsManager != null)
- {
- _clientSessionsManager.ApplicationMessageReceived -= OnApplicationMessageReceived;
- _clientSessionsManager.ClientConnected -= OnClientConnected;
- _clientSessionsManager.ClientDisconnected -= OnClientDisconnected;
- }
-
_clientSessionsManager = null;
}
}
- private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs)
+ internal void OnClientConnected(ConnectedMqttClient client)
{
- eventArgs.SessionTask = Task.Run(async () => await _clientSessionsManager.RunClientSessionAsync(eventArgs.Client, _cancellationTokenSource.Token), _cancellationTokenSource.Token);
+ if (client == null) throw new ArgumentNullException(nameof(client));
+
+ _logger.Info("Client '{0}': Connected.", client.ClientId);
+ ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(client));
}
- private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs)
+ internal void OnClientDisconnected(ConnectedMqttClient client)
{
- _logger.Info("Client '{0}': Connected.", eventArgs.Client.ClientId);
- ClientConnected?.Invoke(this, eventArgs);
+ if (client == null) throw new ArgumentNullException(nameof(client));
+
+ _logger.Info("Client '{0}': Disconnected.", client.ClientId);
+ ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client));
}
- private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
+ internal void OnApplicationMessageReceived(string clientId, MqttApplicationMessage applicationMessage)
{
- _logger.Info("Client '{0}': Disconnected.", eventArgs.Client.ClientId);
- ClientDisconnected?.Invoke(this, eventArgs);
+ if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
+
+ ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(clientId, applicationMessage));
}
- private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
+ private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs)
{
- ApplicationMessageReceived?.Invoke(this, e);
+ eventArgs.SessionTask = Task.Run(
+ async () => await _clientSessionsManager.RunClientSessionAsync(eventArgs.Client, _cancellationTokenSource.Token).ConfigureAwait(false),
+ _cancellationTokenSource.Token);
}
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttSubscriptionInterceptorContext.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttSubscriptionInterceptorContext.cs
index e05dd21..07b94bb 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttSubscriptionInterceptorContext.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttSubscriptionInterceptorContext.cs
@@ -1,17 +1,19 @@
-namespace MQTTnet.Server
+using System;
+
+namespace MQTTnet.Server
{
public class MqttSubscriptionInterceptorContext
{
public MqttSubscriptionInterceptorContext(string clientId, TopicFilter topicFilter)
{
ClientId = clientId;
- TopicFilter = topicFilter;
+ TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));
}
public string ClientId { get; }
public TopicFilter TopicFilter { get; }
-
+
public bool AcceptSubscription { get; set; } = true;
public bool CloseConnection { get; set; }
diff --git a/Frameworks/MQTTnet.NetStandard/TopicFilter.cs b/Frameworks/MQTTnet.NetStandard/TopicFilter.cs
index b82fc71..da126d6 100644
--- a/Frameworks/MQTTnet.NetStandard/TopicFilter.cs
+++ b/Frameworks/MQTTnet.NetStandard/TopicFilter.cs
@@ -4,7 +4,7 @@ namespace MQTTnet
{
public sealed class TopicFilter
{
- public TopicFilter(string topic, MqttQualityOfServiceLevel qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce)
+ public TopicFilter(string topic, MqttQualityOfServiceLevel qualityOfServiceLevel)
{
Topic = topic;
QualityOfServiceLevel = qualityOfServiceLevel;
diff --git a/MQTTnet.sln b/MQTTnet.sln
index ceaa04f..cece21c 100644
--- a/MQTTnet.sln
+++ b/MQTTnet.sln
@@ -1,7 +1,7 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
-VisualStudioVersion = 15.0.27004.2009
+VisualStudioVersion = 15.0.27004.2010
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject
@@ -33,6 +33,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.AspNetCore2
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore", "Frameworks\MQTTnet.AspnetCore\MQTTnet.AspNetCore.csproj", "{F10C4060-F7EE-4A83-919F-FF723E72F94A}"
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Extensions", "Extensions", "{12816BCC-AF9E-44A9-9AE5-C246AF2A0587}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.Rpc", "Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj", "{C444E9C8-95FA-430E-9126-274129DE16CD}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -147,6 +151,22 @@ Global
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.Build.0 = Release|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.ActiveCfg = Release|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.Build.0 = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|ARM.ActiveCfg = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|ARM.Build.0 = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x64.Build.0 = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x86.Build.0 = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|Any CPU.Build.0 = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|ARM.ActiveCfg = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|ARM.Build.0 = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x64.ActiveCfg = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x64.Build.0 = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x86.ActiveCfg = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -158,6 +178,7 @@ Global
{3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{F10C4060-F7EE-4A83-919F-FF723E72F94A} = {32A630A7-2598-41D7-B625-204CD906F5FB}
+ {C444E9C8-95FA-430E-9126-274129DE16CD} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894}
diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
index 25c9352..9f51b86 100644
--- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
@@ -4,7 +4,6 @@ using System.IO;
using System.Text;
using System.Threading;
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using MQTTnet.Adapter;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Serializer;
@@ -410,7 +409,7 @@ namespace MQTTnet.Core.Tests
using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, header.BodyLength))
{
- var deserializedPacket = serializer.Deserialize(new ReceivedMqttPacket(header, bodyStream));
+ var deserializedPacket = serializer.Deserialize(header, bodyStream.ToArray());
var buffer2 = serializer.Serialize(deserializedPacket);
Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(Join(buffer2)));
diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
index f1c8ff0..5081dfc 100644
--- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
@@ -62,7 +62,7 @@ namespace MQTTnet.Core.Tests
var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage);
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
- await c1.SubscribeAsync(new TopicFilter("#"));
+ await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
await c2.DisconnectAsync();
@@ -167,7 +167,7 @@ namespace MQTTnet.Core.Tests
var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
- await c2.SubscribeAsync(new TopicFilter("retained"));
+ await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
await Task.Delay(500);
}
@@ -199,7 +199,7 @@ namespace MQTTnet.Core.Tests
var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
- await c2.SubscribeAsync(new TopicFilter("retained"));
+ await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
await Task.Delay(500);
}
@@ -277,7 +277,7 @@ namespace MQTTnet.Core.Tests
var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
- await c2.SubscribeAsync(new TopicFilter("retained"));
+ await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
await Task.Delay(500);
}
diff --git a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
index b6b68dc..50acd61 100644
--- a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
@@ -14,9 +14,9 @@ namespace MQTTnet.Core.Tests
var sm = new MqttClientSubscriptionsManager(new MqttServerOptions());
var sp = new MqttSubscribePacket();
- sp.TopicFilters.Add(new TopicFilter("A/B/C"));
+ sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
- sm.Subscribe(sp, "");
+ sm.SubscribeAsync(sp, "").Wait();
var pp = new MqttApplicationMessage
{
@@ -24,7 +24,52 @@ namespace MQTTnet.Core.Tests
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};
- Assert.IsTrue(sm.CheckSubscriptions(pp).IsSubscribed);
+ var result = sm.CheckSubscriptionsAsync(pp).Result;
+ Assert.IsTrue(result.IsSubscribed);
+ Assert.AreEqual(result.QualityOfServiceLevel, MqttQualityOfServiceLevel.AtMostOnce);
+ }
+
+ [TestMethod]
+ public void MqttSubscriptionsManager_SubscribeDifferentQoSSuccess()
+ {
+ var sm = new MqttClientSubscriptionsManager(new MqttServerOptions());
+
+ var sp = new MqttSubscribePacket();
+ sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce));
+
+ sm.SubscribeAsync(sp, "").Wait();
+
+ var pp = new MqttApplicationMessage
+ {
+ Topic = "A/B/C",
+ QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce
+ };
+
+ var result = sm.CheckSubscriptionsAsync(pp).Result;
+ Assert.IsTrue(result.IsSubscribed);
+ Assert.AreEqual(result.QualityOfServiceLevel, MqttQualityOfServiceLevel.AtMostOnce);
+ }
+
+ [TestMethod]
+ public void MqttSubscriptionsManager_SubscribeTwoTimesSuccess()
+ {
+ var sm = new MqttClientSubscriptionsManager(new MqttServerOptions());
+
+ var sp = new MqttSubscribePacket();
+ sp.TopicFilters.Add(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce));
+ sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtLeastOnce));
+
+ sm.SubscribeAsync(sp, "").Wait();
+
+ var pp = new MqttApplicationMessage
+ {
+ Topic = "A/B/C",
+ QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce
+ };
+
+ var result = sm.CheckSubscriptionsAsync(pp).Result;
+ Assert.IsTrue(result.IsSubscribed);
+ Assert.AreEqual(result.QualityOfServiceLevel, MqttQualityOfServiceLevel.AtLeastOnce);
}
[TestMethod]
@@ -33,9 +78,9 @@ namespace MQTTnet.Core.Tests
var sm = new MqttClientSubscriptionsManager(new MqttServerOptions());
var sp = new MqttSubscribePacket();
- sp.TopicFilters.Add(new TopicFilter("A/B/C"));
+ sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
- sm.Subscribe(sp, "");
+ sm.SubscribeAsync(sp, "").Wait();
var pp = new MqttApplicationMessage
{
@@ -43,7 +88,7 @@ namespace MQTTnet.Core.Tests
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};
- Assert.IsFalse(sm.CheckSubscriptions(pp).IsSubscribed);
+ Assert.IsFalse(sm.CheckSubscriptionsAsync(pp).Result.IsSubscribed);
}
[TestMethod]
@@ -52,9 +97,9 @@ namespace MQTTnet.Core.Tests
var sm = new MqttClientSubscriptionsManager(new MqttServerOptions());
var sp = new MqttSubscribePacket();
- sp.TopicFilters.Add(new TopicFilter("A/B/C"));
+ sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
- sm.Subscribe(sp, "");
+ sm.SubscribeAsync(sp, "").Wait();
var pp = new MqttApplicationMessage
{
@@ -62,13 +107,13 @@ namespace MQTTnet.Core.Tests
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};
- Assert.IsTrue(sm.CheckSubscriptions(pp).IsSubscribed);
+ Assert.IsTrue(sm.CheckSubscriptionsAsync(pp).Result.IsSubscribed);
var up = new MqttUnsubscribePacket();
up.TopicFilters.Add("A/B/C");
- sm.Unsubscribe(up);
+ sm.UnsubscribeAsync(up).Wait();
- Assert.IsFalse(sm.CheckSubscriptions(pp).IsSubscribed);
+ Assert.IsFalse(sm.CheckSubscriptionsAsync(pp).Result.IsSubscribed);
}
}
}
diff --git a/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs
index 09788cc..dd4a1bc 100644
--- a/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs
+++ b/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs
@@ -2,6 +2,7 @@
using System.Text;
using System.Threading.Tasks;
using MQTTnet.Client;
+using MQTTnet.Protocol;
namespace MQTTnet.TestApp.NetCore
{
@@ -17,7 +18,8 @@ namespace MQTTnet.TestApp.NetCore
CleanSession = true,
ChannelOptions = new MqttClientTcpOptions
{
- Server = "localhost"
+ //Server = "localhost",
+ Server = "192.168.1.174"
},
//ChannelOptions = new MqttClientWebSocketOptions
//{
@@ -78,6 +80,8 @@ namespace MQTTnet.TestApp.NetCore
{
Console.ReadLine();
+ await client.SubscribeAsync(new TopicFilter("test", MqttQualityOfServiceLevel.AtMostOnce));
+
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("A/B/C")
.WithPayload("Hello World")
diff --git a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj
index 8f25448..1784cc0 100644
--- a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj
+++ b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj
@@ -6,6 +6,10 @@
netcoreapp2.0;net452;net461
+
+ RELEASE;NETCOREAPP2_0
+
+
diff --git a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs
index 1c72b06..5f2e5ad 100644
--- a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs
+++ b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs
@@ -1,5 +1,4 @@
using System;
-using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
@@ -64,9 +63,17 @@ namespace MQTTnet.TestApp.NetCore
sentMessagesCount++;
}
- Console.WriteLine($"Sending {sentMessagesCount} messages per second.");
-
+ Console.WriteLine($"Sending {sentMessagesCount} messages per second. #1");
+
+ sentMessagesCount = 0;
stopwatch.Restart();
+ while (stopwatch.ElapsedMilliseconds < 1000)
+ {
+ await client.PublishAsync(messages).ConfigureAwait(false);
+ sentMessagesCount++;
+ }
+
+ Console.WriteLine($"Sending {sentMessagesCount} messages per second. #2");
var testMessageCount = 10000;
for (var i = 0; i < testMessageCount; i++)
@@ -141,8 +148,8 @@ namespace MQTTnet.TestApp.NetCore
{
var mqttServer = new MqttFactory().CreateMqttServer();
- var msgs = 0;
- var stopwatch = Stopwatch.StartNew();
+ ////var msgs = 0;
+ ////var stopwatch = Stopwatch.StartNew();
////mqttServer.ApplicationMessageReceived += (sender, args) =>
////{
//// msgs++;
diff --git a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
index 8f89176..bd8b4ef 100644
--- a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
+++ b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
@@ -29,6 +29,7 @@ namespace MQTTnet.TestApp.NetCore
},
Storage = new RetainedMessageHandler(),
+
ApplicationMessageInterceptor = context =>
{
if (MqttTopicFilterComparer.IsMatch(context.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#"))
@@ -67,18 +68,29 @@ namespace MQTTnet.TestApp.NetCore
mqttServer.ApplicationMessageReceived += (s, e) =>
{
MqttNetConsoleLogger.PrintToConsole(
- $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}'",
+ $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0])}'",
ConsoleColor.Magenta);
};
options.ApplicationMessageInterceptor = c =>
{
- var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload));
- var timestampProperty = content.Property("timestamp");
- if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
+ if (c.ApplicationMessage.Payload == null || c.ApplicationMessage.Payload.Length == 0)
+ {
+ return;
+ }
+
+ try
+ {
+ var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload));
+ var timestampProperty = content.Property("timestamp");
+ if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
+ {
+ timestampProperty.Value = DateTime.Now.ToString("O");
+ c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString());
+ }
+ }
+ catch (Exception e)
{
- timestampProperty.Value = DateTime.Now.ToString("O");
- c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString());
}
};
diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj
index 4445af3..5636dd2 100644
--- a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj
+++ b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj
@@ -127,6 +127,10 @@
+
+ {c444e9c8-95fa-430e-9126-274129de16cd}
+ MQTTnet.Extensions.Rpc
+
{3587e506-55a2-4eb3-99c7-dc01e42d25d2}
MQTTnet.NetStandard
diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml
index 43b26c7..cf05888 100644
--- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml
+++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml
@@ -65,6 +65,40 @@
+
+
+ Method:
+
+
+ Payload:
+
+
+ Text
+ Base64
+
+
+ QoS:
+
+ 0 (At most once)
+ 1 (At least once)
+ 2 (Exactly once)
+
+
+ Responses:
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Topic:
@@ -86,7 +120,7 @@
-
+
@@ -101,7 +135,7 @@
Persist retained messages in JSON format
Clear previously retained messages on startup
-
+
diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs
index 00c6ffa..bbd6474 100644
--- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs
+++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs
@@ -7,6 +7,8 @@ using Windows.UI.Core;
using Windows.UI.Xaml;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
+using MQTTnet.Exceptions;
+using MQTTnet.Extensions.Rpc;
using MQTTnet.Implementations;
using MQTTnet.ManagedClient;
using MQTTnet.Protocol;
@@ -245,6 +247,95 @@ namespace MQTTnet.TestApp.UniversalWindows
// This code is for the Wiki at GitHub!
// ReSharper disable once UnusedMember.Local
+
+ private async void StartServer(object sender, RoutedEventArgs e)
+ {
+ if (_mqttServer != null)
+ {
+ return;
+ }
+
+ JsonServerStorage storage = null;
+ if (ServerPersistRetainedMessages.IsChecked == true)
+ {
+ storage = new JsonServerStorage();
+
+ if (ServerClearRetainedMessages.IsChecked == true)
+ {
+ storage.Clear();
+ }
+ }
+
+ _mqttServer = new MqttFactory().CreateMqttServer();
+
+ var options = new MqttServerOptions();
+ options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text);
+ options.Storage = storage;
+
+ await _mqttServer.StartAsync(options);
+ }
+
+ private async void StopServer(object sender, RoutedEventArgs e)
+ {
+ if (_mqttServer == null)
+ {
+ return;
+ }
+
+ await _mqttServer.StopAsync();
+ _mqttServer = null;
+ }
+
+ private void ClearReceivedMessages(object sender, RoutedEventArgs e)
+ {
+ ReceivedMessages.Items.Clear();
+ }
+
+ private async void ExecuteRpc(object sender, RoutedEventArgs e)
+ {
+ var qos = MqttQualityOfServiceLevel.AtMostOnce;
+ if (RpcQoS1.IsChecked == true)
+ {
+ qos = MqttQualityOfServiceLevel.AtLeastOnce;
+ }
+
+ if (RpcQoS2.IsChecked == true)
+ {
+ qos = MqttQualityOfServiceLevel.ExactlyOnce;
+ }
+
+ var payload = new byte[0];
+ if (RpcText.IsChecked == true)
+ {
+ payload = Encoding.UTF8.GetBytes(RpcPayload.Text);
+ }
+
+ if (RpcBase64.IsChecked == true)
+ {
+ payload = Convert.FromBase64String(RpcPayload.Text);
+ }
+
+
+ try
+ {
+ var rpcClient = new MqttRpcClient(_mqttClient);
+ await rpcClient.EnableAsync();
+ var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos);
+ await rpcClient.DisableAsync();
+
+ RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response));
+ }
+ catch (MqttCommunicationTimedOutException)
+ {
+ RpcResponses.Items.Add(RpcMethod.Text + " >>> [TIMEOUT]");
+ }
+ }
+
+ private void ClearRpcResponses(object sender, RoutedEventArgs e)
+ {
+ RpcResponses.Items.Clear();
+ }
+
private async Task WikiCode()
{
{
@@ -293,9 +384,9 @@ namespace MQTTnet.TestApp.UniversalWindows
{
// Use secure TCP connection.
var options = new MqttClientOptionsBuilder()
- .WithTcpServer("broker.hivemq.com")
- .WithTls()
- .Build();
+ .WithTcpServer("broker.hivemq.com")
+ .WithTls()
+ .Build();
}
{
@@ -480,48 +571,5 @@ namespace MQTTnet.TestApp.UniversalWindows
}
}
-
- private async void StartServer(object sender, RoutedEventArgs e)
- {
- if (_mqttServer != null)
- {
- return;
- }
-
- JsonServerStorage storage = null;
- if (ServerPersistRetainedMessages.IsChecked == true)
- {
- storage = new JsonServerStorage();
-
- if (ServerClearRetainedMessages.IsChecked == true)
- {
- storage.Clear();
- }
- }
-
- _mqttServer = new MqttFactory().CreateMqttServer();
-
- var options = new MqttServerOptions();
- options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text);
- options.Storage = storage;
-
- await _mqttServer.StartAsync(options);
- }
-
- private async void StopServer(object sender, RoutedEventArgs e)
- {
- if (_mqttServer == null)
- {
- return;
- }
-
- await _mqttServer.StopAsync();
- _mqttServer = null;
- }
-
- private void ClearReceivedMessages(object sender, RoutedEventArgs e)
- {
- ReceivedMessages.Items.Clear();
- }
}
}