Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.
 
 
 
 

823 wiersze
28 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.ObjectModel;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using Windows.Security.Cryptography.Certificates;
  7. using Windows.UI.Core;
  8. using Windows.UI.Xaml;
  9. using MQTTnet.Client;
  10. using MQTTnet.Client.Connecting;
  11. using MQTTnet.Client.Disconnecting;
  12. using MQTTnet.Client.Options;
  13. using MQTTnet.Diagnostics;
  14. using MQTTnet.Exceptions;
  15. using MQTTnet.Extensions.ManagedClient;
  16. using MQTTnet.Extensions.Rpc;
  17. using MQTTnet.Formatter;
  18. using MQTTnet.Implementations;
  19. using MQTTnet.Protocol;
  20. using MQTTnet.Server;
  21. using MQTTnet.Server.Status;
  22. using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs;
  23. using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs;
  24. using MQTTnet.Extensions.WebSocket4Net;
  25. namespace MQTTnet.TestApp.UniversalWindows
  26. {
  27. public sealed partial class MainPage
  28. {
  29. private readonly ConcurrentQueue<MqttNetLogMessage> _traceMessages = new ConcurrentQueue<MqttNetLogMessage>();
  30. private readonly ObservableCollection<IMqttClientStatus> _sessions = new ObservableCollection<IMqttClientStatus>();
  31. private IMqttClient _mqttClient;
  32. private IManagedMqttClient _managedMqttClient;
  33. private IMqttServer _mqttServer;
  34. public MainPage()
  35. {
  36. InitializeComponent();
  37. ClientId.Text = Guid.NewGuid().ToString("D");
  38. MqttNetGlobalLogger.LogMessagePublished += OnTraceMessagePublished;
  39. }
  40. private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
  41. {
  42. _traceMessages.Enqueue(e.TraceMessage);
  43. await UpdateLogAsync();
  44. }
  45. private async Task UpdateLogAsync()
  46. {
  47. while (_traceMessages.Count > 100)
  48. {
  49. _traceMessages.TryDequeue(out _);
  50. }
  51. var logText = new StringBuilder();
  52. foreach (var traceMessage in _traceMessages)
  53. {
  54. logText.AppendFormat(
  55. "[{0:yyyy-MM-dd HH:mm:ss.fff}] [{1}] [{2}] [{3}] [{4}]{5}",
  56. traceMessage.Timestamp,
  57. traceMessage.Level,
  58. traceMessage.Source,
  59. traceMessage.ThreadId,
  60. traceMessage.Message,
  61. Environment.NewLine);
  62. if (traceMessage.Exception != null)
  63. {
  64. logText.AppendLine(traceMessage.Exception.ToString());
  65. }
  66. }
  67. await Trace.Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  68. {
  69. Trace.Text = logText.ToString();
  70. });
  71. }
  72. private async void Connect(object sender, RoutedEventArgs e)
  73. {
  74. var mqttFactory = new MqttFactory();
  75. var tlsOptions = new MqttClientTlsOptions
  76. {
  77. UseTls = UseTls.IsChecked == true,
  78. IgnoreCertificateChainErrors = true,
  79. IgnoreCertificateRevocationErrors = true,
  80. AllowUntrustedCertificates = true
  81. };
  82. var options = new MqttClientOptions
  83. {
  84. ClientId = ClientId.Text,
  85. ProtocolVersion = MqttProtocolVersion.V500
  86. };
  87. if (UseTcp.IsChecked == true)
  88. {
  89. options.ChannelOptions = new MqttClientTcpOptions
  90. {
  91. Server = Server.Text,
  92. Port = int.Parse(Port.Text),
  93. TlsOptions = tlsOptions
  94. };
  95. }
  96. if (UseWs.IsChecked == true)
  97. {
  98. options.ChannelOptions = new MqttClientWebSocketOptions
  99. {
  100. Uri = Server.Text,
  101. TlsOptions = tlsOptions
  102. };
  103. }
  104. if (UseWs4Net.IsChecked == true)
  105. {
  106. options.ChannelOptions = new MqttClientWebSocketOptions
  107. {
  108. Uri = Server.Text,
  109. TlsOptions = tlsOptions
  110. };
  111. mqttFactory.UseWebSocket4Net();
  112. }
  113. if (options.ChannelOptions == null)
  114. {
  115. throw new InvalidOperationException();
  116. }
  117. if (!string.IsNullOrEmpty(User.Text))
  118. {
  119. options.Credentials = new MqttClientCredentials
  120. {
  121. Username = User.Text,
  122. Password = Encoding.UTF8.GetBytes(Password.Text)
  123. };
  124. }
  125. options.CleanSession = CleanSession.IsChecked == true;
  126. options.KeepAlivePeriod = TimeSpan.FromSeconds(double.Parse(KeepAliveInterval.Text));
  127. if (UseMqtt310.IsChecked == true)
  128. {
  129. options.ProtocolVersion = MqttProtocolVersion.V310;
  130. }
  131. else if (UseMqtt311.IsChecked == true)
  132. {
  133. options.ProtocolVersion = MqttProtocolVersion.V311;
  134. }
  135. else if (UseMqtt500.IsChecked == true)
  136. {
  137. options.ProtocolVersion = MqttProtocolVersion.V500;
  138. }
  139. try
  140. {
  141. if (_mqttClient != null)
  142. {
  143. await _mqttClient.DisconnectAsync();
  144. _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
  145. _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
  146. _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
  147. }
  148. if (UseManagedClient.IsChecked == true)
  149. {
  150. _managedMqttClient = mqttFactory.CreateManagedMqttClient();
  151. _managedMqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
  152. _managedMqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
  153. _managedMqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
  154. await _managedMqttClient.StartAsync(new ManagedMqttClientOptions
  155. {
  156. ClientOptions = options
  157. });
  158. }
  159. else
  160. {
  161. _mqttClient = mqttFactory.CreateMqttClient();
  162. _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
  163. _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
  164. _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
  165. await _mqttClient.ConnectAsync(options);
  166. }
  167. }
  168. catch (Exception exception)
  169. {
  170. Trace.Text += exception + Environment.NewLine;
  171. }
  172. }
  173. private void OnDisconnected(MqttClientDisconnectedEventArgs e)
  174. {
  175. _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
  176. "", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null));
  177. Task.Run(UpdateLogAsync);
  178. }
  179. private void OnConnected(MqttClientConnectedEventArgs e)
  180. {
  181. _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
  182. "", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null));
  183. Task.Run(UpdateLogAsync);
  184. }
  185. private async Task HandleReceivedApplicationMessage(MqttApplicationMessageReceivedEventArgs eventArgs)
  186. {
  187. var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {eventArgs.ApplicationMessage.ConvertPayloadToString()} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";
  188. await Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  189. {
  190. if (AddReceivedMessagesToList.IsChecked == true)
  191. {
  192. ReceivedMessages.Items.Add(item);
  193. }
  194. });
  195. }
  196. private async void Publish(object sender, RoutedEventArgs e)
  197. {
  198. try
  199. {
  200. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  201. if (QoS1.IsChecked == true)
  202. {
  203. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  204. }
  205. if (QoS2.IsChecked == true)
  206. {
  207. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  208. }
  209. var payload = new byte[0];
  210. if (PlainText.IsChecked == true)
  211. {
  212. payload = Encoding.UTF8.GetBytes(Payload.Text);
  213. }
  214. if (Base64.IsChecked == true)
  215. {
  216. payload = Convert.FromBase64String(Payload.Text);
  217. }
  218. var message = new MqttApplicationMessageBuilder()
  219. .WithContentType(ContentType.Text)
  220. .WithResponseTopic(ResponseTopic.Text)
  221. .WithTopic(Topic.Text)
  222. .WithPayload(payload)
  223. .WithQualityOfServiceLevel(qos)
  224. .WithRetainFlag(Retain.IsChecked == true)
  225. .Build();
  226. if (_mqttClient != null)
  227. {
  228. await _mqttClient.PublishAsync(message);
  229. }
  230. if (_managedMqttClient != null)
  231. {
  232. await _managedMqttClient.PublishAsync(message);
  233. }
  234. }
  235. catch (Exception exception)
  236. {
  237. Trace.Text += exception + Environment.NewLine;
  238. }
  239. }
  240. private async void Disconnect(object sender, RoutedEventArgs e)
  241. {
  242. try
  243. {
  244. if (_mqttClient != null)
  245. {
  246. await _mqttClient.DisconnectAsync();
  247. _mqttClient.Dispose();
  248. _mqttClient = null;
  249. }
  250. if (_managedMqttClient != null)
  251. {
  252. await _managedMqttClient.StopAsync();
  253. _managedMqttClient.Dispose();
  254. _managedMqttClient = null;
  255. }
  256. }
  257. catch (Exception exception)
  258. {
  259. Trace.Text += exception + Environment.NewLine;
  260. }
  261. }
  262. private void ClearLog(object sender, RoutedEventArgs e)
  263. {
  264. while (_traceMessages.Count > 0)
  265. {
  266. _traceMessages.TryDequeue(out _);
  267. }
  268. Trace.Text = string.Empty;
  269. }
  270. private async void Subscribe(object sender, RoutedEventArgs e)
  271. {
  272. try
  273. {
  274. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  275. if (SubscribeQoS1.IsChecked == true)
  276. {
  277. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  278. }
  279. if (SubscribeQoS2.IsChecked == true)
  280. {
  281. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  282. }
  283. var topicFilter = new TopicFilter { Topic = SubscribeTopic.Text, QualityOfServiceLevel = qos };
  284. if (_mqttClient != null)
  285. {
  286. await _mqttClient.SubscribeAsync(topicFilter);
  287. }
  288. if (_managedMqttClient != null)
  289. {
  290. await _managedMqttClient.SubscribeAsync(topicFilter);
  291. }
  292. }
  293. catch (Exception exception)
  294. {
  295. Trace.Text += exception + Environment.NewLine;
  296. }
  297. }
  298. private async void Unsubscribe(object sender, RoutedEventArgs e)
  299. {
  300. try
  301. {
  302. if (_mqttClient != null)
  303. {
  304. await _mqttClient.UnsubscribeAsync(SubscribeTopic.Text);
  305. }
  306. if (_managedMqttClient != null)
  307. {
  308. await _managedMqttClient.UnsubscribeAsync(SubscribeTopic.Text);
  309. }
  310. }
  311. catch (Exception exception)
  312. {
  313. Trace.Text += exception + Environment.NewLine;
  314. }
  315. }
  316. // This code is for the Wiki at GitHub!
  317. // ReSharper disable once UnusedMember.Local
  318. private async void StartServer(object sender, RoutedEventArgs e)
  319. {
  320. if (_mqttServer != null)
  321. {
  322. return;
  323. }
  324. JsonServerStorage storage = null;
  325. if (ServerPersistRetainedMessages.IsChecked == true)
  326. {
  327. storage = new JsonServerStorage();
  328. if (ServerClearRetainedMessages.IsChecked == true)
  329. {
  330. storage.Clear();
  331. }
  332. }
  333. _mqttServer = new MqttFactory().CreateMqttServer();
  334. var options = new MqttServerOptions();
  335. options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text);
  336. options.Storage = storage;
  337. options.EnablePersistentSessions = ServerAllowPersistentSessions.IsChecked == true;
  338. await _mqttServer.StartAsync(options);
  339. }
  340. private async void StopServer(object sender, RoutedEventArgs e)
  341. {
  342. if (_mqttServer == null)
  343. {
  344. return;
  345. }
  346. await _mqttServer.StopAsync();
  347. _mqttServer = null;
  348. }
  349. private void ClearReceivedMessages(object sender, RoutedEventArgs e)
  350. {
  351. ReceivedMessages.Items.Clear();
  352. }
  353. private async void ExecuteRpc(object sender, RoutedEventArgs e)
  354. {
  355. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  356. if (RpcQoS1.IsChecked == true)
  357. {
  358. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  359. }
  360. if (RpcQoS2.IsChecked == true)
  361. {
  362. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  363. }
  364. var payload = new byte[0];
  365. if (RpcText.IsChecked == true)
  366. {
  367. payload = Encoding.UTF8.GetBytes(RpcPayload.Text);
  368. }
  369. if (RpcBase64.IsChecked == true)
  370. {
  371. payload = Convert.FromBase64String(RpcPayload.Text);
  372. }
  373. try
  374. {
  375. var rpcClient = new MqttRpcClient(_mqttClient);
  376. var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos);
  377. RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response));
  378. }
  379. catch (MqttCommunicationTimedOutException)
  380. {
  381. RpcResponses.Items.Add(RpcMethod.Text + " >>> [TIMEOUT]");
  382. }
  383. catch (Exception exception)
  384. {
  385. RpcResponses.Items.Add(RpcMethod.Text + " >>> [EXCEPTION (" + exception.Message + ")]");
  386. }
  387. }
  388. private void ClearRpcResponses(object sender, RoutedEventArgs e)
  389. {
  390. RpcResponses.Items.Clear();
  391. }
  392. private void ClearSessions(object sender, RoutedEventArgs e)
  393. {
  394. _sessions.Clear();
  395. }
  396. private void RefreshSessions(object sender, RoutedEventArgs e)
  397. {
  398. if (_mqttServer == null)
  399. {
  400. return;
  401. }
  402. var sessions = _mqttServer.GetClientStatusAsync().GetAwaiter().GetResult();
  403. _sessions.Clear();
  404. foreach (var session in sessions)
  405. {
  406. _sessions.Add(session);
  407. }
  408. ListViewSessions.DataContext = _sessions;
  409. }
  410. #region Wiki Code
  411. private async Task WikiCode()
  412. {
  413. {
  414. // Use a custom identifier for the trace messages.
  415. var clientOptions = new MqttClientOptionsBuilder()
  416. .Build();
  417. }
  418. {
  419. // Create a new MQTT client.
  420. var factory = new MqttFactory();
  421. var client = factory.CreateMqttClient();
  422. // Create TCP based options using the builder.
  423. var options = new MqttClientOptionsBuilder()
  424. .WithClientId("Client1")
  425. .WithTcpServer("broker.hivemq.com")
  426. .WithCredentials("bud", "%spencer%")
  427. .WithTls()
  428. .WithCleanSession()
  429. .Build();
  430. await client.ConnectAsync(options);
  431. // Reconnecting
  432. client.UseDisconnectedHandler(async e =>
  433. {
  434. Console.WriteLine("### DISCONNECTED FROM SERVER ###");
  435. await Task.Delay(TimeSpan.FromSeconds(5));
  436. try
  437. {
  438. await client.ConnectAsync(options);
  439. }
  440. catch
  441. {
  442. Console.WriteLine("### RECONNECTING FAILED ###");
  443. }
  444. });
  445. // Consuming messages
  446. client.UseApplicationMessageReceivedHandler(e =>
  447. {
  448. Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
  449. Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
  450. Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
  451. Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
  452. Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
  453. Console.WriteLine();
  454. });
  455. void Handler(MqttApplicationMessageReceivedEventArgs args)
  456. {
  457. //...
  458. }
  459. client.UseApplicationMessageReceivedHandler(e => Handler(e));
  460. // Subscribe after connect
  461. client.UseConnectedHandler(async e =>
  462. {
  463. Console.WriteLine("### CONNECTED WITH SERVER ###");
  464. // Subscribe to a topic
  465. await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  466. Console.WriteLine("### SUBSCRIBED ###");
  467. });
  468. // Subscribe to a topic
  469. await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  470. // Unsubscribe from a topic
  471. await client.UnsubscribeAsync("my/topic");
  472. // Publish an application message
  473. var applicationMessage = new MqttApplicationMessageBuilder()
  474. .WithTopic("A/B/C")
  475. .WithPayload("Hello World")
  476. .WithAtLeastOnceQoS()
  477. .Build();
  478. await client.PublishAsync(applicationMessage);
  479. }
  480. {
  481. {
  482. // Use TCP connection.
  483. var options = new MqttClientOptionsBuilder()
  484. .WithTcpServer("broker.hivemq.com", 1883) // Port is optional
  485. .Build();
  486. }
  487. {
  488. // Use secure TCP connection.
  489. var options = new MqttClientOptionsBuilder()
  490. .WithTcpServer("broker.hivemq.com")
  491. .WithTls()
  492. .Build();
  493. }
  494. {
  495. // Use WebSocket connection.
  496. var options = new MqttClientOptionsBuilder()
  497. .WithWebSocketServer("broker.hivemq.com:8000/mqtt")
  498. .Build();
  499. }
  500. {
  501. // Create TCP based options manually
  502. var options = new MqttClientOptions
  503. {
  504. ClientId = "Client1",
  505. Credentials = new MqttClientCredentials
  506. {
  507. Username = "bud",
  508. Password = Encoding.UTF8.GetBytes("%spencer%")
  509. },
  510. ChannelOptions = new MqttClientTcpOptions
  511. {
  512. Server = "broker.hivemq.org",
  513. TlsOptions = new MqttClientTlsOptions
  514. {
  515. UseTls = true
  516. }
  517. },
  518. };
  519. }
  520. }
  521. // ----------------------------------
  522. {
  523. var options = new MqttServerOptions();
  524. options.ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
  525. {
  526. if (c.ClientId.Length < 10)
  527. {
  528. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  529. return;
  530. }
  531. if (c.Username != "mySecretUser")
  532. {
  533. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  534. return;
  535. }
  536. if (c.Password != "mySecretPassword")
  537. {
  538. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  539. return;
  540. }
  541. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  542. });
  543. var factory = new MqttFactory();
  544. var mqttServer = factory.CreateMqttServer();
  545. await mqttServer.StartAsync(options);
  546. Console.WriteLine("Press any key to exit.");
  547. Console.ReadLine();
  548. await mqttServer.StopAsync();
  549. }
  550. // ----------------------------------
  551. // For UWP apps:
  552. MqttTcpChannel.CustomIgnorableServerCertificateErrorsResolver = o =>
  553. {
  554. if (o.Server == "server_with_revoked_cert")
  555. {
  556. return new[] { ChainValidationResult.Revoked };
  557. }
  558. return new ChainValidationResult[0];
  559. };
  560. {
  561. // Start a MQTT server.
  562. var mqttServer = new MqttFactory().CreateMqttServer();
  563. await mqttServer.StartAsync(new MqttServerOptions());
  564. Console.WriteLine("Press any key to exit.");
  565. Console.ReadLine();
  566. await mqttServer.StopAsync();
  567. }
  568. {
  569. // Configure MQTT server.
  570. var optionsBuilder = new MqttServerOptionsBuilder()
  571. .WithConnectionBacklog(100)
  572. .WithDefaultEndpointPort(1884);
  573. var options = new MqttServerOptions
  574. {
  575. };
  576. options.ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
  577. {
  578. if (c.ClientId != "Highlander")
  579. {
  580. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  581. return;
  582. }
  583. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  584. });
  585. var mqttServer = new MqttFactory().CreateMqttServer();
  586. await mqttServer.StartAsync(optionsBuilder.Build());
  587. }
  588. {
  589. // Setup client validator.
  590. var options = new MqttServerOptions
  591. {
  592. ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
  593. {
  594. if (c.ClientId.Length < 10)
  595. {
  596. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  597. return;
  598. }
  599. if (c.Username != "mySecretUser")
  600. {
  601. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  602. return;
  603. }
  604. if (c.Password != "mySecretPassword")
  605. {
  606. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  607. return;
  608. }
  609. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  610. })
  611. };
  612. }
  613. {
  614. // Create a new MQTT server.
  615. var mqttServer = new MqttFactory().CreateMqttServer();
  616. }
  617. {
  618. // Setup application message interceptor.
  619. var options = new MqttServerOptionsBuilder()
  620. .WithApplicationMessageInterceptor(context =>
  621. {
  622. if (context.ApplicationMessage.Topic == "my/custom/topic")
  623. {
  624. context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload.");
  625. }
  626. // It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document.
  627. // This is useful when the IoT device has no own clock and the creation time of the message might be important.
  628. })
  629. .Build();
  630. }
  631. {
  632. // Setup subscription interceptor.
  633. var options = new MqttServerOptionsBuilder()
  634. .WithSubscriptionInterceptor(context =>
  635. {
  636. if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
  637. {
  638. context.AcceptSubscription = false;
  639. }
  640. if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
  641. {
  642. context.AcceptSubscription = false;
  643. context.CloseConnection = true;
  644. }
  645. })
  646. .Build();
  647. }
  648. {
  649. // Setup and start a managed MQTT client.
  650. var options = new ManagedMqttClientOptionsBuilder()
  651. .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
  652. .WithClientOptions(new MqttClientOptionsBuilder()
  653. .WithClientId("Client1")
  654. .WithTcpServer("broker.hivemq.com")
  655. .WithTls().Build())
  656. .Build();
  657. var mqttClient = new MqttFactory().CreateManagedMqttClient();
  658. await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  659. await mqttClient.StartAsync(options);
  660. }
  661. {
  662. // Use a custom log ID for the logger.
  663. var factory = new MqttFactory();
  664. var client = factory.CreateMqttClient(new MqttNetLogger("MyCustomId"));
  665. }
  666. {
  667. var client = new MqttFactory().CreateMqttClient();
  668. var message = new MqttApplicationMessageBuilder()
  669. .WithTopic("MyTopic")
  670. .WithPayload("Hello World")
  671. .WithExactlyOnceQoS()
  672. .WithRetainFlag()
  673. .Build();
  674. await client.PublishAsync(message);
  675. }
  676. {
  677. // Write all trace messages to the console window.
  678. MqttNetGlobalLogger.LogMessagePublished += (s, e) =>
  679. {
  680. var trace = $">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}";
  681. if (e.TraceMessage.Exception != null)
  682. {
  683. trace += Environment.NewLine + e.TraceMessage.Exception.ToString();
  684. }
  685. Console.WriteLine(trace);
  686. };
  687. }
  688. }
  689. #endregion
  690. }
  691. }