using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; /* ***********************************************  * subject 通用数据总线,采取订阅推送模式  * author 张原川  * date   2019/6/3 15:03:10  * ***********************************************/ namespace BPASmartClient.Bus.DataBus { public class DataBus_Currency : ISimpleDataBus { //接收数据缓冲 //protected CircularBuffer _dataPool = new CircularBuffer(1 * 1024 * 1024); protected ConcurrentQueue _dataPool = new ConcurrentQueue(); //订阅数据回调集合 protected List> _multDataHandlers = new List>(); protected List> _singleDataHandlers = new List>(); //订阅与推送数据信号量 protected AutoResetEvent _are = new AutoResetEvent(false); //运行标识 protected bool _running = false; public int MultSubscriberCount { get { return _multDataHandlers.Count; } } public int SingleSubscriberCount { get { return _singleDataHandlers.Count; } } public int DataCount { get { return _dataPool.Count; } } /// /// 终止事件总线 /// public void Dispose() { StopBus(); } /// /// 数据存入 /// public void Put(TData data) { if (!_running) return; if (_multDataHandlers.Count > 0 || _singleDataHandlers.Count > 0) _dataPool.Enqueue(data); } /// /// 数据存入 /// public void Put(TData[] data) { if (!_running) return; if (_multDataHandlers.Count > 0 || _singleDataHandlers.Count > 0) foreach (var item in data) _dataPool.Enqueue(item); } /// /// 开启总线 /// public void StartBus() { _running = true; List items = new List(); _are.Set(); while (_running) { int count = _dataPool.Count; items.Clear(); if (_singleDataHandlers.Count > 0) { _are.WaitOne(TimeSpan.FromMilliseconds(10)); for (int i = 0; i < count; i++) { TData data = default(TData); while (!_dataPool.TryDequeue(out data)) ; uint msgId = Convert.ToUInt32(data.GetType().GetProperty("msgid").GetValue(data)); if (msgId == 316) { } //var item = _dataPool.Get(); //Parallel.ForEach(_singleDataHandlers, p => p.Invoke(items[items.Count - 1])); for (int j = 0; j < _singleDataHandlers.Count; j++) { _singleDataHandlers[j].Invoke(data); } items.Add(data); } } if (_multDataHandlers.Count > 0) { if (items.Count <= 0) { TData data = default(TData); for (int i = 0; i < count; i++) { while (!_dataPool.TryDequeue(out data)) ; items.Add(data); } //items.AddRange(_dataPool.Get(count)); } _are.WaitOne(TimeSpan.FromMilliseconds(10)); //Parallel.For(0, _multDataHandlers.Count, (i) => //{ // _multDataHandlers[i].Invoke(items.ToArray()); //}); for (int i = _multDataHandlers.Count - 1; i >= 0; i--) { _multDataHandlers[i].Invoke(items.ToArray()); } } Thread.Sleep(10); _are.Set(); } } public void StopBus() { _running = false; } /// /// 数据订阅 /// /// 接收数据回调 public void Subscribe(Action action) { if (_singleDataHandlers.Contains(action)) return; _are.Reset(); _singleDataHandlers.Add(action); _are.Set(); } /// /// 数据订阅 /// /// 接收数据回调 public void Subscribe(Action action) { if (_multDataHandlers.Contains(action)) return; _are.Reset(); _multDataHandlers.Add(action); _are.Set(); } /// /// 取消数据订阅 /// /// 接收数据回调 public void UnSubcribe(Action action) { if (!_singleDataHandlers.Contains(action)) return; _are.Reset(); _singleDataHandlers.Remove(action); _are.Set(); } /// /// 取消数据订阅 /// /// 接收数据回调 public void UnSubcribe(Action action) { if (!_multDataHandlers.Contains(action)) return; _are.Reset(); _multDataHandlers.Remove(action); _are.Set(); } } }