Sfoglia il codice sorgente

add connection pool size config to KafkaOption.

undefined
Savorboard 7 anni fa
parent
commit
0c8313141a
2 ha cambiato i file con 9 aggiunte e 7 eliminazioni
  1. +8
    -2
      src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs
  2. +1
    -5
      src/DotNetCore.CAP.Kafka/ConnectionPool.cs

+ 8
- 2
src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs Vedi File

@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;

@@ -16,16 +17,21 @@ namespace DotNetCore.CAP
/// Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter.
/// </para>
/// </summary>
public readonly IDictionary<string, object> MainConfig;
public readonly ConcurrentDictionary<string, object> MainConfig;

private IEnumerable<KeyValuePair<string, object>> _kafkaConfig;


public KafkaOptions()
{
MainConfig = new Dictionary<string, object>();
MainConfig = new ConcurrentDictionary<string, object>();
}

/// <summary>
/// Producer connection pool size, default is 10
/// </summary>
public int ConnectionPoolSize { get; set; } = 10;

/// <summary>
/// The `bootstrap.servers` item config of <see cref="MainConfig" />.
/// <para>


+ 1
- 5
src/DotNetCore.CAP.Kafka/ConnectionPool.cs Vedi File

@@ -8,10 +8,7 @@ namespace DotNetCore.CAP.Kafka
{
public class ConnectionPool : IConnectionPool, IDisposable
{
private const int DefaultPoolSize = 15;

private readonly Func<Producer> _activator;

private readonly ConcurrentQueue<Producer> _pool = new ConcurrentQueue<Producer>();
private int _count;

@@ -19,8 +16,7 @@ namespace DotNetCore.CAP.Kafka

public ConnectionPool(KafkaOptions options)
{
_maxSize = DefaultPoolSize;

_maxSize = options.ConnectionPoolSize;
_activator = CreateActivator(options);
}



Caricamento…
Annulla
Salva