diff --git a/src/Cap.Consistency/EventBus/BackgroundWorker.cs b/src/Cap.Consistency/EventBus/BackgroundWorker.cs deleted file mode 100644 index 08a8a1d..0000000 --- a/src/Cap.Consistency/EventBus/BackgroundWorker.cs +++ /dev/null @@ -1,107 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; - -namespace Cap.Consistency.EventBus -{ - public abstract class BackgroundWorker - : IBackgroundWorker, IDisposable - { - protected readonly ILogger _logger; - -#if FEATURE_THREAD - protected Thread _dispatchThread; -#else - protected Task _dispatchThread; -#endif - protected CancellationTokenSource _cancellationTokenSource; - - public virtual bool IsRunning { - get { - return this._dispatchThread != null && -#if FEATURE_THREAD - this._dispatchThread.ThreadState == ThreadState.Running; -#else - this._dispatchThread.Status == TaskStatus.Running; -#endif - } - } - - protected BackgroundWorker(ILoggerFactory loggerFactory) { - this._logger = loggerFactory.CreateLogger(this.GetType().FullName); - } - - public virtual void Start() { - this.Start(false); - } - - public virtual void Start(bool force) { - if (!force) { - if (this.IsRunning) { - return; - } - } - this._cancellationTokenSource = new CancellationTokenSource(); -#if !FEATURE_THREAD - this._dispatchThread = this.ThreadWorker(this._cancellationTokenSource.Token); -#else - this._dispatchThread = new Thread((userObject) => - { - this.ThreadWorker(userObject).GetAwaiter().GetResult(); - }) - { - IsBackground = true, - Name = $"{this.GetType().Name}-Thread-{Guid.NewGuid().ToString()}" - }; - this._dispatchThread.Start(this._cancellationTokenSource.Token); -#endif - } - - public virtual void Stop(int timeout = 2000) { - Task.WaitAny(Task.Run(() => { - this._cancellationTokenSource.Cancel(); - while (this.IsRunning) { - Task.Delay(500).GetAwaiter().GetResult(); - } - }), Task.Delay(timeout)); - } - - protected virtual async Task ThreadWorker(object userObject) { - this._logger.LogInformation($"Background worker {this.GetType().FullName} has been started."); - var token = (CancellationToken)userObject; - while (!token.IsCancellationRequested && await this.Process()) { - } - this._logger.LogInformation($"Background worker {this.GetType().FullName} has been stopped."); - } - - protected abstract Task Process(); - - #region IDisposable - - // Flag: Has Dispose already been called? - private bool disposed = false; - - // Public implementation of Dispose pattern callable by consumers. - public void Dispose() { - Dispose(true); - GC.SuppressFinalize(this); - } - - // Protected implementation of Dispose pattern. - protected virtual void Dispose(bool disposing) { - if (disposed) - return; - - if (disposing) { - // Free any other managed objects here. - this.Stop(); - } - - // Free any unmanaged objects here. - disposed = true; - } - - #endregion IDisposable - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/EventBus/BrokeredMessage.cs b/src/Cap.Consistency/EventBus/BrokeredMessage.cs deleted file mode 100644 index 642a228..0000000 --- a/src/Cap.Consistency/EventBus/BrokeredMessage.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace Cap.Consistency.EventBus -{ - public class BrokeredMessage - { - public byte[] Body { get; set; } - - public string Type { get; set; } - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/EventBus/EventBusBase.cs b/src/Cap.Consistency/EventBus/EventBusBase.cs deleted file mode 100644 index 4c41053..0000000 --- a/src/Cap.Consistency/EventBus/EventBusBase.cs +++ /dev/null @@ -1,155 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using Microsoft.Extensions.Logging; - -namespace Cap.Consistency.EventBus -{ - /// - /// The EventBusBase class is the base class for all the IEventBus implementations. - /// - public abstract class EventBusBase - : BackgroundWorker, IEventBus, IDisposable - { - public const int DefaultMaxPendingEventNumber = 1024 * 1024; - - public event EventHandler MessageReceieved; - - protected readonly object _eventHandlerLock = new object(); - - protected List _eventHandlerList = new List(); - - /// - /// The pending event number which does not yet dispatched. - /// - public abstract long PendingEventNumber { get; } - - public virtual bool IsDispatcherEnabled { - get { - return base.IsRunning; - } - } - - /// - /// The constructor of EventBusBase. - /// - /// - protected EventBusBase(ILoggerFactory loggerFactory) - : base(loggerFactory) { - this._eventHandlerList = new List(); - } - - /// - /// Post an event to the event bus, dispatched after the specific time. - /// - /// If you do not need the event processed in the delivery order, use SimpleEventBus instead. - /// The event object - /// The delay time before dispatch this event - public abstract void Post(object eventObject, TimeSpan dispatchDelay); - - /// - /// Register event handlers in the handler instance. - /// One handler instance may have many event handler methods. - /// These methods have EventSubscriberAttribute contract and exactly one parameter. - /// - /// If you do not need the event processed in the delivery order, use SimpleEventBus instead. - /// The instance of event handler class - public void Register(object handler) { - if (handler == null) { - return; - } - - var miList = handler.GetType().GetRuntimeMethods(); - lock (_eventHandlerLock) { - // Don't allow register multiple times. - if (_eventHandlerList.Any(record => record.Handler == handler)) { - return; - } - - List newList = null; - foreach (var mi in miList) { - var attribute = mi.GetCustomAttribute(); - if (attribute != null) { - var piList = mi.GetParameters(); - if (piList.Length == 1) { - // OK, we got valid handler, create newList as needed - if (newList == null) { - newList = new List(_eventHandlerList); - } - newList.Add(this.CreateEventHandlerHolder(handler, mi, piList[0].ParameterType)); - } - } - } - - // OK, we have new handler registered - if (newList != null) { - _eventHandlerList = newList; - } - } - } - - /// - /// Unregister event handlers belong to the handler instance. - /// One handler instance may have many event handler methods. - /// These methods have EventSubscriberAttribute contract and exactly one parameter. - /// - /// The instance of event handler class - public void Unregister(object handler) { - if (handler == null) { - return; - } - lock (_eventHandlerLock) { - bool needAction = _eventHandlerList.Any(record => record.Handler == handler); - - if (needAction) { - var newList = new List(); - foreach (var record in this._eventHandlerList) { - if (record.Handler != handler) { - newList.Add(record); - } - else { - record.Dispose(); - } - } - _eventHandlerList = newList; - } - } - } - - protected virtual EventHandlerHolder CreateEventHandlerHolder(object handler, MethodInfo methodInfo, Type parameterType) { - return new EventHandlerHolder(handler, methodInfo, parameterType); - } - - protected virtual void OnMessageReceieved(EventHandlerHolder handler) { - this.MessageReceieved?.Invoke(this, handler); - } - - #region IDisposable - - // Flag: Has Dispose already been called? - private bool disposed = false; - - // Public implementation of Dispose pattern callable by consumers. - public new void Dispose() { - Dispose(true); - GC.SuppressFinalize(this); - } - - // Protected implementation of Dispose pattern. - protected new virtual void Dispose(bool disposing) { - if (disposed) - return; - - if (disposing) { - // Free any other managed objects here. - this.Stop(); - } - - // Free any unmanaged objects here. - disposed = true; - } - - #endregion IDisposable - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/EventBus/EventBusFactory.cs b/src/Cap.Consistency/EventBus/EventBusFactory.cs deleted file mode 100644 index f026649..0000000 --- a/src/Cap.Consistency/EventBus/EventBusFactory.cs +++ /dev/null @@ -1,23 +0,0 @@ -using System; -using Microsoft.Extensions.Logging; - -namespace Cap.Consistency.EventBus -{ - public class EventBusFactory - : IEventBusFactory - { - private readonly ILoggerFactory _loggerFactory; - - public EventBusFactory(ILoggerFactory loggerFactory) { - this._loggerFactory = loggerFactory; - } - - public IEventBus CreateEventBus() where TEventBus : IEventBus { - return this.CreateEventBus(-1); - } - - public IEventBus CreateEventBus(long maxPendingEventNumber) where TEventBus : IEventBus { - return Activator.CreateInstance(typeof(TEventBus), new object[] { this._loggerFactory, maxPendingEventNumber }) as IEventBus; - } - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/EventBus/EventBusOptions.cs b/src/Cap.Consistency/EventBus/EventBusOptions.cs deleted file mode 100644 index afcd032..0000000 --- a/src/Cap.Consistency/EventBus/EventBusOptions.cs +++ /dev/null @@ -1,20 +0,0 @@ -namespace Cap.Consistency.EventBus -{ - public class EventBusOptions - { - public long MaxPendingEventNumber { get; set; } - - public int MaxPendingEventNumber32 { - get { - if (this.MaxPendingEventNumber < int.MaxValue) { - return (int)this.MaxPendingEventNumber; - } - return int.MaxValue; - } - } - - public EventBusOptions() { - this.MaxPendingEventNumber = EventBusBase.DefaultMaxPendingEventNumber; - } - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/EventBus/EventHandlerHolder.cs b/src/Cap.Consistency/EventBus/EventHandlerHolder.cs deleted file mode 100644 index 9e5ecb6..0000000 --- a/src/Cap.Consistency/EventBus/EventHandlerHolder.cs +++ /dev/null @@ -1,49 +0,0 @@ -using System; -using System.Reflection; - -namespace Cap.Consistency.EventBus -{ - public class EventHandlerHolder - : IDisposable - { - public object Handler { get; } - - public MethodInfo MethodInfo { get; } - - public Type ParameterType { get; } - - public EventHandlerHolder(object handler, MethodInfo methodInfo, Type parameterType) { - Handler = handler; - MethodInfo = methodInfo; - ParameterType = parameterType; - } - - #region IDisposable - - // Flag: Has Dispose already been called? - private bool disposed = false; - - // Public implementation of Dispose pattern callable by consumers. - public void Dispose() { - Dispose(true); - GC.SuppressFinalize(this); - } - - // Protected implementation of Dispose pattern. - protected virtual void Dispose(bool disposing) { - if (disposed) - return; - - if (disposing) { - // Free any other managed objects here. - // - } - - // Free any unmanaged objects here. - // - disposed = true; - } - - #endregion IDisposable - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/EventBus/EventSubscriberAttribute.cs b/src/Cap.Consistency/EventBus/EventSubscriberAttribute.cs deleted file mode 100644 index 567b922..0000000 --- a/src/Cap.Consistency/EventBus/EventSubscriberAttribute.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; - -namespace Cap.Consistency.EventBus -{ - /// - /// The attribute class of the event handler. - /// If a method have this attribue contract and exactly one parameter, then it's event handler. - /// - public class EventSubscriberAttribute - : Attribute - { - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/EventBus/IBackgroundWorker.cs b/src/Cap.Consistency/EventBus/IBackgroundWorker.cs deleted file mode 100644 index 4b37b8a..0000000 --- a/src/Cap.Consistency/EventBus/IBackgroundWorker.cs +++ /dev/null @@ -1,18 +0,0 @@ -namespace Cap.Consistency.EventBus -{ - public interface IBackgroundWorker - { - bool IsRunning { get; } - - /// - /// Start the background worker digest loop. - /// - void Start(); - - /// - /// Stop the background worker digest loop. - /// - /// - void Stop(int timeout = 2000); - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/EventBus/IEventBus.cs b/src/Cap.Consistency/EventBus/IEventBus.cs deleted file mode 100644 index b12a0eb..0000000 --- a/src/Cap.Consistency/EventBus/IEventBus.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System; - -namespace Cap.Consistency.EventBus -{ - /// - /// The event bus interface. - /// - public interface IEventBus : IBackgroundWorker - { - /// - /// Post an event to the event bus, dispatched after the specific time. - /// - /// The event object - /// The delay time before dispatch this event - void Post(object eventObject, TimeSpan dispatchDelay); - - /// - /// Register event handlers in the handler instance. - /// - /// One handler instance may have many event handler methods. - /// These methods have EventSubscriberAttribute contract and exactly one parameter. - /// - /// The instance of event handler class - void Register(object handler); - - /// - /// Unregister event handlers belong to the handler instance. - /// - /// One handler instance may have many event handler methods. - /// These methods have EventSubscriberAttribute contract and exactly one parameter. - /// - /// The instance of event handler class - void Unregister(object handler); - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/EventBus/IEventBusFactory.cs b/src/Cap.Consistency/EventBus/IEventBusFactory.cs deleted file mode 100644 index e4e28ac..0000000 --- a/src/Cap.Consistency/EventBus/IEventBusFactory.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace Cap.Consistency.EventBus -{ - public interface IEventBusFactory - { - IEventBus CreateEventBus() where TEventBus : IEventBus; - - IEventBus CreateEventBus(long maxPendingEventNumber) where TEventBus : IEventBus; - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/EventBus/Ordered/OrderedEventBus.cs b/src/Cap.Consistency/EventBus/Ordered/OrderedEventBus.cs deleted file mode 100644 index 0932865..0000000 --- a/src/Cap.Consistency/EventBus/Ordered/OrderedEventBus.cs +++ /dev/null @@ -1,119 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Reflection; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; - -namespace Cap.Consistency.EventBus -{ - /// - /// The OrderedEventBus class is a simple and fast IEventBus implementation which processes event in the delivery order. - /// - /// If you do not need the event processed in the delivery order, use SimpleEventBus instead. - public class OrderedEventBus - : EventBusBase, IEventBus - { - private readonly BlockingCollection _eventQueue; - - /// - /// The pending event number which does not yet dispatched. - /// - public override long PendingEventNumber { - get { - return Math.Max(_eventQueue.Count, 0); - } - } - - public override bool IsDispatcherEnabled { - get { - return true; - } - } - - public OrderedEventBus(ILoggerFactory loggerFactory, IOptions options) - : this(loggerFactory, options?.Value.MaxPendingEventNumber32 ?? 0) { - } - - /// - /// The constructor of OrderedEventBus. - /// - /// The maximum pending event number which does not yet dispatched - public OrderedEventBus(ILoggerFactory loggerFactory, int maxPendingEventNumber, bool shouldStart = true) - : base(loggerFactory) { - this._eventQueue = new BlockingCollection( - maxPendingEventNumber > 0 - ? maxPendingEventNumber - : DefaultMaxPendingEventNumber); - - if (shouldStart) { - this.Start(); - } - } - - /// - /// Post an event to the event bus, dispatched after the specific time. - /// - /// If you do not need the event processed in the delivery order, use SimpleEventBus instead. - /// The event object - /// The delay time before dispatch this event - public override void Post(object eventObject, TimeSpan dispatchDelay) { - int dispatchDelayMs = (int)dispatchDelay.TotalMilliseconds; - - if (dispatchDelayMs >= 1) { - Task.Delay(dispatchDelayMs).ContinueWith(task => _eventQueue.Add(eventObject)); - } - else { - _eventQueue.Add(eventObject); - } - } - - protected override async Task Process() { - object eventObject = null; - try { - eventObject = _eventQueue.Take(); - InvokeEventHandler(eventObject); - } - catch (Exception de) { - if (de is ObjectDisposedException) { - return await Task.FromResult(true); - } - this._logger.LogError("Dispatch event ({0}) failed: {1}{2}{3}", eventObject, de.Message, Environment.NewLine, de.StackTrace); - } - return await Task.FromResult(true); - } - - protected void InvokeEventHandler(object eventObject, Action resultCallback = null) { - List taskList = null; - // ReSharper disable once ForCanBeConvertedToForeach - for (int i = 0; i < _eventHandlerList.Count; i++) { - // ReSharper disable once InconsistentlySynchronizedField - EventHandlerHolder record = _eventHandlerList[i]; - if (eventObject == null || record.ParameterType.IsInstanceOfType(eventObject)) { - Task task = Task.Run(() => { - this.OnMessageReceieved(record); - var isVoid = record.MethodInfo.ReturnType == typeof(void); - try { - var result = record.MethodInfo.Invoke(record.Handler, new[] { eventObject }); - resultCallback?.Invoke(isVoid, null, result, record.MethodInfo.ReturnType); - } - catch (Exception ex) { - this._logger.LogError(ex.Message + Environment.NewLine + ex.StackTrace); - resultCallback?.Invoke(isVoid, ex, null, record.MethodInfo.ReturnType); - } - }); - if (taskList == null) taskList = new List(); - taskList.Add(task); - //record.MethodInfo.Invoke(record.Handler, new[] { eventObject }); - } - } - if (taskList != null) { - Task.WaitAll(taskList.ToArray()); - } - else { - resultCallback?.Invoke(false, null, null, null); - } - } - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/EventBus/ReceiveResult.cs b/src/Cap.Consistency/EventBus/ReceiveResult.cs deleted file mode 100644 index 5053943..0000000 --- a/src/Cap.Consistency/EventBus/ReceiveResult.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System; - -namespace Cap.Consistency.EventBus -{ - public class ReceiveResult - { - public bool IsSucceeded { get; set; } - - public bool IsVoid { get; set; } - - public object Result { get; set; } - - public Type ResultType { get; set; } - - public Exception Exception { get; set; } - - public ReceiveResult(bool isSucceeded, bool isVoid, object result, Exception ex = null, Type resultType = null) { - this.IsSucceeded = isSucceeded; - this.IsVoid = isVoid; - this.Result = result; - this.Exception = ex; - this.ResultType = (resultType ?? result?.GetType()) ?? typeof(object); - } - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/EventBus/Simple/SimpleEventBus.cs b/src/Cap.Consistency/EventBus/Simple/SimpleEventBus.cs deleted file mode 100644 index 0ca2f0f..0000000 --- a/src/Cap.Consistency/EventBus/Simple/SimpleEventBus.cs +++ /dev/null @@ -1,159 +0,0 @@ -//#define UseTotalEventNumber - -using System; -using System.Reflection; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; - -namespace Cap.Consistency.EventBus -{ - /// - /// The SimpleEventBus class is a simple and fast IEventBus implementation. - /// - /// - /// The event may be processed out of the delivery order under heavy load. - /// If you need the event processed in the delivery order, use OrderedEventBus instead. - /// - public class SimpleEventBus - : EventBusBase, IEventBus - { - private readonly long _maxPendingEventNumber; - - // Interlocked operation cause the performance drop at least 10% !. - private long _pendingEventNumber; - - private bool _isDispatcherEnabled; - -#if UseTotalEventNumber - // This counter cause the performance drop at least 5% ! - private long _totalEventNumber; - - // The total event number which post to the event bus. - // This counter cause the performance drop at least 5% ! - public long TotalEventNumber - { - get - { - return Interlocked.Read(ref _totalEventNumber); - } - } -#endif - - /// - /// The pending event number which does not yet dispatched. - /// - public override long PendingEventNumber { - get { - return Math.Max(Interlocked.Read(ref _pendingEventNumber), 0); - } - } - - public override bool IsDispatcherEnabled { - get { - return this._isDispatcherEnabled; - } - } - - public SimpleEventBus(ILoggerFactory loggerFactory, IOptions options) - : this(loggerFactory, options?.Value.MaxPendingEventNumber ?? 0) { - } - - /// - /// The constructor of SimpleEventBus. - /// - /// The maximum pending event number which does not yet dispatched - public SimpleEventBus(ILoggerFactory loggerFactory, long maxPendingEventNumber, bool shouldStart = true) - : base(loggerFactory) { - this._maxPendingEventNumber = maxPendingEventNumber > 0 ? maxPendingEventNumber : DefaultMaxPendingEventNumber; - this._isDispatcherEnabled = false; - - if (shouldStart) { - this.Start(); - } - } - - public override void Start() { - if (this.IsRunning) { - return; - } - this._isDispatcherEnabled = true; - } - - public override void Stop(int timeout = 2000) { - this._isDispatcherEnabled = false; - } - - /// - /// Post an event to the event bus, dispatched after the specific time. - /// - /// - /// The event may be processed out of the delivery order under heavy load. - /// If you need the event processed in the delivery order, use OrderedEventBus instead. - /// - /// The event object - /// The delay time before dispatch this event - public override void Post(object eventObject, TimeSpan dispatchDelay) { - if (!this._isDispatcherEnabled) return; - - int dispatchDelayMs = (int)dispatchDelay.TotalMilliseconds; - - while (Interlocked.Read(ref _pendingEventNumber) >= _maxPendingEventNumber) { - this._logger.LogWarning("Too many events in the EventBus, pendingEventNumber={0}, maxPendingEventNumber={1}{2}PendingEvent='{3}', dispatchDelay={4}ms", - PendingEventNumber, _maxPendingEventNumber, Environment.NewLine, eventObject, dispatchDelayMs); - Task.Delay(16).Wait(); - } - - if (dispatchDelayMs >= 1) { - Task.Delay(dispatchDelayMs).ContinueWith(task => { - DispatchMessage(eventObject); - }); - } - else { - Task.Run(() => DispatchMessage(eventObject)); - } - - Interlocked.Increment(ref _pendingEventNumber); - // Interlocked.Increment(ref _totalEventNumber); - } - - protected override Task ThreadWorker(object userObject) { - throw new NotSupportedException(); - } - - protected override Task Process() { - throw new NotSupportedException(); - } - - protected void DispatchMessage(object eventObject) { - try { - // ReSharper disable once ForCanBeConvertedToForeach - for (int i = 0; i < _eventHandlerList.Count; i++) { - // ReSharper disable once InconsistentlySynchronizedField - EventHandlerHolder record = _eventHandlerList[i]; - if (eventObject == null || record.ParameterType.IsInstanceOfType(eventObject)) { - Task.Run(() => { - try { - this.OnMessageReceieved(record); - record.MethodInfo.Invoke(record.Handler, new[] { eventObject }); - } - catch (Exception ie) { - this._logger.LogWarning("Event handler (class '{0}@{1}', method '{2}') failed: {3}{4}{5}{4}eventObject: {6}", - record.Handler.GetType(), record.Handler.GetHashCode(), record.MethodInfo, - ie.Message, Environment.NewLine, ie.StackTrace, eventObject); - } - }); - } - } - } - catch (Exception de) { - this._logger.LogError("Dispatch event ({0}) failed: {1}{2}{3}", - eventObject, de.Message, Environment.NewLine, de.StackTrace); - } - finally { - Interlocked.Decrement(ref _pendingEventNumber); - } - } - } -} \ No newline at end of file