You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

835 lines
28 KiB

  1. using MQTTnet.Client;
  2. using MQTTnet.Client.Connecting;
  3. using MQTTnet.Client.Disconnecting;
  4. using MQTTnet.Client.Options;
  5. using MQTTnet.Diagnostics;
  6. using MQTTnet.Exceptions;
  7. using MQTTnet.Extensions.ManagedClient;
  8. using MQTTnet.Extensions.Rpc;
  9. using MQTTnet.Extensions.Rpc.Options;
  10. using MQTTnet.Extensions.WebSocket4Net;
  11. using MQTTnet.Formatter;
  12. using MQTTnet.Implementations;
  13. using MQTTnet.Protocol;
  14. using MQTTnet.Server;
  15. using MQTTnet.Server.Status;
  16. using System;
  17. using System.Collections.Concurrent;
  18. using System.Collections.ObjectModel;
  19. using System.Text;
  20. using System.Threading.Tasks;
  21. using Windows.Security.Cryptography.Certificates;
  22. using Windows.UI.Core;
  23. using Windows.UI.Xaml;
  24. namespace MQTTnet.TestApp.UniversalWindows
  25. {
  26. public sealed partial class MainPage
  27. {
  28. private readonly ConcurrentQueue<MqttNetLogMessage> _traceMessages = new ConcurrentQueue<MqttNetLogMessage>();
  29. private readonly ObservableCollection<IMqttClientStatus> _sessions = new ObservableCollection<IMqttClientStatus>();
  30. private IMqttClient _mqttClient;
  31. private IManagedMqttClient _managedMqttClient;
  32. private IMqttServer _mqttServer;
  33. public MainPage()
  34. {
  35. InitializeComponent();
  36. ClientId.Text = Guid.NewGuid().ToString("D");
  37. MqttNetGlobalLogger.LogMessagePublished += OnTraceMessagePublished;
  38. }
  39. private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
  40. {
  41. _traceMessages.Enqueue(e.LogMessage);
  42. await UpdateLogAsync();
  43. }
  44. private async Task UpdateLogAsync()
  45. {
  46. while (_traceMessages.Count > 100)
  47. {
  48. _traceMessages.TryDequeue(out _);
  49. }
  50. var logText = new StringBuilder();
  51. foreach (var traceMessage in _traceMessages)
  52. {
  53. logText.AppendFormat(
  54. "[{0:yyyy-MM-dd HH:mm:ss.fff}] [{1}] [{2}] [{3}] [{4}]{5}",
  55. traceMessage.Timestamp,
  56. traceMessage.Level,
  57. traceMessage.Source,
  58. traceMessage.ThreadId,
  59. traceMessage.Message,
  60. Environment.NewLine);
  61. if (traceMessage.Exception != null)
  62. {
  63. logText.AppendLine(traceMessage.Exception.ToString());
  64. }
  65. }
  66. await Trace.Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  67. {
  68. Trace.Text = logText.ToString();
  69. });
  70. }
  71. private async void Connect(object sender, RoutedEventArgs e)
  72. {
  73. var mqttFactory = new MqttFactory();
  74. var tlsOptions = new MqttClientTlsOptions
  75. {
  76. UseTls = UseTls.IsChecked == true,
  77. IgnoreCertificateChainErrors = true,
  78. IgnoreCertificateRevocationErrors = true,
  79. AllowUntrustedCertificates = true
  80. };
  81. var options = new MqttClientOptions
  82. {
  83. ClientId = ClientId.Text,
  84. ProtocolVersion = MqttProtocolVersion.V500
  85. };
  86. if (UseTcp.IsChecked == true)
  87. {
  88. options.ChannelOptions = new MqttClientTcpOptions
  89. {
  90. Server = Server.Text,
  91. Port = int.Parse(Port.Text),
  92. TlsOptions = tlsOptions
  93. };
  94. }
  95. if (UseWs.IsChecked == true)
  96. {
  97. options.ChannelOptions = new MqttClientWebSocketOptions
  98. {
  99. Uri = Server.Text,
  100. TlsOptions = tlsOptions
  101. };
  102. }
  103. if (UseWs4Net.IsChecked == true)
  104. {
  105. options.ChannelOptions = new MqttClientWebSocketOptions
  106. {
  107. Uri = Server.Text,
  108. TlsOptions = tlsOptions
  109. };
  110. mqttFactory.UseWebSocket4Net();
  111. }
  112. if (options.ChannelOptions == null)
  113. {
  114. throw new InvalidOperationException();
  115. }
  116. if (!string.IsNullOrEmpty(User.Text))
  117. {
  118. options.Credentials = new MqttClientCredentials
  119. {
  120. Username = User.Text,
  121. Password = Encoding.UTF8.GetBytes(Password.Text)
  122. };
  123. }
  124. options.CleanSession = CleanSession.IsChecked == true;
  125. options.KeepAlivePeriod = TimeSpan.FromSeconds(double.Parse(KeepAliveInterval.Text));
  126. if (UseMqtt310.IsChecked == true)
  127. {
  128. options.ProtocolVersion = MqttProtocolVersion.V310;
  129. }
  130. else if (UseMqtt311.IsChecked == true)
  131. {
  132. options.ProtocolVersion = MqttProtocolVersion.V311;
  133. }
  134. else if (UseMqtt500.IsChecked == true)
  135. {
  136. options.ProtocolVersion = MqttProtocolVersion.V500;
  137. }
  138. try
  139. {
  140. if (_mqttClient != null)
  141. {
  142. await _mqttClient.DisconnectAsync();
  143. _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
  144. _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected());
  145. _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected());
  146. }
  147. if (UseManagedClient.IsChecked == true)
  148. {
  149. _managedMqttClient = mqttFactory.CreateManagedMqttClient();
  150. _managedMqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
  151. _managedMqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected());
  152. _managedMqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected());
  153. await _managedMqttClient.StartAsync(new ManagedMqttClientOptions
  154. {
  155. ClientOptions = options
  156. });
  157. }
  158. else
  159. {
  160. _mqttClient = mqttFactory.CreateMqttClient();
  161. _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
  162. _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected());
  163. _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected());
  164. await _mqttClient.ConnectAsync(options);
  165. }
  166. }
  167. catch (Exception exception)
  168. {
  169. Trace.Text += exception + Environment.NewLine;
  170. }
  171. }
  172. private void OnDisconnected()
  173. {
  174. _traceMessages.Enqueue(new MqttNetLogMessage
  175. {
  176. Timestamp = DateTime.UtcNow,
  177. ThreadId = -1,
  178. Level = MqttNetLogLevel.Info,
  179. Message = "! DISCONNECTED EVENT FIRED",
  180. });
  181. Task.Run(UpdateLogAsync);
  182. }
  183. private void OnConnected()
  184. {
  185. _traceMessages.Enqueue(new MqttNetLogMessage
  186. {
  187. Timestamp = DateTime.UtcNow,
  188. ThreadId = -1,
  189. Level = MqttNetLogLevel.Info,
  190. Message = "! CONNECTED EVENT FIRED",
  191. });
  192. Task.Run(UpdateLogAsync);
  193. }
  194. private async Task HandleReceivedApplicationMessage(MqttApplicationMessageReceivedEventArgs eventArgs)
  195. {
  196. var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {eventArgs.ApplicationMessage.ConvertPayloadToString()} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";
  197. await Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  198. {
  199. if (AddReceivedMessagesToList.IsChecked == true)
  200. {
  201. ReceivedMessages.Items.Add(item);
  202. }
  203. });
  204. }
  205. private async void Publish(object sender, RoutedEventArgs e)
  206. {
  207. try
  208. {
  209. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  210. if (QoS1.IsChecked == true)
  211. {
  212. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  213. }
  214. if (QoS2.IsChecked == true)
  215. {
  216. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  217. }
  218. var payload = Array.Empty<byte>();
  219. if (PlainText.IsChecked == true)
  220. {
  221. payload = Encoding.UTF8.GetBytes(Payload.Text);
  222. }
  223. if (Base64.IsChecked == true)
  224. {
  225. payload = Convert.FromBase64String(Payload.Text);
  226. }
  227. var message = new MqttApplicationMessageBuilder()
  228. .WithContentType(ContentType.Text)
  229. .WithResponseTopic(ResponseTopic.Text)
  230. .WithTopic(Topic.Text)
  231. .WithPayload(payload)
  232. .WithQualityOfServiceLevel(qos)
  233. .WithRetainFlag(Retain.IsChecked == true)
  234. .Build();
  235. if (_mqttClient != null)
  236. {
  237. await _mqttClient.PublishAsync(message);
  238. }
  239. if (_managedMqttClient != null)
  240. {
  241. await _managedMqttClient.PublishAsync(message);
  242. }
  243. }
  244. catch (Exception exception)
  245. {
  246. Trace.Text += exception + Environment.NewLine;
  247. }
  248. }
  249. private async void Disconnect(object sender, RoutedEventArgs e)
  250. {
  251. try
  252. {
  253. if (_mqttClient != null)
  254. {
  255. await _mqttClient.DisconnectAsync();
  256. _mqttClient.Dispose();
  257. _mqttClient = null;
  258. }
  259. if (_managedMqttClient != null)
  260. {
  261. await _managedMqttClient.StopAsync();
  262. _managedMqttClient.Dispose();
  263. _managedMqttClient = null;
  264. }
  265. }
  266. catch (Exception exception)
  267. {
  268. Trace.Text += exception + Environment.NewLine;
  269. }
  270. }
  271. private void ClearLog(object sender, RoutedEventArgs e)
  272. {
  273. while (_traceMessages.Count > 0)
  274. {
  275. _traceMessages.TryDequeue(out _);
  276. }
  277. Trace.Text = string.Empty;
  278. }
  279. private async void Subscribe(object sender, RoutedEventArgs e)
  280. {
  281. try
  282. {
  283. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  284. if (SubscribeQoS1.IsChecked == true)
  285. {
  286. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  287. }
  288. if (SubscribeQoS2.IsChecked == true)
  289. {
  290. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  291. }
  292. var topicFilter = new MqttTopicFilter { Topic = SubscribeTopic.Text, QualityOfServiceLevel = qos };
  293. if (_mqttClient != null)
  294. {
  295. await _mqttClient.SubscribeAsync(topicFilter);
  296. }
  297. if (_managedMqttClient != null)
  298. {
  299. await _managedMqttClient.SubscribeAsync(topicFilter);
  300. }
  301. }
  302. catch (Exception exception)
  303. {
  304. Trace.Text += exception + Environment.NewLine;
  305. }
  306. }
  307. private async void Unsubscribe(object sender, RoutedEventArgs e)
  308. {
  309. try
  310. {
  311. if (_mqttClient != null)
  312. {
  313. await _mqttClient.UnsubscribeAsync(SubscribeTopic.Text);
  314. }
  315. if (_managedMqttClient != null)
  316. {
  317. await _managedMqttClient.UnsubscribeAsync(SubscribeTopic.Text);
  318. }
  319. }
  320. catch (Exception exception)
  321. {
  322. Trace.Text += exception + Environment.NewLine;
  323. }
  324. }
  325. // This code is for the Wiki at GitHub!
  326. // ReSharper disable once UnusedMember.Local
  327. private async void StartServer(object sender, RoutedEventArgs e)
  328. {
  329. if (_mqttServer != null)
  330. {
  331. return;
  332. }
  333. JsonServerStorage storage = null;
  334. if (ServerPersistRetainedMessages.IsChecked == true)
  335. {
  336. storage = new JsonServerStorage();
  337. if (ServerClearRetainedMessages.IsChecked == true)
  338. {
  339. storage.Clear();
  340. }
  341. }
  342. _mqttServer = new MqttFactory().CreateMqttServer();
  343. var options = new MqttServerOptions();
  344. options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text);
  345. options.Storage = storage;
  346. options.EnablePersistentSessions = ServerAllowPersistentSessions.IsChecked == true;
  347. await _mqttServer.StartAsync(options);
  348. }
  349. private async void StopServer(object sender, RoutedEventArgs e)
  350. {
  351. if (_mqttServer == null)
  352. {
  353. return;
  354. }
  355. await _mqttServer.StopAsync();
  356. _mqttServer = null;
  357. }
  358. private void ClearReceivedMessages(object sender, RoutedEventArgs e)
  359. {
  360. ReceivedMessages.Items.Clear();
  361. }
  362. private async void ExecuteRpc(object sender, RoutedEventArgs e)
  363. {
  364. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  365. if (RpcQoS1.IsChecked == true)
  366. {
  367. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  368. }
  369. if (RpcQoS2.IsChecked == true)
  370. {
  371. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  372. }
  373. var payload = Array.Empty<byte>();
  374. if (RpcText.IsChecked == true)
  375. {
  376. payload = Encoding.UTF8.GetBytes(RpcPayload.Text);
  377. }
  378. if (RpcBase64.IsChecked == true)
  379. {
  380. payload = Convert.FromBase64String(RpcPayload.Text);
  381. }
  382. try
  383. {
  384. var rpcClient = new MqttRpcClient(_mqttClient, new MqttRpcClientOptions());
  385. var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos);
  386. RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response));
  387. }
  388. catch (MqttCommunicationTimedOutException)
  389. {
  390. RpcResponses.Items.Add(RpcMethod.Text + " >>> [TIMEOUT]");
  391. }
  392. catch (Exception exception)
  393. {
  394. RpcResponses.Items.Add(RpcMethod.Text + " >>> [EXCEPTION (" + exception.Message + ")]");
  395. }
  396. }
  397. private void ClearRpcResponses(object sender, RoutedEventArgs e)
  398. {
  399. RpcResponses.Items.Clear();
  400. }
  401. private void ClearSessions(object sender, RoutedEventArgs e)
  402. {
  403. _sessions.Clear();
  404. }
  405. private void RefreshSessions(object sender, RoutedEventArgs e)
  406. {
  407. if (_mqttServer == null)
  408. {
  409. return;
  410. }
  411. var sessions = _mqttServer.GetClientStatusAsync().GetAwaiter().GetResult();
  412. _sessions.Clear();
  413. foreach (var session in sessions)
  414. {
  415. _sessions.Add(session);
  416. }
  417. ListViewSessions.DataContext = _sessions;
  418. }
  419. #region Wiki Code
  420. #pragma warning disable IDE0051 // Remove unused private members
  421. private async Task WikiCode()
  422. #pragma warning restore IDE0051 // Remove unused private members
  423. {
  424. {
  425. // Use a custom identifier for the trace messages.
  426. var clientOptions = new MqttClientOptionsBuilder()
  427. .Build();
  428. }
  429. {
  430. // Create a new MQTT client.
  431. var factory = new MqttFactory();
  432. var client = factory.CreateMqttClient();
  433. // Create TCP based options using the builder.
  434. var options = new MqttClientOptionsBuilder()
  435. .WithClientId("Client1")
  436. .WithTcpServer("broker.hivemq.com")
  437. .WithCredentials("bud", "%spencer%")
  438. .WithTls()
  439. .WithCleanSession()
  440. .Build();
  441. await client.ConnectAsync(options);
  442. // Reconnecting
  443. client.UseDisconnectedHandler(async e =>
  444. {
  445. Console.WriteLine("### DISCONNECTED FROM SERVER ###");
  446. await Task.Delay(TimeSpan.FromSeconds(5));
  447. try
  448. {
  449. await client.ConnectAsync(options);
  450. }
  451. catch
  452. {
  453. Console.WriteLine("### RECONNECTING FAILED ###");
  454. }
  455. });
  456. // Consuming messages
  457. client.UseApplicationMessageReceivedHandler(e =>
  458. {
  459. Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
  460. Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
  461. Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
  462. Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
  463. Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
  464. Console.WriteLine();
  465. });
  466. void Handler(MqttApplicationMessageReceivedEventArgs args)
  467. {
  468. //...
  469. }
  470. client.UseApplicationMessageReceivedHandler(e => Handler(e));
  471. // Subscribe after connect
  472. client.UseConnectedHandler(async e =>
  473. {
  474. Console.WriteLine("### CONNECTED WITH SERVER ###");
  475. // Subscribe to a topic
  476. await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("my/topic").Build());
  477. Console.WriteLine("### SUBSCRIBED ###");
  478. });
  479. // Subscribe to a topic
  480. await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("my/topic").Build());
  481. // Unsubscribe from a topic
  482. await client.UnsubscribeAsync("my/topic");
  483. // Publish an application message
  484. var applicationMessage = new MqttApplicationMessageBuilder()
  485. .WithTopic("A/B/C")
  486. .WithPayload("Hello World")
  487. .WithAtLeastOnceQoS()
  488. .Build();
  489. await client.PublishAsync(applicationMessage);
  490. }
  491. {
  492. {
  493. // Use TCP connection.
  494. var options = new MqttClientOptionsBuilder()
  495. .WithTcpServer("broker.hivemq.com", 1883) // Port is optional
  496. .Build();
  497. }
  498. {
  499. // Use secure TCP connection.
  500. var options = new MqttClientOptionsBuilder()
  501. .WithTcpServer("broker.hivemq.com")
  502. .WithTls()
  503. .Build();
  504. }
  505. {
  506. // Use WebSocket connection.
  507. var options = new MqttClientOptionsBuilder()
  508. .WithWebSocketServer("broker.hivemq.com:8000/mqtt")
  509. .Build();
  510. }
  511. {
  512. // Create TCP based options manually
  513. var options = new MqttClientOptions
  514. {
  515. ClientId = "Client1",
  516. Credentials = new MqttClientCredentials
  517. {
  518. Username = "bud",
  519. Password = Encoding.UTF8.GetBytes("%spencer%")
  520. },
  521. ChannelOptions = new MqttClientTcpOptions
  522. {
  523. Server = "broker.hivemq.org",
  524. TlsOptions = new MqttClientTlsOptions
  525. {
  526. UseTls = true
  527. }
  528. },
  529. };
  530. }
  531. }
  532. // ----------------------------------
  533. {
  534. var options = new MqttServerOptions
  535. {
  536. ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
  537. {
  538. if (c.ClientId.Length < 10)
  539. {
  540. c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
  541. return;
  542. }
  543. if (c.Username != "mySecretUser")
  544. {
  545. c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  546. return;
  547. }
  548. if (c.Password != "mySecretPassword")
  549. {
  550. c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  551. return;
  552. }
  553. c.ReasonCode = MqttConnectReasonCode.Success;
  554. })
  555. };
  556. var factory = new MqttFactory();
  557. var mqttServer = factory.CreateMqttServer();
  558. await mqttServer.StartAsync(options);
  559. Console.WriteLine("Press any key to exit.");
  560. Console.ReadLine();
  561. await mqttServer.StopAsync();
  562. }
  563. // ----------------------------------
  564. // For UWP apps:
  565. MqttTcpChannel.CustomIgnorableServerCertificateErrorsResolver = o =>
  566. {
  567. if (o.Server == "server_with_revoked_cert")
  568. {
  569. return new[] { ChainValidationResult.Revoked };
  570. }
  571. return Array.Empty<ChainValidationResult>();
  572. };
  573. {
  574. // Start a MQTT server.
  575. var mqttServer = new MqttFactory().CreateMqttServer();
  576. await mqttServer.StartAsync(new MqttServerOptions());
  577. Console.WriteLine("Press any key to exit.");
  578. Console.ReadLine();
  579. await mqttServer.StopAsync();
  580. }
  581. {
  582. // Configure MQTT server.
  583. var optionsBuilder = new MqttServerOptionsBuilder()
  584. .WithConnectionBacklog(100)
  585. .WithDefaultEndpointPort(1884);
  586. var options = new MqttServerOptions
  587. {
  588. };
  589. options.ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
  590. {
  591. if (c.ClientId != "Highlander")
  592. {
  593. c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
  594. return;
  595. }
  596. c.ReasonCode = MqttConnectReasonCode.Success;
  597. });
  598. var mqttServer = new MqttFactory().CreateMqttServer();
  599. await mqttServer.StartAsync(optionsBuilder.Build());
  600. }
  601. {
  602. // Setup client validator.
  603. var options = new MqttServerOptions
  604. {
  605. ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
  606. {
  607. if (c.ClientId.Length < 10)
  608. {
  609. c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
  610. return;
  611. }
  612. if (c.Username != "mySecretUser")
  613. {
  614. c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  615. return;
  616. }
  617. if (c.Password != "mySecretPassword")
  618. {
  619. c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  620. return;
  621. }
  622. c.ReasonCode = MqttConnectReasonCode.Success;
  623. })
  624. };
  625. }
  626. {
  627. // Create a new MQTT server.
  628. var mqttServer = new MqttFactory().CreateMqttServer();
  629. }
  630. {
  631. // Setup application message interceptor.
  632. var options = new MqttServerOptionsBuilder()
  633. .WithApplicationMessageInterceptor(context =>
  634. {
  635. if (context.ApplicationMessage.Topic == "my/custom/topic")
  636. {
  637. context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload.");
  638. }
  639. // It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document.
  640. // This is useful when the IoT device has no own clock and the creation time of the message might be important.
  641. })
  642. .Build();
  643. }
  644. {
  645. // Setup subscription interceptor.
  646. var options = new MqttServerOptionsBuilder()
  647. .WithSubscriptionInterceptor(context =>
  648. {
  649. if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
  650. {
  651. context.AcceptSubscription = false;
  652. }
  653. if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
  654. {
  655. context.AcceptSubscription = false;
  656. context.CloseConnection = true;
  657. }
  658. })
  659. .Build();
  660. }
  661. {
  662. // Setup and start a managed MQTT client.
  663. var options = new ManagedMqttClientOptionsBuilder()
  664. .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
  665. .WithClientOptions(new MqttClientOptionsBuilder()
  666. .WithClientId("Client1")
  667. .WithTcpServer("broker.hivemq.com")
  668. .WithTls().Build())
  669. .Build();
  670. var mqttClient = new MqttFactory().CreateManagedMqttClient();
  671. await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("my/topic").Build());
  672. await mqttClient.StartAsync(options);
  673. }
  674. {
  675. // Use a custom log ID for the logger.
  676. var factory = new MqttFactory();
  677. var client = factory.CreateMqttClient(new MqttNetLogger("MyCustomId"));
  678. }
  679. {
  680. var client = new MqttFactory().CreateMqttClient();
  681. var message = new MqttApplicationMessageBuilder()
  682. .WithTopic("MyTopic")
  683. .WithPayload("Hello World")
  684. .WithExactlyOnceQoS()
  685. .WithRetainFlag()
  686. .Build();
  687. await client.PublishAsync(message);
  688. }
  689. {
  690. // Write all trace messages to the console window.
  691. MqttNetGlobalLogger.LogMessagePublished += (s, e) =>
  692. {
  693. var trace = $">> [{e.LogMessage.Timestamp:O}] [{e.LogMessage.ThreadId}] [{e.LogMessage.Source}] [{e.LogMessage.Level}]: {e.LogMessage.Message}";
  694. if (e.LogMessage.Exception != null)
  695. {
  696. trace += Environment.NewLine + e.LogMessage.Exception.ToString();
  697. }
  698. Console.WriteLine(trace);
  699. };
  700. }
  701. }
  702. #endregion
  703. }
  704. }