You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

826 line
29 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.ObjectModel;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using Windows.Security.Cryptography.Certificates;
  7. using Windows.UI.Core;
  8. using Windows.UI.Xaml;
  9. using MQTTnet.Client;
  10. using MQTTnet.Client.Connecting;
  11. using MQTTnet.Client.Disconnecting;
  12. using MQTTnet.Client.Options;
  13. using MQTTnet.Diagnostics;
  14. using MQTTnet.Exceptions;
  15. using MQTTnet.Extensions.ManagedClient;
  16. using MQTTnet.Extensions.Rpc;
  17. using MQTTnet.Formatter;
  18. using MQTTnet.Implementations;
  19. using MQTTnet.Protocol;
  20. using MQTTnet.Server;
  21. using MQTTnet.Server.Status;
  22. using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs;
  23. using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs;
  24. using MQTTnet.Extensions.WebSocket4Net;
  25. namespace MQTTnet.TestApp.UniversalWindows
  26. {
  27. public sealed partial class MainPage
  28. {
  29. private readonly ConcurrentQueue<MqttNetLogMessage> _traceMessages = new ConcurrentQueue<MqttNetLogMessage>();
  30. private readonly ObservableCollection<IMqttClientStatus> _sessions = new ObservableCollection<IMqttClientStatus>();
  31. private IMqttClient _mqttClient;
  32. private IManagedMqttClient _managedMqttClient;
  33. private IMqttServer _mqttServer;
  34. public MainPage()
  35. {
  36. InitializeComponent();
  37. ClientId.Text = Guid.NewGuid().ToString("D");
  38. MqttNetGlobalLogger.LogMessagePublished += OnTraceMessagePublished;
  39. }
  40. private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
  41. {
  42. _traceMessages.Enqueue(e.TraceMessage);
  43. await UpdateLogAsync();
  44. }
  45. private async Task UpdateLogAsync()
  46. {
  47. while (_traceMessages.Count > 100)
  48. {
  49. _traceMessages.TryDequeue(out _);
  50. }
  51. var logText = new StringBuilder();
  52. foreach (var traceMessage in _traceMessages)
  53. {
  54. logText.AppendFormat(
  55. "[{0:yyyy-MM-dd HH:mm:ss.fff}] [{1}] [{2}] [{3}] [{4}]{5}",
  56. traceMessage.Timestamp,
  57. traceMessage.Level,
  58. traceMessage.Source,
  59. traceMessage.ThreadId,
  60. traceMessage.Message,
  61. Environment.NewLine);
  62. if (traceMessage.Exception != null)
  63. {
  64. logText.AppendLine(traceMessage.Exception.ToString());
  65. }
  66. }
  67. await Trace.Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  68. {
  69. Trace.Text = logText.ToString();
  70. });
  71. }
  72. private async void Connect(object sender, RoutedEventArgs e)
  73. {
  74. var mqttFactory = new MqttFactory();
  75. var tlsOptions = new MqttClientTlsOptions
  76. {
  77. UseTls = UseTls.IsChecked == true,
  78. IgnoreCertificateChainErrors = true,
  79. IgnoreCertificateRevocationErrors = true,
  80. AllowUntrustedCertificates = true
  81. };
  82. var options = new MqttClientOptions
  83. {
  84. ClientId = ClientId.Text,
  85. ProtocolVersion = MqttProtocolVersion.V500
  86. };
  87. if (UseTcp.IsChecked == true)
  88. {
  89. options.ChannelOptions = new MqttClientTcpOptions
  90. {
  91. Server = Server.Text,
  92. Port = int.Parse(Port.Text),
  93. TlsOptions = tlsOptions
  94. };
  95. }
  96. if (UseWs.IsChecked == true)
  97. {
  98. options.ChannelOptions = new MqttClientWebSocketOptions
  99. {
  100. Uri = Server.Text,
  101. TlsOptions = tlsOptions
  102. };
  103. }
  104. if (UseWs4Net.IsChecked == true)
  105. {
  106. options.ChannelOptions = new MqttClientWebSocketOptions
  107. {
  108. Uri = Server.Text,
  109. TlsOptions = tlsOptions
  110. };
  111. mqttFactory.UseWebSocket4Net();
  112. }
  113. if (options.ChannelOptions == null)
  114. {
  115. throw new InvalidOperationException();
  116. }
  117. if (!string.IsNullOrEmpty(User.Text))
  118. {
  119. options.Credentials = new MqttClientCredentials
  120. {
  121. Username = User.Text,
  122. Password = Encoding.UTF8.GetBytes(Password.Text)
  123. };
  124. }
  125. options.CleanSession = CleanSession.IsChecked == true;
  126. options.KeepAlivePeriod = TimeSpan.FromSeconds(double.Parse(KeepAliveInterval.Text));
  127. if (UseMqtt310.IsChecked == true)
  128. {
  129. options.ProtocolVersion = MqttProtocolVersion.V310;
  130. }
  131. else if (UseMqtt311.IsChecked == true)
  132. {
  133. options.ProtocolVersion = MqttProtocolVersion.V311;
  134. }
  135. else if (UseMqtt500.IsChecked == true)
  136. {
  137. options.ProtocolVersion = MqttProtocolVersion.V500;
  138. }
  139. try
  140. {
  141. if (_mqttClient != null)
  142. {
  143. await _mqttClient.DisconnectAsync();
  144. _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
  145. _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
  146. _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
  147. }
  148. if (UseManagedClient.IsChecked == true)
  149. {
  150. _managedMqttClient = mqttFactory.CreateManagedMqttClient();
  151. _managedMqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
  152. _managedMqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
  153. _managedMqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
  154. await _managedMqttClient.StartAsync(new ManagedMqttClientOptions
  155. {
  156. ClientOptions = options
  157. });
  158. }
  159. else
  160. {
  161. _mqttClient = mqttFactory.CreateMqttClient();
  162. _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
  163. _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
  164. _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
  165. await _mqttClient.ConnectAsync(options);
  166. }
  167. }
  168. catch (Exception exception)
  169. {
  170. Trace.Text += exception + Environment.NewLine;
  171. }
  172. }
  173. private void OnDisconnected(MqttClientDisconnectedEventArgs e)
  174. {
  175. _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
  176. "", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null));
  177. Task.Run(UpdateLogAsync);
  178. }
  179. private void OnConnected(MqttClientConnectedEventArgs e)
  180. {
  181. _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
  182. "", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null));
  183. Task.Run(UpdateLogAsync);
  184. }
  185. private async Task HandleReceivedApplicationMessage(MqttApplicationMessageReceivedEventArgs eventArgs)
  186. {
  187. var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {eventArgs.ApplicationMessage.ConvertPayloadToString()} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";
  188. await Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  189. {
  190. if (AddReceivedMessagesToList.IsChecked == true)
  191. {
  192. ReceivedMessages.Items.Add(item);
  193. }
  194. });
  195. }
  196. private async void Publish(object sender, RoutedEventArgs e)
  197. {
  198. try
  199. {
  200. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  201. if (QoS1.IsChecked == true)
  202. {
  203. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  204. }
  205. if (QoS2.IsChecked == true)
  206. {
  207. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  208. }
  209. var payload = new byte[0];
  210. if (PlainText.IsChecked == true)
  211. {
  212. payload = Encoding.UTF8.GetBytes(Payload.Text);
  213. }
  214. if (Base64.IsChecked == true)
  215. {
  216. payload = Convert.FromBase64String(Payload.Text);
  217. }
  218. var message = new MqttApplicationMessageBuilder()
  219. .WithContentType(ContentType.Text)
  220. .WithResponseTopic(ResponseTopic.Text)
  221. .WithTopic(Topic.Text)
  222. .WithPayload(payload)
  223. .WithQualityOfServiceLevel(qos)
  224. .WithRetainFlag(Retain.IsChecked == true)
  225. .Build();
  226. if (_mqttClient != null)
  227. {
  228. await _mqttClient.PublishAsync(message);
  229. }
  230. if (_managedMqttClient != null)
  231. {
  232. await _managedMqttClient.PublishAsync(message);
  233. }
  234. }
  235. catch (Exception exception)
  236. {
  237. Trace.Text += exception + Environment.NewLine;
  238. }
  239. }
  240. private async void Disconnect(object sender, RoutedEventArgs e)
  241. {
  242. try
  243. {
  244. if (_mqttClient != null)
  245. {
  246. await _mqttClient.DisconnectAsync();
  247. _mqttClient.Dispose();
  248. _mqttClient = null;
  249. }
  250. if (_managedMqttClient != null)
  251. {
  252. await _managedMqttClient.StopAsync();
  253. _managedMqttClient.Dispose();
  254. _managedMqttClient = null;
  255. }
  256. }
  257. catch (Exception exception)
  258. {
  259. Trace.Text += exception + Environment.NewLine;
  260. }
  261. }
  262. private void ClearLog(object sender, RoutedEventArgs e)
  263. {
  264. while (_traceMessages.Count > 0)
  265. {
  266. _traceMessages.TryDequeue(out _);
  267. }
  268. Trace.Text = string.Empty;
  269. }
  270. private async void Subscribe(object sender, RoutedEventArgs e)
  271. {
  272. try
  273. {
  274. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  275. if (SubscribeQoS1.IsChecked == true)
  276. {
  277. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  278. }
  279. if (SubscribeQoS2.IsChecked == true)
  280. {
  281. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  282. }
  283. var topicFilter = new TopicFilter { Topic = SubscribeTopic.Text, QualityOfServiceLevel = qos };
  284. if (_mqttClient != null)
  285. {
  286. await _mqttClient.SubscribeAsync(topicFilter);
  287. }
  288. if (_managedMqttClient != null)
  289. {
  290. await _managedMqttClient.SubscribeAsync(topicFilter);
  291. }
  292. }
  293. catch (Exception exception)
  294. {
  295. Trace.Text += exception + Environment.NewLine;
  296. }
  297. }
  298. private async void Unsubscribe(object sender, RoutedEventArgs e)
  299. {
  300. try
  301. {
  302. if (_mqttClient != null)
  303. {
  304. await _mqttClient.UnsubscribeAsync(SubscribeTopic.Text);
  305. }
  306. if (_managedMqttClient != null)
  307. {
  308. await _managedMqttClient.UnsubscribeAsync(SubscribeTopic.Text);
  309. }
  310. }
  311. catch (Exception exception)
  312. {
  313. Trace.Text += exception + Environment.NewLine;
  314. }
  315. }
  316. // This code is for the Wiki at GitHub!
  317. // ReSharper disable once UnusedMember.Local
  318. private async void StartServer(object sender, RoutedEventArgs e)
  319. {
  320. if (_mqttServer != null)
  321. {
  322. return;
  323. }
  324. JsonServerStorage storage = null;
  325. if (ServerPersistRetainedMessages.IsChecked == true)
  326. {
  327. storage = new JsonServerStorage();
  328. if (ServerClearRetainedMessages.IsChecked == true)
  329. {
  330. storage.Clear();
  331. }
  332. }
  333. _mqttServer = new MqttFactory().CreateMqttServer();
  334. var options = new MqttServerOptions();
  335. options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text);
  336. options.Storage = storage;
  337. options.EnablePersistentSessions = ServerAllowPersistentSessions.IsChecked == true;
  338. await _mqttServer.StartAsync(options);
  339. }
  340. private async void StopServer(object sender, RoutedEventArgs e)
  341. {
  342. if (_mqttServer == null)
  343. {
  344. return;
  345. }
  346. await _mqttServer.StopAsync();
  347. _mqttServer = null;
  348. }
  349. private void ClearReceivedMessages(object sender, RoutedEventArgs e)
  350. {
  351. ReceivedMessages.Items.Clear();
  352. }
  353. private async void ExecuteRpc(object sender, RoutedEventArgs e)
  354. {
  355. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  356. if (RpcQoS1.IsChecked == true)
  357. {
  358. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  359. }
  360. if (RpcQoS2.IsChecked == true)
  361. {
  362. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  363. }
  364. var payload = new byte[0];
  365. if (RpcText.IsChecked == true)
  366. {
  367. payload = Encoding.UTF8.GetBytes(RpcPayload.Text);
  368. }
  369. if (RpcBase64.IsChecked == true)
  370. {
  371. payload = Convert.FromBase64String(RpcPayload.Text);
  372. }
  373. try
  374. {
  375. var rpcClient = new MqttRpcClient(_mqttClient);
  376. var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos);
  377. RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response));
  378. }
  379. catch (MqttCommunicationTimedOutException)
  380. {
  381. RpcResponses.Items.Add(RpcMethod.Text + " >>> [TIMEOUT]");
  382. }
  383. catch (Exception exception)
  384. {
  385. RpcResponses.Items.Add(RpcMethod.Text + " >>> [EXCEPTION (" + exception.Message + ")]");
  386. }
  387. }
  388. private void ClearRpcResponses(object sender, RoutedEventArgs e)
  389. {
  390. RpcResponses.Items.Clear();
  391. }
  392. private void ClearSessions(object sender, RoutedEventArgs e)
  393. {
  394. _sessions.Clear();
  395. }
  396. private void RefreshSessions(object sender, RoutedEventArgs e)
  397. {
  398. if (_mqttServer == null)
  399. {
  400. return;
  401. }
  402. var sessions = _mqttServer.GetClientStatusAsync().GetAwaiter().GetResult();
  403. _sessions.Clear();
  404. foreach (var session in sessions)
  405. {
  406. _sessions.Add(session);
  407. }
  408. ListViewSessions.DataContext = _sessions;
  409. }
  410. #region Wiki Code
  411. private async Task WikiCode()
  412. {
  413. {
  414. // Use a custom identifier for the trace messages.
  415. var clientOptions = new MqttClientOptionsBuilder()
  416. .Build();
  417. }
  418. {
  419. // Create a new MQTT client.
  420. var factory = new MqttFactory();
  421. var client = factory.CreateMqttClient();
  422. // Create TCP based options using the builder.
  423. var options = new MqttClientOptionsBuilder()
  424. .WithClientId("Client1")
  425. .WithTcpServer("broker.hivemq.com")
  426. .WithCredentials("bud", "%spencer%")
  427. .WithTls()
  428. .WithCleanSession()
  429. .Build();
  430. await client.ConnectAsync(options);
  431. // Reconnecting
  432. client.UseDisconnectedHandler(async e =>
  433. {
  434. Console.WriteLine("### DISCONNECTED FROM SERVER ###");
  435. await Task.Delay(TimeSpan.FromSeconds(5));
  436. try
  437. {
  438. await client.ConnectAsync(options);
  439. }
  440. catch
  441. {
  442. Console.WriteLine("### RECONNECTING FAILED ###");
  443. }
  444. });
  445. // Consuming messages
  446. client.UseApplicationMessageReceivedHandler(e =>
  447. {
  448. Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
  449. Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
  450. Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
  451. Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
  452. Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
  453. Console.WriteLine();
  454. });
  455. void Handler(MqttApplicationMessageReceivedEventArgs args)
  456. {
  457. //...
  458. }
  459. client.UseApplicationMessageReceivedHandler(e => Handler(e));
  460. // Subscribe after connect
  461. client.UseConnectedHandler(async e =>
  462. {
  463. Console.WriteLine("### CONNECTED WITH SERVER ###");
  464. // Subscribe to a topic
  465. await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  466. Console.WriteLine("### SUBSCRIBED ###");
  467. });
  468. // Subscribe to a topic
  469. await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  470. // Unsubscribe from a topic
  471. await client.UnsubscribeAsync("my/topic");
  472. // Publish an application message
  473. var applicationMessage = new MqttApplicationMessageBuilder()
  474. .WithTopic("A/B/C")
  475. .WithPayload("Hello World")
  476. .WithAtLeastOnceQoS()
  477. .Build();
  478. await client.PublishAsync(applicationMessage);
  479. }
  480. {
  481. {
  482. // Use TCP connection.
  483. var options = new MqttClientOptionsBuilder()
  484. .WithTcpServer("broker.hivemq.com", 1883) // Port is optional
  485. .Build();
  486. }
  487. {
  488. // Use secure TCP connection.
  489. var options = new MqttClientOptionsBuilder()
  490. .WithTcpServer("broker.hivemq.com")
  491. .WithTls()
  492. .Build();
  493. }
  494. {
  495. // Use WebSocket connection.
  496. var options = new MqttClientOptionsBuilder()
  497. .WithWebSocketServer("broker.hivemq.com:8000/mqtt")
  498. .Build();
  499. }
  500. {
  501. // Create TCP based options manually
  502. var options = new MqttClientOptions
  503. {
  504. ClientId = "Client1",
  505. Credentials = new MqttClientCredentials
  506. {
  507. Username = "bud",
  508. Password = Encoding.UTF8.GetBytes("%spencer%")
  509. },
  510. ChannelOptions = new MqttClientTcpOptions
  511. {
  512. Server = "broker.hivemq.org",
  513. TlsOptions = new MqttClientTlsOptions
  514. {
  515. UseTls = true
  516. }
  517. },
  518. };
  519. }
  520. }
  521. // ----------------------------------
  522. {
  523. var options = new MqttServerOptions();
  524. options.ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
  525. {
  526. if (c.ClientId.Length < 10)
  527. {
  528. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  529. return;
  530. }
  531. if (c.Username != "mySecretUser")
  532. {
  533. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  534. return;
  535. }
  536. var password = Encoding.UTF8.GetString(c.Password);
  537. if (password != "mySecretPassword")
  538. {
  539. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  540. return;
  541. }
  542. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  543. });
  544. var factory = new MqttFactory();
  545. var mqttServer = factory.CreateMqttServer();
  546. await mqttServer.StartAsync(options);
  547. Console.WriteLine("Press any key to exit.");
  548. Console.ReadLine();
  549. await mqttServer.StopAsync();
  550. }
  551. // ----------------------------------
  552. // For UWP apps:
  553. MqttTcpChannel.CustomIgnorableServerCertificateErrorsResolver = o =>
  554. {
  555. if (o.Server == "server_with_revoked_cert")
  556. {
  557. return new[] { ChainValidationResult.Revoked };
  558. }
  559. return new ChainValidationResult[0];
  560. };
  561. {
  562. // Start a MQTT server.
  563. var mqttServer = new MqttFactory().CreateMqttServer();
  564. await mqttServer.StartAsync(new MqttServerOptions());
  565. Console.WriteLine("Press any key to exit.");
  566. Console.ReadLine();
  567. await mqttServer.StopAsync();
  568. }
  569. {
  570. // Configure MQTT server.
  571. var optionsBuilder = new MqttServerOptionsBuilder()
  572. .WithConnectionBacklog(100)
  573. .WithDefaultEndpointPort(1884);
  574. var options = new MqttServerOptions
  575. {
  576. };
  577. options.ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
  578. {
  579. if (c.ClientId != "Highlander")
  580. {
  581. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  582. return;
  583. }
  584. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  585. });
  586. var mqttServer = new MqttFactory().CreateMqttServer();
  587. await mqttServer.StartAsync(optionsBuilder.Build());
  588. }
  589. {
  590. // Setup client validator.
  591. var options = new MqttServerOptions
  592. {
  593. ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
  594. {
  595. if (c.ClientId.Length < 10)
  596. {
  597. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  598. return;
  599. }
  600. if (c.Username != "mySecretUser")
  601. {
  602. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  603. return;
  604. }
  605. var password = Encoding.UTF8.GetString(c.Password);
  606. if (password != "mySecretPassword")
  607. {
  608. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  609. return;
  610. }
  611. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  612. })
  613. };
  614. }
  615. {
  616. // Create a new MQTT server.
  617. var mqttServer = new MqttFactory().CreateMqttServer();
  618. }
  619. {
  620. // Setup application message interceptor.
  621. var options = new MqttServerOptionsBuilder()
  622. .WithApplicationMessageInterceptor(context =>
  623. {
  624. if (context.ApplicationMessage.Topic == "my/custom/topic")
  625. {
  626. context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload.");
  627. }
  628. // It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document.
  629. // This is useful when the IoT device has no own clock and the creation time of the message might be important.
  630. })
  631. .Build();
  632. }
  633. {
  634. // Setup subscription interceptor.
  635. var options = new MqttServerOptionsBuilder()
  636. .WithSubscriptionInterceptor(context =>
  637. {
  638. if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
  639. {
  640. context.AcceptSubscription = false;
  641. }
  642. if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
  643. {
  644. context.AcceptSubscription = false;
  645. context.CloseConnection = true;
  646. }
  647. })
  648. .Build();
  649. }
  650. {
  651. // Setup and start a managed MQTT client.
  652. var options = new ManagedMqttClientOptionsBuilder()
  653. .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
  654. .WithClientOptions(new MqttClientOptionsBuilder()
  655. .WithClientId("Client1")
  656. .WithTcpServer("broker.hivemq.com")
  657. .WithTls().Build())
  658. .Build();
  659. var mqttClient = new MqttFactory().CreateManagedMqttClient();
  660. await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  661. await mqttClient.StartAsync(options);
  662. }
  663. {
  664. // Use a custom log ID for the logger.
  665. var factory = new MqttFactory();
  666. var client = factory.CreateMqttClient(new MqttNetLogger("MyCustomId"));
  667. }
  668. {
  669. var client = new MqttFactory().CreateMqttClient();
  670. var message = new MqttApplicationMessageBuilder()
  671. .WithTopic("MyTopic")
  672. .WithPayload("Hello World")
  673. .WithExactlyOnceQoS()
  674. .WithRetainFlag()
  675. .Build();
  676. await client.PublishAsync(message);
  677. }
  678. {
  679. // Write all trace messages to the console window.
  680. MqttNetGlobalLogger.LogMessagePublished += (s, e) =>
  681. {
  682. var trace = $">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}";
  683. if (e.TraceMessage.Exception != null)
  684. {
  685. trace += Environment.NewLine + e.TraceMessage.Exception.ToString();
  686. }
  687. Console.WriteLine(trace);
  688. };
  689. }
  690. }
  691. #endregion
  692. }
  693. }