diff --git a/src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs b/src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs index 83116bf..5db81e7 100644 --- a/src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs +++ b/src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs @@ -8,7 +8,7 @@ namespace DotNetCore.CAP.RabbitMQ { public class ConnectionPool : IConnectionPool, IDisposable { - private const int DefaultPoolSize = 32; + private const int DefaultPoolSize = 15; private readonly ConcurrentQueue _pool = new ConcurrentQueue(); diff --git a/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs b/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs index ee49ce9..0a47501 100644 --- a/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs +++ b/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs @@ -28,9 +28,10 @@ namespace DotNetCore.CAP.RabbitMQ public override Task PublishAsync(string keyName, string content) { + var connection = _connectionPool.Rent(); + try { - var connection = _connectionPool.Rent(); using (var channel = connection.CreateModel()) { var body = Encoding.UTF8.GetBytes(content); @@ -56,6 +57,10 @@ namespace DotNetCore.CAP.RabbitMQ Description = ex.Message })); } + finally + { + _connectionPool.Return(connection); + } } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index a6720fd..0172c2b 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -14,7 +14,7 @@ namespace DotNetCore.CAP.RabbitMQ private readonly string _queueName; private readonly RabbitMQOptions _rabbitMQOptions; - private IConnection _connection; + private ConnectionPool _connectionPool; private IModel _channel; private ulong _deliveryTag; @@ -23,11 +23,11 @@ namespace DotNetCore.CAP.RabbitMQ public event EventHandler OnError; public RabbitMQConsumerClient(string queueName, - IConnection connection, + ConnectionPool connectionPool, RabbitMQOptions options) { _queueName = queueName; - _connection = connection; + _connectionPool = connectionPool; _rabbitMQOptions = options; _exchageName = options.TopicExchangeName; @@ -36,7 +36,9 @@ namespace DotNetCore.CAP.RabbitMQ private void InitClient() { - _channel = _connection.CreateModel(); + var connection = _connectionPool.Rent(); + + _channel = connection.CreateModel(); _channel.ExchangeDeclare( exchange: _exchageName, @@ -49,6 +51,8 @@ namespace DotNetCore.CAP.RabbitMQ exclusive: false, autoDelete: false, arguments: arguments); + + _connectionPool.Return(connection); } public void Subscribe(IEnumerable topics) @@ -81,7 +85,6 @@ namespace DotNetCore.CAP.RabbitMQ public void Dispose() { _channel.Dispose(); - _connection.Dispose(); } private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs index 252b865..753fc05 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs @@ -6,18 +6,18 @@ namespace DotNetCore.CAP.RabbitMQ internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory { private readonly RabbitMQOptions _rabbitMQOptions; - private readonly IConnection _connection; + private readonly ConnectionPool _connectionPool; public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool) { _rabbitMQOptions = rabbitMQOptions; - _connection = pool.Rent(); + _connectionPool = pool; } public IConsumerClient Create(string groupId) { - return new RabbitMQConsumerClient(groupId, _connection, _rabbitMQOptions); + return new RabbitMQConsumerClient(groupId, _connectionPool, _rabbitMQOptions); } } } \ No newline at end of file