You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

833 lines
29 KiB

  1. using MQTTnet.Client;
  2. using MQTTnet.Client.Connecting;
  3. using MQTTnet.Client.Disconnecting;
  4. using MQTTnet.Client.Options;
  5. using MQTTnet.Diagnostics;
  6. using MQTTnet.Exceptions;
  7. using MQTTnet.Extensions.ManagedClient;
  8. using MQTTnet.Extensions.Rpc;
  9. using MQTTnet.Extensions.WebSocket4Net;
  10. using MQTTnet.Formatter;
  11. using MQTTnet.Implementations;
  12. using MQTTnet.Protocol;
  13. using MQTTnet.Server;
  14. using MQTTnet.Server.Status;
  15. using System;
  16. using System.Collections.Concurrent;
  17. using System.Collections.ObjectModel;
  18. using System.Text;
  19. using System.Threading.Tasks;
  20. using Windows.Security.Cryptography.Certificates;
  21. using Windows.UI.Core;
  22. using Windows.UI.Xaml;
  23. using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs;
  24. using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs;
  25. namespace MQTTnet.TestApp.UniversalWindows
  26. {
  27. public sealed partial class MainPage
  28. {
  29. private readonly ConcurrentQueue<MqttNetLogMessage> _traceMessages = new ConcurrentQueue<MqttNetLogMessage>();
  30. private readonly ObservableCollection<IMqttClientStatus> _sessions = new ObservableCollection<IMqttClientStatus>();
  31. private IMqttClient _mqttClient;
  32. private IManagedMqttClient _managedMqttClient;
  33. private IMqttServer _mqttServer;
  34. public MainPage()
  35. {
  36. InitializeComponent();
  37. ClientId.Text = Guid.NewGuid().ToString("D");
  38. MqttNetGlobalLogger.LogMessagePublished += OnTraceMessagePublished;
  39. }
  40. private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
  41. {
  42. _traceMessages.Enqueue(e.TraceMessage);
  43. await UpdateLogAsync();
  44. }
  45. private async Task UpdateLogAsync()
  46. {
  47. while (_traceMessages.Count > 100)
  48. {
  49. _traceMessages.TryDequeue(out _);
  50. }
  51. var logText = new StringBuilder();
  52. foreach (var traceMessage in _traceMessages)
  53. {
  54. logText.AppendFormat(
  55. "[{0:yyyy-MM-dd HH:mm:ss.fff}] [{1}] [{2}] [{3}] [{4}]{5}",
  56. traceMessage.Timestamp,
  57. traceMessage.Level,
  58. traceMessage.Source,
  59. traceMessage.ThreadId,
  60. traceMessage.Message,
  61. Environment.NewLine);
  62. if (traceMessage.Exception != null)
  63. {
  64. logText.AppendLine(traceMessage.Exception.ToString());
  65. }
  66. }
  67. await Trace.Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  68. {
  69. Trace.Text = logText.ToString();
  70. });
  71. }
  72. private async void Connect(object sender, RoutedEventArgs e)
  73. {
  74. var mqttFactory = new MqttFactory();
  75. var tlsOptions = new MqttClientTlsOptions
  76. {
  77. UseTls = UseTls.IsChecked == true,
  78. IgnoreCertificateChainErrors = true,
  79. IgnoreCertificateRevocationErrors = true,
  80. AllowUntrustedCertificates = true
  81. };
  82. var options = new MqttClientOptions
  83. {
  84. ClientId = ClientId.Text,
  85. ProtocolVersion = MqttProtocolVersion.V500
  86. };
  87. if (UseTcp.IsChecked == true)
  88. {
  89. options.ChannelOptions = new MqttClientTcpOptions
  90. {
  91. Server = Server.Text,
  92. Port = int.Parse(Port.Text),
  93. TlsOptions = tlsOptions
  94. };
  95. }
  96. if (UseWs.IsChecked == true)
  97. {
  98. options.ChannelOptions = new MqttClientWebSocketOptions
  99. {
  100. Uri = Server.Text,
  101. TlsOptions = tlsOptions
  102. };
  103. }
  104. if (UseWs4Net.IsChecked == true)
  105. {
  106. options.ChannelOptions = new MqttClientWebSocketOptions
  107. {
  108. Uri = Server.Text,
  109. TlsOptions = tlsOptions
  110. };
  111. mqttFactory.UseWebSocket4Net();
  112. }
  113. if (options.ChannelOptions == null)
  114. {
  115. throw new InvalidOperationException();
  116. }
  117. if (!string.IsNullOrEmpty(User.Text))
  118. {
  119. options.Credentials = new MqttClientCredentials
  120. {
  121. Username = User.Text,
  122. Password = Encoding.UTF8.GetBytes(Password.Text)
  123. };
  124. }
  125. options.CleanSession = CleanSession.IsChecked == true;
  126. options.KeepAlivePeriod = TimeSpan.FromSeconds(double.Parse(KeepAliveInterval.Text));
  127. if (UseMqtt310.IsChecked == true)
  128. {
  129. options.ProtocolVersion = MqttProtocolVersion.V310;
  130. }
  131. else if (UseMqtt311.IsChecked == true)
  132. {
  133. options.ProtocolVersion = MqttProtocolVersion.V311;
  134. }
  135. else if (UseMqtt500.IsChecked == true)
  136. {
  137. options.ProtocolVersion = MqttProtocolVersion.V500;
  138. }
  139. try
  140. {
  141. if (_mqttClient != null)
  142. {
  143. await _mqttClient.DisconnectAsync();
  144. _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
  145. _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
  146. _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
  147. }
  148. if (UseManagedClient.IsChecked == true)
  149. {
  150. _managedMqttClient = mqttFactory.CreateManagedMqttClient();
  151. _managedMqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
  152. _managedMqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
  153. _managedMqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
  154. await _managedMqttClient.StartAsync(new ManagedMqttClientOptions
  155. {
  156. ClientOptions = options
  157. });
  158. }
  159. else
  160. {
  161. _mqttClient = mqttFactory.CreateMqttClient();
  162. _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
  163. _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
  164. _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
  165. await _mqttClient.ConnectAsync(options);
  166. }
  167. }
  168. catch (Exception exception)
  169. {
  170. Trace.Text += exception + Environment.NewLine;
  171. }
  172. }
  173. private void OnDisconnected(MqttClientDisconnectedEventArgs e)
  174. {
  175. _traceMessages.Enqueue(new MqttNetLogMessage
  176. {
  177. Timestamp = DateTime.UtcNow,
  178. ThreadId = -1,
  179. Level = MqttNetLogLevel.Info,
  180. Message = "! DISCONNECTED EVENT FIRED",
  181. });
  182. Task.Run(UpdateLogAsync);
  183. }
  184. private void OnConnected(MqttClientConnectedEventArgs e)
  185. {
  186. _traceMessages.Enqueue(new MqttNetLogMessage
  187. {
  188. Timestamp = DateTime.UtcNow,
  189. ThreadId = -1,
  190. Level = MqttNetLogLevel.Info,
  191. Message = "! CONNECTED EVENT FIRED",
  192. });
  193. Task.Run(UpdateLogAsync);
  194. }
  195. private async Task HandleReceivedApplicationMessage(MqttApplicationMessageReceivedEventArgs eventArgs)
  196. {
  197. var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {eventArgs.ApplicationMessage.ConvertPayloadToString()} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";
  198. await Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  199. {
  200. if (AddReceivedMessagesToList.IsChecked == true)
  201. {
  202. ReceivedMessages.Items.Add(item);
  203. }
  204. });
  205. }
  206. private async void Publish(object sender, RoutedEventArgs e)
  207. {
  208. try
  209. {
  210. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  211. if (QoS1.IsChecked == true)
  212. {
  213. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  214. }
  215. if (QoS2.IsChecked == true)
  216. {
  217. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  218. }
  219. var payload = new byte[0];
  220. if (PlainText.IsChecked == true)
  221. {
  222. payload = Encoding.UTF8.GetBytes(Payload.Text);
  223. }
  224. if (Base64.IsChecked == true)
  225. {
  226. payload = Convert.FromBase64String(Payload.Text);
  227. }
  228. var message = new MqttApplicationMessageBuilder()
  229. .WithContentType(ContentType.Text)
  230. .WithResponseTopic(ResponseTopic.Text)
  231. .WithTopic(Topic.Text)
  232. .WithPayload(payload)
  233. .WithQualityOfServiceLevel(qos)
  234. .WithRetainFlag(Retain.IsChecked == true)
  235. .Build();
  236. if (_mqttClient != null)
  237. {
  238. await _mqttClient.PublishAsync(message);
  239. }
  240. if (_managedMqttClient != null)
  241. {
  242. await _managedMqttClient.PublishAsync(message);
  243. }
  244. }
  245. catch (Exception exception)
  246. {
  247. Trace.Text += exception + Environment.NewLine;
  248. }
  249. }
  250. private async void Disconnect(object sender, RoutedEventArgs e)
  251. {
  252. try
  253. {
  254. if (_mqttClient != null)
  255. {
  256. await _mqttClient.DisconnectAsync();
  257. _mqttClient.Dispose();
  258. _mqttClient = null;
  259. }
  260. if (_managedMqttClient != null)
  261. {
  262. await _managedMqttClient.StopAsync();
  263. _managedMqttClient.Dispose();
  264. _managedMqttClient = null;
  265. }
  266. }
  267. catch (Exception exception)
  268. {
  269. Trace.Text += exception + Environment.NewLine;
  270. }
  271. }
  272. private void ClearLog(object sender, RoutedEventArgs e)
  273. {
  274. while (_traceMessages.Count > 0)
  275. {
  276. _traceMessages.TryDequeue(out _);
  277. }
  278. Trace.Text = string.Empty;
  279. }
  280. private async void Subscribe(object sender, RoutedEventArgs e)
  281. {
  282. try
  283. {
  284. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  285. if (SubscribeQoS1.IsChecked == true)
  286. {
  287. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  288. }
  289. if (SubscribeQoS2.IsChecked == true)
  290. {
  291. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  292. }
  293. var topicFilter = new TopicFilter { Topic = SubscribeTopic.Text, QualityOfServiceLevel = qos };
  294. if (_mqttClient != null)
  295. {
  296. await _mqttClient.SubscribeAsync(topicFilter);
  297. }
  298. if (_managedMqttClient != null)
  299. {
  300. await _managedMqttClient.SubscribeAsync(topicFilter);
  301. }
  302. }
  303. catch (Exception exception)
  304. {
  305. Trace.Text += exception + Environment.NewLine;
  306. }
  307. }
  308. private async void Unsubscribe(object sender, RoutedEventArgs e)
  309. {
  310. try
  311. {
  312. if (_mqttClient != null)
  313. {
  314. await _mqttClient.UnsubscribeAsync(SubscribeTopic.Text);
  315. }
  316. if (_managedMqttClient != null)
  317. {
  318. await _managedMqttClient.UnsubscribeAsync(SubscribeTopic.Text);
  319. }
  320. }
  321. catch (Exception exception)
  322. {
  323. Trace.Text += exception + Environment.NewLine;
  324. }
  325. }
  326. // This code is for the Wiki at GitHub!
  327. // ReSharper disable once UnusedMember.Local
  328. private async void StartServer(object sender, RoutedEventArgs e)
  329. {
  330. if (_mqttServer != null)
  331. {
  332. return;
  333. }
  334. JsonServerStorage storage = null;
  335. if (ServerPersistRetainedMessages.IsChecked == true)
  336. {
  337. storage = new JsonServerStorage();
  338. if (ServerClearRetainedMessages.IsChecked == true)
  339. {
  340. storage.Clear();
  341. }
  342. }
  343. _mqttServer = new MqttFactory().CreateMqttServer();
  344. var options = new MqttServerOptions();
  345. options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text);
  346. options.Storage = storage;
  347. options.EnablePersistentSessions = ServerAllowPersistentSessions.IsChecked == true;
  348. await _mqttServer.StartAsync(options);
  349. }
  350. private async void StopServer(object sender, RoutedEventArgs e)
  351. {
  352. if (_mqttServer == null)
  353. {
  354. return;
  355. }
  356. await _mqttServer.StopAsync();
  357. _mqttServer = null;
  358. }
  359. private void ClearReceivedMessages(object sender, RoutedEventArgs e)
  360. {
  361. ReceivedMessages.Items.Clear();
  362. }
  363. private async void ExecuteRpc(object sender, RoutedEventArgs e)
  364. {
  365. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  366. if (RpcQoS1.IsChecked == true)
  367. {
  368. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  369. }
  370. if (RpcQoS2.IsChecked == true)
  371. {
  372. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  373. }
  374. var payload = new byte[0];
  375. if (RpcText.IsChecked == true)
  376. {
  377. payload = Encoding.UTF8.GetBytes(RpcPayload.Text);
  378. }
  379. if (RpcBase64.IsChecked == true)
  380. {
  381. payload = Convert.FromBase64String(RpcPayload.Text);
  382. }
  383. try
  384. {
  385. var rpcClient = new MqttRpcClient(_mqttClient);
  386. var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos);
  387. RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response));
  388. }
  389. catch (MqttCommunicationTimedOutException)
  390. {
  391. RpcResponses.Items.Add(RpcMethod.Text + " >>> [TIMEOUT]");
  392. }
  393. catch (Exception exception)
  394. {
  395. RpcResponses.Items.Add(RpcMethod.Text + " >>> [EXCEPTION (" + exception.Message + ")]");
  396. }
  397. }
  398. private void ClearRpcResponses(object sender, RoutedEventArgs e)
  399. {
  400. RpcResponses.Items.Clear();
  401. }
  402. private void ClearSessions(object sender, RoutedEventArgs e)
  403. {
  404. _sessions.Clear();
  405. }
  406. private void RefreshSessions(object sender, RoutedEventArgs e)
  407. {
  408. if (_mqttServer == null)
  409. {
  410. return;
  411. }
  412. var sessions = _mqttServer.GetClientStatusAsync().GetAwaiter().GetResult();
  413. _sessions.Clear();
  414. foreach (var session in sessions)
  415. {
  416. _sessions.Add(session);
  417. }
  418. ListViewSessions.DataContext = _sessions;
  419. }
  420. #region Wiki Code
  421. private async Task WikiCode()
  422. {
  423. {
  424. // Use a custom identifier for the trace messages.
  425. var clientOptions = new MqttClientOptionsBuilder()
  426. .Build();
  427. }
  428. {
  429. // Create a new MQTT client.
  430. var factory = new MqttFactory();
  431. var client = factory.CreateMqttClient();
  432. // Create TCP based options using the builder.
  433. var options = new MqttClientOptionsBuilder()
  434. .WithClientId("Client1")
  435. .WithTcpServer("broker.hivemq.com")
  436. .WithCredentials("bud", "%spencer%")
  437. .WithTls()
  438. .WithCleanSession()
  439. .Build();
  440. await client.ConnectAsync(options);
  441. // Reconnecting
  442. client.UseDisconnectedHandler(async e =>
  443. {
  444. Console.WriteLine("### DISCONNECTED FROM SERVER ###");
  445. await Task.Delay(TimeSpan.FromSeconds(5));
  446. try
  447. {
  448. await client.ConnectAsync(options);
  449. }
  450. catch
  451. {
  452. Console.WriteLine("### RECONNECTING FAILED ###");
  453. }
  454. });
  455. // Consuming messages
  456. client.UseApplicationMessageReceivedHandler(e =>
  457. {
  458. Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
  459. Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
  460. Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
  461. Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
  462. Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
  463. Console.WriteLine();
  464. });
  465. void Handler(MqttApplicationMessageReceivedEventArgs args)
  466. {
  467. //...
  468. }
  469. client.UseApplicationMessageReceivedHandler(e => Handler(e));
  470. // Subscribe after connect
  471. client.UseConnectedHandler(async e =>
  472. {
  473. Console.WriteLine("### CONNECTED WITH SERVER ###");
  474. // Subscribe to a topic
  475. await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  476. Console.WriteLine("### SUBSCRIBED ###");
  477. });
  478. // Subscribe to a topic
  479. await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  480. // Unsubscribe from a topic
  481. await client.UnsubscribeAsync("my/topic");
  482. // Publish an application message
  483. var applicationMessage = new MqttApplicationMessageBuilder()
  484. .WithTopic("A/B/C")
  485. .WithPayload("Hello World")
  486. .WithAtLeastOnceQoS()
  487. .Build();
  488. await client.PublishAsync(applicationMessage);
  489. }
  490. {
  491. {
  492. // Use TCP connection.
  493. var options = new MqttClientOptionsBuilder()
  494. .WithTcpServer("broker.hivemq.com", 1883) // Port is optional
  495. .Build();
  496. }
  497. {
  498. // Use secure TCP connection.
  499. var options = new MqttClientOptionsBuilder()
  500. .WithTcpServer("broker.hivemq.com")
  501. .WithTls()
  502. .Build();
  503. }
  504. {
  505. // Use WebSocket connection.
  506. var options = new MqttClientOptionsBuilder()
  507. .WithWebSocketServer("broker.hivemq.com:8000/mqtt")
  508. .Build();
  509. }
  510. {
  511. // Create TCP based options manually
  512. var options = new MqttClientOptions
  513. {
  514. ClientId = "Client1",
  515. Credentials = new MqttClientCredentials
  516. {
  517. Username = "bud",
  518. Password = Encoding.UTF8.GetBytes("%spencer%")
  519. },
  520. ChannelOptions = new MqttClientTcpOptions
  521. {
  522. Server = "broker.hivemq.org",
  523. TlsOptions = new MqttClientTlsOptions
  524. {
  525. UseTls = true
  526. }
  527. },
  528. };
  529. }
  530. }
  531. // ----------------------------------
  532. {
  533. var options = new MqttServerOptions();
  534. options.ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
  535. {
  536. if (c.ClientId.Length < 10)
  537. {
  538. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  539. return;
  540. }
  541. if (c.Username != "mySecretUser")
  542. {
  543. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  544. return;
  545. }
  546. if (c.Password != "mySecretPassword")
  547. {
  548. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  549. return;
  550. }
  551. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  552. });
  553. var factory = new MqttFactory();
  554. var mqttServer = factory.CreateMqttServer();
  555. await mqttServer.StartAsync(options);
  556. Console.WriteLine("Press any key to exit.");
  557. Console.ReadLine();
  558. await mqttServer.StopAsync();
  559. }
  560. // ----------------------------------
  561. // For UWP apps:
  562. MqttTcpChannel.CustomIgnorableServerCertificateErrorsResolver = o =>
  563. {
  564. if (o.Server == "server_with_revoked_cert")
  565. {
  566. return new[] { ChainValidationResult.Revoked };
  567. }
  568. return new ChainValidationResult[0];
  569. };
  570. {
  571. // Start a MQTT server.
  572. var mqttServer = new MqttFactory().CreateMqttServer();
  573. await mqttServer.StartAsync(new MqttServerOptions());
  574. Console.WriteLine("Press any key to exit.");
  575. Console.ReadLine();
  576. await mqttServer.StopAsync();
  577. }
  578. {
  579. // Configure MQTT server.
  580. var optionsBuilder = new MqttServerOptionsBuilder()
  581. .WithConnectionBacklog(100)
  582. .WithDefaultEndpointPort(1884);
  583. var options = new MqttServerOptions
  584. {
  585. };
  586. options.ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
  587. {
  588. if (c.ClientId != "Highlander")
  589. {
  590. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  591. return;
  592. }
  593. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  594. });
  595. var mqttServer = new MqttFactory().CreateMqttServer();
  596. await mqttServer.StartAsync(optionsBuilder.Build());
  597. }
  598. {
  599. // Setup client validator.
  600. var options = new MqttServerOptions
  601. {
  602. ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
  603. {
  604. if (c.ClientId.Length < 10)
  605. {
  606. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  607. return;
  608. }
  609. if (c.Username != "mySecretUser")
  610. {
  611. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  612. return;
  613. }
  614. if (c.Password != "mySecretPassword")
  615. {
  616. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  617. return;
  618. }
  619. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  620. })
  621. };
  622. }
  623. {
  624. // Create a new MQTT server.
  625. var mqttServer = new MqttFactory().CreateMqttServer();
  626. }
  627. {
  628. // Setup application message interceptor.
  629. var options = new MqttServerOptionsBuilder()
  630. .WithApplicationMessageInterceptor(context =>
  631. {
  632. if (context.ApplicationMessage.Topic == "my/custom/topic")
  633. {
  634. context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload.");
  635. }
  636. // It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document.
  637. // This is useful when the IoT device has no own clock and the creation time of the message might be important.
  638. })
  639. .Build();
  640. }
  641. {
  642. // Setup subscription interceptor.
  643. var options = new MqttServerOptionsBuilder()
  644. .WithSubscriptionInterceptor(context =>
  645. {
  646. if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
  647. {
  648. context.AcceptSubscription = false;
  649. }
  650. if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
  651. {
  652. context.AcceptSubscription = false;
  653. context.CloseConnection = true;
  654. }
  655. })
  656. .Build();
  657. }
  658. {
  659. // Setup and start a managed MQTT client.
  660. var options = new ManagedMqttClientOptionsBuilder()
  661. .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
  662. .WithClientOptions(new MqttClientOptionsBuilder()
  663. .WithClientId("Client1")
  664. .WithTcpServer("broker.hivemq.com")
  665. .WithTls().Build())
  666. .Build();
  667. var mqttClient = new MqttFactory().CreateManagedMqttClient();
  668. await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  669. await mqttClient.StartAsync(options);
  670. }
  671. {
  672. // Use a custom log ID for the logger.
  673. var factory = new MqttFactory();
  674. var client = factory.CreateMqttClient(new MqttNetLogger("MyCustomId"));
  675. }
  676. {
  677. var client = new MqttFactory().CreateMqttClient();
  678. var message = new MqttApplicationMessageBuilder()
  679. .WithTopic("MyTopic")
  680. .WithPayload("Hello World")
  681. .WithExactlyOnceQoS()
  682. .WithRetainFlag()
  683. .Build();
  684. await client.PublishAsync(message);
  685. }
  686. {
  687. // Write all trace messages to the console window.
  688. MqttNetGlobalLogger.LogMessagePublished += (s, e) =>
  689. {
  690. var trace = $">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}";
  691. if (e.TraceMessage.Exception != null)
  692. {
  693. trace += Environment.NewLine + e.TraceMessage.Exception.ToString();
  694. }
  695. Console.WriteLine(trace);
  696. };
  697. }
  698. }
  699. #endregion
  700. }
  701. }