@@ -14,106 +14,128 @@ namespace MQTTnet.Extensions.ManagedClient | |||
{ | |||
public static IManagedMqttClient UseConnectedHandler(this IManagedMqttClient client, Func<MqttClientConnectedEventArgs, Task> handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (handler == null) | |||
{ | |||
client.ConnectedHandler = null; | |||
return client; | |||
return client.UseConnectedHandler((IMqttClientConnectedHandler)null); | |||
} | |||
client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(handler); | |||
return client; | |||
return client.UseConnectedHandler(new MqttClientConnectedHandlerDelegate(handler)); | |||
} | |||
public static IManagedMqttClient UseConnectedHandler(this IManagedMqttClient client, Action<MqttClientConnectedEventArgs> handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (handler == null) | |||
{ | |||
client.ConnectedHandler = null; | |||
return client; | |||
return client.UseConnectedHandler((IMqttClientConnectedHandler)null); | |||
} | |||
client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(handler); | |||
return client.UseConnectedHandler(new MqttClientConnectedHandlerDelegate(handler)); | |||
} | |||
public static IManagedMqttClient UseConnectedHandler(this IManagedMqttClient client, IMqttClientConnectedHandler handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
client.ConnectedHandler = handler; | |||
return client; | |||
} | |||
public static IManagedMqttClient UseDisconnectedHandler(this IManagedMqttClient client, Func<MqttClientDisconnectedEventArgs, Task> handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (handler == null) | |||
{ | |||
client.DisconnectedHandler = null; | |||
return client; | |||
return client.UseDisconnectedHandler((IMqttClientDisconnectedHandler)null); | |||
} | |||
client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(handler); | |||
return client; | |||
return client.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(handler)); | |||
} | |||
public static IManagedMqttClient UseDisconnectedHandler(this IManagedMqttClient client, Action<MqttClientDisconnectedEventArgs> handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (handler == null) | |||
{ | |||
client.DisconnectedHandler = null; | |||
return client; | |||
return client.UseDisconnectedHandler((IMqttClientDisconnectedHandler)null); | |||
} | |||
client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(handler); | |||
return client.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(handler)); | |||
} | |||
public static IManagedMqttClient UseDisconnectedHandler(this IManagedMqttClient client, IMqttClientDisconnectedHandler handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
client.DisconnectedHandler = handler; | |||
return client; | |||
} | |||
public static IManagedMqttClient UseApplicationMessageReceivedHandler(this IManagedMqttClient client, Func<MqttApplicationMessageReceivedEventArgs, Task> handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (handler == null) | |||
{ | |||
client.ApplicationMessageReceivedHandler = null; | |||
return client; | |||
return client.UseApplicationMessageReceivedHandler((IMqttApplicationMessageReceivedHandler)null); | |||
} | |||
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(handler); | |||
return client; | |||
return client.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(handler)); | |||
} | |||
public static IManagedMqttClient UseApplicationMessageReceivedHandler(this IManagedMqttClient client, Action<MqttApplicationMessageReceivedEventArgs> handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (handler == null) | |||
{ | |||
client.ApplicationMessageReceivedHandler = null; | |||
return client; | |||
return client.UseApplicationMessageReceivedHandler((IMqttApplicationMessageReceivedHandler)null); | |||
} | |||
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(handler); | |||
return client.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(handler)); | |||
} | |||
public static IManagedMqttClient UseApplicationMessageReceivedHandler(this IManagedMqttClient client, IMqttApplicationMessageReceivedHandler handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
client.ApplicationMessageReceivedHandler = handler; | |||
return client; | |||
} | |||
public static Task SubscribeAsync(this IManagedMqttClient managedClient, params TopicFilter[] topicFilters) | |||
public static Task SubscribeAsync(this IManagedMqttClient client, params TopicFilter[] topicFilters) | |||
{ | |||
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
return managedClient.SubscribeAsync(topicFilters); | |||
return client.SubscribeAsync(topicFilters); | |||
} | |||
public static Task SubscribeAsync(this IManagedMqttClient managedClient, string topic, MqttQualityOfServiceLevel qualityOfServiceLevel) | |||
public static Task SubscribeAsync(this IManagedMqttClient client, string topic, MqttQualityOfServiceLevel qualityOfServiceLevel) | |||
{ | |||
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (topic == null) throw new ArgumentNullException(nameof(topic)); | |||
return managedClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); | |||
return client.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); | |||
} | |||
public static Task SubscribeAsync(this IManagedMqttClient managedClient, string topic) | |||
public static Task SubscribeAsync(this IManagedMqttClient client, string topic) | |||
{ | |||
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (topic == null) throw new ArgumentNullException(nameof(topic)); | |||
return managedClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build()); | |||
return client.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build()); | |||
} | |||
public static Task UnsubscribeAsync(this IManagedMqttClient managedClient, params string[] topicFilters) | |||
public static Task UnsubscribeAsync(this IManagedMqttClient client, params string[] topicFilters) | |||
{ | |||
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
return managedClient.UnsubscribeAsync(topicFilters); | |||
return client.UnsubscribeAsync(topicFilters); | |||
} | |||
public static async Task PublishAsync(this IManagedMqttClient client, IEnumerable<MqttApplicationMessage> applicationMessages) | |||
@@ -190,12 +212,16 @@ namespace MQTTnet.Extensions.ManagedClient | |||
public static Task<MqttClientPublishResult> PublishAsync(this IManagedMqttClient client, Func<MqttApplicationMessageBuilder, MqttApplicationMessageBuilder> builder, CancellationToken cancellationToken) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
var message = builder(new MqttApplicationMessageBuilder()).Build(); | |||
return client.PublishAsync(message, cancellationToken); | |||
} | |||
public static Task<MqttClientPublishResult> PublishAsync(this IManagedMqttClient client, Func<MqttApplicationMessageBuilder, MqttApplicationMessageBuilder> builder) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
var message = builder(new MqttApplicationMessageBuilder()).Build(); | |||
return client.PublishAsync(message, CancellationToken.None); | |||
} | |||
@@ -119,11 +119,6 @@ namespace MQTTnet.Extensions.Rpc | |||
return Task.FromResult(0); | |||
} | |||
if (tcs.Task.IsCompleted || tcs.Task.IsCanceled) | |||
{ | |||
return Task.FromResult(0); | |||
} | |||
tcs.TrySetResult(eventArgs.ApplicationMessage.Payload); | |||
return Task.FromResult(0); | |||
@@ -17,82 +17,97 @@ namespace MQTTnet.Client | |||
{ | |||
public static IMqttClient UseConnectedHandler(this IMqttClient client, Func<MqttClientConnectedEventArgs, Task> handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (handler == null) | |||
{ | |||
client.ConnectedHandler = null; | |||
return client; | |||
return client.UseConnectedHandler((IMqttClientConnectedHandler)null); | |||
} | |||
client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(handler); | |||
return client; | |||
return client.UseConnectedHandler(new MqttClientConnectedHandlerDelegate(handler)); | |||
} | |||
public static IMqttClient UseConnectedHandler(this IMqttClient client, Action<MqttClientConnectedEventArgs> handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (handler == null) | |||
{ | |||
client.ConnectedHandler = null; | |||
return client; | |||
return client.UseConnectedHandler((IMqttClientConnectedHandler)null); | |||
} | |||
client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(handler); | |||
return client.UseConnectedHandler(new MqttClientConnectedHandlerDelegate(handler)); | |||
} | |||
public static IMqttClient UseConnectedHandler(this IMqttClient client, IMqttClientConnectedHandler handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
client.ConnectedHandler = handler; | |||
return client; | |||
} | |||
public static IMqttClient UseDisconnectedHandler(this IMqttClient client, Func<MqttClientDisconnectedEventArgs, Task> handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (handler == null) | |||
{ | |||
client.DisconnectedHandler = null; | |||
return client; | |||
return client.UseDisconnectedHandler((IMqttClientDisconnectedHandler)null); | |||
} | |||
client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(handler); | |||
return client; | |||
return client.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(handler)); | |||
} | |||
public static IMqttClient UseDisconnectedHandler(this IMqttClient client, Action<MqttClientDisconnectedEventArgs> handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (handler == null) | |||
{ | |||
client.DisconnectedHandler = null; | |||
return client; | |||
return client.UseDisconnectedHandler((IMqttClientDisconnectedHandler)null); | |||
} | |||
client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(handler); | |||
return client.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(handler)); | |||
} | |||
public static IMqttClient UseDisconnectedHandler(this IMqttClient client, IMqttClientDisconnectedHandler handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
client.DisconnectedHandler = handler; | |||
return client; | |||
} | |||
public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, Func<MqttApplicationMessageReceivedEventArgs, Task> handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (handler == null) | |||
{ | |||
client.ApplicationMessageReceivedHandler = null; | |||
return client; | |||
return client.UseApplicationMessageReceivedHandler((IMqttApplicationMessageReceivedHandler)null); | |||
} | |||
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(handler); | |||
return client; | |||
return client.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(handler)); | |||
} | |||
public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, Action<MqttApplicationMessageReceivedEventArgs> handler) | |||
{ | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
if (handler == null) | |||
{ | |||
client.ApplicationMessageReceivedHandler = null; | |||
return client; | |||
return client.UseApplicationMessageReceivedHandler((IMqttApplicationMessageReceivedHandler)null); | |||
} | |||
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(handler); | |||
return client; | |||
return client.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(handler)); | |||
} | |||
public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, IMqttApplicationMessageReceivedHandler handler) | |||
{ | |||
client.ApplicationMessageReceivedHandler = handler; | |||
if (client == null) throw new ArgumentNullException(nameof(client)); | |||
client.ApplicationMessageReceivedHandler = handler; | |||
return client; | |||
} | |||
@@ -121,15 +121,6 @@ namespace MQTTnet.Server | |||
return _packageReceiverTask; | |||
} | |||
public void EnqueueApplicationMessage(MqttApplicationMessage applicationMessage, string senderClientId, bool isRetainedApplicationMessage) | |||
{ | |||
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); | |||
Session.EnqueueApplicationMessage(applicationMessage, senderClientId, isRetainedApplicationMessage); | |||
_logger.Verbose("Queued application message (ClientId: {0}).", ClientId); | |||
} | |||
private async Task<MqttClientDisconnectType> RunInternalAsync() | |||
{ | |||
var disconnectType = MqttClientDisconnectType.NotClean; | |||
@@ -273,7 +264,7 @@ namespace MQTTnet.Server | |||
var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false); | |||
foreach (var applicationMessage in retainedMessages) | |||
{ | |||
EnqueueApplicationMessage(applicationMessage, ClientId, true); | |||
Session.EnqueueApplicationMessage(applicationMessage, ClientId, true); | |||
} | |||
} | |||
@@ -1,20 +1,26 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Diagnostics; | |||
using MQTTnet.Server.Status; | |||
namespace MQTTnet.Server | |||
{ | |||
public class MqttClientSession | |||
{ | |||
private readonly IMqttNetChildLogger _logger; | |||
private readonly DateTime _createdTimestamp = DateTime.UtcNow; | |||
public MqttClientSession(string clientId, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions) | |||
public MqttClientSession(string clientId, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, IMqttNetChildLogger logger) | |||
{ | |||
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); | |||
SubscriptionsManager = new MqttClientSubscriptionsManager(clientId, eventDispatcher, serverOptions); | |||
ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions); | |||
if (logger == null) throw new ArgumentNullException(nameof(logger)); | |||
_logger = logger.CreateChildLogger(nameof(MqttClientSession)); | |||
} | |||
public string ClientId { get; } | |||
@@ -35,12 +41,15 @@ namespace MQTTnet.Server | |||
return; | |||
} | |||
_logger.Verbose("Queued application message with topic '{0}' (ClientId: {1}).", applicationMessage.Topic, ClientId); | |||
ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage); | |||
} | |||
public async Task SubscribeAsync(ICollection<TopicFilter> topicFilters, MqttRetainedMessagesManager retainedMessagesManager) | |||
{ | |||
await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false); | |||
var matchingRetainedMessages = await retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false); | |||
foreach (var matchingRetainedMessage in matchingRetainedMessages) | |||
{ | |||
@@ -332,8 +332,6 @@ namespace MQTTnet.Server | |||
{ | |||
if (connectPacket.CleanSession) | |||
{ | |||
// TODO: Check if required. | |||
//session.Dispose(); | |||
session = null; | |||
_logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId); | |||
@@ -346,7 +344,7 @@ namespace MQTTnet.Server | |||
if (session == null) | |||
{ | |||
session = new MqttClientSession(connectPacket.ClientId, _eventDispatcher, _options); | |||
session = new MqttClientSession(connectPacket.ClientId, _eventDispatcher, _options, _logger); | |||
_logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); | |||
} | |||
@@ -2,14 +2,46 @@ | |||
using System.Collections.Generic; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Client; | |||
using MQTTnet.Client.Publishing; | |||
using MQTTnet.Client.Receiving; | |||
using MQTTnet.Protocol; | |||
namespace MQTTnet.Server | |||
{ | |||
public static class MqttServerExtensions | |||
{ | |||
public static IMqttServer UseApplicationMessageReceivedHandler(this IMqttServer server, Func<MqttApplicationMessageReceivedEventArgs, Task> handler) | |||
{ | |||
if (server == null) throw new ArgumentNullException(nameof(server)); | |||
if (handler == null) | |||
{ | |||
return server.UseApplicationMessageReceivedHandler((IMqttApplicationMessageReceivedHandler)null); | |||
} | |||
return server.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(handler)); | |||
} | |||
public static IMqttServer UseApplicationMessageReceivedHandler(this IMqttServer server, Action<MqttApplicationMessageReceivedEventArgs> handler) | |||
{ | |||
if (server == null) throw new ArgumentNullException(nameof(server)); | |||
if (handler == null) | |||
{ | |||
return server.UseApplicationMessageReceivedHandler((IMqttApplicationMessageReceivedHandler)null); | |||
} | |||
return server.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(handler)); | |||
} | |||
public static IMqttServer UseApplicationMessageReceivedHandler(this IMqttServer server, IMqttApplicationMessageReceivedHandler handler) | |||
{ | |||
if (server == null) throw new ArgumentNullException(nameof(server)); | |||
server.ApplicationMessageReceivedHandler = handler; | |||
return server; | |||
} | |||
public static Task SubscribeAsync(this IMqttServer server, string clientId, params TopicFilter[] topicFilters) | |||
{ | |||
if (server == null) throw new ArgumentNullException(nameof(server)); | |||
@@ -120,12 +152,16 @@ namespace MQTTnet.Server | |||
public static Task<MqttClientPublishResult> PublishAsync(this IMqttServer server, Func<MqttApplicationMessageBuilder, MqttApplicationMessageBuilder> builder, CancellationToken cancellationToken) | |||
{ | |||
if (server == null) throw new ArgumentNullException(nameof(server)); | |||
var message = builder(new MqttApplicationMessageBuilder()).Build(); | |||
return server.PublishAsync(message, cancellationToken); | |||
} | |||
public static Task<MqttClientPublishResult> PublishAsync(this IMqttServer server, Func<MqttApplicationMessageBuilder, MqttApplicationMessageBuilder> builder) | |||
{ | |||
if (server == null) throw new ArgumentNullException(nameof(server)); | |||
var message = builder(new MqttApplicationMessageBuilder()).Build(); | |||
return server.PublishAsync(message, CancellationToken.None); | |||
} | |||
@@ -21,7 +21,7 @@ namespace MQTTnet.Tests.Mockups | |||
private readonly List<Exception> _exceptions = new List<Exception>(); | |||
private IMqttServer _server; | |||
public IMqttServer Server { get; private set; } | |||
public bool IgnoreClientLogErrors { get; set; } | |||
@@ -72,15 +72,15 @@ namespace MQTTnet.Tests.Mockups | |||
public async Task<IMqttServer> StartServerAsync(MqttServerOptionsBuilder options) | |||
{ | |||
if (_server != null) | |||
if (Server != null) | |||
{ | |||
throw new InvalidOperationException("Server already started."); | |||
} | |||
_server = _mqttFactory.CreateMqttServer(_serverLogger); | |||
await _server.StartAsync(options.WithDefaultEndpointPort(ServerPort).Build()); | |||
Server = _mqttFactory.CreateMqttServer(_serverLogger); | |||
await Server.StartAsync(options.WithDefaultEndpointPort(ServerPort).Build()); | |||
return _server; | |||
return Server; | |||
} | |||
public Task<IMqttClient> ConnectClientAsync() | |||
@@ -122,7 +122,7 @@ namespace MQTTnet.Tests.Mockups | |||
mqttClient?.Dispose(); | |||
} | |||
_server?.StopAsync().GetAwaiter().GetResult(); | |||
Server?.StopAsync().GetAwaiter().GetResult(); | |||
ThrowIfLogErrors(); | |||
@@ -295,5 +295,49 @@ namespace MQTTnet.Tests | |||
Assert.IsFalse(client.IsConnected); | |||
} | |||
} | |||
[TestMethod] | |||
public async Task Frequent_Connects() | |||
{ | |||
using (var testEnvironment = new TestEnvironment()) | |||
{ | |||
await testEnvironment.StartServerAsync(); | |||
var clients = new List<IMqttClient>(); | |||
for (var i = 0; i < 100; i++) | |||
{ | |||
clients.Add(await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a"))); | |||
} | |||
var clientStatus = await testEnvironment.Server.GetClientStatusAsync(); | |||
var sessionStatus = await testEnvironment.Server.GetSessionStatusAsync(); | |||
for (var i = 0; i < 98; i++) | |||
{ | |||
Assert.IsFalse(clients[i].IsConnected); | |||
} | |||
Assert.IsTrue(clients[99].IsConnected); | |||
Assert.AreEqual(1, clientStatus.Count); | |||
Assert.AreEqual(1, sessionStatus.Count); | |||
var receiveClient = clients[99]; | |||
object receivedPayload = null; | |||
receiveClient.UseApplicationMessageReceivedHandler(e => | |||
{ | |||
receivedPayload = e.ApplicationMessage.ConvertPayloadToString(); | |||
}); | |||
await receiveClient.SubscribeAsync("x"); | |||
var sendClient = await testEnvironment.ConnectClientAsync(); | |||
await sendClient.PublishAsync("x", "1"); | |||
await Task.Delay(100); | |||
Assert.AreEqual("1", receivedPayload); | |||
} | |||
} | |||
} | |||
} |
@@ -84,58 +84,6 @@ namespace MQTTnet.TestApp.NetCore | |||
Thread.Sleep(Timeout.Infinite); | |||
} | |||
// This code is used at the Wiki on GitHub! | |||
// ReSharper disable once UnusedMember.Local | |||
private static async void WikiCode() | |||
{ | |||
{ | |||
var client = new MqttFactory().CreateMqttClient(); | |||
var options = new MqttClientOptionsBuilder() | |||
.WithClientId("Client1") | |||
.WithTcpServer("broker.hivemq.com") | |||
.WithCredentials("bud", "%spencer%") | |||
.WithTls() | |||
.Build(); | |||
await client.ConnectAsync(options); | |||
var message = new MqttApplicationMessageBuilder() | |||
.WithTopic("MyTopic") | |||
.WithPayload("Hello World") | |||
.WithExactlyOnceQoS() | |||
.WithRetainFlag() | |||
.Build(); | |||
await client.PublishAsync(message); | |||
} | |||
{ | |||
var factory = new MqttFactory(); | |||
var client = factory.CreateMqttClient(); | |||
} | |||
{ | |||
// Write all trace messages to the console window. | |||
MqttNetGlobalLogger.LogMessagePublished += (s, e) => | |||
{ | |||
var trace = $">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"; | |||
if (e.TraceMessage.Exception != null) | |||
{ | |||
trace += Environment.NewLine + e.TraceMessage.Exception.ToString(); | |||
} | |||
Console.WriteLine(trace); | |||
}; | |||
} | |||
{ | |||
// Use a custom log ID for the logger. | |||
var factory = new MqttFactory(); | |||
var mqttClient = factory.CreateMqttClient(new MqttNetLogger("MyCustomId")); | |||
} | |||
} | |||
} | |||
public class RetainedMessageHandler : IMqttServerStorage | |||
@@ -10,7 +10,6 @@ using MQTTnet.Client; | |||
using MQTTnet.Client.Connecting; | |||
using MQTTnet.Client.Disconnecting; | |||
using MQTTnet.Client.Options; | |||
using MQTTnet.Client.Receiving; | |||
using MQTTnet.Diagnostics; | |||
using MQTTnet.Exceptions; | |||
using MQTTnet.Extensions.ManagedClient; | |||
@@ -469,18 +468,6 @@ namespace MQTTnet.TestApp.UniversalWindows | |||
private async Task WikiCode() | |||
{ | |||
{ | |||
// Write all trace messages to the console window. | |||
MqttNetGlobalLogger.LogMessagePublished += (s, e) => | |||
{ | |||
Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); | |||
if (e.TraceMessage.Exception != null) | |||
{ | |||
Console.WriteLine(e.TraceMessage.Exception); | |||
} | |||
}; | |||
} | |||
{ | |||
// Use a custom identifier for the trace messages. | |||
var clientOptions = new MqttClientOptionsBuilder() | |||
@@ -490,20 +477,77 @@ namespace MQTTnet.TestApp.UniversalWindows | |||
{ | |||
// Create a new MQTT client. | |||
var factory = new MqttFactory(); | |||
var mqttClient = factory.CreateMqttClient(); | |||
var client = factory.CreateMqttClient(); | |||
// Create TCP based options using the builder. | |||
var options = new MqttClientOptionsBuilder() | |||
.WithClientId("Client1") | |||
.WithTcpServer("broker.hivemq.com") | |||
.WithCredentials("bud", "%spencer%") | |||
.WithTls() | |||
.WithCleanSession() | |||
.Build(); | |||
await client.ConnectAsync(options); | |||
// Reconnecting | |||
client.UseDisconnectedHandler(async e => | |||
{ | |||
// Create TCP based options using the builder. | |||
var options = new MqttClientOptionsBuilder() | |||
.WithClientId("Client1") | |||
.WithTcpServer("broker.hivemq.com") | |||
.WithCredentials("bud", "%spencer%") | |||
.WithTls() | |||
.WithCleanSession() | |||
.Build(); | |||
Console.WriteLine("### DISCONNECTED FROM SERVER ###"); | |||
await Task.Delay(TimeSpan.FromSeconds(5)); | |||
await mqttClient.ConnectAsync(options); | |||
} | |||
try | |||
{ | |||
await client.ConnectAsync(options); | |||
} | |||
catch | |||
{ | |||
Console.WriteLine("### RECONNECTING FAILED ###"); | |||
} | |||
}); | |||
// Consuming messages | |||
client.UseApplicationMessageReceivedHandler(e => | |||
{ | |||
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); | |||
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); | |||
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); | |||
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); | |||
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); | |||
Console.WriteLine(); | |||
}); | |||
// Subscribe after connect | |||
client.UseConnectedHandler(async e => | |||
{ | |||
Console.WriteLine("### CONNECTED WITH SERVER ###"); | |||
// Subscribe to a topic | |||
await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build()); | |||
Console.WriteLine("### SUBSCRIBED ###"); | |||
}); | |||
// Subscribe to a topic | |||
await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build()); | |||
// Unsubscribe from a topic | |||
await client.UnsubscribeAsync("my/topic"); | |||
// Publish an application message | |||
var applicationMessage = new MqttApplicationMessageBuilder() | |||
.WithTopic("A/B/C") | |||
.WithPayload("Hello World") | |||
.WithAtLeastOnceQoS() | |||
.Build(); | |||
await client.PublishAsync(applicationMessage); | |||
} | |||
{ | |||
{ | |||
// Use TCP connection. | |||
@@ -525,8 +569,6 @@ namespace MQTTnet.TestApp.UniversalWindows | |||
var options = new MqttClientOptionsBuilder() | |||
.WithWebSocketServer("broker.hivemq.com:8000/mqtt") | |||
.Build(); | |||
await mqttClient.ConnectAsync(options); | |||
} | |||
{ | |||
@@ -549,23 +591,6 @@ namespace MQTTnet.TestApp.UniversalWindows | |||
}, | |||
}; | |||
} | |||
{ | |||
// Subscribe to a topic | |||
await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build()); | |||
// Unsubscribe from a topic | |||
await mqttClient.UnsubscribeAsync("my/topic"); | |||
// Publish an application message | |||
var applicationMessage = new MqttApplicationMessageBuilder() | |||
.WithTopic("A/B/C") | |||
.WithPayload("Hello World") | |||
.WithAtLeastOnceQoS() | |||
.Build(); | |||
await mqttClient.PublishAsync(applicationMessage); | |||
} | |||
} | |||
// ---------------------------------- | |||
@@ -700,6 +725,39 @@ namespace MQTTnet.TestApp.UniversalWindows | |||
await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build()); | |||
await mqttClient.StartAsync(options); | |||
} | |||
{ | |||
// Use a custom log ID for the logger. | |||
var factory = new MqttFactory(); | |||
var client = factory.CreateMqttClient(new MqttNetLogger("MyCustomId")); | |||
} | |||
{ | |||
var client = new MqttFactory().CreateMqttClient(); | |||
var message = new MqttApplicationMessageBuilder() | |||
.WithTopic("MyTopic") | |||
.WithPayload("Hello World") | |||
.WithExactlyOnceQoS() | |||
.WithRetainFlag() | |||
.Build(); | |||
await client.PublishAsync(message); | |||
} | |||
{ | |||
// Write all trace messages to the console window. | |||
MqttNetGlobalLogger.LogMessagePublished += (s, e) => | |||
{ | |||
var trace = $">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"; | |||
if (e.TraceMessage.Exception != null) | |||
{ | |||
trace += Environment.NewLine + e.TraceMessage.Exception.ToString(); | |||
} | |||
Console.WriteLine(trace); | |||
}; | |||
} | |||
} | |||
#endregion | |||