@@ -6,23 +6,29 @@ using System.Collections.Concurrent; | |||
using System.Diagnostics; | |||
using System.Threading; | |||
using Confluent.Kafka; | |||
using Microsoft.Extensions.Logging; | |||
using Newtonsoft.Json; | |||
namespace DotNetCore.CAP.Kafka | |||
{ | |||
public class ConnectionPool : IConnectionPool, IDisposable | |||
{ | |||
private readonly ILogger<ConnectionPool> _logger; | |||
private readonly Func<Producer> _activator; | |||
private readonly ConcurrentQueue<Producer> _pool = new ConcurrentQueue<Producer>(); | |||
private readonly ConcurrentQueue<Producer> _pool; | |||
private int _count; | |||
private int _maxSize; | |||
public ConnectionPool(KafkaOptions options) | |||
public ConnectionPool(ILogger<ConnectionPool> logger, KafkaOptions options) | |||
{ | |||
_logger = logger; | |||
_pool = new ConcurrentQueue<Producer>(); | |||
_maxSize = options.ConnectionPoolSize; | |||
_activator = CreateActivator(options); | |||
ServersAddress = options.Servers; | |||
_logger.LogDebug("Kafka configuration of CAP :\r\n {0}", | |||
JsonConvert.SerializeObject(options.AsKafkaConfig(), Formatting.Indented)); | |||
} | |||
public string ServersAddress { get; } | |||
@@ -6,6 +6,7 @@ using System.Collections.Concurrent; | |||
using System.Diagnostics; | |||
using System.Threading; | |||
using Microsoft.Extensions.Logging; | |||
using Newtonsoft.Json; | |||
using RabbitMQ.Client; | |||
namespace DotNetCore.CAP.RabbitMQ | |||
@@ -15,21 +16,24 @@ namespace DotNetCore.CAP.RabbitMQ | |||
private const int DefaultPoolSize = 15; | |||
private readonly Func<IConnection> _connectionActivator; | |||
private readonly ILogger<ConnectionChannelPool> _logger; | |||
private readonly ConcurrentQueue<IModel> _pool = new ConcurrentQueue<IModel>(); | |||
private readonly ConcurrentQueue<IModel> _pool; | |||
private IConnection _connection; | |||
private int _count; | |||
private int _maxSize; | |||
public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger, | |||
RabbitMQOptions options) | |||
public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger, RabbitMQOptions options) | |||
{ | |||
_logger = logger; | |||
_maxSize = DefaultPoolSize; | |||
_pool = new ConcurrentQueue<IModel>(); | |||
_connectionActivator = CreateConnection(options); | |||
HostAddress = options.HostName + ":" + options.Port; | |||
Exchange = options.ExchangeName; | |||
_logger.LogDebug("RabbitMQ configuration of CAP :\r\n {0}", | |||
JsonConvert.SerializeObject(options, Formatting.Indented)); | |||
} | |||
IModel IConnectionChannelPool.Rent() | |||
@@ -87,7 +91,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) | |||
{ | |||
_logger.LogWarning($"RabbitMQ client connection closed! {e}"); | |||
_logger.LogWarning($"RabbitMQ client connection closed! --> {e.ReplyText}"); | |||
} | |||
public virtual IModel Rent() | |||
@@ -93,7 +93,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||
_connection = _connectionChannelPool.GetConnection(); | |||
_channel = _connection.CreateModel(); | |||
_channel.ExchangeDeclare( | |||
_exchageName, | |||
RabbitMQOptions.ExchangeType, | |||
@@ -155,7 +155,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||
var args = new LogMessageEventArgs | |||
{ | |||
LogType = MqLogType.ConsumerShutdown, | |||
Reason = e.ToString() | |||
Reason = e.ReplyText | |||
}; | |||
OnLog?.Invoke(sender, args); | |||
} | |||
@@ -146,22 +146,22 @@ namespace DotNetCore.CAP | |||
switch (logmsg.LogType) | |||
{ | |||
case MqLogType.ConsumerCancelled: | |||
_logger.LogWarning("RabbitMQ consumer cancelled. reason: " + logmsg.Reason); | |||
_logger.LogWarning("RabbitMQ consumer cancelled. --> " + logmsg.Reason); | |||
break; | |||
case MqLogType.ConsumerRegistered: | |||
_logger.LogInformation("RabbitMQ consumer registered. " + logmsg.Reason); | |||
_logger.LogInformation("RabbitMQ consumer registered. --> " + logmsg.Reason); | |||
break; | |||
case MqLogType.ConsumerUnregistered: | |||
_logger.LogWarning("RabbitMQ consumer unregistered. reason: " + logmsg.Reason); | |||
_logger.LogWarning("RabbitMQ consumer unregistered. --> " + logmsg.Reason); | |||
break; | |||
case MqLogType.ConsumerShutdown: | |||
_logger.LogWarning("RabbitMQ consumer shutdown. reason:" + logmsg.Reason); | |||
_logger.LogWarning("RabbitMQ consumer shutdown. --> " + logmsg.Reason); | |||
break; | |||
case MqLogType.ConsumeError: | |||
_logger.LogError("Kakfa client consume error. reason:" + logmsg.Reason); | |||
_logger.LogError("Kakfa client consume error. --> " + logmsg.Reason); | |||
break; | |||
case MqLogType.ServerConnError: | |||
_logger.LogCritical("Kafka server connection error. reason:" + logmsg.Reason); | |||
_logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason); | |||
break; | |||
default: | |||
throw new ArgumentOutOfRangeException(); | |||