浏览代码

refactor

undefined
yangxiaodong 7 年前
父节点
当前提交
2ca99909a2
共有 2 个文件被更改,包括 170 次插入0 次删除
  1. +95
    -0
      src/Cap.Consistency/Consumer/Kafka/KafkaConsumerClient.cs
  2. +75
    -0
      src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs

+ 95
- 0
src/Cap.Consistency/Consumer/Kafka/KafkaConsumerClient.cs 查看文件

@@ -0,0 +1,95 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Cap.Consistency.Infrastructure;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace Cap.Consistency.Consumer.Kafka
{
public class KafkaTopicPartition
{
public string Topic { get; set; }

public int Partition { get; set; }
}

public class KafkaConsumerClient : IDisposable
{

private readonly string _groupId;
private readonly string _bootstrapServers;

private Consumer<Null, string> _consumerClient;

public event EventHandler<DeliverMessage> MessageReceieved;

public IDeserializer<string> StringDeserializer { get; set; }

public KafkaConsumerClient(string groupId, string bootstrapServers) {
_groupId = groupId;
_bootstrapServers = bootstrapServers;

StringDeserializer = new StringDeserializer(Encoding.UTF8);
}

public void Subscribe(string topicName, int partition) {

if (_consumerClient == null) {
InitKafkaClient();
}
_consumerClient.Assignment.Add(new TopicPartition(topicName, partition));
}

public void Subscribe(IEnumerable<KafkaTopicPartition> topicList) {

if (_consumerClient == null) {
InitKafkaClient();
}

if (topicList == null || topicList.Count() == 0) {
throw new ArgumentNullException(nameof(topicList));
}

foreach (var item in topicList) {
Subscribe(item.Topic, item.Partition);
}
}


public void Listening(TimeSpan timeout) {
while (true) {
_consumerClient.Poll(timeout);
}
}

public void Dispose() {
_consumerClient.Dispose();
}

#region private methods

private void InitKafkaClient() {
var config = new Dictionary<string, object>{
{ "group.id", "simple-csharp-consumer" },
{ "bootstrap.servers", _bootstrapServers }
};

_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
_consumerClient.OnMessage += ConsumerClient_OnMessage;
}

private void ConsumerClient_OnMessage(object sender, Message<Null, string> e) {
var message = new DeliverMessage {
MessageKey = e.Topic,
Value = e.Value
};
MessageReceieved?.Invoke(sender, message);
}
#endregion

//public void

}
}

+ 75
- 0
src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs 查看文件

@@ -0,0 +1,75 @@
//using System;
//using System.Collections.Generic;
//using System.Text;
//using System.Linq;
//using System.Threading.Tasks;
//using Cap.Consistency.Abstractions;
//using Cap.Consistency.Infrastructure;
//using Cap.Consistency.Routing;
//using Confluent.Kafka;
//using Confluent.Kafka.Serialization;
//using Microsoft.AspNetCore.Builder;
//using Microsoft.Extensions.Logging;
//using Microsoft.Extensions.Options;

//namespace Cap.Consistency.Consumer.Kafka
//{
// public class KafkaConsumerHandler<T> : ConsumerHandler<T> where T : ConsistencyMessage, new()
// {
// protected override void OnMessageReceieved(T message) {
// }

// public Task RouteAsync(TopicRouteContext context) {

// if (context == null) {
// throw new ArgumentNullException(nameof(context));
// }

// context.ServiceProvider = _serviceProvider;

// var matchs = _selector.SelectCandidates(context);

// var config = new Dictionary<string, object>
// {
// { "group.id", "simple-csharp-consumer" },
// { "bootstrap.servers", _options.BrokerUrlList }
// };

// using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8))) {

// var topicList = matchs.Select(item => new TopicPartitionOffset(item.Topic.Name, item.Topic.Partition, new Offset(item.Topic.Offset)));
// consumer.Assign(topicList);

// while (true) {
// if (consumer.Consume(out Message<Null, string> msg)) {

// T consistencyMessage = new T();
// var message = new DeliverMessage() {
// MessageKey = msg.Topic,
// Body = Encoding.UTF8.GetBytes(msg.Value)
// };
// var routeContext = new TopicRouteContext(message);

// var executeDescriptor = _selector.SelectBestCandidate(routeContext, matchs);

// if (executeDescriptor == null) {
// _logger.LogInformation("can not be fond topic execute");
// return Task.CompletedTask;
// }

// var consumerContext = new ConsumerContext(executeDescriptor, message);
// var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);

// _logger.LogInformation("consumer starting");

// return invoker.InvokeAsync();

// }
// }
// }

// }
// }
//}

正在加载...
取消
保存