终端一体化运控平台
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

102 lines
3.3 KiB

  1. using BPA.Message;
  2. using BPASmartClient.Helper;
  3. using BPASmartClient.MQTT;
  4. using HBLConsole.Communication;
  5. using Microsoft.Extensions.Configuration;
  6. using System;
  7. using System.Collections.Concurrent;
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. using System.Text;
  11. using System.Threading.Tasks;
  12. namespace BPASmartClient.Business
  13. {
  14. /// <summary>
  15. /// 消息处理接口
  16. /// </summary>
  17. public delegate void RecivedHandle(IMessage message);
  18. /// <summary>
  19. /// MQTT 管理类
  20. /// </summary>
  21. public class MQTTMgr : IPlugin
  22. {
  23. //客户端ID
  24. private int clientId;
  25. //运行标识
  26. private bool running = false;
  27. //消息缓存
  28. private ConcurrentQueue<string> msg = new ConcurrentQueue<string>();
  29. //MQTT 代理
  30. private MQTTProxy mqttProxy = new MQTTProxy();
  31. //消息处理者
  32. private List<RecivedHandle> messageRecives = new List<RecivedHandle>();
  33. public void Initialize()
  34. {
  35. //MQTT 连接成功
  36. mqttProxy.Connected = new Action(() =>
  37. {
  38. MqttHelper.GetInstance().MqttSubscriptionAsync(TopicDefine.GetInstance().SubscribTopics.ToArray());
  39. });
  40. //MQTT 连接成功
  41. mqttProxy.LostConnect = new Action(() =>
  42. {
  43. MqttHelper.GetInstance().MqttSubscriptionAsync(TopicDefine.GetInstance().SubscribTopics.ToArray());
  44. });
  45. //MQTT 数据接收
  46. mqttProxy.MessageRecive = new Action<string>((message) =>
  47. {
  48. msg.Enqueue(message);
  49. });
  50. //MQTT 初始化
  51. mqttProxy.Connect(Plugin.GetInstance().GetPlugin<ConfigMgr>().MQTT_Config.Host, Plugin.GetInstance().GetPlugin<ConfigMgr>().MQTT_Config.Port,
  52. Guid.NewGuid().ToString());
  53. ThreadManage.GetInstance().StartLong(() =>
  54. {
  55. while (running)
  56. {
  57. while (msg.Count > 0 && msg.TryDequeue(out string temp))
  58. {
  59. if (0 == clientId)
  60. clientId = Plugin.GetInstance().GetPlugin<ConfigMgr>().ClientId;
  61. var package = BPAPackage.Deserialize(temp);
  62. if (package?.ClientId == clientId)
  63. {
  64. if (package.Message != null)
  65. {
  66. for (int i = messageRecives.Count - 1; i >= 0; i--) {
  67. messageRecives[i]?.Invoke(package.Message);
  68. }
  69. }
  70. }
  71. }
  72. Thread.Sleep(50);
  73. }
  74. }, "MQTT 消息处理");
  75. }
  76. /// <summary>
  77. /// 设置消息处理者
  78. /// </summary>
  79. /// <param name="messageRecive">消息处理者</param>
  80. public void SetMessageReciveHandler(RecivedHandle messageRecive)
  81. {
  82. if (messageRecives.Contains(messageRecive))
  83. return;
  84. messageRecives.Add(messageRecive);
  85. }
  86. public void Dispose()
  87. {
  88. running = false;
  89. mqttProxy.CloseConnect();
  90. messageRecives.Clear();
  91. }
  92. }
  93. }