using BPA.Message;
using BPASmartClient.Helper;
using BPASmartClient.Message;
using BPASmartClient.MQTT;
using HBLConsole.Communication;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace BPASmartClient.Business
{
///
/// 消息处理接口
///
public delegate void RecivedHandle(IMessage message);
///
/// MQTT 管理类
///
public class MQTTMgr : IPlugin
{
//客户端ID
private int clientId;
//运行标识
private bool running = false;
//消息缓存
private ConcurrentQueue msg = new ConcurrentQueue();
//MQTT 代理
private MQTTProxy mqttProxy = new MQTTProxy();
//消息处理者
private List messageRecives = new List();
public void Initialize()
{
}
public void Start()
{
running = true;
//主题初始化
TopicDefine.GetInstance().Initialize(Plugin.GetInstance().GetPlugin().GetDevices());
//MQTT 连接成功
mqttProxy.Connected = new Action(() =>
{
mqttProxy.Subscrib(TopicDefine.GetInstance().SubscribTopics.ToArray());
});
//MQTT 连接成功
mqttProxy.LostConnect = new Action(() =>
{
mqttProxy.Subscrib(TopicDefine.GetInstance().SubscribTopics.ToArray());
});
//MQTT 数据接收
mqttProxy.MessageRecive = new Action((message) =>
{
msg.Enqueue(message);
});
var MqttServerConfig = Plugin.GetInstance().GetPlugin().MQTT_Config;
var MqttServerAccount = Plugin.GetInstance().GetPlugin().Mqtt_Account;
var deviceConfig = Plugin.GetInstance().GetPlugin().deviceConfigModelJsons;
string deviceId = deviceConfig[0].deviceModels[0].DeviceId;
clientId = Plugin.GetInstance().GetPlugin().ClientId;
//MQTT 初始化
mqttProxy.Connect(MqttServerAccount.UserName, MqttServerAccount.Password, MqttServerConfig.Host, MqttServerConfig.Port, "ClientId:" + clientId + "DeviceId:" + deviceId + Guid.NewGuid().ToString());
ThreadManage.GetInstance().Start(() =>
{
while (running)
{
while (msg.Count > 0 && msg.TryDequeue(out string temp))
{
if (0 == clientId)
clientId = Plugin.GetInstance().GetPlugin().ClientId;
var package = BPAPackage.Deserialize(temp);
if (package?.ClientId == clientId)
{
if (package.Message != null)
{
for (int i = messageRecives.Count - 1; i >= 0; i--)
{
messageRecives[i]?.Invoke(package.Message);
}
}
}
}
Thread.Sleep(50);
}
}, "MQTT 消息处理", true);
}
///
/// MQTT消息推送
///
/// 主题
/// 消息体
public void Publish(string topic, string message)
{
mqttProxy.Publish(topic, message);
}
///
/// 设置消息处理者
///
/// 消息处理者
public void SetMessageReciveHandler(RecivedHandle messageRecive)
{
if (messageRecives.Contains(messageRecive))
return;
messageRecives.Add(messageRecive);
}
public void Dispose()
{
running = false;
mqttProxy.CloseConnect();
messageRecives.Clear();
}
}
}