|
- using BPA.Message;
- using BPA.Helper;
-
- using BPA.Communication;
- using Microsoft.Extensions.Configuration;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
-
- namespace BPASmartClient.Business
- {
- /// <summary>
- /// 消息处理接口
- /// </summary>
- public delegate void RecivedHandle(IMessage message);
- /// <summary>
- /// MQTT 管理类
- /// </summary>
- public class MQTTMgr : IPlugin
- {
- //客户端ID
- private int clientId;
- //运行标识
- private bool running = false;
- //消息缓存
- private ConcurrentQueue<string> msg = new ConcurrentQueue<string>();
- //MQTT 代理
- private MqttHelper mqttProxy = new MqttHelper();
-
- //消息处理者
- private List<RecivedHandle> messageRecives = new List<RecivedHandle>();
-
- public void Initialize()
- {
-
- }
-
- public void Start()
- {
- running = true;
- //主题初始化
- TopicDefine.GetInstance().Initialize(Plugin.GetInstance().GetPlugin<DeviceMgr>().GetDevices());
-
- mqttProxy.ConnectOk = () =>
- {
- mqttProxy.Subscrib(TopicDefine.GetInstance().SubscribTopics.ToArray());
- };
-
- //MQTT 数据接收
- mqttProxy.MessageRecive = s => { msg.Enqueue(s); };
-
- var MqttServerConfig = Plugin.GetInstance().GetPlugin<ConfigMgr>().MQTT_Config;
- var MqttServerAccount = Plugin.GetInstance().GetPlugin<ConfigMgr>().Mqtt_Account;
- var deviceConfig = Plugin.GetInstance().GetPlugin<ConfigMgr>().deviceConfigModelJsons;
- string deviceId = deviceConfig[0].deviceModels[0].DeviceId;
- clientId = Plugin.GetInstance().GetPlugin<ConfigMgr>().ClientId;
- //MQTT 初始化
- mqttProxy.Connect(new BPA.Communication.Base.ConfigurationOptions()
- {
- UserName = MqttServerAccount.UserName,
- Password = MqttServerAccount.Password,
- IpAddress = MqttServerConfig.Host,
- Port = MqttServerConfig.Port,
- ClientId = $"ClientId-[{clientId}]-DeviceId-[{deviceId}]-{Guid.NewGuid()}"
- });
-
- TaskManage.GetInstance.Start(() =>
- {
- while (running)
- {
- while (msg.Count > 0 && msg.TryDequeue(out string temp))
- {
- if (0 == clientId)
- clientId = Plugin.GetInstance().GetPlugin<ConfigMgr>().ClientId;
- var package = BPAPackage.Deserialize(temp);
- if (package?.ClientId == clientId)
- {
- if (package.Message != null)
- {
- for (int i = messageRecives.Count - 1; i >= 0; i--)
- {
- messageRecives[i]?.Invoke(package.Message);
- }
- }
- }
- }
- Thread.Sleep(50);
- }
- }, "MQTT 消息处理", isRestart: true);
- }
-
- /// <summary>
- /// MQTT消息推送
- /// </summary>
- /// <param name="topic">主题</param>
- /// <param name="message">消息体</param>
- public void Publish(string topic, string message)
- {
- mqttProxy.Publish(topic, message);
- }
-
- /// <summary>
- /// 设置消息处理者
- /// </summary>
- /// <param name="messageRecive">消息处理者</param>
- public void SetMessageReciveHandler(RecivedHandle messageRecive)
- {
- if (messageRecives.Contains(messageRecive))
- return;
- messageRecives.Add(messageRecive);
- }
-
- public void Dispose()
- {
- running = false;
- mqttProxy.Dispose();
- messageRecives.Clear();
- }
-
-
- }
- }
|