终端一体化运控平台
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 

137 rindas
4.7 KiB

  1. using BPA.Message;
  2. using BPA.Helper;
  3. using BPA.Communication;
  4. using Microsoft.Extensions.Configuration;
  5. using System;
  6. using System.Collections.Concurrent;
  7. using System.Collections.Generic;
  8. using System.Linq;
  9. using System.Text;
  10. using System.Threading.Tasks;
  11. namespace BPASmartClient.Business
  12. {
  13. /// <summary>
  14. /// 消息处理接口
  15. /// </summary>
  16. public delegate void RecivedHandle(BPA.Message.IMessage message);
  17. /// <summary>
  18. /// MQTT 管理类
  19. /// </summary>
  20. public class MQTTMgr : IPlugin
  21. {
  22. //客户端ID
  23. private int clientId;
  24. //运行标识
  25. private bool running = false;
  26. //消息缓存
  27. private ConcurrentQueue<string> msg = new ConcurrentQueue<string>();
  28. //MQTT 代理
  29. //private MqttHelper mqttProxy = new MqttHelper();
  30. ICommMqtt mqttProxy = CommHelper.MQTT;
  31. //消息处理者
  32. private List<RecivedHandle> messageRecives = new List<RecivedHandle>();
  33. public void Initialize()
  34. {
  35. }
  36. public void Start()
  37. {
  38. running = true;
  39. //主题初始化
  40. TopicDefine.GetInstance.Initialize(Plugin.GetInstance.GetPlugin<DeviceMgr>().GetDevices());
  41. //mqttProxy.Connected = () =>
  42. //{
  43. // mqttProxy.Subscrib(TopicDefine.GetInstance.SubscribTopics.ToArray());
  44. //};
  45. //MQTT 数据接收
  46. mqttProxy.MessageRecive = s => { msg.Enqueue(s); };
  47. var MqttServerConfig = Plugin.GetInstance.GetPlugin<ConfigMgr>().MQTT_Config;
  48. var MqttServerAccount = Plugin.GetInstance.GetPlugin<ConfigMgr>().Mqtt_Account;
  49. var deviceConfig = Plugin.GetInstance.GetPlugin<ConfigMgr>().deviceConfigModelJsons;
  50. string deviceId = "";
  51. if (deviceConfig.Count > 0 && deviceConfig[0].deviceModels.Count > 0)
  52. {
  53. deviceId = deviceConfig[0].deviceModels[0].DeviceId;
  54. }
  55. clientId = Plugin.GetInstance.GetPlugin<ConfigMgr>().ClientId;
  56. //MQTT 初始化
  57. //mqttProxy.Connect(new BPA.Communication.Base.ConfigurationOptions()
  58. //{
  59. // UserName = MqttServerAccount.UserName,
  60. // Password = MqttServerAccount.Password,
  61. // IpAddress = MqttServerConfig.Host,
  62. // Port = MqttServerConfig.Port,
  63. // ClientId = $"ClientId-[{clientId}]-DeviceId-[{deviceId}]-{Guid.NewGuid()}"
  64. //});
  65. string ClientId = $"ClientId-[{clientId}]-DeviceId-[{deviceId}]-{Guid.NewGuid()}";
  66. CommHelper.CreateMqtt(MqttServerConfig.Host, MqttServerAccount.UserName, MqttServerAccount.Password, ClientId, MqttServerConfig.Port).OnSuccess((s) =>
  67. {
  68. mqttProxy = s.Content;
  69. mqttProxy.Subscrib(TopicDefine.GetInstance.SubscribTopics.ToArray());
  70. });
  71. TaskManage.GetInstance.Start(() =>
  72. {
  73. while (running)
  74. {
  75. while (msg.Count > 0 && msg.TryDequeue(out string temp))
  76. {
  77. if (0 == clientId)
  78. clientId = Plugin.GetInstance.GetPlugin<ConfigMgr>().ClientId;
  79. var package = BPAPackage.Deserialize(temp);
  80. if (package?.ClientId == clientId)
  81. {
  82. if (package.Message != null)
  83. {
  84. for (int i = messageRecives.Count - 1; i >= 0; i--)
  85. {
  86. messageRecives[i]?.Invoke(package.Message);
  87. }
  88. }
  89. }
  90. }
  91. Thread.Sleep(50);
  92. }
  93. }, "MQTT 消息处理", isRestart: true);
  94. }
  95. /// <summary>
  96. /// MQTT消息推送
  97. /// </summary>
  98. /// <param name="topic">主题</param>
  99. /// <param name="message">消息体</param>
  100. public void Publish(string topic, string message)
  101. {
  102. mqttProxy.Publish(topic, message);
  103. }
  104. /// <summary>
  105. /// 设置消息处理者
  106. /// </summary>
  107. /// <param name="messageRecive">消息处理者</param>
  108. public void SetMessageReciveHandler(RecivedHandle messageRecive)
  109. {
  110. if (messageRecives.Contains(messageRecive))
  111. return;
  112. messageRecives.Add(messageRecive);
  113. }
  114. public void Dispose()
  115. {
  116. running = false;
  117. mqttProxy.Dispose();
  118. messageRecives.Clear();
  119. }
  120. }
  121. }