终端一体化运控平台
25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.

MqttHelper.cs 5.3 KiB

2 yıl önce
2 yıl önce
2 yıl önce
2 yıl önce
2 yıl önce
2 yıl önce
2 yıl önce
2 yıl önce
2 yıl önce
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. using BPASmartClient.Helper;
  2. using BPASmartClient.Message;
  3. using MQTTnet;
  4. using MQTTnet.Client;
  5. using MQTTnet.Client.Options;
  6. using System;
  7. using System.Threading;
  8. namespace HBLConsole.Communication
  9. {
  10. public class MqttHelper:Singleton<MqttHelper>
  11. {
  12. private IMqttClient client;
  13. IMqttClientOptions options;
  14. /// <summary>
  15. /// MQTT 接收消息
  16. /// </summary>
  17. public Action<MqttApplicationMessageReceivedEventArgs> MqttReceive { get; set; }
  18. /// <summary>
  19. /// MQTT 连接成功
  20. /// </summary>
  21. public Action ConnectOk { get; set; }
  22. /// <summary>
  23. /// 重连成功
  24. /// </summary>
  25. public Action Reconnection { get; set; }
  26. Action UseDisconnectedAction;
  27. public async void MqttInitAsync(string UserName, string pass, string IP, int port, string clientID)
  28. {
  29. options = new MqttClientOptionsBuilder().WithTcpServer(IP, port).WithClientId(clientID).WithCredentials(UserName, pass).Build();
  30. client = new MqttFactory().CreateMqttClient();
  31. client.UseDisconnectedHandler(c =>
  32. {
  33. if (UseDisconnectedAction == null)
  34. {
  35. Reconnect();
  36. UseDisconnectedAction();
  37. }
  38. }).UseApplicationMessageReceivedHandler(c =>
  39. {
  40. MqttReceive(c);
  41. }).UseConnectedHandler((e) =>
  42. {
  43. MessageLog.GetInstance.Show($"连接成功");
  44. });
  45. try
  46. {
  47. await client.ConnectAsync(options);
  48. }
  49. catch (Exception ex)
  50. {
  51. MessageLog.GetInstance.ShowEx(ex.Message);
  52. MessageLog.GetInstance.Show("mqtt连接失败!重连执行中");
  53. }
  54. if (client.IsConnected)
  55. {
  56. MessageLog.GetInstance.Show("MQTT连接成功!");
  57. if (ConnectOk != null) ConnectOk();
  58. }
  59. }
  60. public async void Close() {
  61. await client.DisconnectAsync();
  62. }
  63. private void Reconnect()
  64. {
  65. UseDisconnectedAction = new Action(() =>
  66. {
  67. Thread.Sleep(2000);
  68. while (!UniversalHelper.GetInstance().GetNetworkState())
  69. {
  70. Thread.Sleep(2000);
  71. }
  72. MessageLog.GetInstance.Show($"断开连接");
  73. bool ErrorFlag = false;
  74. while (!client.IsConnected)
  75. {
  76. try
  77. {
  78. MessageLog.GetInstance.Show($"重连中");
  79. client.ConnectAsync(options).Wait();
  80. }
  81. catch (Exception ex)
  82. {
  83. if (!ErrorFlag)
  84. {
  85. MessageLog.GetInstance.ShowEx(ex.ToString());
  86. ErrorFlag = true;
  87. }
  88. }
  89. Thread.Sleep(3000);
  90. }
  91. if (client.IsConnected)
  92. {
  93. MessageLog.GetInstance.Show("MQTT重连成功!");
  94. if (Reconnection != null) Reconnection();
  95. }
  96. UseDisconnectedAction = null;
  97. });
  98. }
  99. /// <summary>
  100. /// Mqtt 订阅
  101. /// </summary>
  102. /// <param name="topic">需要订阅的主题</param>
  103. public async void MqttSubscriptionAsync(string topic)
  104. {
  105. if (client != null)
  106. {
  107. if (client.IsConnected)
  108. {
  109. try
  110. {
  111. var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).WithExactlyOnceQoS().Build());
  112. }
  113. catch { }
  114. }
  115. }
  116. }
  117. /// <summary>
  118. /// Mqtt 订阅
  119. /// </summary>
  120. /// <param name="topic">需要订阅的主题</param>
  121. public async void MqttSubscriptionAsync(string[] topic)
  122. {
  123. if (client != null)
  124. {
  125. if (client.IsConnected)
  126. {
  127. try
  128. {
  129. foreach (var item in topic)
  130. {
  131. var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(item).WithExactlyOnceQoS().Build());
  132. }
  133. }
  134. catch { }
  135. }
  136. }
  137. }
  138. /// <summary>
  139. /// Mqtt 发布
  140. /// </summary>
  141. /// <param name="topic">需要发布的主题</param>
  142. /// <param name="content">需要发布的内容</param>
  143. public async void MqttPublishAsync(string topic, string content)
  144. {
  145. if (client != null)
  146. {
  147. if (client.IsConnected)
  148. {
  149. var msg = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(content).WithExactlyOnceQoS().Build();
  150. try
  151. {
  152. var result = await client.PublishAsync(msg);
  153. }
  154. catch { }
  155. }
  156. }
  157. }
  158. }
  159. }