using BPA.Message; using BPA.Helper; using BPA.Communication; using Microsoft.Extensions.Configuration; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace BPASmartClient.Business { /// /// 消息处理接口 /// public delegate void RecivedHandle(IMessage message); /// /// MQTT 管理类 /// public class MQTTMgr : IPlugin { //客户端ID private int clientId; //运行标识 private bool running = false; //消息缓存 private ConcurrentQueue msg = new ConcurrentQueue(); //MQTT 代理 private MqttHelper mqttProxy = new MqttHelper(); //消息处理者 private List messageRecives = new List(); public void Initialize() { } public void Start() { running = true; //主题初始化 TopicDefine.GetInstance().Initialize(Plugin.GetInstance().GetPlugin().GetDevices()); mqttProxy.ConnectOk = () => { mqttProxy.Subscrib(TopicDefine.GetInstance().SubscribTopics.ToArray()); }; //MQTT 数据接收 mqttProxy.MessageRecive = s => { msg.Enqueue(s); }; var MqttServerConfig = Plugin.GetInstance().GetPlugin().MQTT_Config; var MqttServerAccount = Plugin.GetInstance().GetPlugin().Mqtt_Account; var deviceConfig = Plugin.GetInstance().GetPlugin().deviceConfigModelJsons; string deviceId = deviceConfig[0].deviceModels[0].DeviceId; clientId = Plugin.GetInstance().GetPlugin().ClientId; //MQTT 初始化 mqttProxy.Connect(new BPA.Communication.Base.ConfigurationOptions() { UserName = MqttServerAccount.UserName, Password = MqttServerAccount.Password, IpAddress = MqttServerConfig.Host, Port = MqttServerConfig.Port, ClientId = $"ClientId-[{clientId}]-DeviceId-[{deviceId}]-{Guid.NewGuid()}" }); ThreadManage.GetInstance().Start(() => { while (running) { while (msg.Count > 0 && msg.TryDequeue(out string temp)) { if (0 == clientId) clientId = Plugin.GetInstance().GetPlugin().ClientId; var package = BPAPackage.Deserialize(temp); if (package?.ClientId == clientId) { if (package.Message != null) { for (int i = messageRecives.Count - 1; i >= 0; i--) { messageRecives[i]?.Invoke(package.Message); } } } } Thread.Sleep(50); } }, "MQTT 消息处理", isRestart: true); } /// /// MQTT消息推送 /// /// 主题 /// 消息体 public void Publish(string topic, string message) { mqttProxy.Publish(topic, message); } /// /// 设置消息处理者 /// /// 消息处理者 public void SetMessageReciveHandler(RecivedHandle messageRecive) { if (messageRecives.Contains(messageRecive)) return; messageRecives.Add(messageRecive); } public void Dispose() { running = false; mqttProxy.Dispose(); messageRecives.Clear(); } } }