using BPA.Communication;
using BPA.Communication.Base;
using BPA.Helper;
using BPA.Message;
using BPASmartClient.JXJFoodBigStation.Model.MQTT.Message;
using BPASmartClient.JXJFoodBigStation.Model.Siemens;
using Microsoft.EntityFrameworkCore.ChangeTracking;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace BPASmartClient.JXJFoodBigStation.Model.MQTT
{
public class MqttServer
{
private volatile static MqttServer _Instance;
public static void Dis() => _Instance = null;
public static MqttServer GetInstance => _Instance ?? (_Instance = new MqttServer());
private MqttServer() { }
MqttHelper mqttHelper = new MqttHelper();
///
/// MQTT 消息
///
ConcurrentQueue msg = new ConcurrentQueue();
public Task Init(params object[] pars)
{
return Task.Factory.StartNew(() =>
{
//mqttHelper.IsVerifyNetwork = false;
mqttHelper.MessageRecive = new Action(s => { msg.Enqueue(s); });
mqttHelper.Connected = new Action(() =>
{
if (pars.Length > 0)
{
pars.ToList().ForEach(item => { mqttHelper.Subscrib(item?.ToString()); });
}
else
{
//订阅消息
}
TaskManage.GetInstance.StartLong(new Action(() =>
{
while (msg.Count > 0)
{
try
{
if (msg.TryDequeue(out string str))
{
//解析消息
}
}
catch (Exception ex)
{
MessageLog.GetInstance.ShowEx(ex.ToString());
}
}
Thread.Sleep(10);
}), "MCS工单信息接收", true);
});
mqttHelper.Connect(new ConfigurationOptions()
{
UserName= ConfigurationManager.AppSettings["MQTT_USERNAME"],
Password= ConfigurationManager.AppSettings["MQTT_PASSWORD"],
IpAddress = ConfigurationManager.AppSettings["MQTT_IPADDRESS"],
Port =Convert.ToInt32( ConfigurationManager.AppSettings["MQTT_PORT"]),
ClientId = $"MES系统业务消息,{Guid.NewGuid()}"
});
});
}
}
}