Browse Source

添加 Consumer相关

undefined
yangxiaodong 7 years ago
parent
commit
5d69ab018a
37 changed files with 1076 additions and 22 deletions
  1. +14
    -15
      src/Cap.Consistency.Server/ConsistencyServer.cs
  2. +18
    -0
      src/Cap.Consistency/Abstractions/ConsumerContext.cs
  3. +18
    -0
      src/Cap.Consistency/Abstractions/ConsumerExecutorDescriptor.cs
  4. +18
    -0
      src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs
  5. +12
    -0
      src/Cap.Consistency/Abstractions/IConsumerInvoker.cs
  6. +33
    -0
      src/Cap.Consistency/Attributes/KafkaTopicAttribute.cs
  7. +14
    -0
      src/Cap.Consistency/Attributes/QMessageAttribute.cs
  8. +25
    -0
      src/Cap.Consistency/Attributes/TopicAttribute.cs
  9. +8
    -0
      src/Cap.Consistency/Builder/BrokerOptions.cs
  10. +104
    -0
      src/Cap.Consistency/Builder/ConsistencyBuilder.cs
  11. +7
    -0
      src/Cap.Consistency/Builder/ConsistencyMarkerService.cs
  12. +2
    -0
      src/Cap.Consistency/Cap.Consistency.csproj
  13. +68
    -0
      src/Cap.Consistency/Consumer/ConsumerHandler.cs
  14. +15
    -0
      src/Cap.Consistency/Consumer/IConsumerHandler.cs
  15. +10
    -0
      src/Cap.Consistency/Consumer/IConsumerService.cs
  16. +15
    -0
      src/Cap.Consistency/Consumer/ITaskSchedule.cs
  17. +31
    -0
      src/Cap.Consistency/Consumer/Kafka/IKafkaTaskSchedule.cs
  18. +28
    -0
      src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs
  19. +45
    -0
      src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs
  20. +31
    -0
      src/Cap.Consistency/Extensions/BuilderExtensions.cs
  21. +17
    -0
      src/Cap.Consistency/Extensions/ConsistencyOptions.cs
  22. +46
    -0
      src/Cap.Consistency/Extensions/ServiceCollectionExtensions.cs
  23. +32
    -0
      src/Cap.Consistency/Infrastructure/DeliverMessage.cs
  24. +15
    -0
      src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs
  25. +12
    -0
      src/Cap.Consistency/Infrastructure/IConsumerInvokerFactory.cs
  26. +57
    -0
      src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs
  27. +16
    -0
      src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs
  28. +43
    -0
      src/Cap.Consistency/Internal/QMessageFinder.cs
  29. +36
    -0
      src/Cap.Consistency/Internal/QMessageMethodInfo.cs
  30. +33
    -0
      src/Cap.Consistency/KafkaConsistency.cs
  31. +12
    -0
      src/Cap.Consistency/Models/IConsumerModel.cs
  32. +31
    -0
      src/Cap.Consistency/Route/TopicRouteContext.cs
  33. +6
    -5
      src/Cap.Consistency/RouteTable.cs
  34. +147
    -0
      src/Cap.Consistency/Store/ConsistencyMessageManager.cs
  35. +55
    -0
      src/Cap.Consistency/Store/IConsistencyMessageStore.cs
  36. +1
    -1
      test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj
  37. +1
    -1
      test/Cap.Consistency.Test/Cap.Consistency.Test.csproj

+ 14
- 15
src/Cap.Consistency.Server/ConsistencyServer.cs View File

@@ -5,7 +5,6 @@ using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using RdKafka;
using System.Text; using System.Text;


namespace Cap.Consistency.Server namespace Cap.Consistency.Server
@@ -40,25 +39,25 @@ namespace Cap.Consistency.Server


public void Run() { public void Run() {
//配置消费者组 //配置消费者组
var config = new Config() { GroupId = "example-csharp-consumer" };
using (var consumer = new EventConsumer(config, "127.0.0.1:9092")) {
//var config = new Config() { GroupId = "example-csharp-consumer" };
//using (var consumer = new EventConsumer(config, "127.0.0.1:9092")) {


//注册一个事件
consumer.OnMessage += (obj, msg) =>
{
string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
};
// //注册一个事件
// consumer.OnMessage += (obj, msg) =>
// {
// string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
// Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
// };


//订阅一个或者多个Topic
consumer.Subscribe(new[] { "testtopic" });
// //订阅一个或者多个Topic
// consumer.Subscribe(new[] { "testtopic" });


//启动
consumer.Start();
// //启动
// consumer.Start();


_logger.LogInformation("Started consumer...");
// _logger.LogInformation("Started consumer...");
}
//}
} }
} }
} }

+ 18
- 0
src/Cap.Consistency/Abstractions/ConsumerContext.cs View File

@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Cap.Consistency.Abstractions
{
public class ConsumerContext
{
public ConsumerContext(ConsumerExecutorDescriptor descriptor) {
ConsumerDescriptor = descriptor ?? throw new ArgumentNullException(nameof(descriptor));
}

public ConsumerExecutorDescriptor ConsumerDescriptor { get; set; }
}
}

+ 18
- 0
src/Cap.Consistency/Abstractions/ConsumerExecutorDescriptor.cs View File

@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;

namespace Cap.Consistency.Abstractions
{
public class ConsumerExecutorDescriptor
{
public MethodInfo MethodInfo { get; set; }

public Type ImplType { get; set; }

public TopicInfo Topic { get; set; }

public int GroupId { get; set; }
}
}

+ 18
- 0
src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs View File

@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Cap.Consistency.Abstractions
{
public class ConsumerInvokerContext
{
public ConsumerInvokerContext(ConsumerContext consumerContext) {
ConsumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext));
}

public ConsumerContext ConsumerContext { get; set; }

public IConsumerInvoker Result { get; set; }

}
}

+ 12
- 0
src/Cap.Consistency/Abstractions/IConsumerInvoker.cs View File

@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Cap.Consistency.Abstractions
{
public interface IConsumerInvoker
{
Task InvokeAsync();
}
}

+ 33
- 0
src/Cap.Consistency/Attributes/KafkaTopicAttribute.cs View File

@@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Cap.Consistency.Attributes
{
public class KafkaTopicAttribute : TopicAttribute
{
public KafkaTopicAttribute(string topicName)
: this(topicName, 0) { }

public KafkaTopicAttribute(string topicName, int partition)
: this(topicName, partition, 0) { }

public KafkaTopicAttribute(string topicName, int partition, long offset)
: base(topicName) {
Offset = offset;
Partition = partition;
}

public int Partition { get; }

public long Offset { get; }

public bool IsPartition { get { return Partition == 0; } }

public bool IsOffset { get { return Offset == 0; } }

public override string ToString() {
return Name;
}
}
}

+ 14
- 0
src/Cap.Consistency/Attributes/QMessageAttribute.cs View File

@@ -0,0 +1,14 @@
using System;

namespace Cap.Consistency
{
[AttributeUsage(AttributeTargets.Method, Inherited = true, AllowMultiple = true)]
sealed class QMessageAttribute : Attribute
{
public QMessageAttribute(string messageName) {
MessageName = messageName;
}
public string MessageName { get; private set; }
}
}

+ 25
- 0
src/Cap.Consistency/Attributes/TopicAttribute.cs View File

@@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Cap.Consistency.Attributes
{

[AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, Inherited = true, AllowMultiple = true)]
public abstract class TopicAttribute : Attribute
{
readonly string _name;

public TopicAttribute(string topicName) {
this._name = topicName;
}

public string Name {
get { return _name; }
}


public bool IsOneWay { get; set; }

}
}

+ 8
- 0
src/Cap.Consistency/Builder/BrokerOptions.cs View File

@@ -0,0 +1,8 @@
namespace Cap.Consistency
{
public class BrokerOptions
{
public string HostName { get; set; }

}
}

+ 104
- 0
src/Cap.Consistency/Builder/ConsistencyBuilder.cs View File

@@ -0,0 +1,104 @@
using System;
using System.Reflection;
using System.Collections.Concurrent;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using System.Collections.Generic;
using Cap.Consistency.Consumer;
using Cap.Consistency.Consumer.Kafka;

namespace Cap.Consistency
{
/// <summary>
/// Helper functions for configuring consistency services.
/// </summary>
public class ConsistencyBuilder
{
/// <summary>
/// Creates a new instance of <see cref="ConsistencyBuilder"/>.
/// </summary>
/// <param name="message">The <see cref="Type"/> to use for the message.</param>
/// <param name="service">The <see cref="IServiceCollection"/> to attach to.</param>
public ConsistencyBuilder(Type message, IServiceCollection service) {
MessageType = message;
Services = service;

AddConsumerServices();
}

/// <summary>
/// Gets the <see cref="IServiceCollection"/> services are attached to.
/// </summary>
/// <value>
/// The <see cref="IServiceCollection"/> services are attached to.
/// </value>
public IServiceCollection Services { get; private set; }

/// <summary>
/// Gets the <see cref="Type"/> used for messages.
/// </summary>
/// <value>
/// The <see cref="Type"/> used for messages.
/// </value>
public Type MessageType { get; private set; }

public virtual ConsistencyBuilder AddConsumerServices() {

var IConsumerListenerServices = new Dictionary<Type, Type>();
foreach (var rejectedServices in Services) {
if (rejectedServices.ImplementationType != null && typeof(IConsumerService).IsAssignableFrom(rejectedServices.ImplementationType))
IConsumerListenerServices.Add(typeof(IConsumerService), rejectedServices.ImplementationType);
}

foreach (var service in IConsumerListenerServices) {
Services.AddSingleton(service.Key, service.Value);
}

var types = Assembly.GetEntryAssembly().ExportedTypes;
foreach (var type in types) {
if (typeof(IConsumerService).IsAssignableFrom(type)) {
Services.AddSingleton(typeof(IConsumerService), type);
}
}
return this;
}

public virtual ConsistencyBuilder AddKafkaServices() {


Services.AddSingleton<IConsumerHandler, KafkaConsumerHandler>();
return this;
}


/// <summary>
/// Adds a <see cref="IConsistencyMessageStore{TMessage}"/> for the <seealso cref="MessageType"/>.
/// </summary>
/// <typeparam name="T">The role type held in the store.</typeparam>
/// <returns>The current <see cref="ConsistencyBuilder"/> instance.</returns>
public virtual ConsistencyBuilder AddMessageStore<T>() where T : class {
return AddScoped(typeof(IConsistencyMessageStore<>).MakeGenericType(MessageType), typeof(T));
}

/// <summary>
/// Adds a <see cref="ConsistencyMessageManager{TUser}"/> for the <seealso cref="MessageType"/>.
/// </summary>
/// <typeparam name="TMessageManager">The type of the message manager to add.</typeparam>
/// <returns>The current <see cref="ConsistencyBuilder"/> instance.</returns>
public virtual ConsistencyBuilder AddConsistencyMessageManager<TMessageManager>() where TMessageManager : class {
var messageManagerType = typeof(ConsistencyMessageManager<>).MakeGenericType(MessageType);
var customType = typeof(TMessageManager);
if (messageManagerType == customType ||
!messageManagerType.GetTypeInfo().IsAssignableFrom(customType.GetTypeInfo())) {
throw new InvalidOperationException($"Type {customType.Name} must be derive from ConsistencyMessageManager<{MessageType.Name}>");
}
Services.AddScoped(customType, services => services.GetRequiredService(messageManagerType));
return AddScoped(messageManagerType, customType);
}

private ConsistencyBuilder AddScoped(Type serviceType, Type concreteType) {
Services.AddScoped(serviceType, concreteType);
return this;
}
}
}

+ 7
- 0
src/Cap.Consistency/Builder/ConsistencyMarkerService.cs View File

@@ -0,0 +1,7 @@
namespace Cap.Consistency
{
/// <summary>
/// Used to verify Consistency service was called on a ServiceCollection
/// </summary>
public class ConsistencyMarkerService { }
}

+ 2
- 0
src/Cap.Consistency/Cap.Consistency.csproj View File

@@ -12,11 +12,13 @@
</PropertyGroup> </PropertyGroup>


<ItemGroup> <ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.9.5" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="1.1.1" /> <PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Options" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Options" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.0" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.2" />
</ItemGroup> </ItemGroup>


</Project> </Project>

+ 68
- 0
src/Cap.Consistency/Consumer/ConsumerHandler.cs View File

@@ -0,0 +1,68 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Route;
using Microsoft.Extensions.Logging;

namespace Cap.Consistency.Consumer
{
public class ConsumerHandler : IConsumerHandler
{

private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IConsumerExcutorSelector _selector;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;


public ConsumerHandler(
IConsumerInvokerFactory consumerInvokerFactory,
IConsumerExcutorSelector selector,
ILoggerFactory loggerFactory) {

_consumerInvokerFactory = consumerInvokerFactory;
_loggerFactory = loggerFactory;
_selector = selector;
_logger = loggerFactory.CreateLogger<ConsumerHandler>();
}

public Task Start(TopicRouteContext context) {
if (context == null) {
throw new ArgumentNullException(nameof(context));
}

var matchs = _selector.SelectCandidates(context);

if (matchs == null || matchs.Count==0) {
_logger.LogInformation("can not be fond topic route");
return Task.CompletedTask;
}

var executeDescriptor = _selector.SelectBestCandidate(context, matchs);
context.Handler = c => {

var consumerContext = new ConsumerContext(executeDescriptor);
var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);

_logger.LogInformation("consumer starting");

return invoker.InvokeAsync();
};

return Task.CompletedTask;
}


public void Start(IEnumerable<IConsumerService> consumers) {
throw new NotImplementedException();
}

public void Stop() {
throw new NotImplementedException();
}
}
}

+ 15
- 0
src/Cap.Consistency/Consumer/IConsumerHandler.cs View File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Cap.Consistency.Consumer
{
public interface IConsumerHandler
{
void Start(IEnumerable<IConsumerService> consumers);

void Stop();
}

}

+ 10
- 0
src/Cap.Consistency/Consumer/IConsumerService.cs View File

@@ -0,0 +1,10 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Cap.Consistency.Consumer
{
public interface IConsumerService
{
}
}

+ 15
- 0
src/Cap.Consistency/Consumer/ITaskSchedule.cs View File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;

namespace Cap.Consistency.Consumer
{
public interface ITaskSchedule
{
void Start(IReadOnlyList<ConsumerExecutorDescriptor> methods);

void Stop();
}
}

+ 31
- 0
src/Cap.Consistency/Consumer/Kafka/IKafkaTaskSchedule.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;
using Microsoft.Extensions.Logging;

namespace Cap.Consistency.Consumer.Kafka
{

public interface IKafkaTaskSchedule : ITaskSchedule { }


public class KafkaTaskSchedule : IKafkaTaskSchedule
{

private readonly ILogger _logger;


public KafkaTaskSchedule(ILoggerFactory loggerFactory) {
_logger = loggerFactory.CreateLogger<KafkaTaskSchedule>();

}
public void Start(IReadOnlyList<ConsumerExecutorDescriptor> methods) {
throw new NotImplementedException();
}

public void Stop() {
throw new NotImplementedException();
}
}
}

+ 28
- 0
src/Cap.Consistency/Consumer/Kafka/KafkaConsumerHandler.cs View File

@@ -0,0 +1,28 @@
using System;
using System.Linq;
using System.Collections.Generic;
using System.Reflection;
using System.Text;

namespace Cap.Consistency.Consumer.Kafka
{
public class KafkaConsumerHandler : IConsumerHandler
{
public readonly QMessageFinder _finder;

public KafkaConsumerHandler(QMessageFinder finder) {
_finder = finder;
}

public void Start(IEnumerable<IConsumerService> consumers) {

var methods = _finder.GetQMessageMethodInfo(consumers.Select(x => x.GetType()).ToArray());

}

public void Stop() {
throw new NotImplementedException();
}
}
}

+ 45
- 0
src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs View File

@@ -0,0 +1,45 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Cap.Consistency.Route;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace Cap.Consistency.Consumer.Kafka
{
public class RdKafkaClient
{

private Consumer<string, string> _client;

public RdKafkaClient() {

}


public void Start(TopicRouteContext routeContext ) {

string brokerList = null;// args[0];
var topics = new List<string>();// args.Skip(1).ToList();

var config = new Dictionary<string, object>
{
{ "group.id", "simple-csharp-consumer" },
{ "bootstrap.servers", brokerList }
};

using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8))) {
//consumer.Assign(new List<TopicInfo> { new TopicInfo(topics.First(), 0, 0) });

while (true) {
Message<Null, string> msg;
if (consumer.Consume(out msg)) {
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
}
}
}

}
}
}

+ 31
- 0
src/Cap.Consistency/Extensions/BuilderExtensions.cs View File

@@ -0,0 +1,31 @@
using System;
using Cap.Consistency;
using Microsoft.Extensions.DependencyInjection;

// ReSharper disable once CheckNamespace
namespace Microsoft.AspNetCore.Builder
{
/// <summary>
/// Consistence extensions for <see cref="IApplicationBuilder"/>
/// </summary>
public static class BuilderExtensions
{
/// <summary>
/// Enables Consistence for the current application
/// </summary>
/// <param name="app">The <see cref="IApplicationBuilder"/> instance this method extends.</param>
/// <returns>The <see cref="IApplicationBuilder"/> instance this method extends.</returns>
public static IApplicationBuilder UseConsistency(this IApplicationBuilder app) {
if (app == null) {
throw new ArgumentNullException(nameof(app));
}

var marker = app.ApplicationServices.GetService<ConsistencyMarkerService>();
if (marker == null) {
throw new InvalidOperationException("Add Consistency must be called on the service collection.");
}

return app;
}
}
}

+ 17
- 0
src/Cap.Consistency/Extensions/ConsistencyOptions.cs View File

@@ -0,0 +1,17 @@
using Cap.Consistency;

namespace Microsoft.AspNetCore.Builder
{
/// <summary>
/// Represents all the options you can use to configure the system.
/// </summary>
public class ConsistencyOptions
{
/// <summary>
/// Gets or sets the <see cref="BrokerOptions"/> for the consistency system.
/// </summary>
public BrokerOptions Broker { get; set; } = new BrokerOptions();

}
}

+ 46
- 0
src/Cap.Consistency/Extensions/ServiceCollectionExtensions.cs View File

@@ -0,0 +1,46 @@
using System;
using Cap.Consistency;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection.Extensions;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Contains extension methods to <see cref="IServiceCollection"/> for configuring consistence services.
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Adds and configures the consistence services for the consitence.
/// </summary>
/// <param name="services">The services available in the application.</param>
/// <returns>An <see cref="ConsistencyBuilder"/> for application services.</returns>
public static ConsistencyBuilder AddConsistency<TMessage>(this IServiceCollection services)
where TMessage : class {
return services.AddConsistency<TMessage>(setupAction: null);
}

/// <summary>
/// Adds and configures the consistence services for the consitence.
/// </summary>
/// <param name="services">The services available in the application.</param>
/// <param name="setupAction">An action to configure the <see cref="ConsistencyOptions"/>.</param>
/// <returns>An <see cref="ConsistencyBuilder"/> for application services.</returns>
public static ConsistencyBuilder AddConsistency<TMessage>(this IServiceCollection services, Action<ConsistencyOptions> setupAction)
where TMessage : class {
services.TryAddSingleton<ConsistencyMarkerService>();

services.TryAddScoped<ConsistencyMessageManager<TMessage>, ConsistencyMessageManager<TMessage>>();

services.AddSingleton<KafkaConsistency>();

if (setupAction != null) {
services.Configure(setupAction);
}

return new ConsistencyBuilder(typeof(TMessage), services);
}

}
}

+ 32
- 0
src/Cap.Consistency/Infrastructure/DeliverMessage.cs View File

@@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Cap.Consistency.Infrastructure
{
public class DeliverMessage
{
/// <summary>
/// Kafka 对应 Topic name
/// <para>
/// RabbitMQ 对应 RoutingKey
/// </para>
/// </summary>
public string MessageKey { get; set; }


public byte[] Body { get; set; }
}


public class KafkaDeliverMessage : DeliverMessage
{

public int Partition { get; set; }

public long Offset { get; set; }

public string MessageId { get; set; }

}
}

+ 15
- 0
src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs View File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Route;

namespace Cap.Consistency.Infrastructure
{
public interface IConsumerExcutorSelector
{
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(TopicRouteContext context);

ConsumerExecutorDescriptor SelectBestCandidate(TopicRouteContext context, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor);
}
}

+ 12
- 0
src/Cap.Consistency/Infrastructure/IConsumerInvokerFactory.cs View File

@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;

namespace Cap.Consistency.Infrastructure
{
public interface IConsumerInvokerFactory
{
IConsumerInvoker CreateInvoker(ConsumerContext actionContext);
}
}

+ 57
- 0
src/Cap.Consistency/Internal/ConsumerExcutorSelector.cs View File

@@ -0,0 +1,57 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Attributes;
using Cap.Consistency.Consumer;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Route;
using Microsoft.Extensions.DependencyInjection;

namespace Cap.Consistency.Internal
{
public class ConsumerExcutorSelector : IConsumerExcutorSelector
{
public ConsumerExecutorDescriptor SelectBestCandidate(TopicRouteContext context,
IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) {

var key = context.Message.MessageKey;
return executeDescriptor.FirstOrDefault(x => x.Topic.Name == key);
}

public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(TopicRouteContext context) {

var consumerServices = context.ServiceProvider.GetServices<IConsumerService>();

var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
foreach (var service in consumerServices) {
var typeInfo = service.GetType().GetTypeInfo();
if (!typeof(IConsumerService).GetTypeInfo().IsAssignableFrom(typeInfo)) {
continue;
}

foreach (var method in typeInfo.DeclaredMethods) {

var topicAttr = method.GetCustomAttribute<TopicAttribute>(true);
if (topicAttr == null) continue;

executorDescriptorList.Add(InitDescriptor(topicAttr));
}
}

return executorDescriptorList;
}

private ConsumerExecutorDescriptor InitDescriptor(TopicAttribute attr) {
var descriptor = new ConsumerExecutorDescriptor();

descriptor.Topic = new TopicInfo(attr.Name);

return descriptor;
}


}
}

+ 16
- 0
src/Cap.Consistency/Internal/ConsumerInvokerFactory.cs View File

@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Infrastructure;

namespace Cap.Consistency.Internal
{
public class ConsumerInvokerFactory : IConsumerInvokerFactory
{
public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext) {
var context = new ConsumerInvokerContext(consumerContext);
return context.Result;
}
}
}

+ 43
- 0
src/Cap.Consistency/Internal/QMessageFinder.cs View File

@@ -0,0 +1,43 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using Cap.Consistency.Extensions;
using Cap.Consistency.Abstractions;

namespace Cap.Consistency
{
public class QMessageFinder
{
public ConcurrentDictionary<string, ConsumerExecutorDescriptor> GetQMessageMethodInfo(params Type[] serviceType) {

var qMessageTypes = new ConcurrentDictionary<string, ConsumerExecutorDescriptor>();

foreach (var type in serviceType) {

foreach (var method in type.GetTypeInfo().DeclaredMethods) {

var messageMethodInfo = new ConsumerExecutorDescriptor();

if (method.IsPropertyBinding()) {
continue;
}

var qMessageAttr = method.GetCustomAttribute<QMessageAttribute>();
if (qMessageAttr == null) {
continue;
}
messageMethodInfo.ImplType = method.DeclaringType;
messageMethodInfo.MethodInfo = method;

qMessageTypes.AddOrUpdate(qMessageAttr.MessageName, messageMethodInfo, (x, y) => y);
}
}

return qMessageTypes;
}
}
}

+ 36
- 0
src/Cap.Consistency/Internal/QMessageMethodInfo.cs View File

@@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;

namespace Cap.Consistency
{
public class TopicInfo
{
public TopicInfo(string topicName) : this(topicName, 0) {}

public TopicInfo(string topicName, int partition) : this(topicName, partition, 0) {}

public TopicInfo(string topicName, int partition, long offset) {
Name = topicName;
Offset = offset;
Partition = partition;
}

public string Name { get; }
public int Partition { get; }

public long Offset { get; }

public bool IsPartition { get { return Partition == 0; } }

public bool IsOffset { get { return Offset == 0; } }

public override string ToString() {
return Name;
}

}
}

+ 33
- 0
src/Cap.Consistency/KafkaConsistency.cs View File

@@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Linq;
using Cap.Consistency.Consumer;
using Microsoft.Extensions.DependencyInjection;

namespace Cap.Consistency
{
public class KafkaConsistency
{
private IServiceProvider _serviceProvider;
private IEnumerable<IConsumerHandler> _handlers;

public KafkaConsistency(IServiceProvider serviceProvider) {
_serviceProvider = serviceProvider;
}

public void Start() {
_handlers = _serviceProvider.GetServices<IConsumerHandler>();
var services = _serviceProvider.GetServices<IConsumerService>();
foreach (var handler in _handlers) {
handler.Start(services);
}
}

public void Stop() {
foreach (var handler in _handlers) {
handler.Stop();
}
}
}
}

+ 12
- 0
src/Cap.Consistency/Models/IConsumerModel.cs View File

@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Cap.Consistency.Models
{
public interface IConsumerModel
{
string TopicName { get; set; }

}
}

+ 31
- 0
src/Cap.Consistency/Route/TopicRouteContext.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Consumer;
using Cap.Consistency.Infrastructure;

namespace Cap.Consistency.Route
{
public delegate Task HandlerConsumer(ConsumerExecutorDescriptor context);

public class TopicRouteContext
{

public TopicRouteContext(DeliverMessage message) {
Message = message;
}

public DeliverMessage Message { get; }

// public event EventHandler<ConsumerExecutorDescriptor> OnMessage;

public HandlerConsumer Handler { get; set; }

public IServiceProvider ServiceProvider { get; set; }

public IList<IConsumerHandler> Consumers { get; set; }

}
}

+ 6
- 5
src/Cap.Consistency/RouteTable.cs View File

@@ -1,21 +1,22 @@
using System; using System;
using System.Collections; using System.Collections;
using System.Collections.Generic; using System.Collections.Generic;
using Cap.Consistency.Abstractions;


namespace Cap.Consistency namespace Cap.Consistency
{ {
public class RouteTable : IReadOnlyList<QMessageMethodInfo>
public class RouteTable : IReadOnlyList<ConsumerExecutorDescriptor>
{ {


public RouteTable() { public RouteTable() {


} }


public RouteTable(List<QMessageMethodInfo> messageMethods) {
public RouteTable(List<ConsumerExecutorDescriptor> messageMethods) {
QMessageMethods = messageMethods; QMessageMethods = messageMethods;
} }


public QMessageMethodInfo this[int index] {
public ConsumerExecutorDescriptor this[int index] {
get { get {
throw new NotImplementedException(); throw new NotImplementedException();
} }
@@ -27,9 +28,9 @@ namespace Cap.Consistency
} }
} }


public List<QMessageMethodInfo> QMessageMethods { get; set; }
public List<ConsumerExecutorDescriptor> QMessageMethods { get; set; }


public IEnumerator<QMessageMethodInfo> GetEnumerator() {
public IEnumerator<ConsumerExecutorDescriptor> GetEnumerator() {
throw new NotImplementedException(); throw new NotImplementedException();
} }




+ 147
- 0
src/Cap.Consistency/Store/ConsistencyMessageManager.cs View File

@@ -0,0 +1,147 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Cap.Consistency
{
/// <summary>
/// Provides the APIs for managing message in a persistence store.
/// </summary>
/// <typeparam name="TMessage">The type encapsulating a message.</typeparam>
public class ConsistencyMessageManager<TMessage> : IDisposable where TMessage : class
{
private bool _disposed;
private readonly HttpContext _context;
private CancellationToken CancellationToken => _context?.RequestAborted ?? CancellationToken.None;

/// <summary>
/// Constructs a new instance of <see cref="ConsistencyMessageManager{TMessage}"/>.
/// </summary>
/// <param name="store">The persistence store the manager will operate over.</param>
/// <param name="services">The <see cref="IServiceProvider"/> used to resolve services.</param>
/// <param name="logger">The logger used to log messages, warnings and errors.</param>
public ConsistencyMessageManager(IConsistencyMessageStore<TMessage> store,
IServiceProvider services,
ILogger<ConsistencyMessageManager<TMessage>> logger) {
if (store == null) {
throw new ArgumentNullException(nameof(store));
}

Store = store;
Logger = logger;

if (services != null) {
_context = services.GetService<IHttpContextAccessor>()?.HttpContext;
}
}

/// <summary>
/// Gets or sets the persistence store the manager operates over.
/// </summary>
/// <value>The persistence store the manager operates over.</value>
protected internal IConsistencyMessageStore<TMessage> Store { get; set; }

/// <summary>
/// Gets the <see cref="ILogger"/> used to log messages from the manager.
/// </summary>
/// <value>
/// The <see cref="ILogger"/> used to log messages from the manager.
/// </value>
protected internal virtual ILogger Logger { get; set; }

/// <summary>
/// Creates the specified <paramref name="message"/> in the backing store.
/// </summary>
/// <param name="message">The message to create.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/>
/// of the operation.
/// </returns>
public virtual Task<OperateResult> CreateAsync(TMessage message) {
ThrowIfDisposed();
//todo: validation message fileds is correct

return Store.CreateAsync(message, CancellationToken);
}

/// <summary>
/// Updates the specified <paramref name="message"/> in the backing store.
/// </summary>
/// <param name="message">The message to update.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/>
/// of the operation.
/// </returns>
public virtual Task<OperateResult> UpdateAsync(TMessage message) {
ThrowIfDisposed();
//todo: validation message fileds is correct

return Store.UpdateAsync(message, CancellationToken);
}

/// <summary>
/// Deletes the specified <paramref name="message"/> in the backing store.
/// </summary>
/// <param name="message">The message to delete.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/>
/// of the operation.
/// </returns>
public virtual Task<OperateResult> DeleteAsync(TMessage message) {
ThrowIfDisposed();

if (message == null) {
throw new ArgumentNullException(nameof(message));
}

return Store.DeleteAsync(message, CancellationToken);
}

/// <summary>
/// Finds and returns a message, if any, who has the specified <paramref name="messageId"/>.
/// </summary>
/// <param name="messageId">The message ID to search for.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the user matching the specified <paramref name="messageId"/> if it exists.
/// </returns>
public virtual Task<TMessage> FindByIdAsync(string messageId) {
ThrowIfDisposed();
return Store.FindByIdAsync(messageId, CancellationToken);
}

/// <summary>
/// Gets the message identifier for the specified <paramref name="message"/>.
/// </summary>
/// <param name="message">The message whose identifier should be retrieved.</param>
/// <returns>The <see cref="Task"/> that represents the asynchronous operation, containing the identifier for the specified <paramref name="message"/>.</returns>
public virtual async Task<string> GetMessageIdAsync(TMessage message) {
ThrowIfDisposed();
return await Store.GetMessageIdAsync(message, CancellationToken);
}

public void Dispose() {
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Releases the unmanaged resources used by the message manager and optionally releases the managed resources.
/// </summary>
/// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing) {
if (disposing && !_disposed) {
Store.Dispose();
_disposed = true;
}
}

protected void ThrowIfDisposed() {
if (_disposed) {
throw new ObjectDisposedException(GetType().Name);
}
}
}
}

+ 55
- 0
src/Cap.Consistency/Store/IConsistencyMessageStore.cs View File

@@ -0,0 +1,55 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Cap.Consistency
{
/// <summary>
/// Provides an abstraction for a store which manages consistent message.
/// </summary>
/// <typeparam name="TMessage"></typeparam>
public interface IConsistencyMessageStore<TMessage> : IDisposable where TMessage : class
{
/// <summary>
/// Finds and returns a message, if any, who has the specified <paramref name="messageId"/>.
/// </summary>
/// <param name="messageId">The message ID to search for.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the message matching the specified <paramref name="messageId"/> if it exists.
/// </returns>
Task<TMessage> FindByIdAsync(string messageId, CancellationToken cancellationToken);

/// <summary>
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to create in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> CreateAsync(TMessage message, CancellationToken cancellationToken);

/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> UpdateAsync(TMessage message, CancellationToken cancellationToken);

/// <summary>
/// Deletes a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to delete in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> DeleteAsync(TMessage message, CancellationToken cancellationToken);

/// <summary>
/// Gets the ID for a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message whose ID should be returned.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that contains the ID of the message.</returns>
Task<string> GetMessageIdAsync(TMessage message, CancellationToken cancellationToken);
}
}

+ 1
- 1
test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj View File

@@ -27,7 +27,7 @@
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0-beta5-build1225" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.2.0-beta5-build1225" />
<PackageReference Include="xunit" Version="2.2.0-beta5-build3474" /> <PackageReference Include="xunit" Version="2.2.0-beta5-build3474" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="1.1.1" /> <PackageReference Include="Microsoft.AspNetCore.Http" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.0-*" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.0" />
<PackageReference Include="Moq" Version="4.6.36-*" /> <PackageReference Include="Moq" Version="4.6.36-*" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="1.1.1" />


+ 1
- 1
test/Cap.Consistency.Test/Cap.Consistency.Test.csproj View File

@@ -23,7 +23,7 @@
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0-beta5-build1225" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.2.0-beta5-build1225" />
<PackageReference Include="xunit" Version="2.2.0-beta5-build3474" /> <PackageReference Include="xunit" Version="2.2.0-beta5-build3474" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="1.1.1" /> <PackageReference Include="Microsoft.AspNetCore.Http" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.0-*" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.0" />
<PackageReference Include="Moq" Version="4.6.36-*" /> <PackageReference Include="Moq" Version="4.6.36-*" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" />
</ItemGroup> </ItemGroup>


Loading…
Cancel
Save