终端一体化运控平台
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

118 lines
3.8 KiB

  1. using BPA.Message;
  2. using BPASmartClient.Helper;
  3. using BPASmartClient.MQTT;
  4. using HBLConsole.Communication;
  5. using Microsoft.Extensions.Configuration;
  6. using System;
  7. using System.Collections.Concurrent;
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. using System.Text;
  11. using System.Threading.Tasks;
  12. namespace BPASmartClient.Business
  13. {
  14. /// <summary>
  15. /// 消息处理接口
  16. /// </summary>
  17. public delegate void RecivedHandle(IMessage message);
  18. /// <summary>
  19. /// MQTT 管理类
  20. /// </summary>
  21. public class MQTTMgr : IPlugin
  22. {
  23. //客户端ID
  24. private int clientId;
  25. //运行标识
  26. private bool running = false;
  27. //消息缓存
  28. private ConcurrentQueue<string> msg = new ConcurrentQueue<string>();
  29. //MQTT 代理
  30. private MQTTProxy mqttProxy = new MQTTProxy();
  31. //消息处理者
  32. private List<RecivedHandle> messageRecives = new List<RecivedHandle>();
  33. public void Initialize()
  34. {
  35. }
  36. public void Start()
  37. {
  38. running = true;
  39. //主题初始化
  40. TopicDefine.GetInstance().Initialize(Plugin.GetInstance().GetPlugin<DeviceMgr>().GetDevices());
  41. //MQTT 连接成功
  42. mqttProxy.Connected = new Action(() =>
  43. {
  44. mqttProxy.Subscrib(TopicDefine.GetInstance().SubscribTopics.ToArray());
  45. });
  46. //MQTT 连接成功
  47. mqttProxy.LostConnect = new Action(() =>
  48. {
  49. MqttHelper.GetInstance().MqttSubscriptionAsync(TopicDefine.GetInstance().SubscribTopics.ToArray());
  50. });
  51. //MQTT 数据接收
  52. mqttProxy.MessageRecive = new Action<string>((message) =>
  53. {
  54. msg.Enqueue(message);
  55. });
  56. //MQTT 初始化
  57. mqttProxy.Connect(Plugin.GetInstance().GetPlugin<ConfigMgr>().MQTT_Config.Host, Plugin.GetInstance().GetPlugin<ConfigMgr>().MQTT_Config.Port,
  58. Guid.NewGuid().ToString());
  59. ThreadManage.GetInstance().Start(() =>
  60. {
  61. while (running)
  62. {
  63. while (msg.Count > 0 && msg.TryDequeue(out string temp))
  64. {
  65. if (0 == clientId)
  66. clientId = Plugin.GetInstance().GetPlugin<ConfigMgr>().ClientId;
  67. var package = BPAPackage.Deserialize(temp);
  68. if (package?.ClientId == clientId)
  69. {
  70. if (package.Message != null)
  71. {
  72. for (int i = messageRecives.Count - 1; i >= 0; i--)
  73. {
  74. messageRecives[i]?.Invoke(package.Message);
  75. }
  76. }
  77. }
  78. }
  79. Thread.Sleep(50);
  80. }
  81. }, "MQTT 消息处理");
  82. }
  83. /// <summary>
  84. /// MQTT消息推送
  85. /// </summary>
  86. /// <param name="topic">主题</param>
  87. /// <param name="message">消息体</param>
  88. public void Publish(string topic, string message) {
  89. mqttProxy.Publish(topic, message);
  90. }
  91. /// <summary>
  92. /// 设置消息处理者
  93. /// </summary>
  94. /// <param name="messageRecive">消息处理者</param>
  95. public void SetMessageReciveHandler(RecivedHandle messageRecive)
  96. {
  97. if (messageRecives.Contains(messageRecive))
  98. return;
  99. messageRecives.Add(messageRecive);
  100. }
  101. public void Dispose()
  102. {
  103. running = false;
  104. mqttProxy.CloseConnect();
  105. messageRecives.Clear();
  106. }
  107. }
  108. }