终端一体化运控平台
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

143 lines
4.5 KiB

  1. using MQTTnet;
  2. using MQTTnet.Client;
  3. using MQTTnet.Client.Options;
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. namespace BPASmartClient.Compiler
  10. {
  11. public class MQTT
  12. {
  13. //private volatile static MQTT _Instance;
  14. //public static MQTT GetInstance => _Instance ?? (_Instance = new MQTT());
  15. //private MQTT() { }
  16. public IMqttClient client;
  17. IMqttClientOptions options;
  18. /// <summary>
  19. /// MQTT 接收消息
  20. /// </summary>
  21. public Action<MqttApplicationMessageReceivedEventArgs> MqttReceive { get; set; }
  22. /// <summary>
  23. /// MQTT 连接成功
  24. /// </summary>
  25. public Action ConnectOk { get; set; }
  26. /// <summary>
  27. /// 重连成功
  28. /// </summary>
  29. public Action Reconnection { get; set; }
  30. public async void MqttInitAsync(string UserName,string pass,string IP,int port,string clientID)
  31. {
  32. p1:
  33. options = new MqttClientOptionsBuilder().WithTcpServer(IP,port).WithClientId(clientID).WithCredentials(UserName,pass).Build();
  34. client = new MqttFactory().CreateMqttClient();
  35. client.UseDisconnectedHandler(async c =>
  36. {
  37. Thread.Sleep(2000);
  38. //while (!Device.DataBus.内存数据缓存.DataBus.GetInstance().NetworkConnectState)
  39. //{
  40. // Thread.Sleep(2000);
  41. //}
  42. //Device.DataBus.内存数据缓存.DataBus.GetInstance().MQTTConnectState = false;
  43. //logHelper.GetLogConfigInstance().WriteLog(LogLevel.INFO,$"断开连接");
  44. try
  45. {
  46. //logHelper.GetLogConfigInstance().WriteLog(LogLevel.INFO,$"重连中");
  47. await client.ConnectAsync(options);
  48. }
  49. catch (Exception ex)
  50. {
  51. //logHelper.GetLogConfigInstance().WriteLog(LogLevel.ERROR,ex.Message);
  52. }
  53. if (client.IsConnected)
  54. {
  55. //logHelper.GetLogConfigInstance().WriteLog(LogLevel.INFO,$"MQTT重连成功");
  56. if (Reconnection != null) Reconnection();
  57. }
  58. }).UseApplicationMessageReceivedHandler(c =>
  59. {
  60. MqttReceive(c);
  61. }).UseConnectedHandler((e) =>
  62. {
  63. //logHelper.GetLogConfigInstance().WriteLog(LogLevel.INFO,$"连接成功");
  64. });
  65. try
  66. {
  67. await client.ConnectAsync(options);
  68. }
  69. catch (Exception ex)
  70. {
  71. //logHelper.GetLogConfigInstance().WriteLog(LogLevel.ERROR,ex.Message);
  72. }
  73. if (!client.IsConnected)
  74. {
  75. Thread.Sleep(2000);
  76. //logHelper.GetLogConfigInstance().WriteLog(LogLevel.INFO,$"mqtt连接失败!重连执行中");
  77. goto p1;
  78. }
  79. Thread.Sleep(2000);
  80. //logHelper.GetLogConfigInstance().WriteLog(LogLevel.INFO,$"MQTT连接成功!");
  81. if (ConnectOk != null) ConnectOk();
  82. }
  83. /// <summary>
  84. /// Mqtt 订阅
  85. /// </summary>
  86. /// <param name="topic">需要订阅的主题</param>
  87. public async void MqttSubscriptionAsync(string topic)
  88. {
  89. if (client != null)
  90. {
  91. if (client.IsConnected)
  92. {
  93. try
  94. {
  95. var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).WithExactlyOnceQoS().Build());
  96. }
  97. catch { }
  98. }
  99. }
  100. }
  101. /// <summary>
  102. /// Mqtt 发布
  103. /// </summary>
  104. /// <param name="topic">需要发布的主题</param>
  105. /// <param name="content">需要发布的内容</param>
  106. public async void MqttPublishAsync(string topic,string content)
  107. {
  108. if (client != null)
  109. {
  110. if (client.IsConnected)
  111. {
  112. var msg = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(content).WithExactlyOnceQoS().Build();
  113. try
  114. {
  115. var result = await client.PublishAsync(msg);
  116. }
  117. catch { }
  118. }
  119. }
  120. }
  121. private static readonly object sendMessageLock = new object();
  122. }
  123. }