diff --git a/BPASmartClient.Business/BPASmartClient.Business.csproj b/BPASmartClient.Business/BPASmartClient.Business.csproj index d1f2ff3b..2c62ade2 100644 --- a/BPASmartClient.Business/BPASmartClient.Business.csproj +++ b/BPASmartClient.Business/BPASmartClient.Business.csproj @@ -8,7 +8,7 @@ - + diff --git a/BPASmartClient.Business/Plugin/MQTTMgr.cs b/BPASmartClient.Business/Plugin/MQTTMgr.cs index fd61a0bb..1f507263 100644 --- a/BPASmartClient.Business/Plugin/MQTTMgr.cs +++ b/BPASmartClient.Business/Plugin/MQTTMgr.cs @@ -53,7 +53,7 @@ namespace BPASmartClient.Business mqttProxy.Connect(Plugin.GetInstance().GetPlugin().MQTT_Config.Host, Plugin.GetInstance().GetPlugin().MQTT_Config.Port, Guid.NewGuid().ToString()); - ThreadManage.GetInstance().StartLong(() => + ThreadManage.GetInstance().Start(() => { while (running) { @@ -74,7 +74,16 @@ namespace BPASmartClient.Business } Thread.Sleep(50); } - }, "MQTT 消息处理",true); + }, "MQTT 消息处理"); + } + + /// + /// MQTT消息推送 + /// + /// 主题 + /// 消息体 + public void Publish(string topic, string message) { + mqttProxy.Publish(topic, message); } /// diff --git a/BPASmartClient.Business/Plugin/OrderProxy.cs b/BPASmartClient.Business/Plugin/OrderProxy.cs index 05d9287e..34b6cbee 100644 --- a/BPASmartClient.Business/Plugin/OrderProxy.cs +++ b/BPASmartClient.Business/Plugin/OrderProxy.cs @@ -21,7 +21,7 @@ namespace BPASmartClient.Business public class OrderProxy : IPlugin { //订单队列 - private ConcurrentQueue orders = new ConcurrentQueue(); + private ConcurrentDictionary> orders = new ConcurrentDictionary>(); //运行标识 private bool running = false; //设备管理 @@ -39,36 +39,44 @@ namespace BPASmartClient.Business if (orderInfo == null) return; if (orderInfo is MorkOrderPush morkOrderpush) { - orders.Enqueue(morkOrderpush); + if (!orders.ContainsKey(morkOrderpush.DeviceId)) + { + orders.TryAdd(morkOrderpush.DeviceId, new ConcurrentQueue()); + StartTargetDeviceOrderJob(morkOrderpush.DeviceId); + } + orders[morkOrderpush.DeviceId].Enqueue(morkOrderpush); } }); EventBus.EventBus.GetInstance().Subscribe(0, OrderStatusChangedHandle); - ThreadManage.GetInstance().StartLong(() => + + } + + private void StartTargetDeviceOrderJob(int deviceId) + { + ThreadManage.GetInstance().Start(() => { + var device = deviceMgr.GetDevices().FirstOrDefault(p => p.DeviceId == deviceId); while (running) { - while (orders.Count > 0) + if (device.IsBusy || !device.IsHealth) + { + Thread.Sleep(100); + continue; + } + while (orders[deviceId].Count > 0) { - var temp = orders.ElementAt(0); - var device = deviceMgr.GetDevices().FirstOrDefault(p => p.DeviceId == temp.DeviceId); - if (null != device) + if (orders[deviceId].TryDequeue(out MorkOrderPush temp)) { - if (!device.IsBusy && device.IsHealth) - { - if (orders.TryDequeue(out temp)) - { - var orderEvent = DoOrderEvent.Make(temp); - orderEvent.Id = device.DeviceId; - orderEvent.Publish(); - } - } + var orderEvent = DoOrderEvent.Make(temp); + orderEvent.Id = device.DeviceId; + orderEvent.Publish(); } } Thread.Sleep(50); } - }, "MQTT 消息处理"); + }, $"MQTT 订单接收处理-设备[{deviceId}]"); } public void OrderStatusChangedHandle(IEvent @event, EventCallBackHandle callBack) diff --git a/BPASmartClient.Business/Plugin/StatusMgr.cs b/BPASmartClient.Business/Plugin/StatusMgr.cs index e3ee4cea..d6902bdf 100644 --- a/BPASmartClient.Business/Plugin/StatusMgr.cs +++ b/BPASmartClient.Business/Plugin/StatusMgr.cs @@ -1,4 +1,5 @@ -using BPASmartClient.Helper; +using BPA.Message; +using BPASmartClient.Helper; using System; using System.Collections.Generic; using System.Linq; @@ -16,23 +17,48 @@ namespace BPASmartClient.Business private bool running = false; //设备管理 private DeviceMgr deviceMgr; + private MQTTMgr mqttMgr; + DeviceStatus deviceStatus = new DeviceStatus(); + private bool wholeDeviceHealth; - private Dictionary> deviceStatus = new Dictionary>(); + + private Dictionary> wholeDeviceStatus = new Dictionary>(); public void Initialize() { deviceMgr = Plugin.GetInstance().GetPlugin(); - ThreadManage.GetInstance().StartLong(() => + mqttMgr = Plugin.GetInstance().GetPlugin(); + + ThreadManage.GetInstance().Start(() => { while (running) { + wholeDeviceHealth = true; foreach (var device in deviceMgr.GetDevices()) { - deviceStatus[device.DeviceId] = device.Status.GetStatus(); + wholeDeviceStatus[device.DeviceId] = device.Status.GetStatus(); } Thread.Sleep(50); } - }, "MQTT 消息处理"); + }, "设备状态收集"); + + deviceStatus.BatchingInfo = new List(); + + ThreadManage.GetInstance().Start(() => + { + while (running) + { + wholeDeviceHealth = true; + foreach (var device in deviceMgr.GetDevices()) + { + wholeDeviceStatus[device.DeviceId] = device.Status.GetStatus(); + deviceStatus.Healthy = device.IsHealth ? BPA.Message.Enum.DeviceHealthy.Health : BPA.Message.Enum.DeviceHealthy.UnHealth; + var msg=BPAPackage.Make(deviceStatus, device.DeviceId, device.DeviceType); + mqttMgr.Publish(TOPIC.GetInstance.GetHeatbeatTopic(device.DeviceType), msg.Serialize()); + } + Thread.Sleep(1000); + } + }, "设备心跳上报"); } public void Dispose()