You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

612 lines
20 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Text;
  4. using System.Threading.Tasks;
  5. using Windows.Security.Cryptography.Certificates;
  6. using Windows.UI.Core;
  7. using Windows.UI.Xaml;
  8. using MQTTnet.Client;
  9. using MQTTnet.Diagnostics;
  10. using MQTTnet.Exceptions;
  11. using MQTTnet.Extensions.Rpc;
  12. using MQTTnet.Implementations;
  13. using MQTTnet.ManagedClient;
  14. using MQTTnet.Protocol;
  15. using MQTTnet.Server;
  16. using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
  17. using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
  18. namespace MQTTnet.TestApp.UniversalWindows
  19. {
  20. public sealed partial class MainPage
  21. {
  22. private readonly ConcurrentQueue<MqttNetLogMessage> _traceMessages = new ConcurrentQueue<MqttNetLogMessage>();
  23. private IMqttClient _mqttClient;
  24. private IMqttServer _mqttServer;
  25. public MainPage()
  26. {
  27. InitializeComponent();
  28. MqttNetGlobalLogger.LogMessagePublished += OnTraceMessagePublished;
  29. }
  30. private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
  31. {
  32. _traceMessages.Enqueue(e.TraceMessage);
  33. await UpdateLogAsync();
  34. }
  35. private async Task UpdateLogAsync()
  36. {
  37. while (_traceMessages.Count > 100)
  38. {
  39. _traceMessages.TryDequeue(out _);
  40. }
  41. var logText = new StringBuilder();
  42. foreach (var traceMessage in _traceMessages)
  43. {
  44. logText.AppendFormat(
  45. "[{0:yyyy-MM-dd HH:mm:ss.fff}] [{1}] [{2}] [{3}] [{4}]{5}",
  46. traceMessage.Timestamp,
  47. traceMessage.Level,
  48. traceMessage.Source,
  49. traceMessage.ThreadId,
  50. traceMessage.Message,
  51. Environment.NewLine);
  52. if (traceMessage.Exception != null)
  53. {
  54. logText.AppendLine(traceMessage.Exception.ToString());
  55. }
  56. }
  57. await Trace.Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  58. {
  59. Trace.Text = logText.ToString();
  60. });
  61. }
  62. private async void Connect(object sender, RoutedEventArgs e)
  63. {
  64. var tlsOptions = new MqttClientTlsOptions
  65. {
  66. UseTls = UseTls.IsChecked == true,
  67. IgnoreCertificateChainErrors = true,
  68. IgnoreCertificateRevocationErrors = true,
  69. AllowUntrustedCertificates = true
  70. };
  71. var options = new MqttClientOptions { ClientId = ClientId.Text };
  72. if (UseTcp.IsChecked == true)
  73. {
  74. options.ChannelOptions = new MqttClientTcpOptions
  75. {
  76. Server = Server.Text,
  77. Port = int.Parse(Port.Text),
  78. TlsOptions = tlsOptions
  79. };
  80. }
  81. if (UseWs.IsChecked == true)
  82. {
  83. options.ChannelOptions = new MqttClientWebSocketOptions
  84. {
  85. Uri = Server.Text,
  86. TlsOptions = tlsOptions
  87. };
  88. }
  89. if (options.ChannelOptions == null)
  90. {
  91. throw new InvalidOperationException();
  92. }
  93. options.Credentials = new MqttClientCredentials
  94. {
  95. Username = User.Text,
  96. Password = Password.Text
  97. };
  98. options.CleanSession = CleanSession.IsChecked == true;
  99. options.KeepAlivePeriod = TimeSpan.FromSeconds(double.Parse(KeepAliveInterval.Text));
  100. try
  101. {
  102. if (_mqttClient != null)
  103. {
  104. await _mqttClient.DisconnectAsync();
  105. _mqttClient.ApplicationMessageReceived -= OnApplicationMessageReceived;
  106. _mqttClient.Connected -= OnConnected;
  107. _mqttClient.Disconnected -= OnDisconnected;
  108. }
  109. var factory = new MqttFactory();
  110. _mqttClient = factory.CreateMqttClient();
  111. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  112. _mqttClient.Connected += OnConnected;
  113. _mqttClient.Disconnected += OnDisconnected;
  114. await _mqttClient.ConnectAsync(options);
  115. }
  116. catch (Exception exception)
  117. {
  118. Trace.Text += exception + Environment.NewLine;
  119. }
  120. }
  121. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
  122. {
  123. _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
  124. "", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null));
  125. Task.Run(UpdateLogAsync);
  126. }
  127. private void OnConnected(object sender, MqttClientConnectedEventArgs e)
  128. {
  129. _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
  130. "", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null));
  131. Task.Run(UpdateLogAsync);
  132. }
  133. private async void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  134. {
  135. var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";
  136. await Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  137. {
  138. if (AddReceivedMessagesToList.IsChecked == true)
  139. {
  140. ReceivedMessages.Items.Add(item);
  141. }
  142. });
  143. }
  144. private async void Publish(object sender, RoutedEventArgs e)
  145. {
  146. if (_mqttClient == null)
  147. {
  148. return;
  149. }
  150. try
  151. {
  152. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  153. if (QoS1.IsChecked == true)
  154. {
  155. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  156. }
  157. if (QoS2.IsChecked == true)
  158. {
  159. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  160. }
  161. var payload = new byte[0];
  162. if (Text.IsChecked == true)
  163. {
  164. payload = Encoding.UTF8.GetBytes(Payload.Text);
  165. }
  166. if (Base64.IsChecked == true)
  167. {
  168. payload = Convert.FromBase64String(Payload.Text);
  169. }
  170. var message = new MqttApplicationMessageBuilder()
  171. .WithTopic(Topic.Text)
  172. .WithPayload(payload)
  173. .WithQualityOfServiceLevel(qos)
  174. .WithRetainFlag(Retain.IsChecked == true)
  175. .Build();
  176. await _mqttClient.PublishAsync(message);
  177. }
  178. catch (Exception exception)
  179. {
  180. Trace.Text += exception + Environment.NewLine;
  181. }
  182. }
  183. private async void Disconnect(object sender, RoutedEventArgs e)
  184. {
  185. try
  186. {
  187. await _mqttClient.DisconnectAsync();
  188. }
  189. catch (Exception exception)
  190. {
  191. Trace.Text += exception + Environment.NewLine;
  192. }
  193. }
  194. private void ClearLog(object sender, RoutedEventArgs e)
  195. {
  196. while (_traceMessages.Count > 0)
  197. {
  198. _traceMessages.TryDequeue(out _);
  199. }
  200. Trace.Text = string.Empty;
  201. }
  202. private async void Subscribe(object sender, RoutedEventArgs e)
  203. {
  204. if (_mqttClient == null)
  205. {
  206. return;
  207. }
  208. try
  209. {
  210. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  211. if (SubscribeQoS1.IsChecked == true)
  212. {
  213. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  214. }
  215. if (SubscribeQoS2.IsChecked == true)
  216. {
  217. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  218. }
  219. await _mqttClient.SubscribeAsync(new TopicFilter(SubscribeTopic.Text, qos));
  220. }
  221. catch (Exception exception)
  222. {
  223. Trace.Text += exception + Environment.NewLine;
  224. }
  225. }
  226. private async void Unsubscribe(object sender, RoutedEventArgs e)
  227. {
  228. if (_mqttClient == null)
  229. {
  230. return;
  231. }
  232. try
  233. {
  234. await _mqttClient.UnsubscribeAsync(SubscribeTopic.Text);
  235. }
  236. catch (Exception exception)
  237. {
  238. Trace.Text += exception + Environment.NewLine;
  239. }
  240. }
  241. // This code is for the Wiki at GitHub!
  242. // ReSharper disable once UnusedMember.Local
  243. private async void StartServer(object sender, RoutedEventArgs e)
  244. {
  245. if (_mqttServer != null)
  246. {
  247. return;
  248. }
  249. JsonServerStorage storage = null;
  250. if (ServerPersistRetainedMessages.IsChecked == true)
  251. {
  252. storage = new JsonServerStorage();
  253. if (ServerClearRetainedMessages.IsChecked == true)
  254. {
  255. storage.Clear();
  256. }
  257. }
  258. _mqttServer = new MqttFactory().CreateMqttServer();
  259. var options = new MqttServerOptions();
  260. options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text);
  261. options.Storage = storage;
  262. await _mqttServer.StartAsync(options);
  263. }
  264. private async void StopServer(object sender, RoutedEventArgs e)
  265. {
  266. if (_mqttServer == null)
  267. {
  268. return;
  269. }
  270. await _mqttServer.StopAsync();
  271. _mqttServer = null;
  272. }
  273. private void ClearReceivedMessages(object sender, RoutedEventArgs e)
  274. {
  275. ReceivedMessages.Items.Clear();
  276. }
  277. private async void ExecuteRpc(object sender, RoutedEventArgs e)
  278. {
  279. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  280. if (RpcQoS1.IsChecked == true)
  281. {
  282. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  283. }
  284. if (RpcQoS2.IsChecked == true)
  285. {
  286. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  287. }
  288. var payload = new byte[0];
  289. if (RpcText.IsChecked == true)
  290. {
  291. payload = Encoding.UTF8.GetBytes(RpcPayload.Text);
  292. }
  293. if (RpcBase64.IsChecked == true)
  294. {
  295. payload = Convert.FromBase64String(RpcPayload.Text);
  296. }
  297. try
  298. {
  299. var rpcClient = new MqttRpcClient(_mqttClient);
  300. var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos);
  301. RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response));
  302. }
  303. catch (MqttCommunicationTimedOutException)
  304. {
  305. RpcResponses.Items.Add(RpcMethod.Text + " >>> [TIMEOUT]");
  306. }
  307. catch (Exception exception)
  308. {
  309. RpcResponses.Items.Add(RpcMethod.Text + " >>> [EXCEPTION (" + exception.Message + ")]");
  310. }
  311. }
  312. private void ClearRpcResponses(object sender, RoutedEventArgs e)
  313. {
  314. RpcResponses.Items.Clear();
  315. }
  316. private async Task WikiCode()
  317. {
  318. {
  319. // Write all trace messages to the console window.
  320. MqttNetGlobalLogger.LogMessagePublished += (s, e) =>
  321. {
  322. Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}");
  323. if (e.TraceMessage.Exception != null)
  324. {
  325. Console.WriteLine(e.TraceMessage.Exception);
  326. }
  327. };
  328. }
  329. {
  330. // Use a custom identifier for the trace messages.
  331. var clientOptions = new MqttClientOptionsBuilder()
  332. .Build();
  333. }
  334. {
  335. // Create a new MQTT client.
  336. var factory = new MqttFactory();
  337. var mqttClient = factory.CreateMqttClient();
  338. {
  339. // Create TCP based options using the builder.
  340. var options = new MqttClientOptionsBuilder()
  341. .WithClientId("Client1")
  342. .WithTcpServer("broker.hivemq.com")
  343. .WithCredentials("bud", "%spencer%")
  344. .WithTls()
  345. .WithCleanSession()
  346. .Build();
  347. await mqttClient.ConnectAsync(options);
  348. }
  349. {
  350. // Use TCP connection.
  351. var options = new MqttClientOptionsBuilder()
  352. .WithTcpServer("broker.hivemq.com", 1883) // Port is optional
  353. .Build();
  354. }
  355. {
  356. // Use secure TCP connection.
  357. var options = new MqttClientOptionsBuilder()
  358. .WithTcpServer("broker.hivemq.com")
  359. .WithTls()
  360. .Build();
  361. }
  362. {
  363. // Use WebSocket connection.
  364. var options = new MqttClientOptionsBuilder()
  365. .WithWebSocketServer("broker.hivemq.com:8000/mqtt")
  366. .Build();
  367. await mqttClient.ConnectAsync(options);
  368. }
  369. {
  370. // Create TCP based options manually
  371. var options = new MqttClientOptions
  372. {
  373. ClientId = "Client1",
  374. Credentials = new MqttClientCredentials
  375. {
  376. Username = "bud",
  377. Password = "%spencer%"
  378. },
  379. ChannelOptions = new MqttClientTcpOptions
  380. {
  381. Server = "broker.hivemq.org",
  382. TlsOptions = new MqttClientTlsOptions
  383. {
  384. UseTls = true
  385. }
  386. },
  387. };
  388. }
  389. {
  390. // Subscribe to a topic
  391. await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  392. // Unsubscribe from a topic
  393. await mqttClient.UnsubscribeAsync("my/topic");
  394. // Publish an application message
  395. var applicationMessage = new MqttApplicationMessageBuilder()
  396. .WithTopic("A/B/C")
  397. .WithPayload("Hello World")
  398. .WithAtLeastOnceQoS()
  399. .Build();
  400. await mqttClient.PublishAsync(applicationMessage);
  401. }
  402. }
  403. // ----------------------------------
  404. {
  405. var options = new MqttServerOptions();
  406. options.ConnectionValidator = c =>
  407. {
  408. if (c.ClientId.Length < 10)
  409. {
  410. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  411. return;
  412. }
  413. if (c.Username != "mySecretUser")
  414. {
  415. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  416. return;
  417. }
  418. if (c.Password != "mySecretPassword")
  419. {
  420. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  421. return;
  422. }
  423. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  424. };
  425. var factory = new MqttFactory();
  426. var mqttServer = factory.CreateMqttServer();
  427. await mqttServer.StartAsync(options);
  428. Console.WriteLine("Press any key to exit.");
  429. Console.ReadLine();
  430. await mqttServer.StopAsync();
  431. }
  432. // ----------------------------------
  433. // For UWP apps:
  434. MqttTcpChannel.CustomIgnorableServerCertificateErrorsResolver = o =>
  435. {
  436. if (o.Server == "server_with_revoked_cert")
  437. {
  438. return new[] { ChainValidationResult.Revoked };
  439. }
  440. return new ChainValidationResult[0];
  441. };
  442. {
  443. // Start a MQTT server.
  444. var mqttServer = new MqttFactory().CreateMqttServer();
  445. await mqttServer.StartAsync(new MqttServerOptions());
  446. Console.WriteLine("Press any key to exit.");
  447. Console.ReadLine();
  448. await mqttServer.StopAsync();
  449. }
  450. {
  451. // Configure MQTT server.
  452. var optionsBuilder = new MqttServerOptionsBuilder()
  453. .WithConnectionBacklog(100)
  454. .WithDefaultEndpointPort(1884);
  455. var options = new MqttServerOptions
  456. {
  457. };
  458. options.ConnectionValidator = c =>
  459. {
  460. if (c.ClientId != "Highlander")
  461. {
  462. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  463. return;
  464. }
  465. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  466. };
  467. var mqttServer = new MqttFactory().CreateMqttServer();
  468. await mqttServer.StartAsync(optionsBuilder.Build());
  469. }
  470. {
  471. // Setup client validator.
  472. var options = new MqttServerOptions
  473. {
  474. ConnectionValidator = c =>
  475. {
  476. if (c.ClientId.Length < 10)
  477. {
  478. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  479. return;
  480. }
  481. if (c.Username != "mySecretUser")
  482. {
  483. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  484. return;
  485. }
  486. if (c.Password != "mySecretPassword")
  487. {
  488. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  489. return;
  490. }
  491. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  492. }
  493. };
  494. }
  495. {
  496. // Create a new MQTT server.
  497. var mqttServer = new MqttFactory().CreateMqttServer();
  498. }
  499. {
  500. // Setup and start a managed MQTT client.
  501. var options = new ManagedMqttClientOptionsBuilder()
  502. .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
  503. .WithClientOptions(new MqttClientOptionsBuilder()
  504. .WithClientId("Client1")
  505. .WithTcpServer("broker.hivemq.com")
  506. .WithTls().Build())
  507. .Build();
  508. var mqttClient = new MqttFactory().CreateManagedMqttClient();
  509. await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  510. await mqttClient.StartAsync(options);
  511. }
  512. }
  513. }
  514. }