Browse Source

Add KafkaOptions and remove old options in CAP project.

master
yangxiaodong 7 years ago
parent
commit
f2cfaf165f
7 changed files with 96 additions and 34 deletions
  1. +33
    -0
      src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs
  2. +33
    -0
      src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs
  3. +4
    -0
      src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
  4. +10
    -7
      src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs
  5. +4
    -6
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
  6. +12
    -3
      src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs
  7. +0
    -18
      src/DotNetCore.CAP.Kafka/Microsoft.Extensions.DependencyInjection/ConsistencyBuilderExtensions.cs

+ 33
- 0
src/DotNetCore.CAP.Kafka/CAP.BuilderExtensions.cs View File

@@ -0,0 +1,33 @@
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Kafka;

namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Contains extension methods to <see cref="CapBuilder"/> for adding kafka service.
/// </summary>
public static class CapBuilderExtensions
{
/// <summary>
/// Adds an Kafka implementation of CAP messages queue.
/// </summary>
/// <param name="builder">The <see cref="CapBuilder"/> instance this method extends</param>
/// <param name="setupAction">An action to configure the <see cref="KafkaOptions"/>.</param>
/// <returns>An <see cref="CapBuilder"/> for creating and configuring the CAP system.</returns>
public static CapBuilder AddKafka(this CapBuilder builder, Action<KafkaOptions> setupAction)
{

if (setupAction == null) throw new ArgumentNullException(nameof(setupAction));

builder.Services.Configure(setupAction);

builder.Services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();

builder.Services.AddTransient<IJobProcessor, KafkaJobProcessor>();

return builder;
}
}
}

+ 33
- 0
src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs View File

@@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace DotNetCore.CAP.Kafka
{
public class KafkaOptions
{
/// <summary>
/// Gets the Kafka broker id.
/// </summary>
public int BrokerId { get; }

/// <summary>
/// Gets the Kafka broker hostname.
/// </summary>
public string Host { get; }

/// <summary>
/// Gets the Kafka broker port.
/// </summary>
public int Port { get; }

/// <summary>
/// Returns a JSON representation of the BrokerMetadata object.
/// </summary>
/// <returns>
/// A JSON representation of the BrokerMetadata object.
/// </returns>
public override string ToString()
=> $"{{ \"BrokerId\": {BrokerId}, \"Host\": \"{Host}\", \"Port\": {Port} }}";
}
}

+ 4
- 0
src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj View File

@@ -11,6 +11,10 @@
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
</PropertyGroup>

<ItemGroup>
<None Include="CAP.KafkaOptions.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.9.5" />
</ItemGroup>


src/DotNetCore.CAP.Kafka/IProcessor.Producer.cs → src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs View File

@@ -16,25 +16,27 @@ namespace DotNetCore.CAP.Kafka
{
public class KafkaJobProcessor : IJobProcessor
{
private readonly CapOptions _options;
private readonly CapOptions _capOptions;
private readonly KafkaOptions _kafkaOptions;
private readonly CancellationTokenSource _cts;

private readonly IServiceProvider _provider;
private readonly ILogger _logger;

//internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
private TimeSpan _pollingDelay;

public KafkaJobProcessor(
IOptions<CapOptions> options,
IOptions<CapOptions> capOptions,
IOptions<KafkaOptions> kafkaOptions,
ILogger<KafkaJobProcessor> logger,
IServiceProvider provider)
{
_logger = logger;
_options = options.Value;
_capOptions = capOptions.Value;
_kafkaOptions = kafkaOptions.Value;
_provider = provider;
_cts = new CancellationTokenSource();
_pollingDelay = TimeSpan.FromSeconds(_options.PollingDelay);
_pollingDelay = TimeSpan.FromSeconds(_capOptions.PollingDelay);
}

public bool Waiting { get; private set; }
@@ -62,7 +64,8 @@ namespace DotNetCore.CAP.Kafka
var token = GetTokenToWaitOn(context);
}

await WaitHandleEx.WaitAnyAsync(WaitHandleEx.PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay);
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay);
}
finally
{
@@ -120,7 +123,7 @@ namespace DotNetCore.CAP.Kafka
{
try
{
var config = new Dictionary<string, object> { { "bootstrap.servers", _options.BrokerUrlList } };
var config = new Dictionary<string, object> { { "bootstrap.servers", _kafkaOptions.Host } };
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
var message = producer.ProduceAsync(topic, null, content).Result;

+ 4
- 6
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs View File

@@ -10,19 +10,17 @@ namespace DotNetCore.CAP.Kafka
public class KafkaConsumerClient : IConsumerClient
{
private readonly string _groupId;
private readonly string _bootstrapServers;

private readonly KafkaOptions _kafkaOptions;
private Consumer<Null, string> _consumerClient;

public event EventHandler<MessageBase> MessageReceieved;

public IDeserializer<string> StringDeserializer { get; set; }

public KafkaConsumerClient(string groupId, string bootstrapServers)
public KafkaConsumerClient(string groupId, KafkaOptions options)
{
_groupId = groupId;
_bootstrapServers = bootstrapServers;

_kafkaOptions = options;
StringDeserializer = new StringDeserializer(Encoding.UTF8);
}

@@ -60,7 +58,7 @@ namespace DotNetCore.CAP.Kafka
{
var config = new Dictionary<string, object>{
{ "group.id", _groupId },
{ "bootstrap.servers", _bootstrapServers }
{ "bootstrap.servers", _kafkaOptions.Host }
};

_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);


+ 12
- 3
src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs View File

@@ -1,10 +1,19 @@
namespace DotNetCore.CAP.Kafka
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Kafka
{
public class KafkaConsumerClientFactory : IConsumerClientFactory
{
public IConsumerClient Create(string groupId, string clientHostAddress)
private readonly KafkaOptions _kafkaOptions;

public KafkaConsumerClientFactory(IOptions<KafkaOptions> kafkaOptions)
{
return new KafkaConsumerClient(groupId, clientHostAddress);
_kafkaOptions = kafkaOptions.Value;
}

public IConsumerClient Create(string groupId)
{
return new KafkaConsumerClient(groupId, _kafkaOptions);
}
}
}

+ 0
- 18
src/DotNetCore.CAP.Kafka/Microsoft.Extensions.DependencyInjection/ConsistencyBuilderExtensions.cs View File

@@ -1,18 +0,0 @@
using DotNetCore.CAP;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Kafka;

namespace Microsoft.Extensions.DependencyInjection
{
public static class ConsistencyBuilderExtensions
{
public static CapBuilder AddKafka(this CapBuilder builder)
{
builder.Services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();

builder.Services.AddTransient<IJobProcessor, KafkaJobProcessor>();

return builder;
}
}
}

Loading…
Cancel
Save