Browse Source

Add Kafka options to support custom header to add offset and partition into CapHeader. #374

master
Savorboard 5 years ago
parent
commit
e15f518532
2 changed files with 15 additions and 0 deletions
  1. +6
    -0
      src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs
  2. +9
    -0
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs

+ 6
- 0
src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs View File

@@ -5,6 +5,7 @@ using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using Confluent.Kafka;


// ReSharper disable once CheckNamespace // ReSharper disable once CheckNamespace
namespace DotNetCore.CAP namespace DotNetCore.CAP
@@ -43,6 +44,11 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public string Servers { get; set; } public string Servers { get; set; }


/// <summary>
/// If you need to get offset and partition and so on.., you can use this function to write additional header into <see cref="CapHeader"/>
/// </summary>
public Func<ConsumeResult<string, byte[]>, List<KeyValuePair<string, string>>> CustomHeaders { get; set; }

internal IEnumerable<KeyValuePair<string, string>> AsKafkaConfig() internal IEnumerable<KeyValuePair<string, string>> AsKafkaConfig()
{ {
if (_kafkaConfig == null) if (_kafkaConfig == null)


+ 9
- 0
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs View File

@@ -62,6 +62,15 @@ namespace DotNetCore.CAP.Kafka
} }
headers.Add(Messages.Headers.Group, _groupId); headers.Add(Messages.Headers.Group, _groupId);


if (_kafkaOptions.CustomHeaders != null)
{
var customHeaders = _kafkaOptions.CustomHeaders(consumerResult);
foreach (var customHeader in customHeaders)
{
headers.Add(customHeader.Key, customHeader.Value);
}
}

var message = new TransportMessage(headers, consumerResult.Value); var message = new TransportMessage(headers, consumerResult.Value);


OnMessageReceived?.Invoke(consumerResult, message); OnMessageReceived?.Invoke(consumerResult, message);


Loading…
Cancel
Save