You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

620 regels
20 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Text;
  4. using System.Threading.Tasks;
  5. using Windows.Security.Cryptography.Certificates;
  6. using Windows.UI.Core;
  7. using Windows.UI.Xaml;
  8. using MQTTnet.Client;
  9. using MQTTnet.Diagnostics;
  10. using MQTTnet.Exceptions;
  11. using MQTTnet.Extensions.Rpc;
  12. using MQTTnet.Implementations;
  13. using MQTTnet.ManagedClient;
  14. using MQTTnet.Protocol;
  15. using MQTTnet.Server;
  16. using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
  17. using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
  18. namespace MQTTnet.TestApp.UniversalWindows
  19. {
  20. public sealed partial class MainPage
  21. {
  22. private readonly ConcurrentQueue<MqttNetLogMessage> _traceMessages = new ConcurrentQueue<MqttNetLogMessage>();
  23. private IMqttClient _mqttClient;
  24. private IMqttServer _mqttServer;
  25. public MainPage()
  26. {
  27. InitializeComponent();
  28. MqttNetGlobalLogger.LogMessagePublished += OnTraceMessagePublished;
  29. }
  30. private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
  31. {
  32. _traceMessages.Enqueue(e.TraceMessage);
  33. await UpdateLogAsync();
  34. }
  35. private async Task UpdateLogAsync()
  36. {
  37. while (_traceMessages.Count > 100)
  38. {
  39. _traceMessages.TryDequeue(out _);
  40. }
  41. var logText = new StringBuilder();
  42. foreach (var traceMessage in _traceMessages)
  43. {
  44. logText.AppendFormat(
  45. "[{0:yyyy-MM-dd HH:mm:ss.fff}] [{1}] [{2}] [{3}] [{4}]{5}",
  46. traceMessage.Timestamp,
  47. traceMessage.Level,
  48. traceMessage.Source,
  49. traceMessage.ThreadId,
  50. traceMessage.Message,
  51. Environment.NewLine);
  52. if (traceMessage.Exception != null)
  53. {
  54. logText.AppendLine(traceMessage.Exception.ToString());
  55. }
  56. }
  57. await Trace.Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  58. {
  59. Trace.Text = logText.ToString();
  60. });
  61. }
  62. private async void Connect(object sender, RoutedEventArgs e)
  63. {
  64. var tlsOptions = new MqttClientTlsOptions
  65. {
  66. UseTls = UseTls.IsChecked == true,
  67. IgnoreCertificateChainErrors = true,
  68. IgnoreCertificateRevocationErrors = true,
  69. AllowUntrustedCertificates = true
  70. };
  71. var options = new MqttClientOptions { ClientId = ClientId.Text };
  72. if (UseTcp.IsChecked == true)
  73. {
  74. options.ChannelOptions = new MqttClientTcpOptions
  75. {
  76. Server = Server.Text,
  77. Port = int.Parse(Port.Text),
  78. TlsOptions = tlsOptions
  79. };
  80. }
  81. if (UseWs.IsChecked == true)
  82. {
  83. options.ChannelOptions = new MqttClientWebSocketOptions
  84. {
  85. Uri = Server.Text,
  86. TlsOptions = tlsOptions
  87. };
  88. }
  89. if (options.ChannelOptions == null)
  90. {
  91. throw new InvalidOperationException();
  92. }
  93. options.Credentials = new MqttClientCredentials
  94. {
  95. Username = User.Text,
  96. Password = Password.Text
  97. };
  98. options.CleanSession = CleanSession.IsChecked == true;
  99. options.KeepAlivePeriod = TimeSpan.FromSeconds(double.Parse(KeepAliveInterval.Text));
  100. try
  101. {
  102. if (_mqttClient != null)
  103. {
  104. await _mqttClient.DisconnectAsync();
  105. _mqttClient.ApplicationMessageReceived -= OnApplicationMessageReceived;
  106. _mqttClient.Connected -= OnConnected;
  107. _mqttClient.Disconnected -= OnDisconnected;
  108. }
  109. var factory = new MqttFactory();
  110. _mqttClient = factory.CreateMqttClient();
  111. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  112. _mqttClient.Connected += OnConnected;
  113. _mqttClient.Disconnected += OnDisconnected;
  114. await _mqttClient.ConnectAsync(options);
  115. }
  116. catch (Exception exception)
  117. {
  118. Trace.Text += exception + Environment.NewLine;
  119. }
  120. }
  121. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
  122. {
  123. _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
  124. "", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null));
  125. Task.Run(UpdateLogAsync);
  126. }
  127. private void OnConnected(object sender, MqttClientConnectedEventArgs e)
  128. {
  129. _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
  130. "", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null));
  131. Task.Run(UpdateLogAsync);
  132. }
  133. private async void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  134. {
  135. var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";
  136. await Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
  137. {
  138. if (AddReceivedMessagesToList.IsChecked == true)
  139. {
  140. ReceivedMessages.Items.Add(item);
  141. }
  142. });
  143. }
  144. private async void Publish(object sender, RoutedEventArgs e)
  145. {
  146. if (_mqttClient == null)
  147. {
  148. return;
  149. }
  150. try
  151. {
  152. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  153. if (QoS1.IsChecked == true)
  154. {
  155. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  156. }
  157. if (QoS2.IsChecked == true)
  158. {
  159. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  160. }
  161. var payload = new byte[0];
  162. if (Text.IsChecked == true)
  163. {
  164. payload = Encoding.UTF8.GetBytes(Payload.Text);
  165. }
  166. if (Base64.IsChecked == true)
  167. {
  168. payload = Convert.FromBase64String(Payload.Text);
  169. }
  170. var message = new MqttApplicationMessageBuilder()
  171. .WithTopic(Topic.Text)
  172. .WithPayload(payload)
  173. .WithQualityOfServiceLevel(qos)
  174. .WithRetainFlag(Retain.IsChecked == true)
  175. .Build();
  176. await _mqttClient.PublishAsync(message);
  177. }
  178. catch (Exception exception)
  179. {
  180. Trace.Text += exception + Environment.NewLine;
  181. }
  182. }
  183. private async void Disconnect(object sender, RoutedEventArgs e)
  184. {
  185. try
  186. {
  187. await _mqttClient.DisconnectAsync();
  188. }
  189. catch (Exception exception)
  190. {
  191. Trace.Text += exception + Environment.NewLine;
  192. }
  193. }
  194. private void ClearLog(object sender, RoutedEventArgs e)
  195. {
  196. while (_traceMessages.Count > 0)
  197. {
  198. _traceMessages.TryDequeue(out _);
  199. }
  200. Trace.Text = string.Empty;
  201. }
  202. private async void Subscribe(object sender, RoutedEventArgs e)
  203. {
  204. if (_mqttClient == null)
  205. {
  206. return;
  207. }
  208. try
  209. {
  210. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  211. if (SubscribeQoS1.IsChecked == true)
  212. {
  213. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  214. }
  215. if (SubscribeQoS2.IsChecked == true)
  216. {
  217. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  218. }
  219. await _mqttClient.SubscribeAsync(new TopicFilter(SubscribeTopic.Text, qos));
  220. }
  221. catch (Exception exception)
  222. {
  223. Trace.Text += exception + Environment.NewLine;
  224. }
  225. }
  226. private async void Unsubscribe(object sender, RoutedEventArgs e)
  227. {
  228. if (_mqttClient == null)
  229. {
  230. return;
  231. }
  232. try
  233. {
  234. await _mqttClient.UnsubscribeAsync(SubscribeTopic.Text);
  235. }
  236. catch (Exception exception)
  237. {
  238. Trace.Text += exception + Environment.NewLine;
  239. }
  240. }
  241. // This code is for the Wiki at GitHub!
  242. // ReSharper disable once UnusedMember.Local
  243. private async void StartServer(object sender, RoutedEventArgs e)
  244. {
  245. if (_mqttServer != null)
  246. {
  247. return;
  248. }
  249. JsonServerStorage storage = null;
  250. if (ServerPersistRetainedMessages.IsChecked == true)
  251. {
  252. storage = new JsonServerStorage();
  253. if (ServerClearRetainedMessages.IsChecked == true)
  254. {
  255. storage.Clear();
  256. }
  257. }
  258. _mqttServer = new MqttFactory().CreateMqttServer();
  259. var options = new MqttServerOptions();
  260. options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text);
  261. options.Storage = storage;
  262. await _mqttServer.StartAsync(options);
  263. }
  264. private async void StopServer(object sender, RoutedEventArgs e)
  265. {
  266. if (_mqttServer == null)
  267. {
  268. return;
  269. }
  270. await _mqttServer.StopAsync();
  271. _mqttServer = null;
  272. }
  273. private void ClearReceivedMessages(object sender, RoutedEventArgs e)
  274. {
  275. ReceivedMessages.Items.Clear();
  276. }
  277. private async void ExecuteRpc(object sender, RoutedEventArgs e)
  278. {
  279. var qos = MqttQualityOfServiceLevel.AtMostOnce;
  280. if (RpcQoS1.IsChecked == true)
  281. {
  282. qos = MqttQualityOfServiceLevel.AtLeastOnce;
  283. }
  284. if (RpcQoS2.IsChecked == true)
  285. {
  286. qos = MqttQualityOfServiceLevel.ExactlyOnce;
  287. }
  288. var payload = new byte[0];
  289. if (RpcText.IsChecked == true)
  290. {
  291. payload = Encoding.UTF8.GetBytes(RpcPayload.Text);
  292. }
  293. if (RpcBase64.IsChecked == true)
  294. {
  295. payload = Convert.FromBase64String(RpcPayload.Text);
  296. }
  297. try
  298. {
  299. var rpcClient = new MqttRpcClient(_mqttClient);
  300. var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos);
  301. RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response));
  302. }
  303. catch (MqttCommunicationTimedOutException)
  304. {
  305. RpcResponses.Items.Add(RpcMethod.Text + " >>> [TIMEOUT]");
  306. }
  307. catch (Exception exception)
  308. {
  309. RpcResponses.Items.Add(RpcMethod.Text + " >>> [EXCEPTION (" + exception.Message + ")]");
  310. }
  311. }
  312. private void ClearRpcResponses(object sender, RoutedEventArgs e)
  313. {
  314. RpcResponses.Items.Clear();
  315. }
  316. private void ClearSessions(object sender, RoutedEventArgs e)
  317. {
  318. }
  319. private void RefreshSessions(object sender, RoutedEventArgs e)
  320. {
  321. }
  322. private async Task WikiCode()
  323. {
  324. {
  325. // Write all trace messages to the console window.
  326. MqttNetGlobalLogger.LogMessagePublished += (s, e) =>
  327. {
  328. Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}");
  329. if (e.TraceMessage.Exception != null)
  330. {
  331. Console.WriteLine(e.TraceMessage.Exception);
  332. }
  333. };
  334. }
  335. {
  336. // Use a custom identifier for the trace messages.
  337. var clientOptions = new MqttClientOptionsBuilder()
  338. .Build();
  339. }
  340. {
  341. // Create a new MQTT client.
  342. var factory = new MqttFactory();
  343. var mqttClient = factory.CreateMqttClient();
  344. {
  345. // Create TCP based options using the builder.
  346. var options = new MqttClientOptionsBuilder()
  347. .WithClientId("Client1")
  348. .WithTcpServer("broker.hivemq.com")
  349. .WithCredentials("bud", "%spencer%")
  350. .WithTls()
  351. .WithCleanSession()
  352. .Build();
  353. await mqttClient.ConnectAsync(options);
  354. }
  355. {
  356. // Use TCP connection.
  357. var options = new MqttClientOptionsBuilder()
  358. .WithTcpServer("broker.hivemq.com", 1883) // Port is optional
  359. .Build();
  360. }
  361. {
  362. // Use secure TCP connection.
  363. var options = new MqttClientOptionsBuilder()
  364. .WithTcpServer("broker.hivemq.com")
  365. .WithTls()
  366. .Build();
  367. }
  368. {
  369. // Use WebSocket connection.
  370. var options = new MqttClientOptionsBuilder()
  371. .WithWebSocketServer("broker.hivemq.com:8000/mqtt")
  372. .Build();
  373. await mqttClient.ConnectAsync(options);
  374. }
  375. {
  376. // Create TCP based options manually
  377. var options = new MqttClientOptions
  378. {
  379. ClientId = "Client1",
  380. Credentials = new MqttClientCredentials
  381. {
  382. Username = "bud",
  383. Password = "%spencer%"
  384. },
  385. ChannelOptions = new MqttClientTcpOptions
  386. {
  387. Server = "broker.hivemq.org",
  388. TlsOptions = new MqttClientTlsOptions
  389. {
  390. UseTls = true
  391. }
  392. },
  393. };
  394. }
  395. {
  396. // Subscribe to a topic
  397. await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  398. // Unsubscribe from a topic
  399. await mqttClient.UnsubscribeAsync("my/topic");
  400. // Publish an application message
  401. var applicationMessage = new MqttApplicationMessageBuilder()
  402. .WithTopic("A/B/C")
  403. .WithPayload("Hello World")
  404. .WithAtLeastOnceQoS()
  405. .Build();
  406. await mqttClient.PublishAsync(applicationMessage);
  407. }
  408. }
  409. // ----------------------------------
  410. {
  411. var options = new MqttServerOptions();
  412. options.ConnectionValidator = c =>
  413. {
  414. if (c.ClientId.Length < 10)
  415. {
  416. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  417. return;
  418. }
  419. if (c.Username != "mySecretUser")
  420. {
  421. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  422. return;
  423. }
  424. if (c.Password != "mySecretPassword")
  425. {
  426. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  427. return;
  428. }
  429. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  430. };
  431. var factory = new MqttFactory();
  432. var mqttServer = factory.CreateMqttServer();
  433. await mqttServer.StartAsync(options);
  434. Console.WriteLine("Press any key to exit.");
  435. Console.ReadLine();
  436. await mqttServer.StopAsync();
  437. }
  438. // ----------------------------------
  439. // For UWP apps:
  440. MqttTcpChannel.CustomIgnorableServerCertificateErrorsResolver = o =>
  441. {
  442. if (o.Server == "server_with_revoked_cert")
  443. {
  444. return new[] { ChainValidationResult.Revoked };
  445. }
  446. return new ChainValidationResult[0];
  447. };
  448. {
  449. // Start a MQTT server.
  450. var mqttServer = new MqttFactory().CreateMqttServer();
  451. await mqttServer.StartAsync(new MqttServerOptions());
  452. Console.WriteLine("Press any key to exit.");
  453. Console.ReadLine();
  454. await mqttServer.StopAsync();
  455. }
  456. {
  457. // Configure MQTT server.
  458. var optionsBuilder = new MqttServerOptionsBuilder()
  459. .WithConnectionBacklog(100)
  460. .WithDefaultEndpointPort(1884);
  461. var options = new MqttServerOptions
  462. {
  463. };
  464. options.ConnectionValidator = c =>
  465. {
  466. if (c.ClientId != "Highlander")
  467. {
  468. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  469. return;
  470. }
  471. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  472. };
  473. var mqttServer = new MqttFactory().CreateMqttServer();
  474. await mqttServer.StartAsync(optionsBuilder.Build());
  475. }
  476. {
  477. // Setup client validator.
  478. var options = new MqttServerOptions
  479. {
  480. ConnectionValidator = c =>
  481. {
  482. if (c.ClientId.Length < 10)
  483. {
  484. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
  485. return;
  486. }
  487. if (c.Username != "mySecretUser")
  488. {
  489. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  490. return;
  491. }
  492. if (c.Password != "mySecretPassword")
  493. {
  494. c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  495. return;
  496. }
  497. c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
  498. }
  499. };
  500. }
  501. {
  502. // Create a new MQTT server.
  503. var mqttServer = new MqttFactory().CreateMqttServer();
  504. }
  505. {
  506. // Setup and start a managed MQTT client.
  507. var options = new ManagedMqttClientOptionsBuilder()
  508. .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
  509. .WithClientOptions(new MqttClientOptionsBuilder()
  510. .WithClientId("Client1")
  511. .WithTcpServer("broker.hivemq.com")
  512. .WithTls().Build())
  513. .Build();
  514. var mqttClient = new MqttFactory().CreateManagedMqttClient();
  515. await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
  516. await mqttClient.StartAsync(options);
  517. }
  518. }
  519. }
  520. }