Browse Source

add rabbitmq connection pool.

master
yangxiaodong 7 years ago
parent
commit
deee0bc74e
2 changed files with 105 additions and 0 deletions
  1. +91
    -0
      src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs
  2. +14
    -0
      src/DotNetCore.CAP.RabbitMQ/IConnectionPool.cs

+ 91
- 0
src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs View File

@@ -0,0 +1,91 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using RabbitMQ.Client;

namespace DotNetCore.CAP.RabbitMQ
{
public class ConnectionPool : IConnectionPool, IDisposable
{
private const int DefaultPoolSize = 32;

private readonly ConcurrentQueue<IConnection> _pool = new ConcurrentQueue<IConnection>();

private readonly Func<IConnection> _activator;

private int _maxSize;
private int _count;

public ConnectionPool(RabbitMQOptions options)
{
_maxSize = DefaultPoolSize;

_activator = CreateActivator(options);
}

private static Func<IConnection> CreateActivator(RabbitMQOptions options)
{
var factory = new ConnectionFactory()
{
HostName = options.HostName,
UserName = options.UserName,
Port = options.Port,
Password = options.Password,
VirtualHost = options.VirtualHost,
RequestedConnectionTimeout = options.RequestedConnectionTimeout,
SocketReadTimeout = options.SocketReadTimeout,
SocketWriteTimeout = options.SocketWriteTimeout
};

return () => factory.CreateConnection();
}

public virtual IConnection Rent()
{
if (_pool.TryDequeue(out IConnection connection))
{
Interlocked.Decrement(ref _count);

Debug.Assert(_count >= 0);

return connection;
}

connection = _activator();

return connection;
}

public virtual bool Return(IConnection connection)
{
if (Interlocked.Increment(ref _count) <= _maxSize)
{
_pool.Enqueue(connection);

return true;
}

Interlocked.Decrement(ref _count);

Debug.Assert(_maxSize == 0 || _pool.Count <= _maxSize);

return false;
}

IConnection IConnectionPool.Rent() => Rent();

bool IConnectionPool.Return(IConnection connection) => Return(connection);

public void Dispose()
{
_maxSize = 0;

IConnection context;
while (_pool.TryDequeue(out context))
{
context.Dispose();
}
}
}
}

+ 14
- 0
src/DotNetCore.CAP.RabbitMQ/IConnectionPool.cs View File

@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Text;
using RabbitMQ.Client;

namespace DotNetCore.CAP.RabbitMQ
{
public interface IConnectionPool
{
IConnection Rent();

bool Return(IConnection context);
}
}

Loading…
Cancel
Save