using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; /* ***********************************************  * subject Byte类型数据总线,在订阅推送模式基础上 * 增加主动获取  * author 张原川  * date   2019/6/3 14:44:36  * ***********************************************/ namespace BPASmartClient.Bus.DataBus { public class DataBus_Byte : DataBus_Currency, IGivenDataBus { //接收数据缓冲(用以Get) //protected CircularBuffer _givenDataPool = new CircularBuffer(1 * 1024 * 1024); protected ConcurrentQueue _givenDataPool = new ConcurrentQueue(); public new int DataCount { get { return _givenDataPool.Count; } } /// /// 重写Put方法,加入givenDataPool /// public new void Put(byte data) { if (!_running) return; if (_multDataHandlers.Count > 0 || _singleDataHandlers.Count > 0) _dataPool.Enqueue(data); _givenDataPool.Enqueue(data); } /// /// 重写Put方法,加入givenDataPool /// public new void Put(byte[] data) { if (!_running) return; if (_multDataHandlers.Count > 0 || _singleDataHandlers.Count > 0) foreach (var item in data) _dataPool.Enqueue(item); foreach (var item in data) _givenDataPool.Enqueue(item); } /// /// 数据取出 /// public byte Get() { agin: if (_givenDataPool.Count <= 0) { Thread.Sleep(5); goto agin; } byte res; while (!_givenDataPool.TryDequeue(out res)) ; return res; } /// /// 数据取出 /// public byte[] Get(int count) { agin: if (_givenDataPool.Count < count) { Thread.Sleep(5); goto agin; } //Console.WriteLine(_givenDataPool.Size + "===========" + _dataPool.Size); //for (int i = 0; i < count; i++) { // _givenDataPool.TryDequeue //} int i = 0; byte[] result = new byte[count]; while (i < count) { if (_givenDataPool.TryDequeue(out result[i])) { i++; } } return result;// _givenDataPool.Get(count); } } }