Browse Source

添加Options

undefined
Savorboard 7 years ago
parent
commit
0626c1eac5
10 changed files with 57 additions and 49 deletions
  1. +2
    -1
      src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs
  2. +24
    -3
      src/Cap.Consistency/Consumer/ConsumerHandler.cs
  3. +0
    -28
      src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs
  4. +0
    -1
      src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs
  5. +3
    -1
      src/Cap.Consistency/Extensions/BuilderExtensions.cs
  6. +3
    -0
      src/Cap.Consistency/Extensions/ConsistencyOptions.cs
  7. +0
    -8
      src/Cap.Consistency/GlobalSuppressions.cs
  8. +2
    -5
      src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs
  9. +20
    -0
      src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs
  10. +3
    -2
      src/Cap.Consistency/KafkaConsistency.cs

+ 2
- 1
src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs View File

@@ -7,7 +7,8 @@ namespace Cap.Consistency.Abstractions
public class ConsumerInvokerContext
{
public ConsumerInvokerContext(ConsumerContext consumerContext) {
ConsumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext));
ConsumerContext = consumerContext ??
throw new ArgumentNullException(nameof(consumerContext));

}



+ 24
- 3
src/Cap.Consistency/Consumer/ConsumerHandler.cs View File

@@ -25,11 +25,11 @@ namespace Cap.Consistency.Consumer
IConsumerExcutorSelector selector,
ILoggerFactory loggerFactory) {

_serviceProvider = serviceProvider;
_consumerInvokerFactory = consumerInvokerFactory;
_loggerFactory = loggerFactory;
_selector = selector;
_logger = loggerFactory.CreateLogger<ConsumerHandler>();
_loggerFactory = loggerFactory;
_serviceProvider = serviceProvider;
_consumerInvokerFactory = consumerInvokerFactory;
}

public Task RouteAsync(TopicRouteContext context) {
@@ -42,6 +42,27 @@ namespace Cap.Consistency.Consumer

var matchs = _selector.SelectCandidates(context);



var config = new Dictionary<string, object>
{
{ "group.id", "simple-csharp-consumer" },
{ "bootstrap.servers", brokerList }
};

using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8))) {
//consumer.Assign(new List<TopicInfo> { new TopicInfo(topics.First(), 0, 0) });

while (true) {
Message<Null, string> msg;
if (consumer.Consume(out msg)) {
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
}
}
}



if (matchs == null || matchs.Count == 0) {
_logger.LogInformation("can not be fond topic route");
return Task.CompletedTask;


+ 0
- 28
src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs View File

@@ -1,28 +0,0 @@
using System;
using System.Linq;
using System.Collections.Generic;
using System.Reflection;
using System.Text;

namespace Cap.Consistency.Consumer.Kafka
{
public class KafkaConsumerHandler : IConsumerHandler
{
public readonly QMessageFinder _finder;

public KafkaConsumerHandler(QMessageFinder finder) {
_finder = finder;
}

public void Start(IEnumerable<IConsumerService> consumers) {

var methods = _finder.GetQMessageMethodInfo(consumers.Select(x => x.GetType()).ToArray());

}

public void Stop() {
throw new NotImplementedException();
}
}
}

+ 0
- 1
src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs View File

@@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Cap.Consistency.Route;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;



+ 3
- 1
src/Cap.Consistency/Extensions/BuilderExtensions.cs View File

@@ -45,8 +45,10 @@ namespace Microsoft.AspNetCore.Builder

var context = new TopicRouteContext();

router.RouteAsync(context).Wait();

return builder;
}
}
}

+ 3
- 0
src/Cap.Consistency/Extensions/ConsistencyOptions.cs View File

@@ -12,6 +12,9 @@ namespace Microsoft.AspNetCore.Builder
/// </summary>
public BrokerOptions Broker { get; set; } = new BrokerOptions();


public string BrokerUrlList { get; set; }

}
}

+ 0
- 8
src/Cap.Consistency/GlobalSuppressions.cs View File

@@ -1,8 +0,0 @@

// This file is used by Code Analysis to maintain SuppressMessage
// attributes that are applied to this project.
// Project-level suppressions either have no target or are given
// a specific target and scoped to a namespace, type, member, etc.

[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0016:Use 'throw' expression", Justification = "<Pending>", Scope = "member", Target = "~M:Cap.Consistency.Internal.ConsumerInvoker.#ctor(Microsoft.Extensions.Logging.ILogger,Cap.Consistency.Abstractions.ConsumerContext,Cap.Consistency.Internal.ObjectMethodExecutor)")]


+ 2
- 5
src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs View File

@@ -7,7 +7,7 @@ using Cap.Consistency.Abstractions;
using Cap.Consistency.Attributes;
using Cap.Consistency.Consumer;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Route;
using Cap.Consistency.Routing;
using Microsoft.Extensions.DependencyInjection;

namespace Cap.Consistency.Internal
@@ -20,7 +20,7 @@ namespace Cap.Consistency.Internal
var key = context.Message.MessageKey;
return executeDescriptor.FirstOrDefault(x => x.Topic.Name == key);
}
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(TopicRouteContext context) {

var consumerServices = context.ServiceProvider.GetServices<IConsumerService>();
@@ -43,7 +43,6 @@ namespace Cap.Consistency.Internal

return executorDescriptorList;
}

private ConsumerExecutorDescriptor InitDescriptor(TopicAttribute attr) {
var descriptor = new ConsumerExecutorDescriptor();

@@ -51,7 +50,5 @@ namespace Cap.Consistency.Internal

return descriptor;
}


}
}

+ 20
- 0
src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs View File

@@ -3,13 +3,33 @@ using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Infrastructure;
using Microsoft.Extensions.Logging;

namespace Cap.Consistency.Internal
{
public class ConsumerInvokerFactory : IConsumerInvokerFactory
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
private readonly ObjectMethodExecutor _executor;

public ConsumerInvokerFactory(
ILoggerFactory loggerFactory,
IServiceProvider serviceProvider,
ObjectMethodExecutor executor) {

_logger = loggerFactory.CreateLogger<ConsumerInvokerFactory>();
_serviceProvider = serviceProvider;
_executor = executor;
}

public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext) {

var context = new ConsumerInvokerContext(consumerContext);

context.Result = new ConsumerInvoker(_logger, _serviceProvider,
consumerContext, _executor);

return context.Result;
}
}


+ 3
- 2
src/Cap.Consistency/KafkaConsistency.cs View File

@@ -8,7 +8,7 @@ using System.Threading.Tasks;

namespace Cap.Consistency
{
public class KafkaConsistency:IRoute
public class KafkaConsistency : IRoute
{
private IServiceProvider _serviceProvider;
private IEnumerable<IConsumerHandler> _handlers;
@@ -34,5 +34,6 @@ namespace Cap.Consistency
public async Task Start() {

}

}
}

Loading…
Cancel
Save