Browse Source

add connection pool for kafka producer.

undefined
Savorboard 7 years ago
parent
commit
938059668c
4 changed files with 119 additions and 22 deletions
  1. +1
    -0
      src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs
  2. +82
    -0
      src/DotNetCore.CAP.Kafka/ConnectionPool.cs
  3. +12
    -0
      src/DotNetCore.CAP.Kafka/IConnectionPool.cs
  4. +24
    -22
      src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs

+ 1
- 0
src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs View File

@@ -25,6 +25,7 @@ namespace DotNetCore.CAP
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>(); services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>(); services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
services.AddSingleton<IPublishExecutor, PublishQueueExecutor>(); services.AddSingleton<IPublishExecutor, PublishQueueExecutor>();
services.AddSingleton<ConnectionPool>();
} }
} }
} }

+ 82
- 0
src/DotNetCore.CAP.Kafka/ConnectionPool.cs View File

@@ -0,0 +1,82 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using Confluent.Kafka;

namespace DotNetCore.CAP.Kafka
{
public class ConnectionPool : IConnectionPool, IDisposable
{
private const int DefaultPoolSize = 15;

private readonly Func<Producer> _activator;

private readonly ConcurrentQueue<Producer> _pool = new ConcurrentQueue<Producer>();
private int _count;

private int _maxSize;

public ConnectionPool(KafkaOptions options)
{
_maxSize = DefaultPoolSize;

_activator = CreateActivator(options);
}

Producer IConnectionPool.Rent()
{
return Rent();
}

bool IConnectionPool.Return(Producer connection)
{
return Return(connection);
}

public void Dispose()
{
_maxSize = 0;

while (_pool.TryDequeue(out var context))
context.Dispose();
}

private static Func<Producer> CreateActivator(KafkaOptions options)
{
return () => new Producer(options.AsKafkaConfig());
}

public virtual Producer Rent()
{
if (_pool.TryDequeue(out var connection))
{
Interlocked.Decrement(ref _count);

Debug.Assert(_count >= 0);

return connection;
}

connection = _activator();

return connection;
}

public virtual bool Return(Producer connection)
{
if (Interlocked.Increment(ref _count) <= _maxSize)
{
_pool.Enqueue(connection);

return true;
}

Interlocked.Decrement(ref _count);

Debug.Assert(_maxSize == 0 || _pool.Count <= _maxSize);

return false;
}
}
}

+ 12
- 0
src/DotNetCore.CAP.Kafka/IConnectionPool.cs View File

@@ -0,0 +1,12 @@

using Confluent.Kafka;

namespace DotNetCore.CAP.Kafka
{
public interface IConnectionPool
{
Producer Rent();

bool Return(Producer context);
}
}

+ 24
- 22
src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs View File

@@ -1,7 +1,6 @@
using System; using System;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Confluent.Kafka;
using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;


@@ -9,49 +8,52 @@ namespace DotNetCore.CAP.Kafka
{ {
internal class PublishQueueExecutor : BasePublishQueueExecutor internal class PublishQueueExecutor : BasePublishQueueExecutor
{ {
private readonly KafkaOptions _kafkaOptions;
private readonly ConnectionPool _connectionPool;
private readonly ILogger _logger; private readonly ILogger _logger;


public PublishQueueExecutor( public PublishQueueExecutor(
CapOptions options, CapOptions options,
IStateChanger stateChanger, IStateChanger stateChanger,
KafkaOptions kafkaOptions,
ConnectionPool connectionPool,
ILogger<PublishQueueExecutor> logger) ILogger<PublishQueueExecutor> logger)
: base(options, stateChanger, logger) : base(options, stateChanger, logger)
{ {
_logger = logger; _logger = logger;
_kafkaOptions = kafkaOptions;
_connectionPool = connectionPool;
} }


public override Task<OperateResult> PublishAsync(string keyName, string content)
public override async Task<OperateResult> PublishAsync(string keyName, string content)
{ {
var producer = _connectionPool.Rent();
try try
{ {
var config = _kafkaOptions.AsKafkaConfig();
var contentBytes = Encoding.UTF8.GetBytes(content); var contentBytes = Encoding.UTF8.GetBytes(content);
using (var producer = new Producer(config))

var message = await producer.ProduceAsync(keyName, null, contentBytes);

if (!message.Error.HasError)
{ {
var message = producer.ProduceAsync(keyName, null, contentBytes).Result;

if (!message.Error.HasError)
{
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");

return Task.FromResult(OperateResult.Success);
}
return Task.FromResult(OperateResult.Failed(new OperateError
{
Code = message.Error.Code.ToString(),
Description = message.Error.Reason
}));
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");

return OperateResult.Success;
} }
return OperateResult.Failed(new OperateError
{
Code = message.Error.Code.ToString(),
Description = message.Error.Reason
});

} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(
_logger.LogError(ex,
$"An error occurred during sending the topic message to kafka. Topic:[{keyName}], Exception: {ex.Message}"); $"An error occurred during sending the topic message to kafka. Topic:[{keyName}], Exception: {ex.Message}");


return Task.FromResult(OperateResult.Failed(ex));
return OperateResult.Failed(ex);
}
finally
{
_connectionPool.Return(producer);
} }
} }
} }

Loading…
Cancel
Save