Browse Source

Allows the overriding of the Kafka ProviderBuilder and ConsumerBuilder (#978)

* Allows the overriding of the ProviderBuilder and ConsumerBuilder creation functions.

* Fix BuildProducer method layout
master
danielblackwellkb 3 years ago
committed by GitHub
parent
commit
df9a8cdd38
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 7 deletions
  1. +6
    -1
      src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs
  2. +9
    -4
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
  3. +2
    -2
      src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs

+ 6
- 1
src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs View File

@@ -46,11 +46,16 @@ namespace DotNetCore.CAP.Kafka
RequestTimeoutMs = 3000
};

producer = new ProducerBuilder<string, byte[]>(config).Build();
producer = BuildProducer(config);

return producer;
}

protected virtual IProducer<string, byte[]> BuildProducer(ProducerConfig config)
{
return new ProducerBuilder<string, byte[]>(config).Build();
}

public bool Return(IProducer<string, byte[]> producer)
{
if (Interlocked.Increment(ref _pCount) <= _maxSize)


+ 9
- 4
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs View File

@@ -15,7 +15,7 @@ using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClient : IConsumerClient
public class KafkaConsumerClient : IConsumerClient
{
private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);

@@ -153,9 +153,7 @@ namespace DotNetCore.CAP.Kafka
config.EnableAutoCommit ??= false;
config.LogConnectionClose ??= false;

_consumerClient = new ConsumerBuilder<string, byte[]>(config)
.SetErrorHandler(ConsumerClient_OnConsumeError)
.Build();
BuildConsumer(config);
}
}
finally
@@ -164,6 +162,13 @@ namespace DotNetCore.CAP.Kafka
}
}

protected virtual void BuildConsumer(ConsumerConfig config)
{
_consumerClient = new ConsumerBuilder<string, byte[]>(config)
.SetErrorHandler(ConsumerClient_OnConsumeError)
.Build();
}

private void ConsumerClient_OnConsumeError(IConsumer<string, byte[]> consumer, Error e)
{
var logArgs = new LogMessageEventArgs


+ 2
- 2
src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs View File

@@ -6,7 +6,7 @@ using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory
public class KafkaConsumerClientFactory : IConsumerClientFactory
{
private readonly IOptions<KafkaOptions> _kafkaOptions;

@@ -15,7 +15,7 @@ namespace DotNetCore.CAP.Kafka
_kafkaOptions = kafkaOptions;
}

public IConsumerClient Create(string groupId)
public virtual IConsumerClient Create(string groupId)
{
try
{


Loading…
Cancel
Save