From 61b10a8b56f368506e28665c436e780750edbbd2 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 7 Apr 2019 19:58:01 +0200 Subject: [PATCH] Add extension methods. --- .../ManagedMqttClientExtensions.cs | 94 +++++++----- .../MQTTnet.Extensions.Rpc/MqttRpcClient.cs | 5 - Source/MQTTnet/Client/MqttClientExtensions.cs | 65 +++++--- Source/MQTTnet/Server/MqttClientConnection.cs | 11 +- Source/MQTTnet/Server/MqttClientSession.cs | 11 +- .../Server/MqttClientSessionsManager.cs | 4 +- Source/MQTTnet/Server/MqttServerExtensions.cs | 38 ++++- .../Mockups/TestEnvironment.cs | 12 +- Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs | 44 ++++++ Tests/MQTTnet.TestApp.NetCore/Program.cs | 52 ------- .../MainPage.xaml.cs | 144 ++++++++++++------ 11 files changed, 300 insertions(+), 180 deletions(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs index a0be4b1..b638791 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs @@ -14,106 +14,128 @@ namespace MQTTnet.Extensions.ManagedClient { public static IManagedMqttClient UseConnectedHandler(this IManagedMqttClient client, Func handler) { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (handler == null) { - client.ConnectedHandler = null; - return client; + return client.UseConnectedHandler((IMqttClientConnectedHandler)null); } - client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(handler); - return client; + return client.UseConnectedHandler(new MqttClientConnectedHandlerDelegate(handler)); } public static IManagedMqttClient UseConnectedHandler(this IManagedMqttClient client, Action handler) { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (handler == null) { - client.ConnectedHandler = null; - return client; + return client.UseConnectedHandler((IMqttClientConnectedHandler)null); } - client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(handler); + return client.UseConnectedHandler(new MqttClientConnectedHandlerDelegate(handler)); + } + + public static IManagedMqttClient UseConnectedHandler(this IManagedMqttClient client, IMqttClientConnectedHandler handler) + { + if (client == null) throw new ArgumentNullException(nameof(client)); + + client.ConnectedHandler = handler; return client; } public static IManagedMqttClient UseDisconnectedHandler(this IManagedMqttClient client, Func handler) { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (handler == null) { - client.DisconnectedHandler = null; - return client; + return client.UseDisconnectedHandler((IMqttClientDisconnectedHandler)null); } - client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(handler); - return client; + return client.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(handler)); } public static IManagedMqttClient UseDisconnectedHandler(this IManagedMqttClient client, Action handler) { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (handler == null) { - client.DisconnectedHandler = null; - return client; + return client.UseDisconnectedHandler((IMqttClientDisconnectedHandler)null); } - client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(handler); + return client.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(handler)); + } + + public static IManagedMqttClient UseDisconnectedHandler(this IManagedMqttClient client, IMqttClientDisconnectedHandler handler) + { + if (client == null) throw new ArgumentNullException(nameof(client)); + + client.DisconnectedHandler = handler; return client; } public static IManagedMqttClient UseApplicationMessageReceivedHandler(this IManagedMqttClient client, Func handler) { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (handler == null) { - client.ApplicationMessageReceivedHandler = null; - return client; + return client.UseApplicationMessageReceivedHandler((IMqttApplicationMessageReceivedHandler)null); } - client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(handler); - - return client; + return client.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(handler)); } public static IManagedMqttClient UseApplicationMessageReceivedHandler(this IManagedMqttClient client, Action handler) { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (handler == null) { - client.ApplicationMessageReceivedHandler = null; - return client; + return client.UseApplicationMessageReceivedHandler((IMqttApplicationMessageReceivedHandler)null); } - client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(handler); + return client.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(handler)); + } + public static IManagedMqttClient UseApplicationMessageReceivedHandler(this IManagedMqttClient client, IMqttApplicationMessageReceivedHandler handler) + { + if (client == null) throw new ArgumentNullException(nameof(client)); + + client.ApplicationMessageReceivedHandler = handler; return client; } - public static Task SubscribeAsync(this IManagedMqttClient managedClient, params TopicFilter[] topicFilters) + public static Task SubscribeAsync(this IManagedMqttClient client, params TopicFilter[] topicFilters) { - if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); + if (client == null) throw new ArgumentNullException(nameof(client)); - return managedClient.SubscribeAsync(topicFilters); + return client.SubscribeAsync(topicFilters); } - public static Task SubscribeAsync(this IManagedMqttClient managedClient, string topic, MqttQualityOfServiceLevel qualityOfServiceLevel) + public static Task SubscribeAsync(this IManagedMqttClient client, string topic, MqttQualityOfServiceLevel qualityOfServiceLevel) { - if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); + if (client == null) throw new ArgumentNullException(nameof(client)); if (topic == null) throw new ArgumentNullException(nameof(topic)); - return managedClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); + return client.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); } - public static Task SubscribeAsync(this IManagedMqttClient managedClient, string topic) + public static Task SubscribeAsync(this IManagedMqttClient client, string topic) { - if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); + if (client == null) throw new ArgumentNullException(nameof(client)); if (topic == null) throw new ArgumentNullException(nameof(topic)); - return managedClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build()); + return client.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build()); } - public static Task UnsubscribeAsync(this IManagedMqttClient managedClient, params string[] topicFilters) + public static Task UnsubscribeAsync(this IManagedMqttClient client, params string[] topicFilters) { - if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); + if (client == null) throw new ArgumentNullException(nameof(client)); - return managedClient.UnsubscribeAsync(topicFilters); + return client.UnsubscribeAsync(topicFilters); } public static async Task PublishAsync(this IManagedMqttClient client, IEnumerable applicationMessages) @@ -190,12 +212,16 @@ namespace MQTTnet.Extensions.ManagedClient public static Task PublishAsync(this IManagedMqttClient client, Func builder, CancellationToken cancellationToken) { + if (client == null) throw new ArgumentNullException(nameof(client)); + var message = builder(new MqttApplicationMessageBuilder()).Build(); return client.PublishAsync(message, cancellationToken); } public static Task PublishAsync(this IManagedMqttClient client, Func builder) { + if (client == null) throw new ArgumentNullException(nameof(client)); + var message = builder(new MqttApplicationMessageBuilder()).Build(); return client.PublishAsync(message, CancellationToken.None); } diff --git a/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs b/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs index 8a279c7..c025bdf 100644 --- a/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs +++ b/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs @@ -119,11 +119,6 @@ namespace MQTTnet.Extensions.Rpc return Task.FromResult(0); } - if (tcs.Task.IsCompleted || tcs.Task.IsCanceled) - { - return Task.FromResult(0); - } - tcs.TrySetResult(eventArgs.ApplicationMessage.Payload); return Task.FromResult(0); diff --git a/Source/MQTTnet/Client/MqttClientExtensions.cs b/Source/MQTTnet/Client/MqttClientExtensions.cs index a44eb42..e5e3cd7 100644 --- a/Source/MQTTnet/Client/MqttClientExtensions.cs +++ b/Source/MQTTnet/Client/MqttClientExtensions.cs @@ -17,82 +17,97 @@ namespace MQTTnet.Client { public static IMqttClient UseConnectedHandler(this IMqttClient client, Func handler) { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (handler == null) { - client.ConnectedHandler = null; - return client; + return client.UseConnectedHandler((IMqttClientConnectedHandler)null); } - client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(handler); - return client; + return client.UseConnectedHandler(new MqttClientConnectedHandlerDelegate(handler)); } public static IMqttClient UseConnectedHandler(this IMqttClient client, Action handler) { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (handler == null) { - client.ConnectedHandler = null; - return client; + return client.UseConnectedHandler((IMqttClientConnectedHandler)null); } - client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(handler); + return client.UseConnectedHandler(new MqttClientConnectedHandlerDelegate(handler)); + } + + public static IMqttClient UseConnectedHandler(this IMqttClient client, IMqttClientConnectedHandler handler) + { + if (client == null) throw new ArgumentNullException(nameof(client)); + + client.ConnectedHandler = handler; return client; } public static IMqttClient UseDisconnectedHandler(this IMqttClient client, Func handler) { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (handler == null) { - client.DisconnectedHandler = null; - return client; + return client.UseDisconnectedHandler((IMqttClientDisconnectedHandler)null); } - client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(handler); - return client; + return client.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(handler)); } public static IMqttClient UseDisconnectedHandler(this IMqttClient client, Action handler) { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (handler == null) { - client.DisconnectedHandler = null; - return client; + return client.UseDisconnectedHandler((IMqttClientDisconnectedHandler)null); } - client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(handler); + return client.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(handler)); + } + + public static IMqttClient UseDisconnectedHandler(this IMqttClient client, IMqttClientDisconnectedHandler handler) + { + if (client == null) throw new ArgumentNullException(nameof(client)); + + client.DisconnectedHandler = handler; return client; } public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, Func handler) { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (handler == null) { - client.ApplicationMessageReceivedHandler = null; - return client; + return client.UseApplicationMessageReceivedHandler((IMqttApplicationMessageReceivedHandler)null); } - client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(handler); - - return client; + return client.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(handler)); } public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, Action handler) { + if (client == null) throw new ArgumentNullException(nameof(client)); + if (handler == null) { - client.ApplicationMessageReceivedHandler = null; - return client; + return client.UseApplicationMessageReceivedHandler((IMqttApplicationMessageReceivedHandler)null); } - client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(handler); - - return client; + return client.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(handler)); } public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, IMqttApplicationMessageReceivedHandler handler) { - client.ApplicationMessageReceivedHandler = handler; + if (client == null) throw new ArgumentNullException(nameof(client)); + client.ApplicationMessageReceivedHandler = handler; return client; } diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index df9028c..1146ea8 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -121,15 +121,6 @@ namespace MQTTnet.Server return _packageReceiverTask; } - public void EnqueueApplicationMessage(MqttApplicationMessage applicationMessage, string senderClientId, bool isRetainedApplicationMessage) - { - if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); - - Session.EnqueueApplicationMessage(applicationMessage, senderClientId, isRetainedApplicationMessage); - - _logger.Verbose("Queued application message (ClientId: {0}).", ClientId); - } - private async Task RunInternalAsync() { var disconnectType = MqttClientDisconnectType.NotClean; @@ -273,7 +264,7 @@ namespace MQTTnet.Server var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false); foreach (var applicationMessage in retainedMessages) { - EnqueueApplicationMessage(applicationMessage, ClientId, true); + Session.EnqueueApplicationMessage(applicationMessage, ClientId, true); } } diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 427fdfa..804a223 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -1,20 +1,26 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using MQTTnet.Diagnostics; using MQTTnet.Server.Status; namespace MQTTnet.Server { public class MqttClientSession { + private readonly IMqttNetChildLogger _logger; + private readonly DateTime _createdTimestamp = DateTime.UtcNow; - public MqttClientSession(string clientId, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions) + public MqttClientSession(string clientId, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, IMqttNetChildLogger logger) { ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); SubscriptionsManager = new MqttClientSubscriptionsManager(clientId, eventDispatcher, serverOptions); ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions); + + if (logger == null) throw new ArgumentNullException(nameof(logger)); + _logger = logger.CreateChildLogger(nameof(MqttClientSession)); } public string ClientId { get; } @@ -35,12 +41,15 @@ namespace MQTTnet.Server return; } + _logger.Verbose("Queued application message with topic '{0}' (ClientId: {1}).", applicationMessage.Topic, ClientId); + ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage); } public async Task SubscribeAsync(ICollection topicFilters, MqttRetainedMessagesManager retainedMessagesManager) { await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false); + var matchingRetainedMessages = await retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false); foreach (var matchingRetainedMessage in matchingRetainedMessages) { diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 72fca9b..b6fb143 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -332,8 +332,6 @@ namespace MQTTnet.Server { if (connectPacket.CleanSession) { - // TODO: Check if required. - //session.Dispose(); session = null; _logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId); @@ -346,7 +344,7 @@ namespace MQTTnet.Server if (session == null) { - session = new MqttClientSession(connectPacket.ClientId, _eventDispatcher, _options); + session = new MqttClientSession(connectPacket.ClientId, _eventDispatcher, _options, _logger); _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); } diff --git a/Source/MQTTnet/Server/MqttServerExtensions.cs b/Source/MQTTnet/Server/MqttServerExtensions.cs index c705071..f4d4423 100644 --- a/Source/MQTTnet/Server/MqttServerExtensions.cs +++ b/Source/MQTTnet/Server/MqttServerExtensions.cs @@ -2,14 +2,46 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; -using MQTTnet.Client; using MQTTnet.Client.Publishing; +using MQTTnet.Client.Receiving; using MQTTnet.Protocol; namespace MQTTnet.Server { public static class MqttServerExtensions { + public static IMqttServer UseApplicationMessageReceivedHandler(this IMqttServer server, Func handler) + { + if (server == null) throw new ArgumentNullException(nameof(server)); + + if (handler == null) + { + return server.UseApplicationMessageReceivedHandler((IMqttApplicationMessageReceivedHandler)null); + } + + return server.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(handler)); + } + + public static IMqttServer UseApplicationMessageReceivedHandler(this IMqttServer server, Action handler) + { + if (server == null) throw new ArgumentNullException(nameof(server)); + + if (handler == null) + { + return server.UseApplicationMessageReceivedHandler((IMqttApplicationMessageReceivedHandler)null); + } + + return server.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(handler)); + } + + public static IMqttServer UseApplicationMessageReceivedHandler(this IMqttServer server, IMqttApplicationMessageReceivedHandler handler) + { + if (server == null) throw new ArgumentNullException(nameof(server)); + + server.ApplicationMessageReceivedHandler = handler; + return server; + } + public static Task SubscribeAsync(this IMqttServer server, string clientId, params TopicFilter[] topicFilters) { if (server == null) throw new ArgumentNullException(nameof(server)); @@ -120,12 +152,16 @@ namespace MQTTnet.Server public static Task PublishAsync(this IMqttServer server, Func builder, CancellationToken cancellationToken) { + if (server == null) throw new ArgumentNullException(nameof(server)); + var message = builder(new MqttApplicationMessageBuilder()).Build(); return server.PublishAsync(message, cancellationToken); } public static Task PublishAsync(this IMqttServer server, Func builder) { + if (server == null) throw new ArgumentNullException(nameof(server)); + var message = builder(new MqttApplicationMessageBuilder()).Build(); return server.PublishAsync(message, CancellationToken.None); } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs index 4ed83ad..776857b 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs @@ -21,7 +21,7 @@ namespace MQTTnet.Tests.Mockups private readonly List _exceptions = new List(); - private IMqttServer _server; + public IMqttServer Server { get; private set; } public bool IgnoreClientLogErrors { get; set; } @@ -72,15 +72,15 @@ namespace MQTTnet.Tests.Mockups public async Task StartServerAsync(MqttServerOptionsBuilder options) { - if (_server != null) + if (Server != null) { throw new InvalidOperationException("Server already started."); } - _server = _mqttFactory.CreateMqttServer(_serverLogger); - await _server.StartAsync(options.WithDefaultEndpointPort(ServerPort).Build()); + Server = _mqttFactory.CreateMqttServer(_serverLogger); + await Server.StartAsync(options.WithDefaultEndpointPort(ServerPort).Build()); - return _server; + return Server; } public Task ConnectClientAsync() @@ -122,7 +122,7 @@ namespace MQTTnet.Tests.Mockups mqttClient?.Dispose(); } - _server?.StopAsync().GetAwaiter().GetResult(); + Server?.StopAsync().GetAwaiter().GetResult(); ThrowIfLogErrors(); diff --git a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs index 6098adb..35c7819 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs @@ -295,5 +295,49 @@ namespace MQTTnet.Tests Assert.IsFalse(client.IsConnected); } } + + [TestMethod] + public async Task Frequent_Connects() + { + using (var testEnvironment = new TestEnvironment()) + { + await testEnvironment.StartServerAsync(); + + var clients = new List(); + for (var i = 0; i < 100; i++) + { + clients.Add(await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a"))); + } + + var clientStatus = await testEnvironment.Server.GetClientStatusAsync(); + var sessionStatus = await testEnvironment.Server.GetSessionStatusAsync(); + + for (var i = 0; i < 98; i++) + { + Assert.IsFalse(clients[i].IsConnected); + } + + Assert.IsTrue(clients[99].IsConnected); + + Assert.AreEqual(1, clientStatus.Count); + Assert.AreEqual(1, sessionStatus.Count); + + var receiveClient = clients[99]; + object receivedPayload = null; + receiveClient.UseApplicationMessageReceivedHandler(e => + { + receivedPayload = e.ApplicationMessage.ConvertPayloadToString(); + }); + + await receiveClient.SubscribeAsync("x"); + + var sendClient = await testEnvironment.ConnectClientAsync(); + await sendClient.PublishAsync("x", "1"); + + await Task.Delay(100); + + Assert.AreEqual("1", receivedPayload); + } + } } } diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index f8fd75d..2d09020 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -84,58 +84,6 @@ namespace MQTTnet.TestApp.NetCore Thread.Sleep(Timeout.Infinite); } - - // This code is used at the Wiki on GitHub! - // ReSharper disable once UnusedMember.Local - private static async void WikiCode() - { - { - var client = new MqttFactory().CreateMqttClient(); - - var options = new MqttClientOptionsBuilder() - .WithClientId("Client1") - .WithTcpServer("broker.hivemq.com") - .WithCredentials("bud", "%spencer%") - .WithTls() - .Build(); - - await client.ConnectAsync(options); - - var message = new MqttApplicationMessageBuilder() - .WithTopic("MyTopic") - .WithPayload("Hello World") - .WithExactlyOnceQoS() - .WithRetainFlag() - .Build(); - - await client.PublishAsync(message); - } - - { - var factory = new MqttFactory(); - var client = factory.CreateMqttClient(); - } - - { - // Write all trace messages to the console window. - MqttNetGlobalLogger.LogMessagePublished += (s, e) => - { - var trace = $">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"; - if (e.TraceMessage.Exception != null) - { - trace += Environment.NewLine + e.TraceMessage.Exception.ToString(); - } - - Console.WriteLine(trace); - }; - } - - { - // Use a custom log ID for the logger. - var factory = new MqttFactory(); - var mqttClient = factory.CreateMqttClient(new MqttNetLogger("MyCustomId")); - } - } } public class RetainedMessageHandler : IMqttServerStorage diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index d79fe06..4623f66 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -10,7 +10,6 @@ using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; -using MQTTnet.Client.Receiving; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; using MQTTnet.Extensions.ManagedClient; @@ -469,18 +468,6 @@ namespace MQTTnet.TestApp.UniversalWindows private async Task WikiCode() { - { - // Write all trace messages to the console window. - MqttNetGlobalLogger.LogMessagePublished += (s, e) => - { - Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); - if (e.TraceMessage.Exception != null) - { - Console.WriteLine(e.TraceMessage.Exception); - } - }; - } - { // Use a custom identifier for the trace messages. var clientOptions = new MqttClientOptionsBuilder() @@ -490,20 +477,77 @@ namespace MQTTnet.TestApp.UniversalWindows { // Create a new MQTT client. var factory = new MqttFactory(); - var mqttClient = factory.CreateMqttClient(); + var client = factory.CreateMqttClient(); + + // Create TCP based options using the builder. + var options = new MqttClientOptionsBuilder() + .WithClientId("Client1") + .WithTcpServer("broker.hivemq.com") + .WithCredentials("bud", "%spencer%") + .WithTls() + .WithCleanSession() + .Build(); + + await client.ConnectAsync(options); + + // Reconnecting + client.UseDisconnectedHandler(async e => { - // Create TCP based options using the builder. - var options = new MqttClientOptionsBuilder() - .WithClientId("Client1") - .WithTcpServer("broker.hivemq.com") - .WithCredentials("bud", "%spencer%") - .WithTls() - .WithCleanSession() - .Build(); + Console.WriteLine("### DISCONNECTED FROM SERVER ###"); + await Task.Delay(TimeSpan.FromSeconds(5)); - await mqttClient.ConnectAsync(options); - } + try + { + await client.ConnectAsync(options); + } + catch + { + Console.WriteLine("### RECONNECTING FAILED ###"); + } + }); + + // Consuming messages + + client.UseApplicationMessageReceivedHandler(e => + { + Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); + Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); + Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); + Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); + Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); + Console.WriteLine(); + }); + + // Subscribe after connect + + client.UseConnectedHandler(async e => + { + Console.WriteLine("### CONNECTED WITH SERVER ###"); + + // Subscribe to a topic + await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build()); + + Console.WriteLine("### SUBSCRIBED ###"); + }); + + // Subscribe to a topic + await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build()); + + // Unsubscribe from a topic + await client.UnsubscribeAsync("my/topic"); + + // Publish an application message + var applicationMessage = new MqttApplicationMessageBuilder() + .WithTopic("A/B/C") + .WithPayload("Hello World") + .WithAtLeastOnceQoS() + .Build(); + + await client.PublishAsync(applicationMessage); + } + + { { // Use TCP connection. @@ -525,8 +569,6 @@ namespace MQTTnet.TestApp.UniversalWindows var options = new MqttClientOptionsBuilder() .WithWebSocketServer("broker.hivemq.com:8000/mqtt") .Build(); - - await mqttClient.ConnectAsync(options); } { @@ -549,23 +591,6 @@ namespace MQTTnet.TestApp.UniversalWindows }, }; } - - { - // Subscribe to a topic - await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build()); - - // Unsubscribe from a topic - await mqttClient.UnsubscribeAsync("my/topic"); - - // Publish an application message - var applicationMessage = new MqttApplicationMessageBuilder() - .WithTopic("A/B/C") - .WithPayload("Hello World") - .WithAtLeastOnceQoS() - .Build(); - - await mqttClient.PublishAsync(applicationMessage); - } } // ---------------------------------- @@ -700,6 +725,39 @@ namespace MQTTnet.TestApp.UniversalWindows await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build()); await mqttClient.StartAsync(options); } + + { + // Use a custom log ID for the logger. + var factory = new MqttFactory(); + var client = factory.CreateMqttClient(new MqttNetLogger("MyCustomId")); + } + + { + var client = new MqttFactory().CreateMqttClient(); + + var message = new MqttApplicationMessageBuilder() + .WithTopic("MyTopic") + .WithPayload("Hello World") + .WithExactlyOnceQoS() + .WithRetainFlag() + .Build(); + + await client.PublishAsync(message); + } + + { + // Write all trace messages to the console window. + MqttNetGlobalLogger.LogMessagePublished += (s, e) => + { + var trace = $">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"; + if (e.TraceMessage.Exception != null) + { + trace += Environment.NewLine + e.TraceMessage.Exception.ToString(); + } + + Console.WriteLine(trace); + }; + } } #endregion