using Aop.Api.Domain;
using BPA.Communication;
using BPA.Communication.Base;
using BPA.Helper;
using BPASmartClient.DosingSystem.Model.Dto;
using BPASmartClient.DosingSystem.Model.MQTT.Messages;
using BPASmartClient.DosingSystem.Service;
using BPASmartClient.Model;
using Microsoft.EntityFrameworkCore.Infrastructure;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace BPASmartClient.DosingSystem.Model.MQTT
{
public class MqttServer : IServer
{
private volatile static MqttServer _Instance;
public static MqttServer GetInstance => _Instance ?? (_Instance = new MqttServer());
private MqttServer() { }
///
/// MQTT 消息
///
ConcurrentQueue msg= new ConcurrentQueue();
MqttHelper mqttHelper = new MqttHelper();
NameValueCollection mqttParam;
///
/// 工单信息
///
public ConcurrentQueue WorkOrders { get; set; } = new ConcurrentQueue();
public Task Init(params object[] pars)
{
return Task.Factory.StartNew(() =>
{
mqttParam = (NameValueCollection)System.Configuration.ConfigurationManager.GetSection("FlexBatchSystem/MQTTParam");
//mqttHelper.IsVerifyNetwork = false;
mqttHelper.MessageRecive = new Action(
s => {
msg.Enqueue(s); });
mqttHelper.Connected = new Action(() =>
{
if (pars.Length > 0)
{
pars.ToList().ForEach(item => { mqttHelper.Subscrib(item?.ToString()); });
}
else
{
//mqttHelper.Subscrib(Topics.MakeDown);
//mqttHelper.Subscrib(Topics.StatusChange);
mqttHelper.Subscrib(Topics.WorkOrderDown);
//mqttHelper.Subscrib(Topics.DataActualTimeUpdate);
}
TaskManage.GetInstance.StartLong(new Action(() =>
{
while (msg.Count > 0)
{
try
{
if (msg.TryDequeue(out string str))
{
var res = str.Deserialize();
if (res?.Message is WorkOrderPublishOutput wod)
WorkOrderPublish(wod);
}
}
catch (Exception ex)
{
MessageLog.GetInstance.ShowEx(ex.ToString());
}
}
Thread.Sleep(10);
}), "MCS工单信息接收", true);
});
//mqttHelper.Connect(Json.Data.MqttUser, Json.Data.MqttPas, Json.Data.MqttIp, Json.Data.MqttPort, $"WCS系统,{Guid.NewGuid()}");
mqttHelper.Connect(new ConfigurationOptions()
{
UserName = mqttParam?.Get("UserName")?? "admin",
Password = mqttParam?.Get("Password")?? "admin8765490789",
IpAddress =mqttParam?.Get("IpAddress")?? "111.9.47.105",
Port =int.TryParse( mqttParam?.Get("Port"),out int port)?port:18883,
ClientId = $"WCS系统业务消息,{Guid.NewGuid()}"
});
});
}
private void WorkOrderPublish(WorkOrderPublishOutput wod)
{
Task.Factory.StartNew(new Action(() =>
{
var x = ServerFactory.GetInstance.Get().GetWorkOrder(wod.WorkOrderId);
if (x.IsSuccess && x.Content.Count > 0)
{
var wop = x.Content.ElementAt(0);
wop.ProductList = wop.ProductList.OrderBy(p => p.Name).ToList();
WorkOrders.Enqueue(wop);
}
}));
}
public void Dispose()
{
mqttHelper.Dispose();
}
public static void Dis() => _Instance = null;
}
}