|
- using BPA.Message;
- using BPASmartClient.Helper;
- using BPASmartClient.Message;
- using BPASmartClient.MQTT;
- using HBLConsole.Communication;
- using Microsoft.Extensions.Configuration;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
-
- namespace BPASmartClient.Business
- {
- /// <summary>
- /// 消息处理接口
- /// </summary>
- public delegate void RecivedHandle(IMessage message);
- /// <summary>
- /// MQTT 管理类
- /// </summary>
- public class MQTTMgr : IPlugin
- {
- //客户端ID
- private int clientId;
- //运行标识
- private bool running = false;
- //消息缓存
- private ConcurrentQueue<string> msg = new ConcurrentQueue<string>();
- //MQTT 代理
- private MQTTProxy mqttProxy = new MQTTProxy();
- //消息处理者
- private List<RecivedHandle> messageRecives = new List<RecivedHandle>();
- public void Initialize()
- {
-
- }
-
- public void Start()
- {
- running = true;
- //主题初始化
- TopicDefine.GetInstance().Initialize(Plugin.GetInstance().GetPlugin<DeviceMgr>().GetDevices());
- //MQTT 连接成功
- mqttProxy.Connected = new Action(() =>
- {
- mqttProxy.Subscrib(TopicDefine.GetInstance().SubscribTopics.ToArray());
- MessageLog.GetInstance.Show("MQTT 连接成功");
- });
- //MQTT 连接成功
- mqttProxy.LostConnect = new Action(() =>
- {
- MqttHelper.GetInstance().MqttSubscriptionAsync(TopicDefine.GetInstance().SubscribTopics.ToArray());
- });
- //MQTT 数据接收
- mqttProxy.MessageRecive = new Action<string>((message) =>
- {
- msg.Enqueue(message);
- });
- var MqttServerConfig = Plugin.GetInstance().GetPlugin<ConfigMgr>().MQTT_Config;
- var MqttServerAccount = Plugin.GetInstance().GetPlugin<ConfigMgr>().Mqtt_Account;
- //MQTT 初始化
- mqttProxy.Connect(MqttServerAccount.UserName, MqttServerAccount.Password, MqttServerConfig.Host, MqttServerConfig.Port, Guid.NewGuid().ToString());
-
- ThreadManage.GetInstance().Start(() =>
- {
- while (running)
- {
- while (msg.Count > 0 && msg.TryDequeue(out string temp))
- {
- if (0 == clientId)
- clientId = Plugin.GetInstance().GetPlugin<ConfigMgr>().ClientId;
- var package = BPAPackage.Deserialize(temp);
- if (package?.ClientId == clientId)
- {
- if (package.Message != null)
- {
- for (int i = messageRecives.Count - 1; i >= 0; i--)
- {
- messageRecives[i]?.Invoke(package.Message);
- }
- }
- }
- }
- Thread.Sleep(50);
- }
- }, "MQTT 消息处理", true);
- }
-
- /// <summary>
- /// MQTT消息推送
- /// </summary>
- /// <param name="topic">主题</param>
- /// <param name="message">消息体</param>
- public void Publish(string topic, string message)
- {
- mqttProxy.Publish(topic, message);
- }
-
- /// <summary>
- /// 设置消息处理者
- /// </summary>
- /// <param name="messageRecive">消息处理者</param>
- public void SetMessageReciveHandler(RecivedHandle messageRecive)
- {
- if (messageRecives.Contains(messageRecive))
- return;
- messageRecives.Add(messageRecive);
- }
-
- public void Dispose()
- {
- running = false;
- mqttProxy.CloseConnect();
- messageRecives.Clear();
- }
-
-
- }
- }
|