@@ -21,7 +21,7 @@ namespace DotNetCore.CAP | |||||
services.AddSingleton(kafkaOptions); | services.AddSingleton(kafkaOptions); | ||||
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>(); | services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>(); | ||||
services.AddTransient<IQueueExecutor, PublishQueueExecutor>(); | |||||
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>(); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -23,9 +23,8 @@ namespace DotNetCore.CAP | |||||
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>(); | services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>(); | ||||
services.AddSingleton<ConnectionPool>(); | services.AddSingleton<ConnectionPool>(); | ||||
services.AddScoped(x => x.GetService<ConnectionPool>().Rent()); | |||||
services.AddTransient<IQueueExecutor, PublishQueueExecutor>(); | |||||
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>(); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -10,19 +10,19 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor | internal sealed class PublishQueueExecutor : BasePublishQueueExecutor | ||||
{ | { | ||||
private readonly ILogger _logger; | private readonly ILogger _logger; | ||||
private readonly IConnection _connection; | |||||
private readonly ConnectionPool _connectionPool; | |||||
private readonly RabbitMQOptions _rabbitMQOptions; | private readonly RabbitMQOptions _rabbitMQOptions; | ||||
public PublishQueueExecutor( | public PublishQueueExecutor( | ||||
CapOptions options, | CapOptions options, | ||||
IStateChanger stateChanger, | IStateChanger stateChanger, | ||||
IConnection connection, | |||||
ConnectionPool connectionPool, | |||||
RabbitMQOptions rabbitMQOptions, | RabbitMQOptions rabbitMQOptions, | ||||
ILogger<PublishQueueExecutor> logger) | ILogger<PublishQueueExecutor> logger) | ||||
: base(options, stateChanger, logger) | : base(options, stateChanger, logger) | ||||
{ | { | ||||
_logger = logger; | _logger = logger; | ||||
_connection = connection; | |||||
_connectionPool = connectionPool; | |||||
_rabbitMQOptions = rabbitMQOptions; | _rabbitMQOptions = rabbitMQOptions; | ||||
} | } | ||||
@@ -30,7 +30,8 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
using (var channel = _connection.CreateModel()) | |||||
var connection = _connectionPool.Rent(); | |||||
using (var channel = connection.CreateModel()) | |||||
{ | { | ||||
var body = Encoding.UTF8.GetBytes(content); | var body = Encoding.UTF8.GetBytes(content); | ||||
@@ -9,10 +9,10 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
private readonly IConnection _connection; | private readonly IConnection _connection; | ||||
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, IConnection connection) | |||||
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool) | |||||
{ | { | ||||
_rabbitMQOptions = rabbitMQOptions; | _rabbitMQOptions = rabbitMQOptions; | ||||
_connection = connection; | |||||
_connection = pool.Rent(); | |||||
} | } | ||||
public IConsumerClient Create(string groupId) | public IConsumerClient Create(string groupId) | ||||
@@ -19,8 +19,8 @@ namespace DotNetCore.CAP | |||||
public void AddServices(IServiceCollection services) | public void AddServices(IServiceCollection services) | ||||
{ | { | ||||
services.AddSingleton<IStorage, SqlServerStorage>(); | services.AddSingleton<IStorage, SqlServerStorage>(); | ||||
services.AddScoped<IStorageConnection, SqlServerStorageConnection>(); | |||||
services.AddScoped<ICapPublisher, CapPublisher>(); | |||||
services.AddSingleton<IStorageConnection, SqlServerStorageConnection>(); | |||||
services.AddTransient<ICapPublisher, CapPublisher>(); | |||||
services.AddTransient<ICallbackPublisher, CapPublisher>(); | services.AddTransient<ICallbackPublisher, CapPublisher>(); | ||||
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>(); | services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>(); | ||||
AddSqlServerOptions(services); | AddSqlServerOptions(services); | ||||