diff --git a/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs b/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs index 1993661..21b8398 100644 --- a/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs +++ b/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs @@ -6,23 +6,29 @@ using System.Collections.Concurrent; using System.Diagnostics; using System.Threading; using Confluent.Kafka; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; namespace DotNetCore.CAP.Kafka { public class ConnectionPool : IConnectionPool, IDisposable { + private readonly ILogger _logger; private readonly Func _activator; - private readonly ConcurrentQueue _pool = new ConcurrentQueue(); + private readonly ConcurrentQueue _pool; private int _count; - private int _maxSize; - public ConnectionPool(KafkaOptions options) + public ConnectionPool(ILogger logger, KafkaOptions options) { + _logger = logger; + _pool = new ConcurrentQueue(); _maxSize = options.ConnectionPoolSize; _activator = CreateActivator(options); - ServersAddress = options.Servers; + + _logger.LogDebug("Kafka configuration of CAP :\r\n {0}", + JsonConvert.SerializeObject(options.AsKafkaConfig(), Formatting.Indented)); } public string ServersAddress { get; } diff --git a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs index e650518..e15fdfa 100644 --- a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs +++ b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs @@ -6,6 +6,7 @@ using System.Collections.Concurrent; using System.Diagnostics; using System.Threading; using Microsoft.Extensions.Logging; +using Newtonsoft.Json; using RabbitMQ.Client; namespace DotNetCore.CAP.RabbitMQ @@ -15,21 +16,24 @@ namespace DotNetCore.CAP.RabbitMQ private const int DefaultPoolSize = 15; private readonly Func _connectionActivator; private readonly ILogger _logger; - private readonly ConcurrentQueue _pool = new ConcurrentQueue(); + private readonly ConcurrentQueue _pool; private IConnection _connection; private int _count; private int _maxSize; - public ConnectionChannelPool(ILogger logger, - RabbitMQOptions options) + public ConnectionChannelPool(ILogger logger, RabbitMQOptions options) { _logger = logger; _maxSize = DefaultPoolSize; - + _pool = new ConcurrentQueue(); _connectionActivator = CreateConnection(options); + HostAddress = options.HostName + ":" + options.Port; Exchange = options.ExchangeName; + + _logger.LogDebug("RabbitMQ configuration of CAP :\r\n {0}", + JsonConvert.SerializeObject(options, Formatting.Indented)); } IModel IConnectionChannelPool.Rent() @@ -87,7 +91,7 @@ namespace DotNetCore.CAP.RabbitMQ private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { - _logger.LogWarning($"RabbitMQ client connection closed! {e}"); + _logger.LogWarning($"RabbitMQ client connection closed! --> {e.ReplyText}"); } public virtual IModel Rent() diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index a34054f..972b6dd 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -93,7 +93,7 @@ namespace DotNetCore.CAP.RabbitMQ _connection = _connectionChannelPool.GetConnection(); _channel = _connection.CreateModel(); - + _channel.ExchangeDeclare( _exchageName, RabbitMQOptions.ExchangeType, @@ -155,7 +155,7 @@ namespace DotNetCore.CAP.RabbitMQ var args = new LogMessageEventArgs { LogType = MqLogType.ConsumerShutdown, - Reason = e.ToString() + Reason = e.ReplyText }; OnLog?.Invoke(sender, args); } diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 1d441ff..e2179ff 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -146,22 +146,22 @@ namespace DotNetCore.CAP switch (logmsg.LogType) { case MqLogType.ConsumerCancelled: - _logger.LogWarning("RabbitMQ consumer cancelled. reason: " + logmsg.Reason); + _logger.LogWarning("RabbitMQ consumer cancelled. --> " + logmsg.Reason); break; case MqLogType.ConsumerRegistered: - _logger.LogInformation("RabbitMQ consumer registered. " + logmsg.Reason); + _logger.LogInformation("RabbitMQ consumer registered. --> " + logmsg.Reason); break; case MqLogType.ConsumerUnregistered: - _logger.LogWarning("RabbitMQ consumer unregistered. reason: " + logmsg.Reason); + _logger.LogWarning("RabbitMQ consumer unregistered. --> " + logmsg.Reason); break; case MqLogType.ConsumerShutdown: - _logger.LogWarning("RabbitMQ consumer shutdown. reason:" + logmsg.Reason); + _logger.LogWarning("RabbitMQ consumer shutdown. --> " + logmsg.Reason); break; case MqLogType.ConsumeError: - _logger.LogError("Kakfa client consume error. reason:" + logmsg.Reason); + _logger.LogError("Kakfa client consume error. --> " + logmsg.Reason); break; case MqLogType.ServerConnError: - _logger.LogCritical("Kafka server connection error. reason:" + logmsg.Reason); + _logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason); break; default: throw new ArgumentOutOfRangeException();