diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 164d00a..1b18152 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -1,16 +1,30 @@ -using DotNetCore.CAP.Job; - -namespace DotNetCore.CAP +namespace DotNetCore.CAP { /// /// Represents all the options you can use to configure the system. /// public class CapOptions { + /// + /// Default value for polling delay timeout, in seconds. + /// + public const int DefaultPollingDelay = 8; + + /// + /// Default value for CAP job. + /// + public const string DefaultCronExp = "* * * * *"; + + public CapOptions() + { + CronExp = DefaultCronExp; + PollingDelay = DefaultPollingDelay; + } + /// /// Corn expression for configuring retry cron job. Default is 1 min. /// - public string CronExp { get; set; } = Cron.Minutely(); + public string CronExp { get; set; } /// /// Productor job polling delay time. Default is 8 sec. diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 968fc0e..a85fc0f 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -23,7 +23,7 @@ namespace DotNetCore.CAP private readonly CapOptions _options; private readonly CancellationTokenSource _cts; - public event EventHandler MessageReceieved; + public event EventHandler MessageReceieved; private Task _compositeTask; private bool _disposed; @@ -46,15 +46,14 @@ namespace DotNetCore.CAP _cts = new CancellationTokenSource(); } - protected virtual void OnMessageReceieved(CapMessage message) + protected virtual void OnMessageReceieved(MessageContext message) { MessageReceieved?.Invoke(this, message); } public void Start() { - var matchs = _selector.GetCandidatesMethods(_serviceProvider); - var groupingMatchs = matchs.GroupBy(x => x.Value.Attribute.Group); + var groupingMatchs = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider); foreach (var matchGroup in groupingMatchs) { @@ -64,9 +63,9 @@ namespace DotNetCore.CAP { client.MessageReceieved += OnMessageReceieved; - foreach (var item in matchGroup) + foreach (var item in matchGroup.Value) { - client.Subscribe(item.Key); + client.Subscribe(item.Attribute.Name); } client.Listening(TimeSpan.FromSeconds(1)); @@ -76,7 +75,7 @@ namespace DotNetCore.CAP _compositeTask = Task.CompletedTask; } - public virtual void OnMessageReceieved(object sender, MessageBase message) + public virtual void OnMessageReceieved(object sender, MessageContext message) { _logger.EnqueuingReceivedMessage(message.KeyName, message.Content); @@ -88,27 +87,24 @@ namespace DotNetCore.CAP var capMessage = new CapReceivedMessage(message) { StatusName = StatusName.Enqueued, - Added = DateTime.Now }; messageStore.StoreReceivedMessageAsync(capMessage).Wait(); - - ConsumerExecutorDescriptor executeDescriptor = null; - try { - executeDescriptor = _selector.GetTopicExector(message.KeyName); - - var consumerContext = new ConsumerContext(executeDescriptor, message); - - var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); - - invoker.InvokeAsync(); - - messageStore.ChangeReceivedMessageStateAsync(capMessage, StatusName.Succeeded).Wait(); + var executeDescriptorGroup = _selector.GetTopicExector(message.KeyName); + if (executeDescriptorGroup.ContainsKey(message.Group)) + { + messageStore.ChangeReceivedMessageStateAsync(capMessage, StatusName.Processing).Wait(); + // If there are multiple consumers in the same group, we will take the first + var executeDescriptor = executeDescriptorGroup[message.Group][0]; + var consumerContext = new ConsumerContext(executeDescriptor, message); + _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); + messageStore.ChangeReceivedMessageStateAsync(capMessage, StatusName.Succeeded).Wait(); + } } catch (Exception ex) { - _logger.ConsumerMethodExecutingFailed(executeDescriptor?.MethodInfo.Name, ex); + _logger.ConsumerMethodExecutingFailed($"Group:{message.Group}, Topic:{message.KeyName}", ex); } } } @@ -126,7 +122,7 @@ namespace DotNetCore.CAP try { - _compositeTask.Wait((int) TimeSpan.FromSeconds(60).TotalMilliseconds); + _compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds); } catch (AggregateException ex) { diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs index cb924b6..b6e74dc 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs @@ -9,11 +9,10 @@ namespace DotNetCore.CAP.Internal { public class DefaultConsumerInvoker : IConsumerInvoker { - protected readonly ILogger Logger; - protected readonly IServiceProvider ServiceProvider; - protected readonly ConsumerContext ConsumerContext; - + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; private readonly IModelBinder _modelBinder; + private readonly ConsumerContext _consumerContext; private readonly ObjectMethodExecutor _executor; public DefaultConsumerInvoker(ILogger logger, @@ -22,24 +21,24 @@ namespace DotNetCore.CAP.Internal ConsumerContext consumerContext) { _modelBinder = modelBinder; - _executor = ObjectMethodExecutor.Create(ConsumerContext.ConsumerDescriptor.MethodInfo, - ConsumerContext.ConsumerDescriptor.ImplTypeInfo); + _serviceProvider = serviceProvider; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - Logger = logger ?? throw new ArgumentNullException(nameof(logger)); - ServiceProvider = serviceProvider; - ConsumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext)); + _consumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext)); + _executor = ObjectMethodExecutor.Create(_consumerContext.ConsumerDescriptor.MethodInfo, + _consumerContext.ConsumerDescriptor.ImplTypeInfo); } public Task InvokeAsync() { - using (Logger.BeginScope("consumer invoker begin")) + using (_logger.BeginScope("consumer invoker begin")) { - Logger.LogDebug("Executing consumer Topic: {0}", ConsumerContext.ConsumerDescriptor.MethodInfo.Name); + _logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name); - var obj = ActivatorUtilities.GetServiceOrCreateInstance(ServiceProvider, - ConsumerContext.ConsumerDescriptor.ImplTypeInfo.AsType()); + var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, + _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType()); - var value = ConsumerContext.DeliverMessage.Content; + var value = _consumerContext.DeliverMessage.Content; if (_executor.MethodParameters.Length > 0) { diff --git a/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs b/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs index cf6a214..0483ab7 100644 --- a/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs +++ b/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs @@ -1,5 +1,7 @@ using System; +using System.Linq; using System.Collections.Concurrent; +using System.Collections.Generic; using DotNetCore.CAP.Abstractions; namespace DotNetCore.CAP.Internal @@ -8,35 +10,54 @@ namespace DotNetCore.CAP.Internal { private readonly IConsumerServiceSelector _selector; + private ConcurrentDictionary> Entries { get; } + public MethodMatcherCache(IConsumerServiceSelector selector) { _selector = selector; + Entries = new ConcurrentDictionary>(); } - public ConcurrentDictionary GetCandidatesMethods(IServiceProvider provider) + /// + /// Get a dictionary of candidates.In the dictionary, + /// the Key is the CAPSubscribeAttribute Group, the Value for the current Group of candidates + /// + /// + public ConcurrentDictionary> GetCandidatesMethodsOfGroupNameGrouped(IServiceProvider provider) { if (Entries.Count != 0) return Entries; var executorCollection = _selector.SelectCandidates(provider); - foreach (var item in executorCollection) + var groupedCandidates = executorCollection.GroupBy(x => x.Attribute.Group); + + foreach (var item in groupedCandidates) { - Entries.GetOrAdd(item.Attribute.Name, item); + Entries.TryAdd(item.Key, item.ToList()); } + return Entries; } - public ConsumerExecutorDescriptor GetTopicExector(string topicName) + /// + /// Get a dictionary of specify topic candidates. + /// The Key is Group name, the value is specify topic candidates. + /// + /// message topic name + public IDictionary> GetTopicExector(string topicName) { if (Entries == null) { throw new ArgumentNullException(nameof(Entries)); } - return Entries[topicName]; + var dic = new Dictionary>(); + foreach (var item in Entries) + { + var topicCandidates = item.Value.Where(x => x.Attribute.Name == topicName); + dic.Add(item.Key, topicCandidates.ToList()); + } + return dic; } - - public ConcurrentDictionary Entries { get; } = - new ConcurrentDictionary(); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Job/IJob.CapJob.cs b/src/DotNetCore.CAP/Job/IJob.CapJob.cs index ff38e96..138281d 100644 --- a/src/DotNetCore.CAP/Job/IJob.CapJob.cs +++ b/src/DotNetCore.CAP/Job/IJob.CapJob.cs @@ -32,26 +32,25 @@ namespace DotNetCore.CAP.Job public async Task ExecuteAsync() { - var matchs = _selector.GetCandidatesMethods(_serviceProvider); + var groupedCandidates = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider); using (var scope = _serviceProvider.CreateScope()) { var provider = scope.ServiceProvider; - var messageStore = provider.GetService(); + var messageStore = provider.GetService(); var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted(); - if (nextReceivedMessage != null) + if (nextReceivedMessage != null && groupedCandidates.ContainsKey(nextReceivedMessage.Group)) { try { - var executeDescriptor = matchs[nextReceivedMessage.KeyName]; - var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage); - var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); - await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing); - + // If there are multiple consumers in the same group, we will take the first + var executeDescriptor = groupedCandidates[nextReceivedMessage.Group][0]; + var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage.ToMessageContext()); + var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); await invoker.InvokeAsync(); - await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Succeeded); + } catch (Exception ex) {