终端一体化运控平台
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

MQTTMgr.cs 4.0 KiB

2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
2 년 전
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. using BPA.Message;
  2. using BPASmartClient.Helper;
  3. using BPASmartClient.Message;
  4. using BPASmartClient.MQTT;
  5. using HBLConsole.Communication;
  6. using Microsoft.Extensions.Configuration;
  7. using System;
  8. using System.Collections.Concurrent;
  9. using System.Collections.Generic;
  10. using System.Linq;
  11. using System.Text;
  12. using System.Threading.Tasks;
  13. namespace BPASmartClient.Business
  14. {
  15. /// <summary>
  16. /// 消息处理接口
  17. /// </summary>
  18. public delegate void RecivedHandle(IMessage message);
  19. /// <summary>
  20. /// MQTT 管理类
  21. /// </summary>
  22. public class MQTTMgr : IPlugin
  23. {
  24. //客户端ID
  25. private int clientId;
  26. //运行标识
  27. private bool running = false;
  28. //消息缓存
  29. private ConcurrentQueue<string> msg = new ConcurrentQueue<string>();
  30. //MQTT 代理
  31. private MQTTProxy mqttProxy = new MQTTProxy();
  32. //消息处理者
  33. private List<RecivedHandle> messageRecives = new List<RecivedHandle>();
  34. public void Initialize()
  35. {
  36. }
  37. public void Start()
  38. {
  39. running = true;
  40. //主题初始化
  41. TopicDefine.GetInstance().Initialize(Plugin.GetInstance().GetPlugin<DeviceMgr>().GetDevices());
  42. //MQTT 连接成功
  43. mqttProxy.Connected = new Action(() =>
  44. {
  45. mqttProxy.Subscrib(TopicDefine.GetInstance().SubscribTopics.ToArray());
  46. MessageLog.GetInstance.Show("MQTT 连接成功");
  47. });
  48. //MQTT 连接成功
  49. mqttProxy.LostConnect = new Action(() =>
  50. {
  51. MqttHelper.GetInstance().MqttSubscriptionAsync(TopicDefine.GetInstance().SubscribTopics.ToArray());
  52. });
  53. //MQTT 数据接收
  54. mqttProxy.MessageRecive = new Action<string>((message) =>
  55. {
  56. msg.Enqueue(message);
  57. });
  58. var MqttServerConfig = Plugin.GetInstance().GetPlugin<ConfigMgr>().MQTT_Config;
  59. var MqttServerAccount = Plugin.GetInstance().GetPlugin<ConfigMgr>().Mqtt_Account;
  60. //MQTT 初始化
  61. mqttProxy.Connect(MqttServerAccount.UserName, MqttServerAccount.Password, MqttServerConfig.Host, MqttServerConfig.Port, Guid.NewGuid().ToString());
  62. ThreadManage.GetInstance().Start(() =>
  63. {
  64. while (running)
  65. {
  66. while (msg.Count > 0 && msg.TryDequeue(out string temp))
  67. {
  68. if (0 == clientId)
  69. clientId = Plugin.GetInstance().GetPlugin<ConfigMgr>().ClientId;
  70. var package = BPAPackage.Deserialize(temp);
  71. if (package?.ClientId == clientId)
  72. {
  73. if (package.Message != null)
  74. {
  75. for (int i = messageRecives.Count - 1; i >= 0; i--)
  76. {
  77. messageRecives[i]?.Invoke(package.Message);
  78. }
  79. }
  80. }
  81. }
  82. Thread.Sleep(50);
  83. }
  84. }, "MQTT 消息处理", true);
  85. }
  86. /// <summary>
  87. /// MQTT消息推送
  88. /// </summary>
  89. /// <param name="topic">主题</param>
  90. /// <param name="message">消息体</param>
  91. public void Publish(string topic, string message)
  92. {
  93. mqttProxy.Publish(topic, message);
  94. }
  95. /// <summary>
  96. /// 设置消息处理者
  97. /// </summary>
  98. /// <param name="messageRecive">消息处理者</param>
  99. public void SetMessageReciveHandler(RecivedHandle messageRecive)
  100. {
  101. if (messageRecives.Contains(messageRecive))
  102. return;
  103. messageRecives.Add(messageRecive);
  104. }
  105. public void Dispose()
  106. {
  107. running = false;
  108. mqttProxy.CloseConnect();
  109. messageRecives.Clear();
  110. }
  111. }
  112. }