终端一体化运控平台
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

99 lines
2.8 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. /* ***********************************************
  9.  * subject Byte类型数据总线,在订阅推送模式基础上
  10. * 增加主动获取
  11.  * author 张原川
  12.  * date   2019/6/3 14:44:36
  13.  * ***********************************************/
  14. namespace BPASmartClient.Bus.DataBus
  15. {
  16. public class DataBus_Byte : DataBus_Currency<byte>, IGivenDataBus<byte>
  17. {
  18. //接收数据缓冲(用以Get)
  19. //protected CircularBuffer<byte> _givenDataPool = new CircularBuffer<byte>(1 * 1024 * 1024);
  20. protected ConcurrentQueue<byte> _givenDataPool = new ConcurrentQueue<byte>();
  21. public new int DataCount { get { return _givenDataPool.Count; } }
  22. /// <summary>
  23. /// 重写Put方法,加入givenDataPool
  24. /// </summary>
  25. public new void Put(byte data)
  26. {
  27. if (!_running)
  28. return;
  29. if (_multDataHandlers.Count > 0 || _singleDataHandlers.Count > 0)
  30. _dataPool.Enqueue(data);
  31. _givenDataPool.Enqueue(data);
  32. }
  33. /// <summary>
  34. /// 重写Put方法,加入givenDataPool
  35. /// </summary>
  36. public new void Put(byte[] data)
  37. {
  38. if (!_running)
  39. return;
  40. if (_multDataHandlers.Count > 0 || _singleDataHandlers.Count > 0)
  41. foreach (var item in data)
  42. _dataPool.Enqueue(item);
  43. foreach (var item in data)
  44. _givenDataPool.Enqueue(item);
  45. }
  46. /// <summary>
  47. /// 数据取出
  48. /// </summary>
  49. public byte Get()
  50. {
  51. agin:
  52. if (_givenDataPool.Count <= 0)
  53. {
  54. Thread.Sleep(5);
  55. goto agin;
  56. }
  57. byte res;
  58. while (!_givenDataPool.TryDequeue(out res)) ;
  59. return res;
  60. }
  61. /// <summary>
  62. /// 数据取出
  63. /// </summary>
  64. public byte[] Get(int count)
  65. {
  66. agin:
  67. if (_givenDataPool.Count < count)
  68. {
  69. Thread.Sleep(5);
  70. goto agin;
  71. }
  72. //Console.WriteLine(_givenDataPool.Size + "===========" + _dataPool.Size);
  73. //for (int i = 0; i < count; i++) {
  74. // _givenDataPool.TryDequeue
  75. //}
  76. int i = 0;
  77. byte[] result = new byte[count];
  78. while (i < count)
  79. {
  80. if (_givenDataPool.TryDequeue(out result[i]))
  81. {
  82. i++;
  83. }
  84. }
  85. return result;// _givenDataPool.Get(count);
  86. }
  87. }
  88. }