From 276b8ee8d936fd18d08a83aa2844c6ed343c8ed8 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Sat, 20 May 2017 00:28:14 +0800 Subject: [PATCH] 1 --- ...Cap.Consistency.EntityFrameworkCore.csproj | 4 +- .../Cap.Consistency.Server.csproj | 24 +- .../Abstractions/ConsumerContext.cs | 2 + .../Abstractions/ConsumerInvokerContext.cs | 1 + src/Cap.Consistency/BrokerOptions.cs | 10 - .../Builder/ConsistencyMiddleware.cs | 24 ++ src/Cap.Consistency/BuilderExtensions.cs | 31 -- src/Cap.Consistency/Cap.Consistency.csproj | 10 +- src/Cap.Consistency/ConsistencyBuilder.cs | 81 ----- .../ConsistencyMarkerService.cs | 7 - .../ConsistencyMessageManager.cs | 147 --------- src/Cap.Consistency/ConsistencyOptions.cs | 26 -- .../Consumer/ConsumerHandler.cs | 25 +- .../Consumer/IConsumerHandler.cs | 7 +- .../Consumer/Kafka/RdKafkaClient.cs | 1 - .../Extensions/BuilderExtensions.cs | 21 ++ src/Cap.Consistency/GlobalSuppressions.cs | 8 + .../IConsistencyMessageStore.cs | 55 ---- .../IConsumerExcutorSelector.cs | 2 +- .../Internal/ConsumerInvoker.cs | 139 +++++++++ .../Internal/ConsumerMethodExecutor.cs | 33 ++ .../Internal/ObjectMethodExecutor.cs | 285 ++++++++++++++++++ src/Cap.Consistency/KafkaConsistency.cs | 9 +- src/Cap.Consistency/QMessageAttribute.cs | 14 - src/Cap.Consistency/QMessageFinder.cs | 48 --- src/Cap.Consistency/QMessageMethodInfo.cs | 17 -- src/Cap.Consistency/Routing/ITopicRoute.cs | 12 + .../{Route => Routing}/TopicRouteContext.cs | 9 +- .../ServiceCollectionExtensions.cs | 47 --- ...onsistency.EntityFrameworkCore.Test.csproj | 28 +- .../Cap.Consistency.Test.csproj | 14 +- 31 files changed, 602 insertions(+), 539 deletions(-) delete mode 100644 src/Cap.Consistency/BrokerOptions.cs create mode 100644 src/Cap.Consistency/Builder/ConsistencyMiddleware.cs delete mode 100644 src/Cap.Consistency/BuilderExtensions.cs delete mode 100644 src/Cap.Consistency/ConsistencyBuilder.cs delete mode 100644 src/Cap.Consistency/ConsistencyMarkerService.cs delete mode 100644 src/Cap.Consistency/ConsistencyMessageManager.cs delete mode 100644 src/Cap.Consistency/ConsistencyOptions.cs create mode 100644 src/Cap.Consistency/GlobalSuppressions.cs delete mode 100644 src/Cap.Consistency/IConsistencyMessageStore.cs create mode 100644 src/Cap.Consistency/Internal/ConsumerInvoker.cs create mode 100644 src/Cap.Consistency/Internal/ConsumerMethodExecutor.cs create mode 100644 src/Cap.Consistency/Internal/ObjectMethodExecutor.cs delete mode 100644 src/Cap.Consistency/QMessageAttribute.cs delete mode 100644 src/Cap.Consistency/QMessageFinder.cs delete mode 100644 src/Cap.Consistency/QMessageMethodInfo.cs create mode 100644 src/Cap.Consistency/Routing/ITopicRoute.cs rename src/Cap.Consistency/{Route => Routing}/TopicRouteContext.cs (84%) delete mode 100644 src/Cap.Consistency/ServiceCollectionExtensions.cs diff --git a/src/Cap.Consistency.EntityFrameworkCore/Cap.Consistency.EntityFrameworkCore.csproj b/src/Cap.Consistency.EntityFrameworkCore/Cap.Consistency.EntityFrameworkCore.csproj index b486e4b..72bfcb6 100644 --- a/src/Cap.Consistency.EntityFrameworkCore/Cap.Consistency.EntityFrameworkCore.csproj +++ b/src/Cap.Consistency.EntityFrameworkCore/Cap.Consistency.EntityFrameworkCore.csproj @@ -16,8 +16,8 @@ - - + + diff --git a/src/Cap.Consistency.Server/Cap.Consistency.Server.csproj b/src/Cap.Consistency.Server/Cap.Consistency.Server.csproj index 176cb07..f5f73da 100644 --- a/src/Cap.Consistency.Server/Cap.Consistency.Server.csproj +++ b/src/Cap.Consistency.Server/Cap.Consistency.Server.csproj @@ -16,9 +16,27 @@ - - - + + + + + + + + + + + + + + + + + + + + + diff --git a/src/Cap.Consistency/Abstractions/ConsumerContext.cs b/src/Cap.Consistency/Abstractions/ConsumerContext.cs index 6b4aab6..0959680 100644 --- a/src/Cap.Consistency/Abstractions/ConsumerContext.cs +++ b/src/Cap.Consistency/Abstractions/ConsumerContext.cs @@ -13,6 +13,8 @@ namespace Cap.Consistency.Abstractions } public ConsumerExecutorDescriptor ConsumerDescriptor { get; set; } + + } } diff --git a/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs b/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs index e2fdedc..5792d6c 100644 --- a/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs +++ b/src/Cap.Consistency/Abstractions/ConsumerInvokerContext.cs @@ -8,6 +8,7 @@ namespace Cap.Consistency.Abstractions { public ConsumerInvokerContext(ConsumerContext consumerContext) { ConsumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext)); + } public ConsumerContext ConsumerContext { get; set; } diff --git a/src/Cap.Consistency/BrokerOptions.cs b/src/Cap.Consistency/BrokerOptions.cs deleted file mode 100644 index 04b4d8f..0000000 --- a/src/Cap.Consistency/BrokerOptions.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace Cap.Consistency -{ - public class BrokerOptions - { - public string HostName { get; set; } - - - - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/Builder/ConsistencyMiddleware.cs b/src/Cap.Consistency/Builder/ConsistencyMiddleware.cs new file mode 100644 index 0000000..48f7272 --- /dev/null +++ b/src/Cap.Consistency/Builder/ConsistencyMiddleware.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Cap.Consistency.Routing; + +namespace Cap.Consistency.Builder +{ + public class ConsistencyMiddleware + { + private readonly ITopicRoute _router; + + public ConsistencyMiddleware(ITopicRoute router) { + _router = router; + } + + public async Task Invoke() { + var context = new TopicRouteContext(); + context.Routes.Add(_router); + + await _router.RouteAsync(context); + } + } +} diff --git a/src/Cap.Consistency/BuilderExtensions.cs b/src/Cap.Consistency/BuilderExtensions.cs deleted file mode 100644 index 96822dc..0000000 --- a/src/Cap.Consistency/BuilderExtensions.cs +++ /dev/null @@ -1,31 +0,0 @@ -using System; -using Cap.Consistency; -using Microsoft.Extensions.DependencyInjection; - -// ReSharper disable once CheckNamespace -namespace Microsoft.AspNetCore.Builder -{ - /// - /// Consistence extensions for - /// - public static class BuilderExtensions - { - /// - /// Enables Consistence for the current application - /// - /// The instance this method extends. - /// The instance this method extends. - public static IApplicationBuilder UseConsistency(this IApplicationBuilder app) { - if (app == null) { - throw new ArgumentNullException(nameof(app)); - } - - var marker = app.ApplicationServices.GetService(); - if (marker == null) { - throw new InvalidOperationException("Add Consistency must be called on the service collection."); - } - - return app; - } - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/Cap.Consistency.csproj b/src/Cap.Consistency/Cap.Consistency.csproj index f50a78e..d5f9640 100644 --- a/src/Cap.Consistency/Cap.Consistency.csproj +++ b/src/Cap.Consistency/Cap.Consistency.csproj @@ -13,11 +13,11 @@ - - - - - + + + + + diff --git a/src/Cap.Consistency/ConsistencyBuilder.cs b/src/Cap.Consistency/ConsistencyBuilder.cs deleted file mode 100644 index 4663d72..0000000 --- a/src/Cap.Consistency/ConsistencyBuilder.cs +++ /dev/null @@ -1,81 +0,0 @@ -using System; -using System.Reflection; -using System.Collections.Concurrent; -using System.Linq; -using Microsoft.Extensions.DependencyInjection; -using System.Collections.Generic; - -namespace Cap.Consistency -{ - /// - /// Helper functions for configuring consistency services. - /// - public class ConsistencyBuilder - { - /// - /// Creates a new instance of . - /// - /// The to use for the message. - /// The to attach to. - public ConsistencyBuilder(Type message, IServiceCollection service) { - MessageType = message; - Services = service; - } - - /// - /// Gets the services are attached to. - /// - /// - /// The services are attached to. - /// - public IServiceCollection Services { get; private set; } - - /// - /// Gets the used for messages. - /// - /// - /// The used for messages. - /// - public Type MessageType { get; private set; } - - /// - /// Adds a for the . - /// - /// The role type held in the store. - /// The current instance. - public virtual ConsistencyBuilder AddMessageStore() where T : class { - return AddScoped(typeof(IConsistencyMessageStore<>).MakeGenericType(MessageType), typeof(T)); - } - - public virtual ConsistencyBuilder AddMessageMethodTable() { - var provider = Services.BuildServiceProvider(); - - var finder = provider.GetRequiredService(); - finder.GetQMessageMethods(Services); - return null; - // Services.AddSingleton(serviceType, concreteType); - // return Add(typeof(IConsistencyMessageStore<>).MakeGenericType(MessageType), typeof(T)); - } - - /// - /// Adds a for the . - /// - /// The type of the message manager to add. - /// The current instance. - public virtual ConsistencyBuilder AddConsistencyMessageManager() where TMessageManager : class { - var messageManagerType = typeof(ConsistencyMessageManager<>).MakeGenericType(MessageType); - var customType = typeof(TMessageManager); - if (messageManagerType == customType || - !messageManagerType.GetTypeInfo().IsAssignableFrom(customType.GetTypeInfo())) { - throw new InvalidOperationException($"Type {customType.Name} must be derive from ConsistencyMessageManager<{MessageType.Name}>"); - } - Services.AddScoped(customType, services => services.GetRequiredService(messageManagerType)); - return AddScoped(messageManagerType, customType); - } - - private ConsistencyBuilder AddScoped(Type serviceType, Type concreteType) { - Services.AddScoped(serviceType, concreteType); - return this; - } - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/ConsistencyMarkerService.cs b/src/Cap.Consistency/ConsistencyMarkerService.cs deleted file mode 100644 index 3ec6cbf..0000000 --- a/src/Cap.Consistency/ConsistencyMarkerService.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace Cap.Consistency -{ - /// - /// Used to verify Consistency service was called on a ServiceCollection - /// - public class ConsistencyMarkerService { } -} \ No newline at end of file diff --git a/src/Cap.Consistency/ConsistencyMessageManager.cs b/src/Cap.Consistency/ConsistencyMessageManager.cs deleted file mode 100644 index 7bcc947..0000000 --- a/src/Cap.Consistency/ConsistencyMessageManager.cs +++ /dev/null @@ -1,147 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Http; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; - -namespace Cap.Consistency -{ - /// - /// Provides the APIs for managing message in a persistence store. - /// - /// The type encapsulating a message. - public class ConsistencyMessageManager : IDisposable where TMessage : class - { - private bool _disposed; - private readonly HttpContext _context; - private CancellationToken CancellationToken => _context?.RequestAborted ?? CancellationToken.None; - - /// - /// Constructs a new instance of . - /// - /// The persistence store the manager will operate over. - /// The used to resolve services. - /// The logger used to log messages, warnings and errors. - public ConsistencyMessageManager(IConsistencyMessageStore store, - IServiceProvider services, - ILogger> logger) { - if (store == null) { - throw new ArgumentNullException(nameof(store)); - } - - Store = store; - Logger = logger; - - if (services != null) { - _context = services.GetService()?.HttpContext; - } - } - - /// - /// Gets or sets the persistence store the manager operates over. - /// - /// The persistence store the manager operates over. - protected internal IConsistencyMessageStore Store { get; set; } - - /// - /// Gets the used to log messages from the manager. - /// - /// - /// The used to log messages from the manager. - /// - protected internal virtual ILogger Logger { get; set; } - - /// - /// Creates the specified in the backing store. - /// - /// The message to create. - /// - /// The that represents the asynchronous operation, containing the - /// of the operation. - /// - public virtual Task CreateAsync(TMessage message) { - ThrowIfDisposed(); - //todo: validation message fileds is correct - - return Store.CreateAsync(message, CancellationToken); - } - - /// - /// Updates the specified in the backing store. - /// - /// The message to update. - /// - /// The that represents the asynchronous operation, containing the - /// of the operation. - /// - public virtual Task UpdateAsync(TMessage message) { - ThrowIfDisposed(); - //todo: validation message fileds is correct - - return Store.UpdateAsync(message, CancellationToken); - } - - /// - /// Deletes the specified in the backing store. - /// - /// The message to delete. - /// - /// The that represents the asynchronous operation, containing the - /// of the operation. - /// - public virtual Task DeleteAsync(TMessage message) { - ThrowIfDisposed(); - - if (message == null) { - throw new ArgumentNullException(nameof(message)); - } - - return Store.DeleteAsync(message, CancellationToken); - } - - /// - /// Finds and returns a message, if any, who has the specified . - /// - /// The message ID to search for. - /// - /// The that represents the asynchronous operation, containing the user matching the specified if it exists. - /// - public virtual Task FindByIdAsync(string messageId) { - ThrowIfDisposed(); - return Store.FindByIdAsync(messageId, CancellationToken); - } - - /// - /// Gets the message identifier for the specified . - /// - /// The message whose identifier should be retrieved. - /// The that represents the asynchronous operation, containing the identifier for the specified . - public virtual async Task GetMessageIdAsync(TMessage message) { - ThrowIfDisposed(); - return await Store.GetMessageIdAsync(message, CancellationToken); - } - - public void Dispose() { - Dispose(true); - GC.SuppressFinalize(this); - } - - /// - /// Releases the unmanaged resources used by the message manager and optionally releases the managed resources. - /// - /// true to release both managed and unmanaged resources; false to release only unmanaged resources. - protected virtual void Dispose(bool disposing) { - if (disposing && !_disposed) { - Store.Dispose(); - _disposed = true; - } - } - - protected void ThrowIfDisposed() { - if (_disposed) { - throw new ObjectDisposedException(GetType().Name); - } - } - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/ConsistencyOptions.cs b/src/Cap.Consistency/ConsistencyOptions.cs deleted file mode 100644 index 2017e4d..0000000 --- a/src/Cap.Consistency/ConsistencyOptions.cs +++ /dev/null @@ -1,26 +0,0 @@ -using Cap.Consistency; - -namespace Microsoft.AspNetCore.Builder -{ - /// - /// Represents all the options you can use to configure the system. - /// - public class ConsistencyOptions - { - /// - /// Gets or sets the for the consistency system. - /// - public BrokerOptions Broker { get; set; } = new BrokerOptions(); - - public long MaxPendingEventNumber { get; set; } - - public int MaxPendingEventNumber32 { - get { - if (this.MaxPendingEventNumber < int.MaxValue) { - return (int)this.MaxPendingEventNumber; - } - return int.MaxValue; - } - } - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/Consumer/ConsumerHandler.cs b/src/Cap.Consistency/Consumer/ConsumerHandler.cs index 4a5eb1f..8990dc4 100644 --- a/src/Cap.Consistency/Consumer/ConsumerHandler.cs +++ b/src/Cap.Consistency/Consumer/ConsumerHandler.cs @@ -4,7 +4,7 @@ using System.Text; using System.Threading.Tasks; using Cap.Consistency.Abstractions; using Cap.Consistency.Infrastructure; -using Cap.Consistency.Route; +using Cap.Consistency.Routing; using Microsoft.Extensions.Logging; namespace Cap.Consistency.Consumer @@ -12,6 +12,7 @@ namespace Cap.Consistency.Consumer public class ConsumerHandler : IConsumerHandler { + private readonly IServiceProvider _serviceProvider; private readonly IConsumerInvokerFactory _consumerInvokerFactory; private readonly IConsumerExcutorSelector _selector; private readonly ILoggerFactory _loggerFactory; @@ -19,30 +20,35 @@ namespace Cap.Consistency.Consumer public ConsumerHandler( + IServiceProvider serviceProvider, IConsumerInvokerFactory consumerInvokerFactory, IConsumerExcutorSelector selector, ILoggerFactory loggerFactory) { + _serviceProvider = serviceProvider; _consumerInvokerFactory = consumerInvokerFactory; _loggerFactory = loggerFactory; _selector = selector; _logger = loggerFactory.CreateLogger(); } - public Task Start(TopicRouteContext context) { + public Task RouteAsync(TopicRouteContext context) { + if (context == null) { throw new ArgumentNullException(nameof(context)); } + context.ServiceProvider = _serviceProvider; + var matchs = _selector.SelectCandidates(context); - if (matchs == null || matchs.Count==0) { + if (matchs == null || matchs.Count == 0) { _logger.LogInformation("can not be fond topic route"); return Task.CompletedTask; } var executeDescriptor = _selector.SelectBestCandidate(context, matchs); - + context.Handler = c => { var consumerContext = new ConsumerContext(executeDescriptor); @@ -52,17 +58,8 @@ namespace Cap.Consistency.Consumer return invoker.InvokeAsync(); }; - + return Task.CompletedTask; } - - - public void Start(IEnumerable consumers) { - throw new NotImplementedException(); - } - - public void Stop() { - throw new NotImplementedException(); - } } } diff --git a/src/Cap.Consistency/Consumer/IConsumerHandler.cs b/src/Cap.Consistency/Consumer/IConsumerHandler.cs index 81514b8..8a727ff 100644 --- a/src/Cap.Consistency/Consumer/IConsumerHandler.cs +++ b/src/Cap.Consistency/Consumer/IConsumerHandler.cs @@ -1,15 +1,12 @@ using System; using System.Collections.Generic; using System.Text; +using Cap.Consistency.Routing; namespace Cap.Consistency.Consumer { - public interface IConsumerHandler + public interface IConsumerHandler : ITopicRoute { - void Start(IEnumerable consumers); - void Stop(); } - - } diff --git a/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs b/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs index bc1efb6..7602617 100644 --- a/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs +++ b/src/Cap.Consistency/Consumer/Kafka/RdKafkaClient.cs @@ -17,7 +17,6 @@ namespace Cap.Consistency.Consumer.Kafka } - public void Start(TopicRouteContext routeContext ) { string brokerList = null;// args[0]; diff --git a/src/Cap.Consistency/Extensions/BuilderExtensions.cs b/src/Cap.Consistency/Extensions/BuilderExtensions.cs index 96822dc..0b716cf 100644 --- a/src/Cap.Consistency/Extensions/BuilderExtensions.cs +++ b/src/Cap.Consistency/Extensions/BuilderExtensions.cs @@ -1,5 +1,6 @@ using System; using Cap.Consistency; +using Cap.Consistency.Routing; using Microsoft.Extensions.DependencyInjection; // ReSharper disable once CheckNamespace @@ -27,5 +28,25 @@ namespace Microsoft.AspNetCore.Builder return app; } + + public static IApplicationBuilder UserRouter(this IApplicationBuilder builder, ITopicRoute router) { + if (builder == null) { + throw new ArgumentNullException(nameof(builder)); + } + + if (router == null) { + throw new ArgumentNullException(nameof(router)); + } + + var marker = builder.ApplicationServices.GetService(); + if (marker == null) { + throw new InvalidOperationException("Add Consistency must be called on the service collection."); + } + + var context = new TopicRouteContext(); + + + } + } } \ No newline at end of file diff --git a/src/Cap.Consistency/GlobalSuppressions.cs b/src/Cap.Consistency/GlobalSuppressions.cs new file mode 100644 index 0000000..52ebaa2 --- /dev/null +++ b/src/Cap.Consistency/GlobalSuppressions.cs @@ -0,0 +1,8 @@ + +// This file is used by Code Analysis to maintain SuppressMessage +// attributes that are applied to this project. +// Project-level suppressions either have no target or are given +// a specific target and scoped to a namespace, type, member, etc. + +[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0016:Use 'throw' expression", Justification = "", Scope = "member", Target = "~M:Cap.Consistency.Internal.ConsumerInvoker.#ctor(Microsoft.Extensions.Logging.ILogger,Cap.Consistency.Abstractions.ConsumerContext,Cap.Consistency.Internal.ObjectMethodExecutor)")] + diff --git a/src/Cap.Consistency/IConsistencyMessageStore.cs b/src/Cap.Consistency/IConsistencyMessageStore.cs deleted file mode 100644 index 80f54a3..0000000 --- a/src/Cap.Consistency/IConsistencyMessageStore.cs +++ /dev/null @@ -1,55 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Cap.Consistency -{ - /// - /// Provides an abstraction for a store which manages consistent message. - /// - /// - public interface IConsistencyMessageStore : IDisposable where TMessage : class - { - /// - /// Finds and returns a message, if any, who has the specified . - /// - /// The message ID to search for. - /// The used to propagate notifications that the operation should be canceled. - /// - /// The that represents the asynchronous operation, containing the message matching the specified if it exists. - /// - Task FindByIdAsync(string messageId, CancellationToken cancellationToken); - - /// - /// Creates a new message in a store as an asynchronous operation. - /// - /// The message to create in the store. - /// The used to propagate notifications that the operation should be canceled. - /// A that represents the of the asynchronous query. - Task CreateAsync(TMessage message, CancellationToken cancellationToken); - - /// - /// Updates a message in a store as an asynchronous operation. - /// - /// The message to update in the store. - /// The used to propagate notifications that the operation should be canceled. - /// A that represents the of the asynchronous query. - Task UpdateAsync(TMessage message, CancellationToken cancellationToken); - - /// - /// Deletes a message from the store as an asynchronous operation. - /// - /// The message to delete in the store. - /// The used to propagate notifications that the operation should be canceled. - /// A that represents the of the asynchronous query. - Task DeleteAsync(TMessage message, CancellationToken cancellationToken); - - /// - /// Gets the ID for a message from the store as an asynchronous operation. - /// - /// The message whose ID should be returned. - /// The used to propagate notifications that the operation should be canceled. - /// A that contains the ID of the message. - Task GetMessageIdAsync(TMessage message, CancellationToken cancellationToken); - } -} \ No newline at end of file diff --git a/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs b/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs index 7099b24..714a955 100644 --- a/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs +++ b/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Text; using Cap.Consistency.Abstractions; -using Cap.Consistency.Route; +using Cap.Consistency.Routing; namespace Cap.Consistency.Infrastructure { diff --git a/src/Cap.Consistency/Internal/ConsumerInvoker.cs b/src/Cap.Consistency/Internal/ConsumerInvoker.cs new file mode 100644 index 0000000..0a6222b --- /dev/null +++ b/src/Cap.Consistency/Internal/ConsumerInvoker.cs @@ -0,0 +1,139 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; +using Cap.Consistency.Abstractions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Cap.Consistency.Internal +{ + public class ConsumerInvoker : IConsumerInvoker + { + protected readonly ILogger _logger; + protected readonly IServiceProvider _serviceProvider; + private readonly ObjectMethodExecutor _executor; + protected readonly ConsumerContext _consumerContext; + + private Dictionary _arguments; + + public ConsumerInvoker(ILogger logger, + IServiceProvider serviceProvider, + ConsumerContext consumerContext, + ObjectMethodExecutor objectMethodExecutor) { + if (logger == null) { + throw new ArgumentNullException(nameof(logger)); + } + + if (consumerContext == null) { + throw new ArgumentNullException(nameof(consumerContext)); + } + + if (objectMethodExecutor == null) { + throw new ArgumentNullException(nameof(objectMethodExecutor)); + } + + _logger = logger; + _serviceProvider = serviceProvider; + _consumerContext = consumerContext; + _executor = ObjectMethodExecutor.Create(_consumerContext.ConsumerDescriptor.MethodInfo, + _consumerContext.ConsumerDescriptor.ImplType.GetTypeInfo()); + } + + + public Task InvokeAsync() { + try { + using (_logger.BeginScope("consumer invoker begin")) { + + _logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.Topic); + + try { + + var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, _consumerContext.ConsumerDescriptor.ImplType); + _executor.Execute(obj, null); + return Task.CompletedTask; + } + finally { + + _logger.LogDebug("Executed consumer method ."); + } + } + } + finally { + + } + } + + private object _controller; + + private async Task InvokeConsumerMethodAsync() { + var controllerContext = _consumerContext; + var executor = _executor; + var controller = _controller; + var arguments = _arguments; + var orderedArguments = ConsumerMethodExecutor.PrepareArguments(arguments, executor); + + var logger = _logger; + + object result = null; + try { + + var returnType = executor.MethodReturnType; + if (returnType == typeof(void)) { + executor.Execute(controller, orderedArguments); + result = new object(); + } + else if (returnType == typeof(Task)) { + await (Task)executor.Execute(controller, orderedArguments); + result = new object(); + } + //else if (executor.TaskGenericType == typeof(IActionResult)) { + // result = await (Task)executor.Execute(controller, orderedArguments); + // if (result == null) { + // throw new InvalidOperationException( + // Resources.FormatActionResult_ActionReturnValueCannotBeNull(typeof(IActionResult))); + // } + //} + //else if (executor.IsTypeAssignableFromIActionResult) { + // if (_executor.IsMethodAsync) { + // result = (IActionResult)await _executor.ExecuteAsync(controller, orderedArguments); + // } + // else { + // result = (IActionResult)_executor.Execute(controller, orderedArguments); + // } + + // if (result == null) { + // throw new InvalidOperationException( + // Resources.FormatActionResult_ActionReturnValueCannotBeNull(_executor.TaskGenericType ?? returnType)); + // } + //} + //else if (!executor.IsMethodAsync) { + // var resultAsObject = executor.Execute(controller, orderedArguments); + // result = resultAsObject as IActionResult ?? new ObjectResult(resultAsObject) { + // DeclaredType = returnType, + // }; + //} + //else if (executor.TaskGenericType != null) { + // var resultAsObject = await executor.ExecuteAsync(controller, orderedArguments); + // result = resultAsObject as IActionResult ?? new ObjectResult(resultAsObject) { + // DeclaredType = executor.TaskGenericType, + // }; + //} + //else { + // // This will be the case for types which have derived from Task and Task or non Task types. + // throw new InvalidOperationException(Resources.FormatActionExecutor_UnexpectedTaskInstance( + // executor.MethodInfo.Name, + // executor.MethodInfo.DeclaringType)); + //} + + //_result = result; + // logger.ActionMethodExecuted(controllerContext, result); + } + finally { + + } + } + + } +} diff --git a/src/Cap.Consistency/Internal/ConsumerMethodExecutor.cs b/src/Cap.Consistency/Internal/ConsumerMethodExecutor.cs new file mode 100644 index 0000000..50240d1 --- /dev/null +++ b/src/Cap.Consistency/Internal/ConsumerMethodExecutor.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Cap.Consistency.Internal +{ + public class ConsumerMethodExecutor + { + public static object[] PrepareArguments( + IDictionary actionParameters, + ObjectMethodExecutor actionMethodExecutor) { + var declaredParameterInfos = actionMethodExecutor.ActionParameters; + var count = declaredParameterInfos.Length; + if (count == 0) { + return null; + } + + var arguments = new object[count]; + for (var index = 0; index < count; index++) { + var parameterInfo = declaredParameterInfos[index]; + object value; + + if (!actionParameters.TryGetValue(parameterInfo.Name, out value)) { + value = actionMethodExecutor.GetDefaultValueForParameter(index); + } + + arguments[index] = value; + } + + return arguments; + } + } +} diff --git a/src/Cap.Consistency/Internal/ObjectMethodExecutor.cs b/src/Cap.Consistency/Internal/ObjectMethodExecutor.cs new file mode 100644 index 0000000..33134fd --- /dev/null +++ b/src/Cap.Consistency/Internal/ObjectMethodExecutor.cs @@ -0,0 +1,285 @@ +using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.Linq; +using System.Linq.Expressions; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace Cap.Consistency.Internal +{ + public class ObjectMethodExecutor + { + private readonly object[] _parameterDefaultValues; + private readonly ConsumerMethodExecutorAsync _executorAsync; + private readonly ConsumerMethodExecutor _executor; + + private static readonly MethodInfo _convertOfTMethod = + typeof(ObjectMethodExecutor).GetRuntimeMethods().Single(methodInfo => methodInfo.Name == nameof(ObjectMethodExecutor.Convert)); + + private ObjectMethodExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo) { + if (methodInfo == null) { + throw new ArgumentNullException(nameof(methodInfo)); + } + + MethodInfo = methodInfo; + TargetTypeInfo = targetTypeInfo; + ActionParameters = methodInfo.GetParameters(); + MethodReturnType = methodInfo.ReturnType; + IsMethodAsync = typeof(Task).IsAssignableFrom(MethodReturnType); + TaskGenericType = IsMethodAsync ? GetTaskInnerTypeOrNull(MethodReturnType) : null; + //IsTypeAssignableFromIActionResult = typeof(IActionResult).IsAssignableFrom(TaskGenericType ?? MethodReturnType); + + if (IsMethodAsync && TaskGenericType != null) { + // For backwards compatibility we're creating a sync-executor for an async method. This was + // supported in the past even though MVC wouldn't have called it. + _executor = GetExecutor(methodInfo, targetTypeInfo); + _executorAsync = GetExecutorAsync(TaskGenericType, methodInfo, targetTypeInfo); + } + else { + _executor = GetExecutor(methodInfo, targetTypeInfo); + } + + _parameterDefaultValues = GetParameterDefaultValues(ActionParameters); + } + + private delegate Task ConsumerMethodExecutorAsync(object target, object[] parameters); + + private delegate object ConsumerMethodExecutor(object target, object[] parameters); + + private delegate void VoidActionExecutor(object target, object[] parameters); + + public MethodInfo MethodInfo { get; } + + public ParameterInfo[] ActionParameters { get; } + + public TypeInfo TargetTypeInfo { get; } + + public Type TaskGenericType { get; } + + // This field is made internal set because it is set in unit tests. + public Type MethodReturnType { get; internal set; } + + public bool IsMethodAsync { get; } + + //public bool IsTypeAssignableFromIActionResult { get; } + + public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo) { + var executor = new ObjectMethodExecutor(methodInfo, targetTypeInfo); + return executor; + } + + public Task ExecuteAsync(object target, object[] parameters) { + return _executorAsync(target, parameters); + } + + public object Execute(object target, object[] parameters) { + return _executor(target, parameters); + } + + public object GetDefaultValueForParameter(int index) { + if (index < 0 || index > ActionParameters.Length - 1) { + throw new ArgumentOutOfRangeException(nameof(index)); + } + + return _parameterDefaultValues[index]; + } + + private static ConsumerMethodExecutor GetExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo) { + // Parameters to executor + var targetParameter = Expression.Parameter(typeof(object), "target"); + var parametersParameter = Expression.Parameter(typeof(object[]), "parameters"); + + // Build parameter list + var parameters = new List(); + var paramInfos = methodInfo.GetParameters(); + for (int i = 0; i < paramInfos.Length; i++) { + var paramInfo = paramInfos[i]; + var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i)); + var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType); + + // valueCast is "(Ti) parameters[i]" + parameters.Add(valueCast); + } + + // Call method + var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType()); + var methodCall = Expression.Call(instanceCast, methodInfo, parameters); + + // methodCall is "((Ttarget) target) method((T0) parameters[0], (T1) parameters[1], ...)" + // Create function + if (methodCall.Type == typeof(void)) { + var lambda = Expression.Lambda(methodCall, targetParameter, parametersParameter); + var voidExecutor = lambda.Compile(); + return WrapVoidAction(voidExecutor); + } + else { + // must coerce methodCall to match ActionExecutor signature + var castMethodCall = Expression.Convert(methodCall, typeof(object)); + var lambda = Expression.Lambda(castMethodCall, targetParameter, parametersParameter); + return lambda.Compile(); + } + } + + private static ConsumerMethodExecutor WrapVoidAction(VoidActionExecutor executor) { + return delegate (object target, object[] parameters) { + executor(target, parameters); + return null; + }; + } + + private static ConsumerMethodExecutorAsync GetExecutorAsync(Type taskInnerType, MethodInfo methodInfo, TypeInfo targetTypeInfo) { + // Parameters to executor + var targetParameter = Expression.Parameter(typeof(object), "target"); + var parametersParameter = Expression.Parameter(typeof(object[]), "parameters"); + + // Build parameter list + var parameters = new List(); + var paramInfos = methodInfo.GetParameters(); + for (int i = 0; i < paramInfos.Length; i++) { + var paramInfo = paramInfos[i]; + var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i)); + var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType); + + // valueCast is "(Ti) parameters[i]" + parameters.Add(valueCast); + } + + // Call method + var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType()); + var methodCall = Expression.Call(instanceCast, methodInfo, parameters); + + var coerceMethodCall = GetCoerceMethodCallExpression(taskInnerType, methodCall, methodInfo); + var lambda = Expression.Lambda(coerceMethodCall, targetParameter, parametersParameter); + return lambda.Compile(); + } + + // We need to CoerceResult as the object value returned from methodInfo.Invoke has to be cast to a Task. + // This is necessary to enable calling await on the returned task. + // i.e we need to write the following var result = await (Task)mInfo.Invoke. + // Returning Task enables us to await on the result. + private static Expression GetCoerceMethodCallExpression( + Type taskValueType, + MethodCallExpression methodCall, + MethodInfo methodInfo) { + var castMethodCall = Expression.Convert(methodCall, typeof(object)); + // for: public Task Action() + // constructs: return (Task)Convert((Task)result) + var genericMethodInfo = _convertOfTMethod.MakeGenericMethod(taskValueType); + var genericMethodCall = Expression.Call(null, genericMethodInfo, castMethodCall); + var convertedResult = Expression.Convert(genericMethodCall, typeof(Task)); + return convertedResult; + } + + /// + /// Cast Task of T to Task of object + /// + private static async Task CastToObject(Task task) { + return (object)await task; + } + + private static Type GetTaskInnerTypeOrNull(Type type) { + var genericType = ExtractGenericInterface(type, typeof(Task<>)); + + return genericType?.GenericTypeArguments[0]; + } + + public static Type ExtractGenericInterface(Type queryType, Type interfaceType) { + if (queryType == null) { + throw new ArgumentNullException(nameof(queryType)); + } + + if (interfaceType == null) { + throw new ArgumentNullException(nameof(interfaceType)); + } + + if (IsGenericInstantiation(queryType, interfaceType)) { + // queryType matches (i.e. is a closed generic type created from) the open generic type. + return queryType; + } + + // Otherwise check all interfaces the type implements for a match. + // - If multiple different generic instantiations exists, we want the most derived one. + // - If that doesn't break the tie, then we sort alphabetically so that it's deterministic. + // + // We do this by looking at interfaces on the type, and recursing to the base type + // if we don't find any matches. + return GetGenericInstantiation(queryType, interfaceType); + } + + private static bool IsGenericInstantiation(Type candidate, Type interfaceType) { + return + candidate.GetTypeInfo().IsGenericType && + candidate.GetGenericTypeDefinition() == interfaceType; + } + + private static Type GetGenericInstantiation(Type queryType, Type interfaceType) { + Type bestMatch = null; + var interfaces = queryType.GetInterfaces(); + foreach (var @interface in interfaces) { + if (IsGenericInstantiation(@interface, interfaceType)) { + if (bestMatch == null) { + bestMatch = @interface; + } + else if (StringComparer.Ordinal.Compare(@interface.FullName, bestMatch.FullName) < 0) { + bestMatch = @interface; + } + else { + // There are two matches at this level of the class hierarchy, but @interface is after + // bestMatch in the sort order. + } + } + } + + if (bestMatch != null) { + return bestMatch; + } + + // BaseType will be null for object and interfaces, which means we've reached 'bottom'. + var baseType = queryType?.GetTypeInfo().BaseType; + if (baseType == null) { + return null; + } + else { + return GetGenericInstantiation(baseType, interfaceType); + } + } + + + private static Task Convert(object taskAsObject) { + var task = (Task)taskAsObject; + return CastToObject(task); + } + + private static object[] GetParameterDefaultValues(ParameterInfo[] parameters) { + var values = new object[parameters.Length]; + + for (var i = 0; i < parameters.Length; i++) { + var parameterInfo = parameters[i]; + object defaultValue; + + if (parameterInfo.HasDefaultValue) { + defaultValue = parameterInfo.DefaultValue; + } + else { + var defaultValueAttribute = parameterInfo + .GetCustomAttribute(inherit: false); + + if (defaultValueAttribute?.Value == null) { + defaultValue = parameterInfo.ParameterType.GetTypeInfo().IsValueType + ? Activator.CreateInstance(parameterInfo.ParameterType) + : null; + } + else { + defaultValue = defaultValueAttribute.Value; + } + } + + values[i] = defaultValue; + } + + return values; + } + } +} diff --git a/src/Cap.Consistency/KafkaConsistency.cs b/src/Cap.Consistency/KafkaConsistency.cs index 12ca288..b0851b6 100644 --- a/src/Cap.Consistency/KafkaConsistency.cs +++ b/src/Cap.Consistency/KafkaConsistency.cs @@ -4,10 +4,11 @@ using System.Text; using System.Linq; using Cap.Consistency.Consumer; using Microsoft.Extensions.DependencyInjection; +using System.Threading.Tasks; namespace Cap.Consistency { - public class KafkaConsistency + public class KafkaConsistency:IRoute { private IServiceProvider _serviceProvider; private IEnumerable _handlers; @@ -29,5 +30,9 @@ namespace Cap.Consistency handler.Stop(); } } - } + + public async Task Start() { + + } + } diff --git a/src/Cap.Consistency/QMessageAttribute.cs b/src/Cap.Consistency/QMessageAttribute.cs deleted file mode 100644 index f9e63a8..0000000 --- a/src/Cap.Consistency/QMessageAttribute.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; - -namespace Cap.Consistency -{ - [AttributeUsage(AttributeTargets.Method, Inherited = true, AllowMultiple = true)] - sealed class QMessageAttribute : Attribute - { - public QMessageAttribute(string messageName) { - MessageName = messageName; - } - - public string MessageName { get; private set; } - } -} diff --git a/src/Cap.Consistency/QMessageFinder.cs b/src/Cap.Consistency/QMessageFinder.cs deleted file mode 100644 index 3553313..0000000 --- a/src/Cap.Consistency/QMessageFinder.cs +++ /dev/null @@ -1,48 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Threading.Tasks; -using System.Collections.Concurrent; -using Cap.Consistency.Extensions; -using Microsoft.Extensions.DependencyInjection; - -namespace Cap.Consistency -{ - public class QMessageFinder - { - - public ConcurrentDictionary GetQMessageMethods(IServiceCollection serviceColloection) { - - if (serviceColloection == null) { - throw new ArgumentNullException(nameof(serviceColloection)); - } - - var qMessageTypes = new ConcurrentDictionary(); - - foreach (var serviceDescriptor in serviceColloection) { - - foreach (var method in serviceDescriptor.ServiceType.GetTypeInfo().DeclaredMethods) { - - var messageMethodInfo = new QMessageMethodInfo(); - - if (method.IsPropertyBinding()) { - continue; - } - - var qMessageAttr = method.GetCustomAttribute(); - if (qMessageAttr == null) { - continue; - } - - messageMethodInfo.MessageName = qMessageAttr.MessageName; - messageMethodInfo.ImplType = method.DeclaringType; - messageMethodInfo.MethodInfo = method; - - qMessageTypes.AddOrUpdate(qMessageAttr.MessageName, messageMethodInfo, (x, y) => y); - } - } - return qMessageTypes; - } - } -} diff --git a/src/Cap.Consistency/QMessageMethodInfo.cs b/src/Cap.Consistency/QMessageMethodInfo.cs deleted file mode 100644 index 7bd3068..0000000 --- a/src/Cap.Consistency/QMessageMethodInfo.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Threading.Tasks; - -namespace Cap.Consistency -{ - public class QMessageMethodInfo - { - public MethodInfo MethodInfo { get; set; } - - public Type ImplType { get; set; } - - public string MessageName { get; set; } - } -} diff --git a/src/Cap.Consistency/Routing/ITopicRoute.cs b/src/Cap.Consistency/Routing/ITopicRoute.cs new file mode 100644 index 0000000..b4a00e0 --- /dev/null +++ b/src/Cap.Consistency/Routing/ITopicRoute.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Cap.Consistency.Routing +{ + public interface ITopicRoute + { + Task RouteAsync(TopicRouteContext context); + } +} diff --git a/src/Cap.Consistency/Route/TopicRouteContext.cs b/src/Cap.Consistency/Routing/TopicRouteContext.cs similarity index 84% rename from src/Cap.Consistency/Route/TopicRouteContext.cs rename to src/Cap.Consistency/Routing/TopicRouteContext.cs index 984e5a6..5bc73f5 100644 --- a/src/Cap.Consistency/Route/TopicRouteContext.cs +++ b/src/Cap.Consistency/Routing/TopicRouteContext.cs @@ -6,12 +6,15 @@ using Cap.Consistency.Abstractions; using Cap.Consistency.Consumer; using Cap.Consistency.Infrastructure; -namespace Cap.Consistency.Route +namespace Cap.Consistency.Routing { public delegate Task HandlerConsumer(ConsumerExecutorDescriptor context); public class TopicRouteContext { + public TopicRouteContext() { + + } public TopicRouteContext(DeliverMessage message) { Message = message; @@ -23,9 +26,11 @@ namespace Cap.Consistency.Route public HandlerConsumer Handler { get; set; } + public IList Consumers { get; set; } + public IServiceProvider ServiceProvider { get; set; } - public IList Consumers { get; set; } + public IList Routes { get; set; } } } diff --git a/src/Cap.Consistency/ServiceCollectionExtensions.cs b/src/Cap.Consistency/ServiceCollectionExtensions.cs deleted file mode 100644 index 058fcc0..0000000 --- a/src/Cap.Consistency/ServiceCollectionExtensions.cs +++ /dev/null @@ -1,47 +0,0 @@ -using System; -using Cap.Consistency; -using Microsoft.AspNetCore.Builder; -using Microsoft.Extensions.DependencyInjection.Extensions; - -// ReSharper disable once CheckNamespace -namespace Microsoft.Extensions.DependencyInjection -{ - /// - /// Contains extension methods to for configuring consistence services. - /// - public static class ServiceCollectionExtensions - { - /// - /// Adds and configures the consistence services for the consitence. - /// - /// The services available in the application. - /// An for application services. - public static ConsistencyBuilder AddConsistency(this IServiceCollection services) - where TMessage : class { - return services.AddConsistency(setupAction: null); - } - - /// - /// Adds and configures the consistence services for the consitence. - /// - /// The services available in the application. - /// An action to configure the . - /// An for application services. - public static ConsistencyBuilder AddConsistency(this IServiceCollection services, Action setupAction) - where TMessage : class { - services.TryAddSingleton(); - - services.TryAddScoped, ConsistencyMessageManager>(); - - - services.AddSingleton(); - - - if (setupAction != null) { - services.Configure(setupAction); - } - - return new ConsistencyBuilder(typeof(TMessage), services); - } - } -} \ No newline at end of file diff --git a/test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj b/test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj index 70bc1b8..6352b19 100644 --- a/test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj +++ b/test/Cap.Consistency.EntityFrameworkCore.Test/Cap.Consistency.EntityFrameworkCore.Test.csproj @@ -23,20 +23,20 @@ - - - - - - - - - - - - - - + + + + + + + + + + + + + + diff --git a/test/Cap.Consistency.Test/Cap.Consistency.Test.csproj b/test/Cap.Consistency.Test/Cap.Consistency.Test.csproj index 148bb6a..1ad1b6c 100644 --- a/test/Cap.Consistency.Test/Cap.Consistency.Test.csproj +++ b/test/Cap.Consistency.Test/Cap.Consistency.Test.csproj @@ -19,13 +19,13 @@ - - - - - - - + + + + + + +