终端一体化运控平台
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

121 lines
4.5 KiB

  1. using Aop.Api.Domain;
  2. using BPA.Communication;
  3. using BPA.Communication.Base;
  4. using BPA.Helper;
  5. using BPASmartClient.DosingSystem.Model.Dto;
  6. using BPASmartClient.DosingSystem.Model.MQTT.Messages;
  7. using BPASmartClient.DosingSystem.Service;
  8. using BPASmartClient.Model;
  9. using Microsoft.EntityFrameworkCore.Infrastructure;
  10. using System;
  11. using System.Collections.Concurrent;
  12. using System.Collections.Generic;
  13. using System.Collections.Specialized;
  14. using System.Linq;
  15. using System.Text;
  16. using System.Threading;
  17. using System.Threading.Tasks;
  18. namespace BPASmartClient.DosingSystem.Model.MQTT
  19. {
  20. public class MqttServer : IServer
  21. {
  22. private volatile static MqttServer _Instance;
  23. public static MqttServer GetInstance => _Instance ?? (_Instance = new MqttServer());
  24. private MqttServer() { }
  25. /// <summary>
  26. /// MQTT 消息
  27. /// </summary>
  28. ConcurrentQueue<string> msg= new ConcurrentQueue<string>();
  29. MqttHelper mqttHelper = new MqttHelper();
  30. NameValueCollection mqttParam;
  31. /// <summary>
  32. /// 工单信息
  33. /// </summary>
  34. public ConcurrentQueue<WorkOrderPusht> WorkOrders { get; set; } = new ConcurrentQueue<WorkOrderPusht>();
  35. public Task Init(params object[] pars)
  36. {
  37. return Task.Factory.StartNew(() =>
  38. {
  39. mqttParam = (NameValueCollection)System.Configuration.ConfigurationManager.GetSection("FlexBatchSystem/MQTTParam");
  40. //mqttHelper.IsVerifyNetwork = false;
  41. mqttHelper.MessageRecive = new Action<string>(
  42. s => {
  43. msg.Enqueue(s); });
  44. mqttHelper.Connected = new Action(() =>
  45. {
  46. if (pars.Length > 0)
  47. {
  48. pars.ToList().ForEach(item => { mqttHelper.Subscrib(item?.ToString()); });
  49. }
  50. else
  51. {
  52. //mqttHelper.Subscrib(Topics.MakeDown);
  53. //mqttHelper.Subscrib(Topics.StatusChange);
  54. mqttHelper.Subscrib(Topics.WorkOrderDown);
  55. //mqttHelper.Subscrib(Topics.DataActualTimeUpdate);
  56. }
  57. TaskManage.GetInstance.StartLong(new Action(() =>
  58. {
  59. while (msg.Count > 0)
  60. {
  61. try
  62. {
  63. if (msg.TryDequeue(out string str))
  64. {
  65. var res = str.Deserialize();
  66. if (res?.Message is WorkOrderPublishOutput wod)
  67. WorkOrderPublish(wod);
  68. }
  69. }
  70. catch (Exception ex)
  71. {
  72. MessageLog.GetInstance.ShowEx(ex.ToString());
  73. }
  74. }
  75. Thread.Sleep(10);
  76. }), "MCS工单信息接收", true);
  77. });
  78. //mqttHelper.Connect(Json<ConnectConfig>.Data.MqttUser, Json<ConnectConfig>.Data.MqttPas, Json<ConnectConfig>.Data.MqttIp, Json<ConnectConfig>.Data.MqttPort, $"WCS系统,{Guid.NewGuid()}");
  79. mqttHelper.Connect(new ConfigurationOptions()
  80. {
  81. UserName = mqttParam?.Get("UserName")?? "admin",
  82. Password = mqttParam?.Get("Password")?? "admin8765490789",
  83. IpAddress =mqttParam?.Get("IpAddress")?? "111.9.47.105",
  84. Port =int.TryParse( mqttParam?.Get("Port"),out int port)?port:18883,
  85. ClientId = $"WCS系统业务消息,{Guid.NewGuid()}"
  86. });
  87. });
  88. }
  89. private void WorkOrderPublish(WorkOrderPublishOutput wod)
  90. {
  91. Task.Factory.StartNew(new Action(() =>
  92. {
  93. var x = ServerFactory.GetInstance.Get().GetWorkOrder(wod.WorkOrderId);
  94. if (x.IsSuccess && x.Content.Count > 0)
  95. {
  96. var wop = x.Content.ElementAt(0);
  97. wop.ProductList = wop.ProductList.OrderBy(p => p.Name).ToList();
  98. WorkOrders.Enqueue(wop);
  99. }
  100. }));
  101. }
  102. public void Dispose()
  103. {
  104. mqttHelper.Dispose();
  105. }
  106. public static void Dis() => _Instance = null;
  107. }
  108. }