diff --git a/samples/Sample.Kafka/Controllers/ValuesController.cs b/samples/Sample.Kafka/Controllers/ValuesController.cs index 44ddc00..fe787d4 100644 --- a/samples/Sample.Kafka/Controllers/ValuesController.cs +++ b/samples/Sample.Kafka/Controllers/ValuesController.cs @@ -24,7 +24,7 @@ namespace Sample.Kafka.Controllers } public string ServerPath => ((IHostingEnvironment)HttpContext.RequestServices.GetService(typeof(IHostingEnvironment))).ContentRootPath; - [KafkaTopic("zzwl.topic.finace.callBack", GroupOrExchange = "test")] + [KafkaTopic("zzwl.topic.finace.callBack", Group = "test")] public void KafkaTest(Person person) { Console.WriteLine(person.Name); diff --git a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj index cc0c788..81686f3 100644 --- a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj +++ b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj @@ -12,11 +12,7 @@ false false - - - - - + diff --git a/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs b/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs index 5489c58..97f38f8 100644 --- a/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs +++ b/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs @@ -84,64 +84,44 @@ namespace DotNetCore.CAP.Kafka { var provider = scopedContext.Provider; var messageStore = provider.GetRequiredService(); - try + var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync(); + if (message != null) { - var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync(); - if (message != null) + try { + var sp = Stopwatch.StartNew(); message.StatusName = StatusName.Processing; await messageStore.UpdateSentMessageAsync(message); - var jobResult = ExecuteJob(message.KeyName, message.Content); + await ExecuteJobAsync(message.KeyName, message.Content); sp.Stop(); - if (!jobResult) - { - _logger.JobFailed(new Exception("topic send failed")); - } - else - { - //TODO : the state will be deleted when release. - message.StatusName = StatusName.Succeeded; - await messageStore.UpdateSentMessageAsync(message); - - _logger.JobExecuted(sp.Elapsed.TotalSeconds); - } - } - } - catch (Exception) - { - return false; - } - } - return true; - } + message.StatusName = StatusName.Succeeded; + await messageStore.UpdateSentMessageAsync(message); - private bool ExecuteJob(string topic, string content) - { - try - { - var config = _kafkaOptions.AsRdkafkaConfig(); - using (var producer = new Producer(config, null, new StringSerializer(Encoding.UTF8))) - { - var message = producer.ProduceAsync(topic, null, content).Result; - if (message.Error.Code == ErrorCode.NoError) - { - return true; + _logger.JobExecuted(sp.Elapsed.TotalSeconds); } - else + catch (Exception ex) { + _logger.ExceptionOccuredWhileExecutingJob(message.KeyName, ex); return false; } } } - catch (Exception ex) + return true; + } + + private Task ExecuteJobAsync(string topic, string content) + { + var config = _kafkaOptions.AsRdkafkaConfig(); + using (var producer = new Producer(config, null, new StringSerializer(Encoding.UTF8))) { - _logger.ExceptionOccuredWhileExecutingJob(topic, ex); - return false; + producer.ProduceAsync(topic, null, content); + producer.Flush(); } + return Task.CompletedTask; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Abstractions/TopicAttribute.cs b/src/DotNetCore.CAP/Abstractions/TopicAttribute.cs index a4a8cb0..0b82a29 100644 --- a/src/DotNetCore.CAP/Abstractions/TopicAttribute.cs +++ b/src/DotNetCore.CAP/Abstractions/TopicAttribute.cs @@ -24,10 +24,14 @@ namespace DotNetCore.CAP.Abstractions } /// - /// the consumer group. + /// kafak --> groups.id + /// rabbitmq --> queue.name /// - public string GroupOrExchange { get; set; } + public string Group { get; set; } = "cap.default.group"; + /// + /// unused now + /// public bool IsOneWay { get; set; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs index d99efb7..49a6630 100644 --- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs +++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs @@ -80,9 +80,9 @@ namespace Microsoft.Extensions.DependencyInjection var types = Assembly.GetEntryAssembly().ExportedTypes; foreach (var type in types) { - if (typeof(IConsumerService).IsAssignableFrom(type)) + if (Helper.IsController(type.GetTypeInfo())) { - services.AddSingleton(typeof(IConsumerService), type); + services.AddSingleton(typeof(object), type); } } } diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 9305792..d8c96ba 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -54,8 +54,7 @@ namespace DotNetCore.CAP public void Start() { var matchs = _selector.GetCandidatesMethods(_serviceProvider); - - var groupingMatchs = matchs.GroupBy(x => x.Value.Attribute.GroupOrExchange); + var groupingMatchs = matchs.GroupBy(x => x.Value.Attribute.Group); foreach (var matchGroup in groupingMatchs) { diff --git a/src/DotNetCore.CAP/Infrastructure/Helper.cs b/src/DotNetCore.CAP/Infrastructure/Helper.cs index eb841f7..105b5ad 100644 --- a/src/DotNetCore.CAP/Infrastructure/Helper.cs +++ b/src/DotNetCore.CAP/Infrastructure/Helper.cs @@ -1,4 +1,5 @@ using System; +using System.Reflection; using Newtonsoft.Json; namespace DotNetCore.CAP.Infrastructure @@ -46,5 +47,35 @@ namespace DotNetCore.CAP.Infrastructure { return Epoch.AddSeconds(value); } + + public static bool IsController(TypeInfo typeInfo) + { + if (!typeInfo.IsClass) + { + return false; + } + + if (typeInfo.IsAbstract) + { + return false; + } + + if (!typeInfo.IsPublic) + { + return false; + } + + if (typeInfo.ContainsGenericParameters) + { + return false; + } + + if (!typeInfo.Name.EndsWith("Controller", StringComparison.OrdinalIgnoreCase)) + { + return false; + } + + return true; + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs index 74c19c9..d80d205 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq; using System.Reflection; using DotNetCore.CAP.Abstractions; +using DotNetCore.CAP.Infrastructure; using Microsoft.Extensions.DependencyInjection; namespace DotNetCore.CAP.Internal @@ -38,9 +39,22 @@ namespace DotNetCore.CAP.Internal public IReadOnlyList SelectCandidates(IServiceProvider provider) { - var consumerServices = provider.GetServices(); + var executorDescriptorList = new List(); + + executorDescriptorList.AddRange(FindConsumersFromInterfaceTypes(provider)); + executorDescriptorList.AddRange(FindConsumersFromControllerTypes(provider)); + + return executorDescriptorList; + } + + + private IReadOnlyList FindConsumersFromInterfaceTypes( + IServiceProvider provider) + { var executorDescriptorList = new List(); + + var consumerServices = provider.GetServices(); foreach (var service in consumerServices) { var typeInfo = service.GetType().GetTypeInfo(); @@ -57,6 +71,31 @@ namespace DotNetCore.CAP.Internal executorDescriptorList.Add(InitDescriptor(topicAttr, method, typeInfo)); } } + return executorDescriptorList; + } + + private IReadOnlyList FindConsumersFromControllerTypes( + IServiceProvider provider) + { + var executorDescriptorList = new List(); + // at cap startup time, find all Controller into the DI container,the type is object. + var controllers = provider.GetServices(); + foreach (var controller in controllers) + { + var typeInfo = controller.GetType().GetTypeInfo(); + //double check + if (Helper.IsController(typeInfo)) + { + foreach (var method in typeInfo.DeclaredMethods) + { + var topicAttr = method.GetCustomAttribute(true); + if (topicAttr == null) continue; + + executorDescriptorList.Add(InitDescriptor(topicAttr, method, typeInfo)); + } + } + continue; + } return executorDescriptorList; }