Browse Source

增加心跳上报

样式分支
applelon 2 years ago
parent
commit
7671f98a50
4 changed files with 68 additions and 25 deletions
  1. +1
    -1
      BPASmartClient.Business/BPASmartClient.Business.csproj
  2. +11
    -2
      BPASmartClient.Business/Plugin/MQTTMgr.cs
  3. +25
    -17
      BPASmartClient.Business/Plugin/OrderProxy.cs
  4. +31
    -5
      BPASmartClient.Business/Plugin/StatusMgr.cs

+ 1
- 1
BPASmartClient.Business/BPASmartClient.Business.csproj View File

@@ -8,7 +8,7 @@

<ItemGroup>
<PackageReference Include="BPA.ApolloClient" Version="1.0.9" />
<PackageReference Include="BPA.Message" Version="1.0.10" />
<PackageReference Include="BPA.Message" Version="1.0.11" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="6.0.0" />
<PackageReference Include="System.Configuration.ConfigurationManager" Version="6.0.0" />


+ 11
- 2
BPASmartClient.Business/Plugin/MQTTMgr.cs View File

@@ -53,7 +53,7 @@ namespace BPASmartClient.Business
mqttProxy.Connect(Plugin.GetInstance().GetPlugin<ConfigMgr>().MQTT_Config.Host, Plugin.GetInstance().GetPlugin<ConfigMgr>().MQTT_Config.Port,
Guid.NewGuid().ToString());

ThreadManage.GetInstance().StartLong(() =>
ThreadManage.GetInstance().Start(() =>
{
while (running)
{
@@ -74,7 +74,16 @@ namespace BPASmartClient.Business
}
Thread.Sleep(50);
}
}, "MQTT 消息处理",true);
}, "MQTT 消息处理");
}

/// <summary>
/// MQTT消息推送
/// </summary>
/// <param name="topic">主题</param>
/// <param name="message">消息体</param>
public void Publish(string topic, string message) {
mqttProxy.Publish(topic, message);
}

/// <summary>


+ 25
- 17
BPASmartClient.Business/Plugin/OrderProxy.cs View File

@@ -21,7 +21,7 @@ namespace BPASmartClient.Business
public class OrderProxy : IPlugin
{
//订单队列
private ConcurrentQueue<MorkOrderPush> orders = new ConcurrentQueue<MorkOrderPush>();
private ConcurrentDictionary<int, ConcurrentQueue<MorkOrderPush>> orders = new ConcurrentDictionary<int, ConcurrentQueue<MorkOrderPush>>();
//运行标识
private bool running = false;
//设备管理
@@ -39,36 +39,44 @@ namespace BPASmartClient.Business
if (orderInfo == null) return;
if (orderInfo is MorkOrderPush morkOrderpush)
{
orders.Enqueue(morkOrderpush);
if (!orders.ContainsKey(morkOrderpush.DeviceId))
{
orders.TryAdd(morkOrderpush.DeviceId, new ConcurrentQueue<MorkOrderPush>());
StartTargetDeviceOrderJob(morkOrderpush.DeviceId);
}
orders[morkOrderpush.DeviceId].Enqueue(morkOrderpush);
}
});

EventBus.EventBus.GetInstance().Subscribe<OrderStatusChangedEvent>(0, OrderStatusChangedHandle);

ThreadManage.GetInstance().StartLong(() =>

}

private void StartTargetDeviceOrderJob(int deviceId)
{
ThreadManage.GetInstance().Start(() =>
{
var device = deviceMgr.GetDevices().FirstOrDefault(p => p.DeviceId == deviceId);
while (running)
{
while (orders.Count > 0)
if (device.IsBusy || !device.IsHealth)
{
Thread.Sleep(100);
continue;
}
while (orders[deviceId].Count > 0)
{
var temp = orders.ElementAt(0);
var device = deviceMgr.GetDevices().FirstOrDefault(p => p.DeviceId == temp.DeviceId);
if (null != device)
if (orders[deviceId].TryDequeue(out MorkOrderPush temp))
{
if (!device.IsBusy && device.IsHealth)
{
if (orders.TryDequeue(out temp))
{
var orderEvent = DoOrderEvent.Make(temp);
orderEvent.Id = device.DeviceId;
orderEvent.Publish();
}
}
var orderEvent = DoOrderEvent.Make(temp);
orderEvent.Id = device.DeviceId;
orderEvent.Publish();
}
}
Thread.Sleep(50);
}
}, "MQTT 消息处理");
}, $"MQTT 订单接收处理-设备[{deviceId}]");
}

public void OrderStatusChangedHandle(IEvent @event, EventCallBackHandle callBack)


+ 31
- 5
BPASmartClient.Business/Plugin/StatusMgr.cs View File

@@ -1,4 +1,5 @@
using BPASmartClient.Helper;
using BPA.Message;
using BPASmartClient.Helper;
using System;
using System.Collections.Generic;
using System.Linq;
@@ -16,23 +17,48 @@ namespace BPASmartClient.Business
private bool running = false;
//设备管理
private DeviceMgr deviceMgr;
private MQTTMgr mqttMgr;
DeviceStatus deviceStatus = new DeviceStatus();
private bool wholeDeviceHealth;

private Dictionary<int, Dictionary<string, object>> deviceStatus = new Dictionary<int, Dictionary<string, object>>();

private Dictionary<int, Dictionary<string, object>> wholeDeviceStatus = new Dictionary<int, Dictionary<string, object>>();

public void Initialize()
{
deviceMgr = Plugin.GetInstance().GetPlugin<DeviceMgr>();
ThreadManage.GetInstance().StartLong(() =>
mqttMgr = Plugin.GetInstance().GetPlugin<MQTTMgr>();

ThreadManage.GetInstance().Start(() =>
{
while (running)
{
wholeDeviceHealth = true;
foreach (var device in deviceMgr.GetDevices())
{
deviceStatus[device.DeviceId] = device.Status.GetStatus();
wholeDeviceStatus[device.DeviceId] = device.Status.GetStatus();
}
Thread.Sleep(50);
}
}, "MQTT 消息处理");
}, "设备状态收集");

deviceStatus.BatchingInfo = new List<BPA.Models.BatchingInfo>();

ThreadManage.GetInstance().Start(() =>
{
while (running)
{
wholeDeviceHealth = true;
foreach (var device in deviceMgr.GetDevices())
{
wholeDeviceStatus[device.DeviceId] = device.Status.GetStatus();
deviceStatus.Healthy = device.IsHealth ? BPA.Message.Enum.DeviceHealthy.Health : BPA.Message.Enum.DeviceHealthy.UnHealth;
var msg=BPAPackage.Make(deviceStatus, device.DeviceId, device.DeviceType);
mqttMgr.Publish(TOPIC.GetInstance.GetHeatbeatTopic(device.DeviceType), msg.Serialize());
}
Thread.Sleep(1000);
}
}, "设备心跳上报");
}

public void Dispose()


Loading…
Cancel
Save