@@ -8,7 +8,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
{ | { | ||||
public class ConnectionPool : IConnectionPool, IDisposable | public class ConnectionPool : IConnectionPool, IDisposable | ||||
{ | { | ||||
private const int DefaultPoolSize = 32; | |||||
private const int DefaultPoolSize = 15; | |||||
private readonly ConcurrentQueue<IConnection> _pool = new ConcurrentQueue<IConnection>(); | private readonly ConcurrentQueue<IConnection> _pool = new ConcurrentQueue<IConnection>(); | ||||
@@ -28,9 +28,10 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
public override Task<OperateResult> PublishAsync(string keyName, string content) | public override Task<OperateResult> PublishAsync(string keyName, string content) | ||||
{ | { | ||||
var connection = _connectionPool.Rent(); | |||||
try | try | ||||
{ | { | ||||
var connection = _connectionPool.Rent(); | |||||
using (var channel = connection.CreateModel()) | using (var channel = connection.CreateModel()) | ||||
{ | { | ||||
var body = Encoding.UTF8.GetBytes(content); | var body = Encoding.UTF8.GetBytes(content); | ||||
@@ -56,6 +57,10 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
Description = ex.Message | Description = ex.Message | ||||
})); | })); | ||||
} | } | ||||
finally | |||||
{ | |||||
_connectionPool.Return(connection); | |||||
} | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -14,7 +14,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
private readonly string _queueName; | private readonly string _queueName; | ||||
private readonly RabbitMQOptions _rabbitMQOptions; | private readonly RabbitMQOptions _rabbitMQOptions; | ||||
private IConnection _connection; | |||||
private ConnectionPool _connectionPool; | |||||
private IModel _channel; | private IModel _channel; | ||||
private ulong _deliveryTag; | private ulong _deliveryTag; | ||||
@@ -23,11 +23,11 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
public event EventHandler<string> OnError; | public event EventHandler<string> OnError; | ||||
public RabbitMQConsumerClient(string queueName, | public RabbitMQConsumerClient(string queueName, | ||||
IConnection connection, | |||||
ConnectionPool connectionPool, | |||||
RabbitMQOptions options) | RabbitMQOptions options) | ||||
{ | { | ||||
_queueName = queueName; | _queueName = queueName; | ||||
_connection = connection; | |||||
_connectionPool = connectionPool; | |||||
_rabbitMQOptions = options; | _rabbitMQOptions = options; | ||||
_exchageName = options.TopicExchangeName; | _exchageName = options.TopicExchangeName; | ||||
@@ -36,7 +36,9 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
private void InitClient() | private void InitClient() | ||||
{ | { | ||||
_channel = _connection.CreateModel(); | |||||
var connection = _connectionPool.Rent(); | |||||
_channel = connection.CreateModel(); | |||||
_channel.ExchangeDeclare( | _channel.ExchangeDeclare( | ||||
exchange: _exchageName, | exchange: _exchageName, | ||||
@@ -49,6 +51,8 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
exclusive: false, | exclusive: false, | ||||
autoDelete: false, | autoDelete: false, | ||||
arguments: arguments); | arguments: arguments); | ||||
_connectionPool.Return(connection); | |||||
} | } | ||||
public void Subscribe(IEnumerable<string> topics) | public void Subscribe(IEnumerable<string> topics) | ||||
@@ -81,7 +85,6 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
public void Dispose() | public void Dispose() | ||||
{ | { | ||||
_channel.Dispose(); | _channel.Dispose(); | ||||
_connection.Dispose(); | |||||
} | } | ||||
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) | private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) | ||||
@@ -6,18 +6,18 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory | internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory | ||||
{ | { | ||||
private readonly RabbitMQOptions _rabbitMQOptions; | private readonly RabbitMQOptions _rabbitMQOptions; | ||||
private readonly IConnection _connection; | |||||
private readonly ConnectionPool _connectionPool; | |||||
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool) | public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool) | ||||
{ | { | ||||
_rabbitMQOptions = rabbitMQOptions; | _rabbitMQOptions = rabbitMQOptions; | ||||
_connection = pool.Rent(); | |||||
_connectionPool = pool; | |||||
} | } | ||||
public IConsumerClient Create(string groupId) | public IConsumerClient Create(string groupId) | ||||
{ | { | ||||
return new RabbitMQConsumerClient(groupId, _connection, _rabbitMQOptions); | |||||
return new RabbitMQConsumerClient(groupId, _connectionPool, _rabbitMQOptions); | |||||
} | } | ||||
} | } | ||||
} | } |