Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 

188 linhas
5.5 KiB

  1. using MQTTnet;
  2. using MQTTnet.Client;
  3. using MQTTnet.Client.Options;
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. using HBLConsole.GVL;
  11. using HBLConsole.Service;
  12. namespace HBLConsole.Communication
  13. {
  14. public class MqttHelper
  15. {
  16. private volatile static MqttHelper _Instance;
  17. public static MqttHelper GetInstance => _Instance ?? (_Instance = new MqttHelper());
  18. private MqttHelper() { }
  19. private IMqttClient client;
  20. IMqttClientOptions options;
  21. /// <summary>
  22. /// MQTT 接收消息
  23. /// </summary>
  24. public Action<MqttApplicationMessageReceivedEventArgs> MqttReceive { get; set; }
  25. /// <summary>
  26. /// MQTT 连接成功
  27. /// </summary>
  28. public Action ConnectOk { get; set; }
  29. /// <summary>
  30. /// 重连成功
  31. /// </summary>
  32. public Action Reconnection { get; set; }
  33. Action UseDisconnectedAction;
  34. public async void MqttInitAsync(string UserName, string pass, string IP, int port, string clientID)
  35. {
  36. options = new MqttClientOptionsBuilder().WithTcpServer(IP, port).WithClientId(clientID).WithCredentials(UserName, pass).Build();
  37. client = new MqttFactory().CreateMqttClient();
  38. client.UseDisconnectedHandler(c =>
  39. {
  40. if (UseDisconnectedAction == null)
  41. {
  42. Reconnect();
  43. UseDisconnectedAction();
  44. }
  45. }).UseApplicationMessageReceivedHandler(c =>
  46. {
  47. MqttReceive(c);
  48. }).UseConnectedHandler((e) =>
  49. {
  50. MessageLog.GetInstance.Show($"连接成功");
  51. });
  52. try
  53. {
  54. await client.ConnectAsync(options);
  55. }
  56. catch (Exception ex)
  57. {
  58. MessageLog.GetInstance.ShowEx(ex.Message);
  59. MessageLog.GetInstance.Show("mqtt连接失败!重连执行中");
  60. }
  61. if (client.IsConnected)
  62. {
  63. MessageLog.GetInstance.Show("MQTT连接成功!");
  64. if (ConnectOk != null) ConnectOk();
  65. }
  66. }
  67. private void Reconnect()
  68. {
  69. UseDisconnectedAction = new Action(() =>
  70. {
  71. Thread.Sleep(2000);
  72. while (!InternetInfo.NetworkConnectState)
  73. {
  74. Thread.Sleep(2000);
  75. }
  76. MessageLog.GetInstance.Show($"断开连接");
  77. bool ErrorFlag = false;
  78. while (!client.IsConnected)
  79. {
  80. try
  81. {
  82. MessageLog.GetInstance.Show($"重连中");
  83. client.ConnectAsync(options).Wait();
  84. }
  85. catch (Exception ex)
  86. {
  87. if (!ErrorFlag)
  88. {
  89. MessageLog.GetInstance.ShowEx(ex.ToString());
  90. ErrorFlag = true;
  91. }
  92. }
  93. Thread.Sleep(3000);
  94. }
  95. if (client.IsConnected)
  96. {
  97. MessageLog.GetInstance.Show("MQTT重连成功!");
  98. if (Reconnection != null) Reconnection();
  99. }
  100. UseDisconnectedAction = null;
  101. });
  102. }
  103. /// <summary>
  104. /// Mqtt 订阅
  105. /// </summary>
  106. /// <param name="topic">需要订阅的主题</param>
  107. public async void MqttSubscriptionAsync(string topic)
  108. {
  109. if (client != null)
  110. {
  111. if (client.IsConnected)
  112. {
  113. try
  114. {
  115. var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).WithExactlyOnceQoS().Build());
  116. }
  117. catch { }
  118. }
  119. }
  120. }
  121. /// <summary>
  122. /// Mqtt 订阅
  123. /// </summary>
  124. /// <param name="topic">需要订阅的主题</param>
  125. public async void MqttSubscriptionAsync(string[] topic)
  126. {
  127. if (client != null)
  128. {
  129. if (client.IsConnected)
  130. {
  131. try
  132. {
  133. foreach (var item in topic)
  134. {
  135. var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(item).WithExactlyOnceQoS().Build());
  136. }
  137. }
  138. catch { }
  139. }
  140. }
  141. }
  142. /// <summary>
  143. /// Mqtt 发布
  144. /// </summary>
  145. /// <param name="topic">需要发布的主题</param>
  146. /// <param name="content">需要发布的内容</param>
  147. public async void MqttPublishAsync(string topic, string content)
  148. {
  149. if (client != null)
  150. {
  151. if (client.IsConnected)
  152. {
  153. var msg = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(content).WithExactlyOnceQoS().Build();
  154. try
  155. {
  156. var result = await client.PublishAsync(msg);
  157. }
  158. catch { }
  159. }
  160. }
  161. }
  162. }
  163. }