You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

637 line
21 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.ObjectModel;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using Windows.Security.Cryptography.Certificates;
  7. using Windows.UI.Core;
  8. using Windows.UI.Xaml;
  9. using MQTTnet.Client;
  10. using MQTTnet.Diagnostics;
  11. using MQTTnet.Exceptions;
  12. using MQTTnet.Extensions.ManagedClient;
  13. using MQTTnet.Extensions.Rpc;
  14. using MQTTnet.Implementations;
  15. using MQTTnet.Protocol;
  16. using MQTTnet.Server;
  17. using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
  18. using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
  19. namespace MQTTnet.TestApp.UniversalWindows
  20. {
  21. public sealed partial class MainPage
  22. {
  23. private readonly ConcurrentQueue<MqttNetLogMessage> _traceMessages = new ConcurrentQueue<MqttNetLogMessage>();
  24. private readonly ObservableCollection<IMqttClientSessionStatus> _sessions = new ObservableCollection<IMqttClientSessionStatus>();
  25. private IMqttClient _mqttClient;
  26. private IMqttServer _mqttServer;
  27. public MainPage()
  28. {
  29. InitializeComponent();
  30. MqttNetGlobalLogger.LogMessagePublished += OnTraceMessagePublished;
  31. }
  32. private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
  33. {
  34. _traceMessages.Enqueue(e.TraceMessage);
  35. await UpdateLogAsync();
  36. }
  37. private async Task UpdateLogAsync()
  38. {
  39. while (_traceMessages.Count > 100)
  40. {
  41. _traceMessages.TryDequeue(out _);
  42. }
  43. var logText = new StringBuilder();
  44. foreach (var traceMessage in _traceMessages)
  45. {
  46. logText.AppendFormat(
  47. "[{0:yyyy-MM-dd HH:mm:ss.fff}] [{1}] [{2}] [{3}] [{4}]{5}",
  48. traceMessage.Timestamp,
  49. traceMessage.Level,
  50. traceMessage.Source,
  51. traceMessage.ThreadId,
  52. traceMessage.Message,
  53. Environment.NewLine);
  54. if (traceMessage.Exception != null)
  55. {
  56. logText.AppendLine(traceMessage.Exception.ToString());
  57. }
  58. }
  59. await Trace.Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  60. {
  61. Trace.Text = logText.ToString();
  62. });
  63. }
  64. private async void Connect(object sender, RoutedEventArgs e)
  65. {
  66. var tlsOptions = new MqttClientTlsOptions
  67. {
  68. UseTls = UseTls.IsChecked == true,
  69. IgnoreCertificateChainErrors = true,
  70. IgnoreCertificateRevocationErrors = true,
  71. AllowUntrustedCertificates = true
  72. };
  73. var options = new MqttClientOptions { ClientId = ClientId.Text };
  74. if (UseTcp.IsChecked == true)
  75. {
  76. options.ChannelOptions = new MqttClientTcpOptions
  77. {
  78. Server = Server.Text,
  79. Port = int.Parse(Port.Text),
  80. TlsOptions = tlsOptions
  81. };
  82. }
  83. if (UseWs.IsChecked == true)
  84. {
  85. options.ChannelOptions = new MqttClientWebSocketOptions
  86. {
  87. Uri = Server.Text,
  88. TlsOptions = tlsOptions
  89. };
  90. }
  91. if (options.ChannelOptions == null)
  92. {
  93. throw new InvalidOperationException();
  94. }
  95. options.Credentials = new MqttClientCredentials
  96. {
  97. Username = User.Text,
  98. Password = Password.Text
  99. };
  100. options.CleanSession = CleanSession.IsChecked == true;
  101. options.KeepAlivePeriod = TimeSpan.FromSeconds(double.Parse(KeepAliveInterval.Text));
  102. try
  103. {
  104. if (_mqttClient != null)
  105. {
  106. await _mqttClient.DisconnectAsync();
  107. _mqttClient.ApplicationMessageReceived -= OnApplicationMessageReceived;
  108. _mqttClient.Connected -= OnConnected;
  109. _mqttClient.Disconnected -= OnDisconnected;
  110. }
  111. var factory = new MqttFactory();
  112. _mqttClient = factory.CreateMqttClient();
  113. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  114. _mqttClient.Connected += OnConnected;
  115. _mqttClient.Disconnected += OnDisconnected;
  116. await _mqttClient.ConnectAsync(options);
  117. }
  118. catch (Exception exception)
  119. {
  120. Trace.Text += exception + Environment.NewLine;
  121. }
  122. }
  123. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
  124. {
  125. _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
  126. "", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null));
  127. Task.Run(UpdateLogAsync);
  128. }
  129. private void OnConnected(object sender, MqttClientConnectedEventArgs e)
  130. {
  131. _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
  132. "", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null));
  133. Task.Run(UpdateLogAsync);
  134. }
  135. private async void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  136. {
  137. var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";
  138. await Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  139. {
  140. if (AddReceivedMessagesToList.IsChecked == true)
  141. {
  142. ReceivedMessages.Items.Add(item);
  143. }
  144. });
  145. }
  146. private async void Publish(object sender, RoutedEventArgs e)
  147. {
  148. if (_mqttClient == null)
  149. {
  150. return;
  151. }
  152. try
  153. {
  154. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  155. if (QoS1.IsChecked == true)
  156. {
  157. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  158. }
  159. if (QoS2.IsChecked == true)
  160. {
  161. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  162. }
  163. var payload = new byte[0];
  164. if (Text.IsChecked == true)
  165. {
  166. payload = Encoding.UTF8.GetBytes(Payload.Text);
  167. }
  168. if (Base64.IsChecked == true)
  169. {
  170. payload = Convert.FromBase64String(Payload.Text);
  171. }
  172. var message = new MqttApplicationMessageBuilder()
  173. .WithTopic(Topic.Text)
  174. .WithPayload(payload)
  175. .WithQualityOfServiceLevel(qos)
  176. .WithRetainFlag(Retain.IsChecked == true)
  177. .Build();
  178. await _mqttClient.PublishAsync(message);
  179. }
  180. catch (Exception exception)
  181. {
  182. Trace.Text += exception + Environment.NewLine;
  183. }
  184. }
  185. private async void Disconnect(object sender, RoutedEventArgs e)
  186. {
  187. try
  188. {
  189. await _mqttClient.DisconnectAsync();
  190. }
  191. catch (Exception exception)
  192. {
  193. Trace.Text += exception + Environment.NewLine;
  194. }
  195. }
  196. private void ClearLog(object sender, RoutedEventArgs e)
  197. {
  198. while (_traceMessages.Count > 0)
  199. {
  200. _traceMessages.TryDequeue(out _);
  201. }
  202. Trace.Text = string.Empty;
  203. }
  204. private async void Subscribe(object sender, RoutedEventArgs e)
  205. {
  206. if (_mqttClient == null)
  207. {
  208. return;
  209. }
  210. try
  211. {
  212. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  213. if (SubscribeQoS1.IsChecked == true)
  214. {
  215. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  216. }
  217. if (SubscribeQoS2.IsChecked == true)
  218. {
  219. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  220. }
  221. await _mqttClient.SubscribeAsync(new TopicFilter(SubscribeTopic.Text, qos));
  222. }
  223. catch (Exception exception)
  224. {
  225. Trace.Text += exception + Environment.NewLine;
  226. }
  227. }
  228. private async void Unsubscribe(object sender, RoutedEventArgs e)
  229. {
  230. if (_mqttClient == null)
  231. {
  232. return;
  233. }
  234. try
  235. {
  236. await _mqttClient.UnsubscribeAsync(SubscribeTopic.Text);
  237. }
  238. catch (Exception exception)
  239. {
  240. Trace.Text += exception + Environment.NewLine;
  241. }
  242. }
  243. // This code is for the Wiki at GitHub!
  244. // ReSharper disable once UnusedMember.Local
  245. private async void StartServer(object sender, RoutedEventArgs e)
  246. {
  247. if (_mqttServer != null)
  248. {
  249. return;
  250. }
  251. JsonServerStorage storage = null;
  252. if (ServerPersistRetainedMessages.IsChecked == true)
  253. {
  254. storage = new JsonServerStorage();
  255. if (ServerClearRetainedMessages.IsChecked == true)
  256. {
  257. storage.Clear();
  258. }
  259. }
  260. _mqttServer = new MqttFactory().CreateMqttServer();
  261. var options = new MqttServerOptions();
  262. options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text);
  263. options.Storage = storage;
  264. options.EnablePersistentSessions = ServerAllowPersistentSessions.IsChecked == true;
  265. await _mqttServer.StartAsync(options);
  266. }
  267. private async void StopServer(object sender, RoutedEventArgs e)
  268. {
  269. if (_mqttServer == null)
  270. {
  271. return;
  272. }
  273. await _mqttServer.StopAsync();
  274. _mqttServer = null;
  275. }
  276. private void ClearReceivedMessages(object sender, RoutedEventArgs e)
  277. {
  278. ReceivedMessages.Items.Clear();
  279. }
  280. private async void ExecuteRpc(object sender, RoutedEventArgs e)
  281. {
  282. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  283. if (RpcQoS1.IsChecked == true)
  284. {
  285. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  286. }
  287. if (RpcQoS2.IsChecked == true)
  288. {
  289. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  290. }
  291. var payload = new byte[0];
  292. if (RpcText.IsChecked == true)
  293. {
  294. payload = Encoding.UTF8.GetBytes(RpcPayload.Text);
  295. }
  296. if (RpcBase64.IsChecked == true)
  297. {
  298. payload = Convert.FromBase64String(RpcPayload.Text);
  299. }
  300. try
  301. {
  302. var rpcClient = new MqttRpcClient(_mqttClient);
  303. var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos);
  304. RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response));
  305. }
  306. catch (MqttCommunicationTimedOutException)
  307. {
  308. RpcResponses.Items.Add(RpcMethod.Text + " >>> [TIMEOUT]");
  309. }
  310. catch (Exception exception)
  311. {
  312. RpcResponses.Items.Add(RpcMethod.Text + " >>> [EXCEPTION (" + exception.Message + ")]");
  313. }
  314. }
  315. private void ClearRpcResponses(object sender, RoutedEventArgs e)
  316. {
  317. RpcResponses.Items.Clear();
  318. }
  319. private void ClearSessions(object sender, RoutedEventArgs e)
  320. {
  321. _sessions.Clear();
  322. }
  323. private async void RefreshSessions(object sender, RoutedEventArgs e)
  324. {
  325. if (_mqttServer == null)
  326. {
  327. return;
  328. }
  329. var sessions = await _mqttServer.GetClientSessionsStatusAsync();
  330. _sessions.Clear();
  331. foreach (var session in sessions)
  332. {
  333. _sessions.Add(session);
  334. }
  335. ListViewSessions.DataContext = _sessions;
  336. }
  337. private async Task WikiCode()
  338. {
  339. {
  340. // Write all trace messages to the console window.
  341. MqttNetGlobalLogger.LogMessagePublished += (s, e) =>
  342. {
  343. Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}");
  344. if (e.TraceMessage.Exception != null)
  345. {
  346. Console.WriteLine(e.TraceMessage.Exception);
  347. }
  348. };
  349. }
  350. {
  351. // Use a custom identifier for the trace messages.
  352. var clientOptions = new MqttClientOptionsBuilder()
  353. .Build();
  354. }
  355. {
  356. // Create a new MQTT client.
  357. var factory = new MqttFactory();
  358. var mqttClient = factory.CreateMqttClient();
  359. {
  360. // Create TCP based options using the builder.
  361. var options = new MqttClientOptionsBuilder()
  362. .WithClientId("Client1")
  363. .WithTcpServer("broker.hivemq.com")
  364. .WithCredentials("bud", "%spencer%")
  365. .WithTls()
  366. .WithCleanSession()
  367. .Build();
  368. await mqttClient.ConnectAsync(options);
  369. }
  370. {
  371. // Use TCP connection.
  372. var options = new MqttClientOptionsBuilder()
  373. .WithTcpServer("broker.hivemq.com", 1883) // Port is optional
  374. .Build();
  375. }
  376. {
  377. // Use secure TCP connection.
  378. var options = new MqttClientOptionsBuilder()
  379. .WithTcpServer("broker.hivemq.com")
  380. .WithTls()
  381. .Build();
  382. }
  383. {
  384. // Use WebSocket connection.
  385. var options = new MqttClientOptionsBuilder()
  386. .WithWebSocketServer("broker.hivemq.com:8000/mqtt")
  387. .Build();
  388. await mqttClient.ConnectAsync(options);
  389. }
  390. {
  391. // Create TCP based options manually
  392. var options = new MqttClientOptions
  393. {
  394. ClientId = "Client1",
  395. Credentials = new MqttClientCredentials
  396. {
  397. Username = "bud",
  398. Password = "%spencer%"
  399. },
  400. ChannelOptions = new MqttClientTcpOptions
  401. {
  402. Server = "broker.hivemq.org",
  403. TlsOptions = new MqttClientTlsOptions
  404. {
  405. UseTls = true
  406. }
  407. },
  408. };
  409. }
  410. {
  411. // Subscribe to a topic
  412. await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  413. // Unsubscribe from a topic
  414. await mqttClient.UnsubscribeAsync("my/topic");
  415. // Publish an application message
  416. var applicationMessage = new MqttApplicationMessageBuilder()
  417. .WithTopic("A/B/C")
  418. .WithPayload("Hello World")
  419. .WithAtLeastOnceQoS()
  420. .Build();
  421. await mqttClient.PublishAsync(applicationMessage);
  422. }
  423. }
  424. // ----------------------------------
  425. {
  426. var options = new MqttServerOptions();
  427. options.ConnectionValidator = c =>
  428. {
  429. if (c.ClientId.Length < 10)
  430. {
  431. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  432. return;
  433. }
  434. if (c.Username != "mySecretUser")
  435. {
  436. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  437. return;
  438. }
  439. if (c.Password != "mySecretPassword")
  440. {
  441. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  442. return;
  443. }
  444. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  445. };
  446. var factory = new MqttFactory();
  447. var mqttServer = factory.CreateMqttServer();
  448. await mqttServer.StartAsync(options);
  449. Console.WriteLine("Press any key to exit.");
  450. Console.ReadLine();
  451. await mqttServer.StopAsync();
  452. }
  453. // ----------------------------------
  454. // For UWP apps:
  455. MqttTcpChannel.CustomIgnorableServerCertificateErrorsResolver = o =>
  456. {
  457. if (o.Server == "server_with_revoked_cert")
  458. {
  459. return new[] { ChainValidationResult.Revoked };
  460. }
  461. return new ChainValidationResult[0];
  462. };
  463. {
  464. // Start a MQTT server.
  465. var mqttServer = new MqttFactory().CreateMqttServer();
  466. await mqttServer.StartAsync(new MqttServerOptions());
  467. Console.WriteLine("Press any key to exit.");
  468. Console.ReadLine();
  469. await mqttServer.StopAsync();
  470. }
  471. {
  472. // Configure MQTT server.
  473. var optionsBuilder = new MqttServerOptionsBuilder()
  474. .WithConnectionBacklog(100)
  475. .WithDefaultEndpointPort(1884);
  476. var options = new MqttServerOptions
  477. {
  478. };
  479. options.ConnectionValidator = c =>
  480. {
  481. if (c.ClientId != "Highlander")
  482. {
  483. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  484. return;
  485. }
  486. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  487. };
  488. var mqttServer = new MqttFactory().CreateMqttServer();
  489. await mqttServer.StartAsync(optionsBuilder.Build());
  490. }
  491. {
  492. // Setup client validator.
  493. var options = new MqttServerOptions
  494. {
  495. ConnectionValidator = c =>
  496. {
  497. if (c.ClientId.Length < 10)
  498. {
  499. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  500. return;
  501. }
  502. if (c.Username != "mySecretUser")
  503. {
  504. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  505. return;
  506. }
  507. if (c.Password != "mySecretPassword")
  508. {
  509. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  510. return;
  511. }
  512. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  513. }
  514. };
  515. }
  516. {
  517. // Create a new MQTT server.
  518. var mqttServer = new MqttFactory().CreateMqttServer();
  519. }
  520. {
  521. // Setup and start a managed MQTT client.
  522. var options = new ManagedMqttClientOptionsBuilder()
  523. .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
  524. .WithClientOptions(new MqttClientOptionsBuilder()
  525. .WithClientId("Client1")
  526. .WithTcpServer("broker.hivemq.com")
  527. .WithTls().Build())
  528. .Build();
  529. var mqttClient = new MqttFactory().CreateManagedMqttClient();
  530. await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  531. await mqttClient.StartAsync(options);
  532. }
  533. }
  534. }
  535. }