@@ -6,23 +6,29 @@ using System.Collections.Concurrent; | |||||
using System.Diagnostics; | using System.Diagnostics; | ||||
using System.Threading; | using System.Threading; | ||||
using Confluent.Kafka; | using Confluent.Kafka; | ||||
using Microsoft.Extensions.Logging; | |||||
using Newtonsoft.Json; | |||||
namespace DotNetCore.CAP.Kafka | namespace DotNetCore.CAP.Kafka | ||||
{ | { | ||||
public class ConnectionPool : IConnectionPool, IDisposable | public class ConnectionPool : IConnectionPool, IDisposable | ||||
{ | { | ||||
private readonly ILogger<ConnectionPool> _logger; | |||||
private readonly Func<Producer> _activator; | private readonly Func<Producer> _activator; | ||||
private readonly ConcurrentQueue<Producer> _pool = new ConcurrentQueue<Producer>(); | |||||
private readonly ConcurrentQueue<Producer> _pool; | |||||
private int _count; | private int _count; | ||||
private int _maxSize; | private int _maxSize; | ||||
public ConnectionPool(KafkaOptions options) | |||||
public ConnectionPool(ILogger<ConnectionPool> logger, KafkaOptions options) | |||||
{ | { | ||||
_logger = logger; | |||||
_pool = new ConcurrentQueue<Producer>(); | |||||
_maxSize = options.ConnectionPoolSize; | _maxSize = options.ConnectionPoolSize; | ||||
_activator = CreateActivator(options); | _activator = CreateActivator(options); | ||||
ServersAddress = options.Servers; | ServersAddress = options.Servers; | ||||
_logger.LogDebug("Kafka configuration of CAP :\r\n {0}", | |||||
JsonConvert.SerializeObject(options.AsKafkaConfig(), Formatting.Indented)); | |||||
} | } | ||||
public string ServersAddress { get; } | public string ServersAddress { get; } | ||||
@@ -6,6 +6,7 @@ using System.Collections.Concurrent; | |||||
using System.Diagnostics; | using System.Diagnostics; | ||||
using System.Threading; | using System.Threading; | ||||
using Microsoft.Extensions.Logging; | using Microsoft.Extensions.Logging; | ||||
using Newtonsoft.Json; | |||||
using RabbitMQ.Client; | using RabbitMQ.Client; | ||||
namespace DotNetCore.CAP.RabbitMQ | namespace DotNetCore.CAP.RabbitMQ | ||||
@@ -15,21 +16,24 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
private const int DefaultPoolSize = 15; | private const int DefaultPoolSize = 15; | ||||
private readonly Func<IConnection> _connectionActivator; | private readonly Func<IConnection> _connectionActivator; | ||||
private readonly ILogger<ConnectionChannelPool> _logger; | private readonly ILogger<ConnectionChannelPool> _logger; | ||||
private readonly ConcurrentQueue<IModel> _pool = new ConcurrentQueue<IModel>(); | |||||
private readonly ConcurrentQueue<IModel> _pool; | |||||
private IConnection _connection; | private IConnection _connection; | ||||
private int _count; | private int _count; | ||||
private int _maxSize; | private int _maxSize; | ||||
public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger, | |||||
RabbitMQOptions options) | |||||
public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger, RabbitMQOptions options) | |||||
{ | { | ||||
_logger = logger; | _logger = logger; | ||||
_maxSize = DefaultPoolSize; | _maxSize = DefaultPoolSize; | ||||
_pool = new ConcurrentQueue<IModel>(); | |||||
_connectionActivator = CreateConnection(options); | _connectionActivator = CreateConnection(options); | ||||
HostAddress = options.HostName + ":" + options.Port; | HostAddress = options.HostName + ":" + options.Port; | ||||
Exchange = options.ExchangeName; | Exchange = options.ExchangeName; | ||||
_logger.LogDebug("RabbitMQ configuration of CAP :\r\n {0}", | |||||
JsonConvert.SerializeObject(options, Formatting.Indented)); | |||||
} | } | ||||
IModel IConnectionChannelPool.Rent() | IModel IConnectionChannelPool.Rent() | ||||
@@ -87,7 +91,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) | private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) | ||||
{ | { | ||||
_logger.LogWarning($"RabbitMQ client connection closed! {e}"); | |||||
_logger.LogWarning($"RabbitMQ client connection closed! --> {e.ReplyText}"); | |||||
} | } | ||||
public virtual IModel Rent() | public virtual IModel Rent() | ||||
@@ -93,7 +93,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
_connection = _connectionChannelPool.GetConnection(); | _connection = _connectionChannelPool.GetConnection(); | ||||
_channel = _connection.CreateModel(); | _channel = _connection.CreateModel(); | ||||
_channel.ExchangeDeclare( | _channel.ExchangeDeclare( | ||||
_exchageName, | _exchageName, | ||||
RabbitMQOptions.ExchangeType, | RabbitMQOptions.ExchangeType, | ||||
@@ -155,7 +155,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
var args = new LogMessageEventArgs | var args = new LogMessageEventArgs | ||||
{ | { | ||||
LogType = MqLogType.ConsumerShutdown, | LogType = MqLogType.ConsumerShutdown, | ||||
Reason = e.ToString() | |||||
Reason = e.ReplyText | |||||
}; | }; | ||||
OnLog?.Invoke(sender, args); | OnLog?.Invoke(sender, args); | ||||
} | } | ||||
@@ -146,22 +146,22 @@ namespace DotNetCore.CAP | |||||
switch (logmsg.LogType) | switch (logmsg.LogType) | ||||
{ | { | ||||
case MqLogType.ConsumerCancelled: | case MqLogType.ConsumerCancelled: | ||||
_logger.LogWarning("RabbitMQ consumer cancelled. reason: " + logmsg.Reason); | |||||
_logger.LogWarning("RabbitMQ consumer cancelled. --> " + logmsg.Reason); | |||||
break; | break; | ||||
case MqLogType.ConsumerRegistered: | case MqLogType.ConsumerRegistered: | ||||
_logger.LogInformation("RabbitMQ consumer registered. " + logmsg.Reason); | |||||
_logger.LogInformation("RabbitMQ consumer registered. --> " + logmsg.Reason); | |||||
break; | break; | ||||
case MqLogType.ConsumerUnregistered: | case MqLogType.ConsumerUnregistered: | ||||
_logger.LogWarning("RabbitMQ consumer unregistered. reason: " + logmsg.Reason); | |||||
_logger.LogWarning("RabbitMQ consumer unregistered. --> " + logmsg.Reason); | |||||
break; | break; | ||||
case MqLogType.ConsumerShutdown: | case MqLogType.ConsumerShutdown: | ||||
_logger.LogWarning("RabbitMQ consumer shutdown. reason:" + logmsg.Reason); | |||||
_logger.LogWarning("RabbitMQ consumer shutdown. --> " + logmsg.Reason); | |||||
break; | break; | ||||
case MqLogType.ConsumeError: | case MqLogType.ConsumeError: | ||||
_logger.LogError("Kakfa client consume error. reason:" + logmsg.Reason); | |||||
_logger.LogError("Kakfa client consume error. --> " + logmsg.Reason); | |||||
break; | break; | ||||
case MqLogType.ServerConnError: | case MqLogType.ServerConnError: | ||||
_logger.LogCritical("Kafka server connection error. reason:" + logmsg.Reason); | |||||
_logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason); | |||||
break; | break; | ||||
default: | default: | ||||
throw new ArgumentOutOfRangeException(); | throw new ArgumentOutOfRangeException(); | ||||