From d12d2648b7eed47ca839550e0e6dfd4ea9a446f4 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Mon, 4 Dec 2017 10:21:37 +0100 Subject: [PATCH] Add extension for RPC calls; replace locks; Refactored storage in Managed Client --- Build/MQTTnet.nuspec | 6 +- .../MQTTnet.Extensions.Rpc.csproj | 30 ++++ .../MQTTnet.Extensions.Rpc/MqttRpcClient.cs | 101 +++++++++++++ .../MQTTnet.AspnetCore.csproj | 9 +- .../Adapter/MqttChannelAdapter.cs | 47 +++--- .../Adapter/ReceivedMqttPacket.cs | 12 +- .../Implementations/MqttTcpChannel.cs | 7 +- .../MQTTnet.Netstandard.csproj | 5 +- .../ManagedClient/ManagedMqttClient.cs | 57 ++++--- .../ManagedMqttClientStorageManager.cs | 32 ++-- .../Serializer/IMqttPacketSerializer.cs | 5 +- .../Serializer/MqttPacketReader.cs | 25 ++-- .../Serializer/MqttPacketSerializer.cs | 15 +- .../Server/MqttClientSession.cs | 43 ++++-- .../Server/MqttClientSessionsManager.cs | 71 ++++----- .../Server/MqttClientSubscriptionsManager.cs | 113 ++++++++++---- .../Server/MqttRetainedMessagesManager.cs | 14 +- .../MQTTnet.NetStandard/Server/MqttServer.cs | 43 +++--- .../MqttSubscriptionInterceptorContext.cs | 8 +- Frameworks/MQTTnet.NetStandard/TopicFilter.cs | 2 +- MQTTnet.sln | 23 ++- .../MqttPacketSerializerTests.cs | 3 +- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 8 +- .../MqttSubscriptionsManagerTests.cs | 67 +++++++-- Tests/MQTTnet.TestApp.NetCore/ClientTest.cs | 6 +- .../MQTTnet.TestApp.NetCore.csproj | 4 + .../PerformanceTest.cs | 17 ++- Tests/MQTTnet.TestApp.NetCore/ServerTest.cs | 24 ++- .../MQTTnet.TestApp.UniversalWindows.csproj | 4 + .../MainPage.xaml | 38 ++++- .../MainPage.xaml.cs | 140 ++++++++++++------ 31 files changed, 681 insertions(+), 298 deletions(-) create mode 100644 Extensions/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj create mode 100644 Extensions/MQTTnet.Extensions.Rpc/MqttRpcClient.cs diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 71a9d55..302b128 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -14,10 +14,14 @@ * [Core] Added a strong name for the assembly. * [Core] Performance optimizations. * [Core] Fixed a logging issue when dealing with IOExceptions. +* [Core] Fixed a typo in the global logger class (BREAKING CHANGE! Please find new example in Wiki). * [Client] Fixed an issue in _ManagedClient_ which can cause the client to stop when publishing subscriptions. * [Server] The application message interceptor can now delete any received application message. -* [Server] Added a ConnectionValidator context to align with other APIs. +* [Server] Added a ConnectionValidator context to align with other APIs (BREAKING CHANGE! Please find new example in Wiki). * [Server] Added an interface for the _MqttServerOptions_. +* [Server] Added packet statistics for the connected clients. +* [Server] Fixed a security issue which sends retained packages to a failed subscription. +* [Server] Fixed the response (MaximumQoS) of a subscription (Thanks to @redbeans2017). Copyright Christian Kratky 2016-2017 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Extensions/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj b/Extensions/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj new file mode 100644 index 0000000..fd16072 --- /dev/null +++ b/Extensions/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj @@ -0,0 +1,30 @@ + + + + netstandard1.3;netstandard2.0;net452;net461;uap10.0 + 0.0.0.0 + 0.0.0.0 + + + + + 0.0.0.0 + + + + false + UAP,Version=v10.0 + UAP + 10.0.16299.0 + 10.0.10240.0 + .NETCore + v5.0 + $(DefineConstants);WINDOWS_UWP + $(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets + + + + + + + diff --git a/Extensions/MQTTnet.Extensions.Rpc/MqttRpcClient.cs b/Extensions/MQTTnet.Extensions.Rpc/MqttRpcClient.cs new file mode 100644 index 0000000..6a9d1c1 --- /dev/null +++ b/Extensions/MQTTnet.Extensions.Rpc/MqttRpcClient.cs @@ -0,0 +1,101 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using MQTTnet.Client; +using MQTTnet.Internal; +using MQTTnet.Protocol; + +namespace MQTTnet.Extensions.Rpc +{ + public sealed class MqttRpcClient : IDisposable + { + private const string ResponseTopic = "$RPC/+/+/response"; + private readonly ConcurrentDictionary> _waitingCalls = new ConcurrentDictionary>(); + private readonly IMqttClient _mqttClient; + private bool _isEnabled; + + public MqttRpcClient(IMqttClient mqttClient) + { + _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); + + _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; + } + + public async Task EnableAsync() + { + await _mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(ResponseTopic).WithAtLeastOnceQoS().Build()); + _isEnabled = true; + } + + public async Task DisableAsync() + { + await _mqttClient.UnsubscribeAsync(ResponseTopic); + _isEnabled = false; + } + + public async Task ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel) + { + if (methodName == null) throw new ArgumentNullException(nameof(methodName)); + + if (methodName.Contains("/") || methodName.Contains("+") || methodName.Contains("#")) + { + throw new ArgumentException("The method name cannot contain /, + or #."); + } + + if (!_isEnabled) + { + throw new InvalidOperationException("The RPC client is not enabled."); + } + + var requestTopic = $"$MQTTnet.RPC/{Guid.NewGuid():N}/{methodName}"; + var responseTopic = requestTopic + "/response"; + + var requestMessage = new MqttApplicationMessageBuilder() + .WithTopic(requestTopic) + .WithPayload(payload) + .WithQualityOfServiceLevel(qualityOfServiceLevel) + .Build(); + + try + { + var tcs = new TaskCompletionSource(); + if (!_waitingCalls.TryAdd(responseTopic, tcs)) + { + throw new InvalidOperationException(); + } + + await _mqttClient.PublishAsync(requestMessage); + return await tcs.Task.TimeoutAfter(timeout); + } + finally + { + _waitingCalls.TryRemove(responseTopic, out _); + } + } + + private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs) + { + if (!_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out TaskCompletionSource tcs)) + { + return; + } + + if (tcs.Task.IsCompleted || tcs.Task.IsCanceled) + { + return; + } + + tcs.TrySetResult(eventArgs.ApplicationMessage.Payload); + } + + public void Dispose() + { + foreach (var tcs in _waitingCalls) + { + tcs.Value.SetCanceled(); + } + + _waitingCalls.Clear(); + } + } +} diff --git a/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj b/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj index fbde3f2..1f23602 100644 --- a/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj +++ b/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj @@ -2,8 +2,13 @@ netstandard2.0 - 2.5.2.0 - 2.5.2.0 + 0.0.0.0 + 0.0.0.0 + 0.0.0.0 + + + + diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index af39958..0cbcc93 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -18,6 +18,8 @@ namespace MQTTnet.Adapter { private const uint ErrorOperationAborted = 0x800703E3; + private static readonly byte[] EmptyBody = new byte[0]; + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); private readonly IMqttNetLogger _logger; private readonly IMqttChannel _channel; @@ -89,35 +91,28 @@ namespace MQTTnet.Adapter MqttBasePacket packet = null; await ExecuteAndWrapExceptionAsync(async () => { - ReceivedMqttPacket receivedMqttPacket = null; - try + ReceivedMqttPacket receivedMqttPacket; + if (timeout > TimeSpan.Zero) { - if (timeout > TimeSpan.Zero) - { - receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false); - } - else - { - receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).ConfigureAwait(false); - } - - if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested) - { - throw new TaskCanceledException(); - } - - packet = PacketSerializer.Deserialize(receivedMqttPacket); - if (packet == null) - { - throw new MqttProtocolViolationException("Received malformed packet."); - } + receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false); + } + else + { + receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).ConfigureAwait(false); + } - _logger.Trace("RX <<< {0}", packet); + if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException(); } - finally + + packet = PacketSerializer.Deserialize(receivedMqttPacket.Header, receivedMqttPacket.Body); + if (packet == null) { - receivedMqttPacket?.Dispose(); + throw new MqttProtocolViolationException("Received malformed packet."); } + + _logger.Trace("RX <<< {0}", packet); }).ConfigureAwait(false); return packet; @@ -133,7 +128,7 @@ namespace MQTTnet.Adapter if (header.BodyLength == 0) { - return new ReceivedMqttPacket(header, new MemoryStream(0)); + return new ReceivedMqttPacket(header, EmptyBody); } var body = new byte[header.BodyLength]; @@ -145,7 +140,7 @@ namespace MQTTnet.Adapter offset += readBytesCount; } while (offset < header.BodyLength); - return new ReceivedMqttPacket(header, new MemoryStream(body, 0, body.Length, false, true)); + return new ReceivedMqttPacket(header, body); } private static async Task ExecuteAndWrapExceptionAsync(Func action) diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/ReceivedMqttPacket.cs b/Frameworks/MQTTnet.NetStandard/Adapter/ReceivedMqttPacket.cs index a44fb54..c92f9d0 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/ReceivedMqttPacket.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/ReceivedMqttPacket.cs @@ -1,12 +1,11 @@ using System; -using System.IO; using MQTTnet.Packets; namespace MQTTnet.Adapter { - public sealed class ReceivedMqttPacket : IDisposable + public class ReceivedMqttPacket { - public ReceivedMqttPacket(MqttPacketHeader header, MemoryStream body) + public ReceivedMqttPacket(MqttPacketHeader header, byte[] body) { Header = header ?? throw new ArgumentNullException(nameof(header)); Body = body ?? throw new ArgumentNullException(nameof(body)); @@ -14,11 +13,6 @@ namespace MQTTnet.Adapter public MqttPacketHeader Header { get; } - public MemoryStream Body { get; } - - public void Dispose() - { - Body?.Dispose(); - } + public byte[] Body { get; } } } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index 7e23fdb..7e6f663 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -70,7 +70,7 @@ namespace MQTTnet.Implementations _sslStream = new SslStream(new NetworkStream(_socket, true), false, InternalUserCertificateValidationCallback); await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(_options), SslProtocols.Tls12, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false); } - + CreateStreams(_socket, _sslStream); } @@ -139,10 +139,7 @@ namespace MQTTnet.Implementations private void CreateStreams(Socket socket, Stream sslStream) { var stream = sslStream ?? new NetworkStream(socket); - - //cannot use this as default buffering prevents from receiving the first connect message - //need two streams otherwise read and write have to be synchronized - + //todo: if branch can be used with min dependency NetStandard1.6 #if NET452 || NET461 SendStream = new BufferedStream(stream, BufferSize); diff --git a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj index abcc865..92920cd 100644 --- a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj +++ b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj @@ -6,8 +6,8 @@ MQTTnet False Full - 2.5.3.0 - 2.5.3.0 + 0.0.0.0 + 0.0.0.0 0.0.0.0 @@ -29,7 +29,6 @@ v5.0 $(DefineConstants);WINDOWS_UWP $(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets - diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs index 7ae7250..9b01156 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs @@ -13,9 +13,9 @@ namespace MQTTnet.ManagedClient { public class ManagedMqttClient : IManagedMqttClient { - private readonly ManagedMqttClientStorageManager _storageManager = new ManagedMqttClientStorageManager(); private readonly BlockingCollection _messageQueue = new BlockingCollection(); private readonly HashSet _subscriptions = new HashSet(); + private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1); private readonly IMqttClient _mqttClient; private readonly IMqttNetLogger _logger; @@ -23,7 +23,9 @@ namespace MQTTnet.ManagedClient private CancellationTokenSource _connectionCancellationToken; private CancellationTokenSource _publishingCancellationToken; + private ManagedMqttClientStorageManager _storageManager; private IManagedMqttClientOptions _options; + private bool _subscriptionsNotPushed; public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger) @@ -55,15 +57,11 @@ namespace MQTTnet.ManagedClient if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started."); _options = options; - await _storageManager.SetStorageAsync(_options.Storage).ConfigureAwait(false); - + if (_options.Storage != null) { - var loadedMessages = await _options.Storage.LoadQueuedMessagesAsync().ConfigureAwait(false); - foreach (var loadedMessage in loadedMessages) - { - _messageQueue.Add(loadedMessage); - } + _storageManager = new ManagedMqttClientStorageManager(_options.Storage); + await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false); } _connectionCancellationToken = new CancellationTokenSource(); @@ -97,16 +95,21 @@ namespace MQTTnet.ManagedClient foreach (var applicationMessage in applicationMessages) { - await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); + if (_storageManager != null) + { + await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); + } + _messageQueue.Add(applicationMessage); } } - public Task SubscribeAsync(IEnumerable topicFilters) + public async Task SubscribeAsync(IEnumerable topicFilters) { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - lock (_subscriptions) + await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false); + try { foreach (var topicFilter in topicFilters) { @@ -116,13 +119,16 @@ namespace MQTTnet.ManagedClient } } } - - return Task.FromResult(0); + finally + { + _subscriptionsSemaphore.Release(); + } } - public Task UnsubscribeAsync(IEnumerable topicFilters) + public async Task UnsubscribeAsync(IEnumerable topicFilters) { - lock (_subscriptions) + await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false); + try { foreach (var topicFilter in topicFilters) { @@ -132,8 +138,10 @@ namespace MQTTnet.ManagedClient } } } - - return Task.FromResult(0); + finally + { + _subscriptionsSemaphore.Release(); + } } private async Task MaintainConnectionAsync(CancellationToken cancellationToken) @@ -242,7 +250,11 @@ namespace MQTTnet.ManagedClient try { await _mqttClient.PublishAsync(message).ConfigureAwait(false); - await _storageManager.RemoveAsync(message).ConfigureAwait(false); + + if (_storageManager != null) + { + await _storageManager.RemoveAsync(message).ConfigureAwait(false); + } } catch (MqttCommunicationException exception) { @@ -264,13 +276,18 @@ namespace MQTTnet.ManagedClient _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions"); List subscriptions; - lock (_subscriptions) + await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false); + try { subscriptions = _subscriptions.ToList(); _subscriptionsNotPushed = false; } + finally + { + _subscriptionsSemaphore.Release(); + } - if (!_subscriptions.Any()) + if (!subscriptions.Any()) { return; } diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs index d55f137..0e71776 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientStorageManager.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -8,18 +9,19 @@ namespace MQTTnet.ManagedClient { private readonly List _applicationMessages = new List(); private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); - private IManagedMqttClientStorage _storage; + private readonly IManagedMqttClientStorage _storage; - public async Task SetStorageAsync(IManagedMqttClientStorage storage) + public ManagedMqttClientStorageManager(IManagedMqttClientStorage storage) { - await _semaphore.WaitAsync().ConfigureAwait(false); - try - { - _storage = storage; - } - finally + _storage = storage ?? throw new ArgumentNullException(nameof(storage)); + } + + public async Task LoadQueuedMessagesAsync() + { + var loadedMessages = await _storage.LoadQueuedMessagesAsync().ConfigureAwait(false); + foreach (var loadedMessage in loadedMessages) { - _semaphore.Release(); + _applicationMessages.Add(loadedMessage); } } @@ -28,11 +30,6 @@ namespace MQTTnet.ManagedClient await _semaphore.WaitAsync().ConfigureAwait(false); try { - if (_storage == null) - { - return; - } - _applicationMessages.Add(applicationMessage); await SaveAsync().ConfigureAwait(false); } @@ -47,11 +44,6 @@ namespace MQTTnet.ManagedClient await _semaphore.WaitAsync().ConfigureAwait(false); try { - if (_storage == null) - { - return; - } - var index = _applicationMessages.IndexOf(applicationMessage); if (index == -1) { diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/IMqttPacketSerializer.cs b/Frameworks/MQTTnet.NetStandard/Serializer/IMqttPacketSerializer.cs index 439460d..2068641 100644 --- a/Frameworks/MQTTnet.NetStandard/Serializer/IMqttPacketSerializer.cs +++ b/Frameworks/MQTTnet.NetStandard/Serializer/IMqttPacketSerializer.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using MQTTnet.Adapter; using MQTTnet.Packets; namespace MQTTnet.Serializer @@ -9,8 +8,8 @@ namespace MQTTnet.Serializer { MqttProtocolVersion ProtocolVersion { get; set; } - IEnumerable> Serialize(MqttBasePacket mqttPacket); + ICollection> Serialize(MqttBasePacket mqttPacket); - MqttBasePacket Deserialize(ReceivedMqttPacket receivedMqttPacket); + MqttBasePacket Deserialize(MqttPacketHeader header, byte[] body); } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs index 04ee01f..a22707c 100644 --- a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs +++ b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs @@ -1,9 +1,9 @@ using System; +using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; using System.Threading.Tasks; -using MQTTnet.Adapter; using MQTTnet.Exceptions; using MQTTnet.Packets; using MQTTnet.Protocol; @@ -12,15 +12,15 @@ namespace MQTTnet.Serializer { public sealed class MqttPacketReader : BinaryReader { - private readonly ReceivedMqttPacket _receivedMqttPacket; - - public MqttPacketReader(ReceivedMqttPacket receivedMqttPacket) - : base(receivedMqttPacket.Body, Encoding.UTF8, true) + private readonly MqttPacketHeader _header; + + public MqttPacketReader(MqttPacketHeader header, Stream bodyStream) + : base(bodyStream, Encoding.UTF8, true) { - _receivedMqttPacket = receivedMqttPacket; + _header = header; } - public bool EndOfRemainingData => BaseStream.Position == _receivedMqttPacket.Header.BodyLength; + public bool EndOfRemainingData => BaseStream.Position == _header.BodyLength; public static MqttPacketHeader ReadHeaderFromSource(Stream stream, CancellationToken cancellationToken) { @@ -77,7 +77,7 @@ namespace MQTTnet.Serializer public byte[] ReadRemainingData() { - return ReadBytes(_receivedMqttPacket.Header.BodyLength - (int)BaseStream.Position); + return ReadBytes(_header.BodyLength - (int)BaseStream.Position); } private static int ReadBodyLengthFromSource(Stream stream, CancellationToken cancellationToken) @@ -87,7 +87,7 @@ namespace MQTTnet.Serializer var value = 0; byte encodedByte; - ////var readBytes = new List(); + var readBytes = new List(); do { if (cancellationToken.IsCancellationRequested) @@ -101,15 +101,14 @@ namespace MQTTnet.Serializer throw new MqttCommunicationException("Connection closed while reading remaining length data."); } - ////readBytes.Add(buffer); - encodedByte = (byte)buffer; + readBytes.Add(encodedByte); + value += (byte)(encodedByte & 127) * multiplier; multiplier *= 128; if (multiplier > 128 * 128 * 128) { - //throw new MqttProtocolViolationException($"Remaining length is invalid (Data={string.Join(",", readBytes)})."); - throw new MqttProtocolViolationException("Remaining length is invalid."); + throw new MqttProtocolViolationException($"Remaining length is invalid (Data={string.Join(",", readBytes)})."); } } while ((encodedByte & 128) != 0); diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs index 0b04114..88414d5 100644 --- a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs +++ b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs @@ -1,5 +1,4 @@ -using MQTTnet.Adapter; -using MQTTnet.Exceptions; +using MQTTnet.Exceptions; using MQTTnet.Packets; using MQTTnet.Protocol; using System; @@ -17,7 +16,7 @@ namespace MQTTnet.Serializer public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; - public IEnumerable> Serialize(MqttBasePacket packet) + public ICollection> Serialize(MqttBasePacket packet) { if (packet == null) throw new ArgumentNullException(nameof(packet)); @@ -43,13 +42,15 @@ namespace MQTTnet.Serializer } } - public MqttBasePacket Deserialize(ReceivedMqttPacket receivedMqttPacket) + public MqttBasePacket Deserialize(MqttPacketHeader header, byte[] body) { - if (receivedMqttPacket == null) throw new ArgumentNullException(nameof(receivedMqttPacket)); + if (header == null) throw new ArgumentNullException(nameof(header)); + if (body == null) throw new ArgumentNullException(nameof(body)); - using (var reader = new MqttPacketReader(receivedMqttPacket)) + using (var bodyStream = new MemoryStream(body)) + using (var reader = new MqttPacketReader(header, bodyStream)) { - return Deserialize(receivedMqttPacket.Header, reader); + return Deserialize(header, reader); } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs index 12ef891..8f7199a 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs @@ -14,15 +14,17 @@ namespace MQTTnet.Server { public sealed class MqttClientSession { - private readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch(); - private readonly Stopwatch _lastNonKeepAlivePacketReceivedTracker = new Stopwatch(); + private readonly Stopwatch _lastPacketReceivedTracker = Stopwatch.StartNew(); + private readonly Stopwatch _lastNonKeepAlivePacketReceivedTracker = Stopwatch.StartNew(); - private readonly MqttClientSubscriptionsManager _subscriptionsManager; - private readonly MqttClientSessionsManager _sessionsManager; - private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; private readonly IMqttServerOptions _options; private readonly IMqttNetLogger _logger; + private readonly MqttClientSessionsManager _sessionsManager; + private readonly MqttRetainedMessagesManager _retainedMessagesManager; + private readonly MqttClientSubscriptionsManager _subscriptionsManager; + private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; + private IMqttChannelAdapter _adapter; private CancellationTokenSource _cancellationTokenSource; private MqttApplicationMessage _willMessage; @@ -30,16 +32,17 @@ namespace MQTTnet.Server public MqttClientSession( string clientId, IMqttServerOptions options, + MqttRetainedMessagesManager retainedMessagesManager, MqttClientSessionsManager sessionsManager, IMqttNetLogger logger) { + _options = options ?? throw new ArgumentNullException(nameof(options)); + _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); ClientId = clientId; - - _options = options; - + _subscriptionsManager = new MqttClientSubscriptionsManager(_options); _pendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger); } @@ -117,11 +120,11 @@ namespace MQTTnet.Server } } - public void EnqueueApplicationMessage(MqttApplicationMessage applicationMessage) + public async Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage) { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); - var result = _subscriptionsManager.CheckSubscriptions(applicationMessage); + var result = await _subscriptionsManager.CheckSubscriptionsAsync(applicationMessage); if (!result.IsSubscribed) { return; @@ -129,6 +132,7 @@ namespace MQTTnet.Server var publishPacket = applicationMessage.ToPublishPacket(); publishPacket.QualityOfServiceLevel = result.QualityOfServiceLevel; + _pendingMessagesQueue.Enqueue(publishPacket); } @@ -199,7 +203,7 @@ namespace MQTTnet.Server if (packet is MqttUnsubscribePacket unsubscribePacket) { - return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, _subscriptionsManager.Unsubscribe(unsubscribePacket)); + return HandleIncomingUnsubscribePacketAsync(adapter, unsubscribePacket, cancellationToken); } if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) @@ -213,24 +217,31 @@ namespace MQTTnet.Server private async Task HandleIncomingSubscribePacketAsync(IMqttChannelAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) { - var subscribeResult = _subscriptionsManager.Subscribe(subscribePacket, ClientId); + var subscribeResult = await _subscriptionsManager.SubscribeAsync(subscribePacket, ClientId); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket).ConfigureAwait(false); - await EnqueueSubscribedRetainedMessagesAsync(subscribePacket).ConfigureAwait(false); - if (subscribeResult.CloseConnection) { await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttDisconnectPacket()).ConfigureAwait(false); await StopAsync().ConfigureAwait(false); } + + await EnqueueSubscribedRetainedMessagesAsync(subscribePacket).ConfigureAwait(false); + } + + private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken) + { + var unsubscribeResult = await _subscriptionsManager.UnsubscribeAsync(unsubscribePacket); + + await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, unsubscribeResult); } private async Task EnqueueSubscribedRetainedMessagesAsync(MqttSubscribePacket subscribePacket) { - var retainedMessages = await _sessionsManager.GetRetainedMessagesAsync(subscribePacket).ConfigureAwait(false); + var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(subscribePacket); foreach (var publishPacket in retainedMessages) { - EnqueueApplicationMessage(publishPacket); + await EnqueueApplicationMessageAsync(publishPacket); } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs index 5bb6598..739f964 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs @@ -15,23 +15,21 @@ namespace MQTTnet.Server public sealed class MqttClientSessionsManager { private readonly Dictionary _sessions = new Dictionary(); - private readonly SemaphoreSlim _sessionsSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); private readonly IMqttServerOptions _options; + private readonly MqttServer _server; private readonly MqttRetainedMessagesManager _retainedMessagesManager; private readonly IMqttNetLogger _logger; - public MqttClientSessionsManager(IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger) + public MqttClientSessionsManager(IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, MqttServer server, IMqttNetLogger logger) { + _server = server ?? throw new ArgumentNullException(nameof(server)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _options = options ?? throw new ArgumentNullException(nameof(options)); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); } - public event EventHandler ClientConnected; - public event EventHandler ClientDisconnected; - public event EventHandler ApplicationMessageReceived; - public async Task RunClientSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) { var clientId = string.Empty; @@ -66,11 +64,11 @@ namespace MQTTnet.Server IsSessionPresent = clientSession.IsExistingSession }).ConfigureAwait(false); - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(new ConnectedMqttClient + _server.OnClientConnected(new ConnectedMqttClient { ClientId = clientId, ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion - })); + }); await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false); } @@ -89,17 +87,17 @@ namespace MQTTnet.Server // ignored } - ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(new ConnectedMqttClient + _server.OnClientDisconnected(new ConnectedMqttClient { ClientId = clientId, ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion - })); + }); } } public async Task StopAsync() { - await _sessionsSemaphore.WaitAsync().ConfigureAwait(false); + await _semaphore.WaitAsync().ConfigureAwait(false); try { foreach (var session in _sessions) @@ -111,13 +109,13 @@ namespace MQTTnet.Server } finally { - _sessionsSemaphore.Release(); + _semaphore.Release(); } } public async Task> GetConnectedClientsAsync() { - await _sessionsSemaphore.WaitAsync().ConfigureAwait(false); + await _semaphore.WaitAsync().ConfigureAwait(false); try { return _sessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient @@ -130,7 +128,7 @@ namespace MQTTnet.Server } finally { - _sessionsSemaphore.Release(); + _semaphore.Release(); } } @@ -138,17 +136,7 @@ namespace MQTTnet.Server { try { - if (_options.ApplicationMessageInterceptor != null) - { - var interceptorContext = new MqttApplicationMessageInterceptorContext - { - ApplicationMessage = applicationMessage - }; - - _options.ApplicationMessageInterceptor(interceptorContext); - applicationMessage = interceptorContext.ApplicationMessage; - } - + applicationMessage = InterceptApplicationMessage(applicationMessage); if (applicationMessage == null) { return; @@ -159,26 +147,41 @@ namespace MQTTnet.Server await _retainedMessagesManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false); } - var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession?.ClientId, applicationMessage); - ApplicationMessageReceived?.Invoke(this, eventArgs); + _server.OnApplicationMessageReceived(senderClientSession?.ClientId, applicationMessage); } catch (Exception exception) { _logger.Error(exception, "Error while processing application message"); } - lock (_sessions) + await _semaphore.WaitAsync().ConfigureAwait(false); + try { foreach (var clientSession in _sessions.Values) { - clientSession.EnqueueApplicationMessage(applicationMessage); + await clientSession.EnqueueApplicationMessageAsync(applicationMessage); } } + finally + { + _semaphore.Release(); + } } - public Task> GetRetainedMessagesAsync(MqttSubscribePacket subscribePacket) + private MqttApplicationMessage InterceptApplicationMessage(MqttApplicationMessage applicationMessage) { - return _retainedMessagesManager.GetSubscribedMessagesAsync(subscribePacket); + if (_options.ApplicationMessageInterceptor == null) + { + return applicationMessage; + } + + var interceptorContext = new MqttApplicationMessageInterceptorContext + { + ApplicationMessage = applicationMessage + }; + + _options.ApplicationMessageInterceptor(interceptorContext); + return interceptorContext.ApplicationMessage; } private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket) @@ -200,7 +203,7 @@ namespace MQTTnet.Server private async Task GetOrCreateClientSessionAsync(MqttConnectPacket connectPacket) { - await _sessionsSemaphore.WaitAsync().ConfigureAwait(false); + await _semaphore.WaitAsync().ConfigureAwait(false); try { var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var clientSession); @@ -225,7 +228,7 @@ namespace MQTTnet.Server { isExistingSession = false; - clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _logger); + clientSession = new MqttClientSession(connectPacket.ClientId, _options, _retainedMessagesManager, this, _logger); _sessions[connectPacket.ClientId] = clientSession; _logger.Trace("Created a new session for client '{0}'.", connectPacket.ClientId); @@ -235,7 +238,7 @@ namespace MQTTnet.Server } finally { - _sessionsSemaphore.Release(); + _semaphore.Release(); } } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs index 73548ce..2151f77 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs @@ -1,5 +1,8 @@ using System; using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; using MQTTnet.Packets; using MQTTnet.Protocol; @@ -7,6 +10,7 @@ namespace MQTTnet.Server { public sealed class MqttClientSubscriptionsManager { + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); private readonly Dictionary _subscriptions = new Dictionary(); private readonly IMqttServerOptions _options; @@ -15,24 +19,34 @@ namespace MQTTnet.Server _options = options ?? throw new ArgumentNullException(nameof(options)); } - public MqttClientSubscribeResult Subscribe(MqttSubscribePacket subscribePacket, string clientId) + public async Task SubscribeAsync(MqttSubscribePacket subscribePacket, string clientId) { if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket)); - var responsePacket = subscribePacket.CreateResponse(); - var closeConnection = false; + var result = new MqttClientSubscribeResult + { + ResponsePacket = subscribePacket.CreateResponse(), + CloseConnection = false + }; - lock (_subscriptions) + await _semaphore.WaitAsync().ConfigureAwait(false); + try { foreach (var topicFilter in subscribePacket.TopicFilters) { - var interceptorContext = new MqttSubscriptionInterceptorContext(clientId, topicFilter); - _options.SubscriptionInterceptor?.Invoke(interceptorContext); - responsePacket.SubscribeReturnCodes.Add(interceptorContext.AcceptSubscription ? MqttSubscribeReturnCode.SuccessMaximumQoS1 : MqttSubscribeReturnCode.Failure); - + var interceptorContext = InterceptSubscribe(clientId, topicFilter); + if (!interceptorContext.AcceptSubscription) + { + result.ResponsePacket.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.Failure); + } + else + { + result.ResponsePacket.SubscribeReturnCodes.Add(ConvertToMaximumQoS(topicFilter.QualityOfServiceLevel)); + } + if (interceptorContext.CloseConnection) { - closeConnection = true; + result.CloseConnection = true; } if (interceptorContext.AcceptSubscription) @@ -41,35 +55,42 @@ namespace MQTTnet.Server } } } - - return new MqttClientSubscribeResult + finally { - ResponsePacket = responsePacket, - CloseConnection = closeConnection - }; + _semaphore.Release(); + } + + return result; } - public MqttUnsubAckPacket Unsubscribe(MqttUnsubscribePacket unsubscribePacket) + public async Task UnsubscribeAsync(MqttUnsubscribePacket unsubscribePacket) { if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket)); - lock (_subscriptions) + await _semaphore.WaitAsync().ConfigureAwait(false); + try { foreach (var topicFilter in unsubscribePacket.TopicFilters) { _subscriptions.Remove(topicFilter); } } + finally + { + _semaphore.Release(); + } return unsubscribePacket.CreateResponse(); } - public CheckSubscriptionsResult CheckSubscriptions(MqttApplicationMessage applicationMessage) + public async Task CheckSubscriptionsAsync(MqttApplicationMessage applicationMessage) { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); - lock (_subscriptions) + await _semaphore.WaitAsync().ConfigureAwait(false); + try { + var qosLevels = new HashSet(); foreach (var subscription in _subscriptions) { if (!MqttTopicFilterComparer.IsMatch(applicationMessage.Topic, subscription.Key)) @@ -77,24 +98,64 @@ namespace MQTTnet.Server continue; } - var effectiveQos = subscription.Value; - if (applicationMessage.QualityOfServiceLevel < effectiveQos) - { - effectiveQos = applicationMessage.QualityOfServiceLevel; - } + qosLevels.Add(subscription.Value); + } + if (qosLevels.Count == 0) + { return new CheckSubscriptionsResult { - IsSubscribed = true, - QualityOfServiceLevel = effectiveQos + IsSubscribed = false }; } + + return CreateSubscriptionResult(applicationMessage, qosLevels); + } + finally + { + _semaphore.Release(); + } + } + + private MqttSubscriptionInterceptorContext InterceptSubscribe(string clientId, TopicFilter topicFilter) + { + var interceptorContext = new MqttSubscriptionInterceptorContext(clientId, topicFilter); + _options.SubscriptionInterceptor?.Invoke(interceptorContext); + return interceptorContext; + } + + private static CheckSubscriptionsResult CreateSubscriptionResult(MqttApplicationMessage applicationMessage, HashSet subscribedQoSLevels) + { + MqttQualityOfServiceLevel effectiveQoS; + if (subscribedQoSLevels.Contains(applicationMessage.QualityOfServiceLevel)) + { + effectiveQoS = applicationMessage.QualityOfServiceLevel; + } + else if (subscribedQoSLevels.Count == 1) + { + effectiveQoS = subscribedQoSLevels.First(); + } + else + { + effectiveQoS = subscribedQoSLevels.Max(); } return new CheckSubscriptionsResult { - IsSubscribed = false + IsSubscribed = true, + QualityOfServiceLevel = effectiveQoS }; } + + private static MqttSubscribeReturnCode ConvertToMaximumQoS(MqttQualityOfServiceLevel qualityOfServiceLevel) + { + switch (qualityOfServiceLevel) + { + case MqttQualityOfServiceLevel.AtMostOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS0; + case MqttQualityOfServiceLevel.AtLeastOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS1; + case MqttQualityOfServiceLevel.ExactlyOnce: return MqttSubscribeReturnCode.SuccessMaximumQoS2; + default: return MqttSubscribeReturnCode.Failure; + } + } } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs index 0b8d669..2c0b260 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs @@ -11,7 +11,7 @@ namespace MQTTnet.Server public sealed class MqttRetainedMessagesManager { private readonly Dictionary _retainedMessages = new Dictionary(); - private readonly SemaphoreSlim _gate = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); private readonly IMqttNetLogger _logger; private readonly IMqttServerOptions _options; @@ -28,7 +28,7 @@ namespace MQTTnet.Server return; } - await _gate.WaitAsync(); + await _semaphore.WaitAsync().ConfigureAwait(false); try { var retainedMessages = await _options.Storage.LoadRetainedMessagesAsync(); @@ -45,7 +45,7 @@ namespace MQTTnet.Server } finally { - _gate.Release(); + _semaphore.Release(); } } @@ -53,7 +53,7 @@ namespace MQTTnet.Server { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); - await _gate.WaitAsync().ConfigureAwait(false); + await _semaphore.WaitAsync().ConfigureAwait(false); try { await HandleMessageInternalAsync(clientId, applicationMessage); @@ -64,7 +64,7 @@ namespace MQTTnet.Server } finally { - _gate.Release(); + _semaphore.Release(); } } @@ -72,7 +72,7 @@ namespace MQTTnet.Server { var retainedMessages = new List(); - await _gate.WaitAsync().ConfigureAwait(false); + await _semaphore.WaitAsync().ConfigureAwait(false); try { foreach (var retainedMessage in _retainedMessages.Values) @@ -96,7 +96,7 @@ namespace MQTTnet.Server } finally { - _gate.Release(); + _semaphore.Release(); } return retainedMessages; diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs index 42c6854..888dd42 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs @@ -60,11 +60,7 @@ namespace MQTTnet.Server _cancellationTokenSource = new CancellationTokenSource(); _retainedMessagesManager = new MqttRetainedMessagesManager(_options, _logger); - - _clientSessionsManager = new MqttClientSessionsManager(_options, _retainedMessagesManager, _logger); - _clientSessionsManager.ApplicationMessageReceived += OnApplicationMessageReceived; - _clientSessionsManager.ClientConnected += OnClientConnected; - _clientSessionsManager.ClientDisconnected += OnClientDisconnected; + _clientSessionsManager = new MqttClientSessionsManager(_options, _retainedMessagesManager, this, _logger); await _retainedMessagesManager.LoadMessagesAsync(); @@ -104,40 +100,39 @@ namespace MQTTnet.Server finally { _cancellationTokenSource = null; - _retainedMessagesManager = null; - - if (_clientSessionsManager != null) - { - _clientSessionsManager.ApplicationMessageReceived -= OnApplicationMessageReceived; - _clientSessionsManager.ClientConnected -= OnClientConnected; - _clientSessionsManager.ClientDisconnected -= OnClientDisconnected; - } - _clientSessionsManager = null; } } - private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) + internal void OnClientConnected(ConnectedMqttClient client) { - eventArgs.SessionTask = Task.Run(async () => await _clientSessionsManager.RunClientSessionAsync(eventArgs.Client, _cancellationTokenSource.Token), _cancellationTokenSource.Token); + if (client == null) throw new ArgumentNullException(nameof(client)); + + _logger.Info("Client '{0}': Connected.", client.ClientId); + ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(client)); } - private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs) + internal void OnClientDisconnected(ConnectedMqttClient client) { - _logger.Info("Client '{0}': Connected.", eventArgs.Client.ClientId); - ClientConnected?.Invoke(this, eventArgs); + if (client == null) throw new ArgumentNullException(nameof(client)); + + _logger.Info("Client '{0}': Disconnected.", client.ClientId); + ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client)); } - private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) + internal void OnApplicationMessageReceived(string clientId, MqttApplicationMessage applicationMessage) { - _logger.Info("Client '{0}': Disconnected.", eventArgs.Client.ClientId); - ClientDisconnected?.Invoke(this, eventArgs); + if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); + + ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(clientId, applicationMessage)); } - private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) + private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) { - ApplicationMessageReceived?.Invoke(this, e); + eventArgs.SessionTask = Task.Run( + async () => await _clientSessionsManager.RunClientSessionAsync(eventArgs.Client, _cancellationTokenSource.Token).ConfigureAwait(false), + _cancellationTokenSource.Token); } } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttSubscriptionInterceptorContext.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttSubscriptionInterceptorContext.cs index e05dd21..07b94bb 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttSubscriptionInterceptorContext.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttSubscriptionInterceptorContext.cs @@ -1,17 +1,19 @@ -namespace MQTTnet.Server +using System; + +namespace MQTTnet.Server { public class MqttSubscriptionInterceptorContext { public MqttSubscriptionInterceptorContext(string clientId, TopicFilter topicFilter) { ClientId = clientId; - TopicFilter = topicFilter; + TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter)); } public string ClientId { get; } public TopicFilter TopicFilter { get; } - + public bool AcceptSubscription { get; set; } = true; public bool CloseConnection { get; set; } diff --git a/Frameworks/MQTTnet.NetStandard/TopicFilter.cs b/Frameworks/MQTTnet.NetStandard/TopicFilter.cs index b82fc71..da126d6 100644 --- a/Frameworks/MQTTnet.NetStandard/TopicFilter.cs +++ b/Frameworks/MQTTnet.NetStandard/TopicFilter.cs @@ -4,7 +4,7 @@ namespace MQTTnet { public sealed class TopicFilter { - public TopicFilter(string topic, MqttQualityOfServiceLevel qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce) + public TopicFilter(string topic, MqttQualityOfServiceLevel qualityOfServiceLevel) { Topic = topic; QualityOfServiceLevel = qualityOfServiceLevel; diff --git a/MQTTnet.sln b/MQTTnet.sln index ceaa04f..cece21c 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.27004.2009 +VisualStudioVersion = 15.0.27004.2010 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" EndProject @@ -33,6 +33,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.AspNetCore2 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore", "Frameworks\MQTTnet.AspnetCore\MQTTnet.AspNetCore.csproj", "{F10C4060-F7EE-4A83-919F-FF723E72F94A}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Extensions", "Extensions", "{12816BCC-AF9E-44A9-9AE5-C246AF2A0587}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.Rpc", "Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj", "{C444E9C8-95FA-430E-9126-274129DE16CD}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -147,6 +151,22 @@ Global {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.Build.0 = Release|Any CPU {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.ActiveCfg = Release|Any CPU {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.Build.0 = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|ARM.ActiveCfg = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|ARM.Build.0 = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x64.ActiveCfg = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x64.Build.0 = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x86.ActiveCfg = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x86.Build.0 = Debug|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|Any CPU.Build.0 = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|ARM.ActiveCfg = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|ARM.Build.0 = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x64.ActiveCfg = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x64.Build.0 = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x86.ActiveCfg = Release|Any CPU + {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -158,6 +178,7 @@ Global {3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {F10C4060-F7EE-4A83-919F-FF723E72F94A} = {32A630A7-2598-41D7-B625-204CD906F5FB} + {C444E9C8-95FA-430E-9126-274129DE16CD} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index 25c9352..9f51b86 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -4,7 +4,6 @@ using System.IO; using System.Text; using System.Threading; using Microsoft.VisualStudio.TestTools.UnitTesting; -using MQTTnet.Adapter; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Serializer; @@ -410,7 +409,7 @@ namespace MQTTnet.Core.Tests using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, header.BodyLength)) { - var deserializedPacket = serializer.Deserialize(new ReceivedMqttPacket(header, bodyStream)); + var deserializedPacket = serializer.Deserialize(header, bodyStream.ToArray()); var buffer2 = serializer.Serialize(deserializedPacket); Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(Join(buffer2))); diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index f1c8ff0..5081dfc 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -62,7 +62,7 @@ namespace MQTTnet.Core.Tests var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage); c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c1.SubscribeAsync(new TopicFilter("#")); + await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build()); await c2.DisconnectAsync(); @@ -167,7 +167,7 @@ namespace MQTTnet.Core.Tests var c2 = await serverAdapter.ConnectTestClient(s, "c2"); c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c2.SubscribeAsync(new TopicFilter("retained")); + await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build()); await Task.Delay(500); } @@ -199,7 +199,7 @@ namespace MQTTnet.Core.Tests var c2 = await serverAdapter.ConnectTestClient(s, "c2"); c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c2.SubscribeAsync(new TopicFilter("retained")); + await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build()); await Task.Delay(500); } @@ -277,7 +277,7 @@ namespace MQTTnet.Core.Tests var c2 = await serverAdapter.ConnectTestClient(s, "c2"); c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c2.SubscribeAsync(new TopicFilter("retained")); + await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build()); await Task.Delay(500); } diff --git a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs index b6b68dc..50acd61 100644 --- a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs @@ -14,9 +14,9 @@ namespace MQTTnet.Core.Tests var sm = new MqttClientSubscriptionsManager(new MqttServerOptions()); var sp = new MqttSubscribePacket(); - sp.TopicFilters.Add(new TopicFilter("A/B/C")); + sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build()); - sm.Subscribe(sp, ""); + sm.SubscribeAsync(sp, "").Wait(); var pp = new MqttApplicationMessage { @@ -24,7 +24,52 @@ namespace MQTTnet.Core.Tests QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }; - Assert.IsTrue(sm.CheckSubscriptions(pp).IsSubscribed); + var result = sm.CheckSubscriptionsAsync(pp).Result; + Assert.IsTrue(result.IsSubscribed); + Assert.AreEqual(result.QualityOfServiceLevel, MqttQualityOfServiceLevel.AtMostOnce); + } + + [TestMethod] + public void MqttSubscriptionsManager_SubscribeDifferentQoSSuccess() + { + var sm = new MqttClientSubscriptionsManager(new MqttServerOptions()); + + var sp = new MqttSubscribePacket(); + sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce)); + + sm.SubscribeAsync(sp, "").Wait(); + + var pp = new MqttApplicationMessage + { + Topic = "A/B/C", + QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce + }; + + var result = sm.CheckSubscriptionsAsync(pp).Result; + Assert.IsTrue(result.IsSubscribed); + Assert.AreEqual(result.QualityOfServiceLevel, MqttQualityOfServiceLevel.AtMostOnce); + } + + [TestMethod] + public void MqttSubscriptionsManager_SubscribeTwoTimesSuccess() + { + var sm = new MqttClientSubscriptionsManager(new MqttServerOptions()); + + var sp = new MqttSubscribePacket(); + sp.TopicFilters.Add(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)); + sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtLeastOnce)); + + sm.SubscribeAsync(sp, "").Wait(); + + var pp = new MqttApplicationMessage + { + Topic = "A/B/C", + QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce + }; + + var result = sm.CheckSubscriptionsAsync(pp).Result; + Assert.IsTrue(result.IsSubscribed); + Assert.AreEqual(result.QualityOfServiceLevel, MqttQualityOfServiceLevel.AtLeastOnce); } [TestMethod] @@ -33,9 +78,9 @@ namespace MQTTnet.Core.Tests var sm = new MqttClientSubscriptionsManager(new MqttServerOptions()); var sp = new MqttSubscribePacket(); - sp.TopicFilters.Add(new TopicFilter("A/B/C")); + sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build()); - sm.Subscribe(sp, ""); + sm.SubscribeAsync(sp, "").Wait(); var pp = new MqttApplicationMessage { @@ -43,7 +88,7 @@ namespace MQTTnet.Core.Tests QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }; - Assert.IsFalse(sm.CheckSubscriptions(pp).IsSubscribed); + Assert.IsFalse(sm.CheckSubscriptionsAsync(pp).Result.IsSubscribed); } [TestMethod] @@ -52,9 +97,9 @@ namespace MQTTnet.Core.Tests var sm = new MqttClientSubscriptionsManager(new MqttServerOptions()); var sp = new MqttSubscribePacket(); - sp.TopicFilters.Add(new TopicFilter("A/B/C")); + sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build()); - sm.Subscribe(sp, ""); + sm.SubscribeAsync(sp, "").Wait(); var pp = new MqttApplicationMessage { @@ -62,13 +107,13 @@ namespace MQTTnet.Core.Tests QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }; - Assert.IsTrue(sm.CheckSubscriptions(pp).IsSubscribed); + Assert.IsTrue(sm.CheckSubscriptionsAsync(pp).Result.IsSubscribed); var up = new MqttUnsubscribePacket(); up.TopicFilters.Add("A/B/C"); - sm.Unsubscribe(up); + sm.UnsubscribeAsync(up).Wait(); - Assert.IsFalse(sm.CheckSubscriptions(pp).IsSubscribed); + Assert.IsFalse(sm.CheckSubscriptionsAsync(pp).Result.IsSubscribed); } } } diff --git a/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs index 09788cc..dd4a1bc 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ClientTest.cs @@ -2,6 +2,7 @@ using System.Text; using System.Threading.Tasks; using MQTTnet.Client; +using MQTTnet.Protocol; namespace MQTTnet.TestApp.NetCore { @@ -17,7 +18,8 @@ namespace MQTTnet.TestApp.NetCore CleanSession = true, ChannelOptions = new MqttClientTcpOptions { - Server = "localhost" + //Server = "localhost", + Server = "192.168.1.174" }, //ChannelOptions = new MqttClientWebSocketOptions //{ @@ -78,6 +80,8 @@ namespace MQTTnet.TestApp.NetCore { Console.ReadLine(); + await client.SubscribeAsync(new TopicFilter("test", MqttQualityOfServiceLevel.AtMostOnce)); + var applicationMessage = new MqttApplicationMessageBuilder() .WithTopic("A/B/C") .WithPayload("Hello World") diff --git a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj index 8f25448..1784cc0 100644 --- a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj +++ b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj @@ -6,6 +6,10 @@ netcoreapp2.0;net452;net461 + + RELEASE;NETCOREAPP2_0 + + diff --git a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs index 1c72b06..5f2e5ad 100644 --- a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; @@ -64,9 +63,17 @@ namespace MQTTnet.TestApp.NetCore sentMessagesCount++; } - Console.WriteLine($"Sending {sentMessagesCount} messages per second."); - + Console.WriteLine($"Sending {sentMessagesCount} messages per second. #1"); + + sentMessagesCount = 0; stopwatch.Restart(); + while (stopwatch.ElapsedMilliseconds < 1000) + { + await client.PublishAsync(messages).ConfigureAwait(false); + sentMessagesCount++; + } + + Console.WriteLine($"Sending {sentMessagesCount} messages per second. #2"); var testMessageCount = 10000; for (var i = 0; i < testMessageCount; i++) @@ -141,8 +148,8 @@ namespace MQTTnet.TestApp.NetCore { var mqttServer = new MqttFactory().CreateMqttServer(); - var msgs = 0; - var stopwatch = Stopwatch.StartNew(); + ////var msgs = 0; + ////var stopwatch = Stopwatch.StartNew(); ////mqttServer.ApplicationMessageReceived += (sender, args) => ////{ //// msgs++; diff --git a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs index 8f89176..bd8b4ef 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs @@ -29,6 +29,7 @@ namespace MQTTnet.TestApp.NetCore }, Storage = new RetainedMessageHandler(), + ApplicationMessageInterceptor = context => { if (MqttTopicFilterComparer.IsMatch(context.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#")) @@ -67,18 +68,29 @@ namespace MQTTnet.TestApp.NetCore mqttServer.ApplicationMessageReceived += (s, e) => { MqttNetConsoleLogger.PrintToConsole( - $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}'", + $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0])}'", ConsoleColor.Magenta); }; options.ApplicationMessageInterceptor = c => { - var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload)); - var timestampProperty = content.Property("timestamp"); - if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null) + if (c.ApplicationMessage.Payload == null || c.ApplicationMessage.Payload.Length == 0) + { + return; + } + + try + { + var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload)); + var timestampProperty = content.Property("timestamp"); + if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null) + { + timestampProperty.Value = DateTime.Now.ToString("O"); + c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString()); + } + } + catch (Exception e) { - timestampProperty.Value = DateTime.Now.ToString("O"); - c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString()); } }; diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj index 4445af3..5636dd2 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj @@ -127,6 +127,10 @@ + + {c444e9c8-95fa-430e-9126-274129de16cd} + MQTTnet.Extensions.Rpc + {3587e506-55a2-4eb3-99c7-dc01e42d25d2} MQTTnet.NetStandard diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml index 43b26c7..cf05888 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml @@ -65,6 +65,40 @@ + + + Method: + + + Payload: + + + Text + Base64 + + + QoS: + + 0 (At most once) + 1 (At least once) + 2 (Exactly once) + + + Responses: + + + + + + + + + + + + + + Topic: @@ -86,7 +120,7 @@ - + @@ -101,7 +135,7 @@ Persist retained messages in JSON format Clear previously retained messages on startup - + diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 00c6ffa..bbd6474 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -7,6 +7,8 @@ using Windows.UI.Core; using Windows.UI.Xaml; using MQTTnet.Client; using MQTTnet.Diagnostics; +using MQTTnet.Exceptions; +using MQTTnet.Extensions.Rpc; using MQTTnet.Implementations; using MQTTnet.ManagedClient; using MQTTnet.Protocol; @@ -245,6 +247,95 @@ namespace MQTTnet.TestApp.UniversalWindows // This code is for the Wiki at GitHub! // ReSharper disable once UnusedMember.Local + + private async void StartServer(object sender, RoutedEventArgs e) + { + if (_mqttServer != null) + { + return; + } + + JsonServerStorage storage = null; + if (ServerPersistRetainedMessages.IsChecked == true) + { + storage = new JsonServerStorage(); + + if (ServerClearRetainedMessages.IsChecked == true) + { + storage.Clear(); + } + } + + _mqttServer = new MqttFactory().CreateMqttServer(); + + var options = new MqttServerOptions(); + options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text); + options.Storage = storage; + + await _mqttServer.StartAsync(options); + } + + private async void StopServer(object sender, RoutedEventArgs e) + { + if (_mqttServer == null) + { + return; + } + + await _mqttServer.StopAsync(); + _mqttServer = null; + } + + private void ClearReceivedMessages(object sender, RoutedEventArgs e) + { + ReceivedMessages.Items.Clear(); + } + + private async void ExecuteRpc(object sender, RoutedEventArgs e) + { + var qos = MqttQualityOfServiceLevel.AtMostOnce; + if (RpcQoS1.IsChecked == true) + { + qos = MqttQualityOfServiceLevel.AtLeastOnce; + } + + if (RpcQoS2.IsChecked == true) + { + qos = MqttQualityOfServiceLevel.ExactlyOnce; + } + + var payload = new byte[0]; + if (RpcText.IsChecked == true) + { + payload = Encoding.UTF8.GetBytes(RpcPayload.Text); + } + + if (RpcBase64.IsChecked == true) + { + payload = Convert.FromBase64String(RpcPayload.Text); + } + + + try + { + var rpcClient = new MqttRpcClient(_mqttClient); + await rpcClient.EnableAsync(); + var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos); + await rpcClient.DisableAsync(); + + RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response)); + } + catch (MqttCommunicationTimedOutException) + { + RpcResponses.Items.Add(RpcMethod.Text + " >>> [TIMEOUT]"); + } + } + + private void ClearRpcResponses(object sender, RoutedEventArgs e) + { + RpcResponses.Items.Clear(); + } + private async Task WikiCode() { { @@ -293,9 +384,9 @@ namespace MQTTnet.TestApp.UniversalWindows { // Use secure TCP connection. var options = new MqttClientOptionsBuilder() - .WithTcpServer("broker.hivemq.com") - .WithTls() - .Build(); + .WithTcpServer("broker.hivemq.com") + .WithTls() + .Build(); } { @@ -480,48 +571,5 @@ namespace MQTTnet.TestApp.UniversalWindows } } - - private async void StartServer(object sender, RoutedEventArgs e) - { - if (_mqttServer != null) - { - return; - } - - JsonServerStorage storage = null; - if (ServerPersistRetainedMessages.IsChecked == true) - { - storage = new JsonServerStorage(); - - if (ServerClearRetainedMessages.IsChecked == true) - { - storage.Clear(); - } - } - - _mqttServer = new MqttFactory().CreateMqttServer(); - - var options = new MqttServerOptions(); - options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text); - options.Storage = storage; - - await _mqttServer.StartAsync(options); - } - - private async void StopServer(object sender, RoutedEventArgs e) - { - if (_mqttServer == null) - { - return; - } - - await _mqttServer.StopAsync(); - _mqttServer = null; - } - - private void ClearReceivedMessages(object sender, RoutedEventArgs e) - { - ReceivedMessages.Items.Clear(); - } } }