Przeglądaj źródła

remove eventbus.

master
yangxiaodong 7 lat temu
rodzic
commit
be5e406dd8
13 zmienionych plików z 0 dodań i 741 usunięć
  1. +0
    -107
      src/Cap.Consistency/EventBus/BackgroundWorker.cs
  2. +0
    -9
      src/Cap.Consistency/EventBus/BrokeredMessage.cs
  3. +0
    -155
      src/Cap.Consistency/EventBus/EventBusBase.cs
  4. +0
    -23
      src/Cap.Consistency/EventBus/EventBusFactory.cs
  5. +0
    -20
      src/Cap.Consistency/EventBus/EventBusOptions.cs
  6. +0
    -49
      src/Cap.Consistency/EventBus/EventHandlerHolder.cs
  7. +0
    -13
      src/Cap.Consistency/EventBus/EventSubscriberAttribute.cs
  8. +0
    -18
      src/Cap.Consistency/EventBus/IBackgroundWorker.cs
  9. +0
    -35
      src/Cap.Consistency/EventBus/IEventBus.cs
  10. +0
    -9
      src/Cap.Consistency/EventBus/IEventBusFactory.cs
  11. +0
    -119
      src/Cap.Consistency/EventBus/Ordered/OrderedEventBus.cs
  12. +0
    -25
      src/Cap.Consistency/EventBus/ReceiveResult.cs
  13. +0
    -159
      src/Cap.Consistency/EventBus/Simple/SimpleEventBus.cs

+ 0
- 107
src/Cap.Consistency/EventBus/BackgroundWorker.cs Wyświetl plik

@@ -1,107 +0,0 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Cap.Consistency.EventBus
{
public abstract class BackgroundWorker
: IBackgroundWorker, IDisposable
{
protected readonly ILogger _logger;

#if FEATURE_THREAD
protected Thread _dispatchThread;
#else
protected Task _dispatchThread;
#endif
protected CancellationTokenSource _cancellationTokenSource;

public virtual bool IsRunning {
get {
return this._dispatchThread != null &&
#if FEATURE_THREAD
this._dispatchThread.ThreadState == ThreadState.Running;
#else
this._dispatchThread.Status == TaskStatus.Running;
#endif
}
}

protected BackgroundWorker(ILoggerFactory loggerFactory) {
this._logger = loggerFactory.CreateLogger(this.GetType().FullName);
}

public virtual void Start() {
this.Start(false);
}

public virtual void Start(bool force) {
if (!force) {
if (this.IsRunning) {
return;
}
}
this._cancellationTokenSource = new CancellationTokenSource();
#if !FEATURE_THREAD
this._dispatchThread = this.ThreadWorker(this._cancellationTokenSource.Token);
#else
this._dispatchThread = new Thread((userObject) =>
{
this.ThreadWorker(userObject).GetAwaiter().GetResult();
})
{
IsBackground = true,
Name = $"{this.GetType().Name}-Thread-{Guid.NewGuid().ToString()}"
};
this._dispatchThread.Start(this._cancellationTokenSource.Token);
#endif
}

public virtual void Stop(int timeout = 2000) {
Task.WaitAny(Task.Run(() => {
this._cancellationTokenSource.Cancel();
while (this.IsRunning) {
Task.Delay(500).GetAwaiter().GetResult();
}
}), Task.Delay(timeout));
}

protected virtual async Task ThreadWorker(object userObject) {
this._logger.LogInformation($"Background worker {this.GetType().FullName} has been started.");
var token = (CancellationToken)userObject;
while (!token.IsCancellationRequested && await this.Process()) {
}
this._logger.LogInformation($"Background worker {this.GetType().FullName} has been stopped.");
}

protected abstract Task<bool> Process();

#region IDisposable

// Flag: Has Dispose already been called?
private bool disposed = false;

// Public implementation of Dispose pattern callable by consumers.
public void Dispose() {
Dispose(true);
GC.SuppressFinalize(this);
}

// Protected implementation of Dispose pattern.
protected virtual void Dispose(bool disposing) {
if (disposed)
return;

if (disposing) {
// Free any other managed objects here.
this.Stop();
}

// Free any unmanaged objects here.
disposed = true;
}

#endregion IDisposable
}
}

+ 0
- 9
src/Cap.Consistency/EventBus/BrokeredMessage.cs Wyświetl plik

@@ -1,9 +0,0 @@
namespace Cap.Consistency.EventBus
{
public class BrokeredMessage
{
public byte[] Body { get; set; }

public string Type { get; set; }
}
}

+ 0
- 155
src/Cap.Consistency/EventBus/EventBusBase.cs Wyświetl plik

@@ -1,155 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using Microsoft.Extensions.Logging;

namespace Cap.Consistency.EventBus
{
/// <summary>
/// The EventBusBase class is the base class for all the IEventBus implementations.
/// </summary>
public abstract class EventBusBase
: BackgroundWorker, IEventBus, IDisposable
{
public const int DefaultMaxPendingEventNumber = 1024 * 1024;

public event EventHandler<EventHandlerHolder> MessageReceieved;

protected readonly object _eventHandlerLock = new object();

protected List<EventHandlerHolder> _eventHandlerList = new List<EventHandlerHolder>();

/// <summary>
/// The pending event number which does not yet dispatched.
/// </summary>
public abstract long PendingEventNumber { get; }

public virtual bool IsDispatcherEnabled {
get {
return base.IsRunning;
}
}

/// <summary>
/// The constructor of EventBusBase.
/// </summary>
/// <param name="loggerFactory"></param>
protected EventBusBase(ILoggerFactory loggerFactory)
: base(loggerFactory) {
this._eventHandlerList = new List<EventHandlerHolder>();
}

/// <summary>
/// Post an event to the event bus, dispatched after the specific time.
/// </summary>
/// <remarks>If you do not need the event processed in the delivery order, use SimpleEventBus instead.</remarks>
/// <param name="eventObject">The event object</param>
/// <param name="dispatchDelay">The delay time before dispatch this event</param>
public abstract void Post(object eventObject, TimeSpan dispatchDelay);

/// <summary>
/// Register event handlers in the handler instance.
/// One handler instance may have many event handler methods.
/// These methods have EventSubscriberAttribute contract and exactly one parameter.
/// </summary>
/// <remarks>If you do not need the event processed in the delivery order, use SimpleEventBus instead.</remarks>
/// <param name="handler">The instance of event handler class</param>
public void Register(object handler) {
if (handler == null) {
return;
}

var miList = handler.GetType().GetRuntimeMethods();
lock (_eventHandlerLock) {
// Don't allow register multiple times.
if (_eventHandlerList.Any(record => record.Handler == handler)) {
return;
}

List<EventHandlerHolder> newList = null;
foreach (var mi in miList) {
var attribute = mi.GetCustomAttribute<EventSubscriberAttribute>();
if (attribute != null) {
var piList = mi.GetParameters();
if (piList.Length == 1) {
// OK, we got valid handler, create newList as needed
if (newList == null) {
newList = new List<EventHandlerHolder>(_eventHandlerList);
}
newList.Add(this.CreateEventHandlerHolder(handler, mi, piList[0].ParameterType));
}
}
}

// OK, we have new handler registered
if (newList != null) {
_eventHandlerList = newList;
}
}
}

/// <summary>
/// Unregister event handlers belong to the handler instance.
/// One handler instance may have many event handler methods.
/// These methods have EventSubscriberAttribute contract and exactly one parameter.
/// </summary>
/// <param name="handler">The instance of event handler class</param>
public void Unregister(object handler) {
if (handler == null) {
return;
}
lock (_eventHandlerLock) {
bool needAction = _eventHandlerList.Any(record => record.Handler == handler);

if (needAction) {
var newList = new List<EventHandlerHolder>();
foreach (var record in this._eventHandlerList) {
if (record.Handler != handler) {
newList.Add(record);
}
else {
record.Dispose();
}
}
_eventHandlerList = newList;
}
}
}

protected virtual EventHandlerHolder CreateEventHandlerHolder(object handler, MethodInfo methodInfo, Type parameterType) {
return new EventHandlerHolder(handler, methodInfo, parameterType);
}

protected virtual void OnMessageReceieved(EventHandlerHolder handler) {
this.MessageReceieved?.Invoke(this, handler);
}

#region IDisposable

// Flag: Has Dispose already been called?
private bool disposed = false;

// Public implementation of Dispose pattern callable by consumers.
public new void Dispose() {
Dispose(true);
GC.SuppressFinalize(this);
}

// Protected implementation of Dispose pattern.
protected new virtual void Dispose(bool disposing) {
if (disposed)
return;

if (disposing) {
// Free any other managed objects here.
this.Stop();
}

// Free any unmanaged objects here.
disposed = true;
}

#endregion IDisposable
}
}

+ 0
- 23
src/Cap.Consistency/EventBus/EventBusFactory.cs Wyświetl plik

@@ -1,23 +0,0 @@
using System;
using Microsoft.Extensions.Logging;

namespace Cap.Consistency.EventBus
{
public class EventBusFactory
: IEventBusFactory
{
private readonly ILoggerFactory _loggerFactory;

public EventBusFactory(ILoggerFactory loggerFactory) {
this._loggerFactory = loggerFactory;
}

public IEventBus CreateEventBus<TEventBus>() where TEventBus : IEventBus {
return this.CreateEventBus<TEventBus>(-1);
}

public IEventBus CreateEventBus<TEventBus>(long maxPendingEventNumber) where TEventBus : IEventBus {
return Activator.CreateInstance(typeof(TEventBus), new object[] { this._loggerFactory, maxPendingEventNumber }) as IEventBus;
}
}
}

+ 0
- 20
src/Cap.Consistency/EventBus/EventBusOptions.cs Wyświetl plik

@@ -1,20 +0,0 @@
namespace Cap.Consistency.EventBus
{
public class EventBusOptions
{
public long MaxPendingEventNumber { get; set; }

public int MaxPendingEventNumber32 {
get {
if (this.MaxPendingEventNumber < int.MaxValue) {
return (int)this.MaxPendingEventNumber;
}
return int.MaxValue;
}
}

public EventBusOptions() {
this.MaxPendingEventNumber = EventBusBase.DefaultMaxPendingEventNumber;
}
}
}

+ 0
- 49
src/Cap.Consistency/EventBus/EventHandlerHolder.cs Wyświetl plik

@@ -1,49 +0,0 @@
using System;
using System.Reflection;

namespace Cap.Consistency.EventBus
{
public class EventHandlerHolder
: IDisposable
{
public object Handler { get; }

public MethodInfo MethodInfo { get; }

public Type ParameterType { get; }

public EventHandlerHolder(object handler, MethodInfo methodInfo, Type parameterType) {
Handler = handler;
MethodInfo = methodInfo;
ParameterType = parameterType;
}

#region IDisposable

// Flag: Has Dispose already been called?
private bool disposed = false;

// Public implementation of Dispose pattern callable by consumers.
public void Dispose() {
Dispose(true);
GC.SuppressFinalize(this);
}

// Protected implementation of Dispose pattern.
protected virtual void Dispose(bool disposing) {
if (disposed)
return;

if (disposing) {
// Free any other managed objects here.
//
}

// Free any unmanaged objects here.
//
disposed = true;
}

#endregion IDisposable
}
}

+ 0
- 13
src/Cap.Consistency/EventBus/EventSubscriberAttribute.cs Wyświetl plik

@@ -1,13 +0,0 @@
using System;

namespace Cap.Consistency.EventBus
{
/// <summary>
/// The attribute class of the event handler.
/// If a method have this attribue contract and exactly one parameter, then it's event handler.
/// </summary>
public class EventSubscriberAttribute
: Attribute
{
}
}

+ 0
- 18
src/Cap.Consistency/EventBus/IBackgroundWorker.cs Wyświetl plik

@@ -1,18 +0,0 @@
namespace Cap.Consistency.EventBus
{
public interface IBackgroundWorker
{
bool IsRunning { get; }

/// <summary>
/// Start the background worker digest loop.
/// </summary>
void Start();

/// <summary>
/// Stop the background worker digest loop.
/// </summary>
/// <param name="timeout"></param>
void Stop(int timeout = 2000);
}
}

+ 0
- 35
src/Cap.Consistency/EventBus/IEventBus.cs Wyświetl plik

@@ -1,35 +0,0 @@
using System;

namespace Cap.Consistency.EventBus
{
/// <summary>
/// The event bus interface.
/// </summary>
public interface IEventBus : IBackgroundWorker
{
/// <summary>
/// Post an event to the event bus, dispatched after the specific time.
/// </summary>
/// <param name="eventObject">The event object</param>
/// <param name="dispatchDelay">The delay time before dispatch this event</param>
void Post(object eventObject, TimeSpan dispatchDelay);

/// <summary>
/// Register event handlers in the handler instance.
///
/// One handler instance may have many event handler methods.
/// These methods have EventSubscriberAttribute contract and exactly one parameter.
/// </summary>
/// <param name="handler">The instance of event handler class</param>
void Register(object handler);

/// <summary>
/// Unregister event handlers belong to the handler instance.
///
/// One handler instance may have many event handler methods.
/// These methods have EventSubscriberAttribute contract and exactly one parameter.
/// </summary>
/// <param name="handler">The instance of event handler class</param>
void Unregister(object handler);
}
}

+ 0
- 9
src/Cap.Consistency/EventBus/IEventBusFactory.cs Wyświetl plik

@@ -1,9 +0,0 @@
namespace Cap.Consistency.EventBus
{
public interface IEventBusFactory
{
IEventBus CreateEventBus<TEventBus>() where TEventBus : IEventBus;

IEventBus CreateEventBus<TEventBus>(long maxPendingEventNumber) where TEventBus : IEventBus;
}
}

+ 0
- 119
src/Cap.Consistency/EventBus/Ordered/OrderedEventBus.cs Wyświetl plik

@@ -1,119 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Cap.Consistency.EventBus
{
/// <summary>
/// The OrderedEventBus class is a simple and fast IEventBus implementation which processes event in the delivery order.
/// </summary>
/// <remarks>If you do not need the event processed in the delivery order, use SimpleEventBus instead.</remarks>
public class OrderedEventBus
: EventBusBase, IEventBus
{
private readonly BlockingCollection<object> _eventQueue;

/// <summary>
/// The pending event number which does not yet dispatched.
/// </summary>
public override long PendingEventNumber {
get {
return Math.Max(_eventQueue.Count, 0);
}
}

public override bool IsDispatcherEnabled {
get {
return true;
}
}

public OrderedEventBus(ILoggerFactory loggerFactory, IOptions<EventBusOptions> options)
: this(loggerFactory, options?.Value.MaxPendingEventNumber32 ?? 0) {
}

/// <summary>
/// The constructor of OrderedEventBus.
/// </summary>
/// <param name="maxPendingEventNumber">The maximum pending event number which does not yet dispatched</param>
public OrderedEventBus(ILoggerFactory loggerFactory, int maxPendingEventNumber, bool shouldStart = true)
: base(loggerFactory) {
this._eventQueue = new BlockingCollection<object>(
maxPendingEventNumber > 0
? maxPendingEventNumber
: DefaultMaxPendingEventNumber);

if (shouldStart) {
this.Start();
}
}

/// <summary>
/// Post an event to the event bus, dispatched after the specific time.
/// </summary>
/// <remarks>If you do not need the event processed in the delivery order, use SimpleEventBus instead.</remarks>
/// <param name="eventObject">The event object</param>
/// <param name="dispatchDelay">The delay time before dispatch this event</param>
public override void Post(object eventObject, TimeSpan dispatchDelay) {
int dispatchDelayMs = (int)dispatchDelay.TotalMilliseconds;

if (dispatchDelayMs >= 1) {
Task.Delay(dispatchDelayMs).ContinueWith(task => _eventQueue.Add(eventObject));
}
else {
_eventQueue.Add(eventObject);
}
}

protected override async Task<bool> Process() {
object eventObject = null;
try {
eventObject = _eventQueue.Take();
InvokeEventHandler(eventObject);
}
catch (Exception de) {
if (de is ObjectDisposedException) {
return await Task.FromResult(true);
}
this._logger.LogError("Dispatch event ({0}) failed: {1}{2}{3}", eventObject, de.Message, Environment.NewLine, de.StackTrace);
}
return await Task.FromResult(true);
}

protected void InvokeEventHandler(object eventObject, Action<bool, Exception, object, Type> resultCallback = null) {
List<Task> taskList = null;
// ReSharper disable once ForCanBeConvertedToForeach
for (int i = 0; i < _eventHandlerList.Count; i++) {
// ReSharper disable once InconsistentlySynchronizedField
EventHandlerHolder record = _eventHandlerList[i];
if (eventObject == null || record.ParameterType.IsInstanceOfType(eventObject)) {
Task task = Task.Run(() => {
this.OnMessageReceieved(record);
var isVoid = record.MethodInfo.ReturnType == typeof(void);
try {
var result = record.MethodInfo.Invoke(record.Handler, new[] { eventObject });
resultCallback?.Invoke(isVoid, null, result, record.MethodInfo.ReturnType);
}
catch (Exception ex) {
this._logger.LogError(ex.Message + Environment.NewLine + ex.StackTrace);
resultCallback?.Invoke(isVoid, ex, null, record.MethodInfo.ReturnType);
}
});
if (taskList == null) taskList = new List<Task>();
taskList.Add(task);
//record.MethodInfo.Invoke(record.Handler, new[] { eventObject });
}
}
if (taskList != null) {
Task.WaitAll(taskList.ToArray());
}
else {
resultCallback?.Invoke(false, null, null, null);
}
}
}
}

+ 0
- 25
src/Cap.Consistency/EventBus/ReceiveResult.cs Wyświetl plik

@@ -1,25 +0,0 @@
using System;

namespace Cap.Consistency.EventBus
{
public class ReceiveResult
{
public bool IsSucceeded { get; set; }

public bool IsVoid { get; set; }

public object Result { get; set; }

public Type ResultType { get; set; }

public Exception Exception { get; set; }

public ReceiveResult(bool isSucceeded, bool isVoid, object result, Exception ex = null, Type resultType = null) {
this.IsSucceeded = isSucceeded;
this.IsVoid = isVoid;
this.Result = result;
this.Exception = ex;
this.ResultType = (resultType ?? result?.GetType()) ?? typeof(object);
}
}
}

+ 0
- 159
src/Cap.Consistency/EventBus/Simple/SimpleEventBus.cs Wyświetl plik

@@ -1,159 +0,0 @@
//#define UseTotalEventNumber

using System;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Cap.Consistency.EventBus
{
/// <summary>
/// The SimpleEventBus class is a simple and fast IEventBus implementation.
/// </summary>
/// <remarks>
/// <para>The event may be processed out of the delivery order under heavy load.</para>
/// <para>If you need the event processed in the delivery order, use OrderedEventBus instead.</para>
/// </remarks>
public class SimpleEventBus
: EventBusBase, IEventBus
{
private readonly long _maxPendingEventNumber;

// Interlocked operation cause the performance drop at least 10% !.
private long _pendingEventNumber;

private bool _isDispatcherEnabled;

#if UseTotalEventNumber
// This counter cause the performance drop at least 5% !
private long _totalEventNumber;

// The total event number which post to the event bus.
// This counter cause the performance drop at least 5% !
public long TotalEventNumber
{
get
{
return Interlocked.Read(ref _totalEventNumber);
}
}
#endif

/// <summary>
/// The pending event number which does not yet dispatched.
/// </summary>
public override long PendingEventNumber {
get {
return Math.Max(Interlocked.Read(ref _pendingEventNumber), 0);
}
}

public override bool IsDispatcherEnabled {
get {
return this._isDispatcherEnabled;
}
}

public SimpleEventBus(ILoggerFactory loggerFactory, IOptions<EventBusOptions> options)
: this(loggerFactory, options?.Value.MaxPendingEventNumber ?? 0) {
}

/// <summary>
/// The constructor of SimpleEventBus.
/// </summary>
/// <param name="maxPendingEventNumber">The maximum pending event number which does not yet dispatched</param>
public SimpleEventBus(ILoggerFactory loggerFactory, long maxPendingEventNumber, bool shouldStart = true)
: base(loggerFactory) {
this._maxPendingEventNumber = maxPendingEventNumber > 0 ? maxPendingEventNumber : DefaultMaxPendingEventNumber;
this._isDispatcherEnabled = false;

if (shouldStart) {
this.Start();
}
}

public override void Start() {
if (this.IsRunning) {
return;
}
this._isDispatcherEnabled = true;
}

public override void Stop(int timeout = 2000) {
this._isDispatcherEnabled = false;
}

/// <summary>
/// Post an event to the event bus, dispatched after the specific time.
/// </summary>
/// <remarks>
/// <para>The event may be processed out of the delivery order under heavy load.</para>
/// <para>If you need the event processed in the delivery order, use OrderedEventBus instead.</para>
/// </remarks>
/// <param name="eventObject">The event object</param>
/// <param name="dispatchDelay">The delay time before dispatch this event</param>
public override void Post(object eventObject, TimeSpan dispatchDelay) {
if (!this._isDispatcherEnabled) return;

int dispatchDelayMs = (int)dispatchDelay.TotalMilliseconds;

while (Interlocked.Read(ref _pendingEventNumber) >= _maxPendingEventNumber) {
this._logger.LogWarning("Too many events in the EventBus, pendingEventNumber={0}, maxPendingEventNumber={1}{2}PendingEvent='{3}', dispatchDelay={4}ms",
PendingEventNumber, _maxPendingEventNumber, Environment.NewLine, eventObject, dispatchDelayMs);
Task.Delay(16).Wait();
}

if (dispatchDelayMs >= 1) {
Task.Delay(dispatchDelayMs).ContinueWith(task => {
DispatchMessage(eventObject);
});
}
else {
Task.Run(() => DispatchMessage(eventObject));
}

Interlocked.Increment(ref _pendingEventNumber);
// Interlocked.Increment(ref _totalEventNumber);
}

protected override Task ThreadWorker(object userObject) {
throw new NotSupportedException();
}

protected override Task<bool> Process() {
throw new NotSupportedException();
}

protected void DispatchMessage(object eventObject) {
try {
// ReSharper disable once ForCanBeConvertedToForeach
for (int i = 0; i < _eventHandlerList.Count; i++) {
// ReSharper disable once InconsistentlySynchronizedField
EventHandlerHolder record = _eventHandlerList[i];
if (eventObject == null || record.ParameterType.IsInstanceOfType(eventObject)) {
Task.Run(() => {
try {
this.OnMessageReceieved(record);
record.MethodInfo.Invoke(record.Handler, new[] { eventObject });
}
catch (Exception ie) {
this._logger.LogWarning("Event handler (class '{0}@{1}', method '{2}') failed: {3}{4}{5}{4}eventObject: {6}",
record.Handler.GetType(), record.Handler.GetHashCode(), record.MethodInfo,
ie.Message, Environment.NewLine, ie.StackTrace, eventObject);
}
});
}
}
}
catch (Exception de) {
this._logger.LogError("Dispatch event ({0}) failed: {1}{2}{3}",
eventObject, de.Message, Environment.NewLine, de.StackTrace);
}
finally {
Interlocked.Decrement(ref _pendingEventNumber);
}
}
}
}

Ładowanie…
Anuluj
Zapisz