终端一体化运控平台
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

191 lines
6.0 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. /* ***********************************************
  9.  * subject 通用数据总线,采取订阅推送模式
  10.  * author 张原川
  11.  * date   2019/6/3 15:03:10
  12.  * ***********************************************/
  13. namespace BPASmartClient.Bus.DataBus
  14. {
  15. public class DataBus_Currency<TData> : ISimpleDataBus<TData>
  16. {
  17. //接收数据缓冲
  18. //protected CircularBuffer<TData> _dataPool = new CircularBuffer<TData>(1 * 1024 * 1024);
  19. protected ConcurrentQueue<TData> _dataPool = new ConcurrentQueue<TData>();
  20. //订阅数据回调集合
  21. protected List<Action<TData[]>> _multDataHandlers = new List<Action<TData[]>>();
  22. protected List<Action<TData>> _singleDataHandlers = new List<Action<TData>>();
  23. //订阅与推送数据信号量
  24. protected AutoResetEvent _are = new AutoResetEvent(false);
  25. //运行标识
  26. protected bool _running = false;
  27. public int MultSubscriberCount { get { return _multDataHandlers.Count; } }
  28. public int SingleSubscriberCount { get { return _singleDataHandlers.Count; } }
  29. public int DataCount { get { return _dataPool.Count; } }
  30. /// <summary>
  31. /// 终止事件总线
  32. /// </summary>
  33. public void Dispose()
  34. {
  35. StopBus();
  36. }
  37. /// <summary>
  38. /// 数据存入
  39. /// </summary>
  40. public void Put(TData data)
  41. {
  42. if (!_running)
  43. return;
  44. if (_multDataHandlers.Count > 0 || _singleDataHandlers.Count > 0)
  45. _dataPool.Enqueue(data);
  46. }
  47. /// <summary>
  48. /// 数据存入
  49. /// </summary>
  50. public void Put(TData[] data)
  51. {
  52. if (!_running)
  53. return;
  54. if (_multDataHandlers.Count > 0 || _singleDataHandlers.Count > 0)
  55. foreach (var item in data)
  56. _dataPool.Enqueue(item);
  57. }
  58. /// <summary>
  59. /// 开启总线
  60. /// </summary>
  61. public void StartBus()
  62. {
  63. _running = true;
  64. List<TData> items = new List<TData>();
  65. _are.Set();
  66. while (_running)
  67. {
  68. int count = _dataPool.Count;
  69. items.Clear();
  70. if (_singleDataHandlers.Count > 0)
  71. {
  72. _are.WaitOne(TimeSpan.FromMilliseconds(10));
  73. for (int i = 0; i < count; i++)
  74. {
  75. TData data = default(TData);
  76. while (!_dataPool.TryDequeue(out data)) ;
  77. uint msgId = Convert.ToUInt32(data.GetType().GetProperty("msgid").GetValue(data));
  78. if (msgId == 316)
  79. {
  80. }
  81. //var item = _dataPool.Get();
  82. //Parallel.ForEach(_singleDataHandlers, p => p.Invoke(items[items.Count - 1]));
  83. for (int j = 0; j < _singleDataHandlers.Count; j++)
  84. {
  85. _singleDataHandlers[j].Invoke(data);
  86. }
  87. items.Add(data);
  88. }
  89. }
  90. if (_multDataHandlers.Count > 0)
  91. {
  92. if (items.Count <= 0)
  93. {
  94. TData data = default(TData);
  95. for (int i = 0; i < count; i++)
  96. {
  97. while (!_dataPool.TryDequeue(out data)) ;
  98. items.Add(data);
  99. }
  100. //items.AddRange(_dataPool.Get(count));
  101. }
  102. _are.WaitOne(TimeSpan.FromMilliseconds(10));
  103. //Parallel.For(0, _multDataHandlers.Count, (i) =>
  104. //{
  105. // _multDataHandlers[i].Invoke(items.ToArray());
  106. //});
  107. for (int i = _multDataHandlers.Count - 1; i >= 0; i--)
  108. {
  109. _multDataHandlers[i].Invoke(items.ToArray());
  110. }
  111. }
  112. Thread.Sleep(10);
  113. _are.Set();
  114. }
  115. }
  116. public void StopBus()
  117. {
  118. _running = false;
  119. }
  120. /// <summary>
  121. /// 数据订阅
  122. /// </summary>
  123. /// <param name="action">接收数据回调</param>
  124. public void Subscribe(Action<TData> action)
  125. {
  126. if (_singleDataHandlers.Contains(action))
  127. return;
  128. _are.Reset();
  129. _singleDataHandlers.Add(action);
  130. _are.Set();
  131. }
  132. /// <summary>
  133. /// 数据订阅
  134. /// </summary>
  135. /// <param name="action">接收数据回调</param>
  136. public void Subscribe(Action<TData[]> action)
  137. {
  138. if (_multDataHandlers.Contains(action))
  139. return;
  140. _are.Reset();
  141. _multDataHandlers.Add(action);
  142. _are.Set();
  143. }
  144. /// <summary>
  145. /// 取消数据订阅
  146. /// </summary>
  147. /// <param name="action">接收数据回调</param>
  148. public void UnSubcribe(Action<TData> action)
  149. {
  150. if (!_singleDataHandlers.Contains(action))
  151. return;
  152. _are.Reset();
  153. _singleDataHandlers.Remove(action);
  154. _are.Set();
  155. }
  156. /// <summary>
  157. /// 取消数据订阅
  158. /// </summary>
  159. /// <param name="action">接收数据回调</param>
  160. public void UnSubcribe(Action<TData[]> action)
  161. {
  162. if (!_multDataHandlers.Contains(action))
  163. return;
  164. _are.Reset();
  165. _multDataHandlers.Remove(action);
  166. _are.Set();
  167. }
  168. }
  169. }