From deee0bc74e996fcb10c2aef23d72f7d4a78d528e Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Mon, 14 Aug 2017 18:31:21 +0800 Subject: [PATCH] add rabbitmq connection pool. --- src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs | 91 +++++++++++++++++++ .../IConnectionPool.cs | 14 +++ 2 files changed, 105 insertions(+) create mode 100644 src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs create mode 100644 src/DotNetCore.CAP.RabbitMQ/IConnectionPool.cs diff --git a/src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs b/src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs new file mode 100644 index 0000000..83116bf --- /dev/null +++ b/src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs @@ -0,0 +1,91 @@ +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Threading; +using RabbitMQ.Client; + +namespace DotNetCore.CAP.RabbitMQ +{ + public class ConnectionPool : IConnectionPool, IDisposable + { + private const int DefaultPoolSize = 32; + + private readonly ConcurrentQueue _pool = new ConcurrentQueue(); + + private readonly Func _activator; + + private int _maxSize; + private int _count; + + public ConnectionPool(RabbitMQOptions options) + { + _maxSize = DefaultPoolSize; + + _activator = CreateActivator(options); + } + + private static Func CreateActivator(RabbitMQOptions options) + { + var factory = new ConnectionFactory() + { + HostName = options.HostName, + UserName = options.UserName, + Port = options.Port, + Password = options.Password, + VirtualHost = options.VirtualHost, + RequestedConnectionTimeout = options.RequestedConnectionTimeout, + SocketReadTimeout = options.SocketReadTimeout, + SocketWriteTimeout = options.SocketWriteTimeout + }; + + return () => factory.CreateConnection(); + } + + public virtual IConnection Rent() + { + if (_pool.TryDequeue(out IConnection connection)) + { + Interlocked.Decrement(ref _count); + + Debug.Assert(_count >= 0); + + return connection; + } + + connection = _activator(); + + return connection; + } + + public virtual bool Return(IConnection connection) + { + if (Interlocked.Increment(ref _count) <= _maxSize) + { + _pool.Enqueue(connection); + + return true; + } + + Interlocked.Decrement(ref _count); + + Debug.Assert(_maxSize == 0 || _pool.Count <= _maxSize); + + return false; + } + + IConnection IConnectionPool.Rent() => Rent(); + + bool IConnectionPool.Return(IConnection connection) => Return(connection); + + public void Dispose() + { + _maxSize = 0; + + IConnection context; + while (_pool.TryDequeue(out context)) + { + context.Dispose(); + } + } + } +} diff --git a/src/DotNetCore.CAP.RabbitMQ/IConnectionPool.cs b/src/DotNetCore.CAP.RabbitMQ/IConnectionPool.cs new file mode 100644 index 0000000..9097f28 --- /dev/null +++ b/src/DotNetCore.CAP.RabbitMQ/IConnectionPool.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; +using RabbitMQ.Client; + +namespace DotNetCore.CAP.RabbitMQ +{ + public interface IConnectionPool + { + IConnection Rent(); + + bool Return(IConnection context); + } +}