Autore | SHA1 | Messaggio | Data |
---|---|---|---|
applelon | 794540881c | Merge branch 'master' of http://10.2.1.24:10244/applelon/BPASmartClient | 2 anni fa |
applelon | 7671f98a50 | 增加心跳上报 | 2 anni fa |
@@ -8,7 +8,7 @@ | |||
<ItemGroup> | |||
<PackageReference Include="BPA.ApolloClient" Version="1.0.9" /> | |||
<PackageReference Include="BPA.Message" Version="1.0.10" /> | |||
<PackageReference Include="BPA.Message" Version="1.0.11" /> | |||
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.1" /> | |||
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="6.0.0" /> | |||
<PackageReference Include="System.Configuration.ConfigurationManager" Version="6.0.0" /> | |||
@@ -53,7 +53,7 @@ namespace BPASmartClient.Business | |||
mqttProxy.Connect(Plugin.GetInstance().GetPlugin<ConfigMgr>().MQTT_Config.Host, Plugin.GetInstance().GetPlugin<ConfigMgr>().MQTT_Config.Port, | |||
Guid.NewGuid().ToString()); | |||
ThreadManage.GetInstance().StartLong(() => | |||
ThreadManage.GetInstance().Start(() => | |||
{ | |||
while (running) | |||
{ | |||
@@ -74,7 +74,16 @@ namespace BPASmartClient.Business | |||
} | |||
Thread.Sleep(50); | |||
} | |||
}, "MQTT 消息处理",true); | |||
}, "MQTT 消息处理"); | |||
} | |||
/// <summary> | |||
/// MQTT消息推送 | |||
/// </summary> | |||
/// <param name="topic">主题</param> | |||
/// <param name="message">消息体</param> | |||
public void Publish(string topic, string message) { | |||
mqttProxy.Publish(topic, message); | |||
} | |||
/// <summary> | |||
@@ -21,7 +21,7 @@ namespace BPASmartClient.Business | |||
public class OrderProxy : IPlugin | |||
{ | |||
//订单队列 | |||
private ConcurrentQueue<MorkOrderPush> orders = new ConcurrentQueue<MorkOrderPush>(); | |||
private ConcurrentDictionary<int, ConcurrentQueue<MorkOrderPush>> orders = new ConcurrentDictionary<int, ConcurrentQueue<MorkOrderPush>>(); | |||
//运行标识 | |||
private bool running = false; | |||
//设备管理 | |||
@@ -39,36 +39,44 @@ namespace BPASmartClient.Business | |||
if (orderInfo == null) return; | |||
if (orderInfo is MorkOrderPush morkOrderpush) | |||
{ | |||
orders.Enqueue(morkOrderpush); | |||
if (!orders.ContainsKey(morkOrderpush.DeviceId)) | |||
{ | |||
orders.TryAdd(morkOrderpush.DeviceId, new ConcurrentQueue<MorkOrderPush>()); | |||
StartTargetDeviceOrderJob(morkOrderpush.DeviceId); | |||
} | |||
orders[morkOrderpush.DeviceId].Enqueue(morkOrderpush); | |||
} | |||
}); | |||
EventBus.EventBus.GetInstance().Subscribe<OrderStatusChangedEvent>(0, OrderStatusChangedHandle); | |||
ThreadManage.GetInstance().StartLong(() => | |||
} | |||
private void StartTargetDeviceOrderJob(int deviceId) | |||
{ | |||
ThreadManage.GetInstance().Start(() => | |||
{ | |||
var device = deviceMgr.GetDevices().FirstOrDefault(p => p.DeviceId == deviceId); | |||
while (running) | |||
{ | |||
while (orders.Count > 0) | |||
if (device.IsBusy || !device.IsHealth) | |||
{ | |||
Thread.Sleep(100); | |||
continue; | |||
} | |||
while (orders[deviceId].Count > 0) | |||
{ | |||
var temp = orders.ElementAt(0); | |||
var device = deviceMgr.GetDevices().FirstOrDefault(p => p.DeviceId == temp.DeviceId); | |||
if (null != device) | |||
if (orders[deviceId].TryDequeue(out MorkOrderPush temp)) | |||
{ | |||
if (!device.IsBusy && device.IsHealth) | |||
{ | |||
if (orders.TryDequeue(out temp)) | |||
{ | |||
var orderEvent = DoOrderEvent.Make(temp); | |||
orderEvent.Id = device.DeviceId; | |||
orderEvent.Publish(); | |||
} | |||
} | |||
var orderEvent = DoOrderEvent.Make(temp); | |||
orderEvent.Id = device.DeviceId; | |||
orderEvent.Publish(); | |||
} | |||
} | |||
Thread.Sleep(50); | |||
} | |||
}, "MQTT 消息处理"); | |||
}, $"MQTT 订单接收处理-设备[{deviceId}]"); | |||
} | |||
public void OrderStatusChangedHandle(IEvent @event, EventCallBackHandle callBack) | |||
@@ -1,4 +1,5 @@ | |||
using BPASmartClient.Helper; | |||
using BPA.Message; | |||
using BPASmartClient.Helper; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
@@ -16,23 +17,48 @@ namespace BPASmartClient.Business | |||
private bool running = false; | |||
//设备管理 | |||
private DeviceMgr deviceMgr; | |||
private MQTTMgr mqttMgr; | |||
DeviceStatus deviceStatus = new DeviceStatus(); | |||
private bool wholeDeviceHealth; | |||
private Dictionary<int, Dictionary<string, object>> deviceStatus = new Dictionary<int, Dictionary<string, object>>(); | |||
private Dictionary<int, Dictionary<string, object>> wholeDeviceStatus = new Dictionary<int, Dictionary<string, object>>(); | |||
public void Initialize() | |||
{ | |||
deviceMgr = Plugin.GetInstance().GetPlugin<DeviceMgr>(); | |||
ThreadManage.GetInstance().StartLong(() => | |||
mqttMgr = Plugin.GetInstance().GetPlugin<MQTTMgr>(); | |||
ThreadManage.GetInstance().Start(() => | |||
{ | |||
while (running) | |||
{ | |||
wholeDeviceHealth = true; | |||
foreach (var device in deviceMgr.GetDevices()) | |||
{ | |||
deviceStatus[device.DeviceId] = device.Status.GetStatus(); | |||
wholeDeviceStatus[device.DeviceId] = device.Status.GetStatus(); | |||
} | |||
Thread.Sleep(50); | |||
} | |||
}, "MQTT 消息处理"); | |||
}, "设备状态收集"); | |||
deviceStatus.BatchingInfo = new List<BPA.Models.BatchingInfo>(); | |||
ThreadManage.GetInstance().Start(() => | |||
{ | |||
while (running) | |||
{ | |||
wholeDeviceHealth = true; | |||
foreach (var device in deviceMgr.GetDevices()) | |||
{ | |||
wholeDeviceStatus[device.DeviceId] = device.Status.GetStatus(); | |||
deviceStatus.Healthy = device.IsHealth ? BPA.Message.Enum.DeviceHealthy.Health : BPA.Message.Enum.DeviceHealthy.UnHealth; | |||
var msg=BPAPackage.Make(deviceStatus, device.DeviceId, device.DeviceType); | |||
mqttMgr.Publish(TOPIC.GetInstance.GetHeatbeatTopic(device.DeviceType), msg.Serialize()); | |||
} | |||
Thread.Sleep(1000); | |||
} | |||
}, "设备心跳上报"); | |||
} | |||
public void Dispose() | |||