终端一体化运控平台
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

131 lines
4.2 KiB

  1. using BPA.Message;
  2. using BPA.Helper;
  3. using BPA.Communication;
  4. using Microsoft.Extensions.Configuration;
  5. using System;
  6. using System.Collections.Concurrent;
  7. using System.Collections.Generic;
  8. using System.Linq;
  9. using System.Text;
  10. using System.Threading.Tasks;
  11. namespace BPASmartClient.Business
  12. {
  13. /// <summary>
  14. /// 消息处理接口
  15. /// </summary>
  16. public delegate void RecivedHandle(IMessage message);
  17. /// <summary>
  18. /// MQTT 管理类
  19. /// </summary>
  20. public class MQTTMgr : IPlugin
  21. {
  22. //客户端ID
  23. private int clientId;
  24. //运行标识
  25. private bool running = false;
  26. //消息缓存
  27. private ConcurrentQueue<string> msg = new ConcurrentQueue<string>();
  28. //MQTT 代理
  29. private MqttHelper mqttProxy = new MqttHelper();
  30. //消息处理者
  31. private List<RecivedHandle> messageRecives = new List<RecivedHandle>();
  32. public void Initialize()
  33. {
  34. }
  35. public void Start()
  36. {
  37. if (!InternetInfo.NetworkConnectState) return;
  38. running = true;
  39. //主题初始化
  40. TopicDefine.GetInstance().Initialize(Plugin.GetInstance().GetPlugin<DeviceMgr>().GetDevices());
  41. mqttProxy.Connected = () =>
  42. {
  43. mqttProxy.Subscrib(TopicDefine.GetInstance().SubscribTopics.ToArray());
  44. };
  45. //MQTT 数据接收
  46. mqttProxy.MessageRecive = s => { msg.Enqueue(s); };
  47. var MqttServerConfig = Plugin.GetInstance().GetPlugin<ConfigMgr>().MQTT_Config;
  48. var MqttServerAccount = Plugin.GetInstance().GetPlugin<ConfigMgr>().Mqtt_Account;
  49. var deviceConfig = Plugin.GetInstance().GetPlugin<ConfigMgr>().deviceConfigModelJsons;
  50. string deviceId = "";
  51. if (deviceConfig.Count > 0 && deviceConfig[0].deviceModels.Count > 0)
  52. {
  53. deviceId = deviceConfig[0].deviceModels[0].DeviceId;
  54. }
  55. clientId = Plugin.GetInstance().GetPlugin<ConfigMgr>().ClientId;
  56. //MQTT 初始化
  57. mqttProxy.Connect(new BPA.Communication.Base.ConfigurationOptions()
  58. {
  59. UserName = MqttServerAccount.UserName,
  60. Password = MqttServerAccount.Password,
  61. IpAddress = MqttServerConfig.Host,
  62. Port = MqttServerConfig.Port,
  63. ClientId = $"ClientId-[{clientId}]-DeviceId-[{deviceId}]-{Guid.NewGuid()}"
  64. });
  65. TaskManage.GetInstance.Start(() =>
  66. {
  67. while (running)
  68. {
  69. while (msg.Count > 0 && msg.TryDequeue(out string temp))
  70. {
  71. if (0 == clientId)
  72. clientId = Plugin.GetInstance().GetPlugin<ConfigMgr>().ClientId;
  73. var package = BPAPackage.Deserialize(temp);
  74. if (package?.ClientId == clientId)
  75. {
  76. if (package.Message != null)
  77. {
  78. for (int i = messageRecives.Count - 1; i >= 0; i--)
  79. {
  80. messageRecives[i]?.Invoke(package.Message);
  81. }
  82. }
  83. }
  84. }
  85. Thread.Sleep(50);
  86. }
  87. }, "MQTT 消息处理", isRestart: true);
  88. }
  89. /// <summary>
  90. /// MQTT消息推送
  91. /// </summary>
  92. /// <param name="topic">主题</param>
  93. /// <param name="message">消息体</param>
  94. public void Publish(string topic, string message)
  95. {
  96. mqttProxy.Publish(topic, message);
  97. }
  98. /// <summary>
  99. /// 设置消息处理者
  100. /// </summary>
  101. /// <param name="messageRecive">消息处理者</param>
  102. public void SetMessageReciveHandler(RecivedHandle messageRecive)
  103. {
  104. if (messageRecives.Contains(messageRecive))
  105. return;
  106. messageRecives.Add(messageRecive);
  107. }
  108. public void Dispose()
  109. {
  110. running = false;
  111. mqttProxy.Dispose();
  112. messageRecives.Clear();
  113. }
  114. }
  115. }