diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CapMessageStore.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CapMessageStore.cs index 641c4c7..6050d20 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CapMessageStore.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CapMessageStore.cs @@ -41,12 +41,31 @@ namespace DotNetCore.CAP.EntityFrameworkCore return OperateResult.Success; } + public async Task ChangeSentMessageStateAsync(CapSentMessage message, string status, bool autoSaveChanges = true) + { + Context.Attach(message); + message.LastRun = DateTime.Now; + message.StatusName = status; + try + { + if (autoSaveChanges) + { + await Context.SaveChangesAsync(); + } + } + catch (DbUpdateConcurrencyException ex) + { + return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message }); + } + return OperateResult.Success; + } + /// /// First Enqueued Message. /// public async Task GetNextSentMessageToBeEnqueuedAsync() { - return await SentMessages.FirstOrDefaultAsync(x => x.StateName == StateName.Enqueued); + return await SentMessages.FirstOrDefaultAsync(x => x.StatusName == StatusName.Enqueued); } /// @@ -105,6 +124,30 @@ namespace DotNetCore.CAP.EntityFrameworkCore return OperateResult.Success; } + public async Task ChangeReceivedMessageStateAsync(CapReceivedMessage message, string status, bool autoSaveChanges = true) + { + Context.Attach(message); + message.LastRun = DateTime.Now; + message.StatusName = status; + try + { + if (autoSaveChanges) + { + await Context.SaveChangesAsync(); + } + } + catch (DbUpdateConcurrencyException ex) + { + return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message }); + } + return OperateResult.Success; + } + + public async Task GetNextReceivedMessageToBeExcuted() + { + return await ReceivedMessages.FirstOrDefaultAsync(x => x.StatusName == StatusName.Enqueued); + } + /// /// Updates the specified in the message store. /// @@ -127,5 +170,6 @@ namespace DotNetCore.CAP.EntityFrameworkCore return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message }); } } + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/ConsistencyDbContext.cs b/src/DotNetCore.CAP.EntityFrameworkCore/ConsistencyDbContext.cs index 517eee5..526f090 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/ConsistencyDbContext.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/ConsistencyDbContext.cs @@ -39,12 +39,12 @@ namespace DotNetCore.CAP.EntityFrameworkCore modelBuilder.Entity(b => { b.HasKey(m => m.Id); - b.Property(p => p.StateName).HasMaxLength(50); + b.Property(p => p.StatusName).HasMaxLength(50); }); modelBuilder.Entity(b => { - b.Property(p => p.StateName).HasMaxLength(50); + b.Property(p => p.StatusName).HasMaxLength(50); }); } } diff --git a/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs b/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs index 8a9e039..5489c58 100644 --- a/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs +++ b/src/DotNetCore.CAP.Kafka/IProcessor.KafkaJobProcessor.cs @@ -90,7 +90,7 @@ namespace DotNetCore.CAP.Kafka if (message != null) { var sp = Stopwatch.StartNew(); - message.StateName = StateName.Processing; + message.StatusName = StatusName.Processing; await messageStore.UpdateSentMessageAsync(message); var jobResult = ExecuteJob(message.KeyName, message.Content); @@ -104,7 +104,7 @@ namespace DotNetCore.CAP.Kafka else { //TODO : the state will be deleted when release. - message.StateName = StateName.Succeeded; + message.StatusName = StatusName.Succeeded; await messageStore.UpdateSentMessageAsync(message); _logger.JobExecuted(sp.Elapsed.TotalSeconds); diff --git a/src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs b/src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs index 4e72ad2..43280e8 100644 --- a/src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs +++ b/src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs @@ -26,7 +26,6 @@ namespace DotNetCore.CAP.RabbitMQ public RabbitJobProcessor( IOptions capOptions, IOptions rabbitMQOptions, - IOptions options, ILogger logger, IServiceProvider provider) { @@ -88,7 +87,7 @@ namespace DotNetCore.CAP.RabbitMQ if (message != null) { var sp = Stopwatch.StartNew(); - message.StateName = StateName.Processing; + message.StatusName = StatusName.Processing; await messageStore.UpdateSentMessageAsync(message); var jobResult = ExecuteJob(message.KeyName, message.Content); @@ -102,7 +101,7 @@ namespace DotNetCore.CAP.RabbitMQ else { //TODO : the state will be deleted when release. - message.StateName = StateName.Succeeded; + message.StatusName = StatusName.Succeeded; await messageStore.UpdateSentMessageAsync(message); _logger.JobExecuted(sp.Elapsed.TotalSeconds); diff --git a/src/DotNetCore.CAP/Abstractions/IConsumerServiceSelector.cs b/src/DotNetCore.CAP/Abstractions/IConsumerServiceSelector.cs index b3e48aa..a150e2d 100644 --- a/src/DotNetCore.CAP/Abstractions/IConsumerServiceSelector.cs +++ b/src/DotNetCore.CAP/Abstractions/IConsumerServiceSelector.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; namespace DotNetCore.CAP.Abstractions { @@ -9,11 +10,11 @@ namespace DotNetCore.CAP.Abstractions { /// /// Selects a set of candidates for the current message associated with - /// . + /// . /// - /// The associated with the current message. + /// . /// A set of candidates or null. - IReadOnlyList SelectCandidates(CapStartContext context); + IReadOnlyList SelectCandidates(IServiceProvider provider); /// /// Selects the best candidate from for the diff --git a/src/DotNetCore.CAP/IBootstrapper.Default.cs b/src/DotNetCore.CAP/IBootstrapper.Default.cs index 6738e6e..a1d2b87 100644 --- a/src/DotNetCore.CAP/IBootstrapper.Default.cs +++ b/src/DotNetCore.CAP/IBootstrapper.Default.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using DotNetCore.CAP.Infrastructure; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace DotNetCore.CAP @@ -14,20 +15,23 @@ namespace DotNetCore.CAP /// public class DefaultBootstrapper : IBootstrapper { - private IApplicationLifetime _appLifetime; - private CancellationTokenSource _cts; - private CancellationTokenRegistration _ctsRegistration; + private readonly ILogger _logger; + private readonly IApplicationLifetime _appLifetime; + private readonly CancellationTokenSource _cts; + private readonly CancellationTokenRegistration _ctsRegistration; private Task _bootstrappingTask; public DefaultBootstrapper( + ILogger logger, IOptions options, ICapMessageStore storage, IApplicationLifetime appLifetime, IServiceProvider provider) { + _logger = logger; + _appLifetime = appLifetime; Options = options.Value; Storage = storage; - _appLifetime = appLifetime; Provider = provider; Servers = Provider.GetServices(); @@ -39,8 +43,9 @@ namespace DotNetCore.CAP { _bootstrappingTask?.Wait(); } - catch (OperationCanceledException) + catch (OperationCanceledException ex) { + _logger.ExpectedOperationCanceledException(ex); } }); } @@ -74,8 +79,9 @@ namespace DotNetCore.CAP { item.Start(); } - catch (Exception) + catch (Exception ex) { + _logger.ServerStartedError(ex); } } diff --git a/src/DotNetCore.CAP/ICapMessageStore.cs b/src/DotNetCore.CAP/ICapMessageStore.cs index 74dceb2..ac1339e 100644 --- a/src/DotNetCore.CAP/ICapMessageStore.cs +++ b/src/DotNetCore.CAP/ICapMessageStore.cs @@ -15,6 +15,14 @@ namespace DotNetCore.CAP /// The message to create in the store. Task StoreSentMessageAsync(CapSentMessage message); + /// + /// Change model status name. + /// + /// The type of . + /// The status name. + /// + Task ChangeSentMessageStateAsync(CapSentMessage message, string statusName, bool autoSaveChanges = true); + /// /// Fetches the next message to be executed. /// @@ -33,6 +41,9 @@ namespace DotNetCore.CAP /// The message to delete in the store. Task RemoveSentMessageAsync(CapSentMessage message); + + + /// /// Creates a new message in a store as an asynchronous operation. /// @@ -40,6 +51,19 @@ namespace DotNetCore.CAP /// Task StoreReceivedMessageAsync(CapReceivedMessage message); + /// + /// Change model status name. + /// + /// The type of . + /// The status name. + /// + Task ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName, bool autoSaveChanges = true); + + /// + /// Fetches the next message to be executed. + /// + Task GetNextReceivedMessageToBeExcuted(); + /// /// Updates a message in a store as an asynchronous operation. /// diff --git a/src/DotNetCore.CAP/ICapProducerService.Default.cs b/src/DotNetCore.CAP/ICapProducerService.Default.cs index 9ac7e6b..915609c 100644 --- a/src/DotNetCore.CAP/ICapProducerService.Default.cs +++ b/src/DotNetCore.CAP/ICapProducerService.Default.cs @@ -48,7 +48,7 @@ namespace DotNetCore.CAP Content = content }; - message.StateName = StateName.Enqueued; + message.StatusName = StatusName.Enqueued; await _store.StoreSentMessageAsync(message); WaitHandleEx.PulseEvent.Set(); diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 74d01ba..9305792 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -20,12 +21,10 @@ namespace DotNetCore.CAP private readonly MethodMatcherCache _selector; private readonly CapOptions _options; - private readonly ICapMessageStore _messageStore; private readonly CancellationTokenSource _cts; public event EventHandler MessageReceieved; - private CapStartContext _context; private Task _compositeTask; private bool _disposed; @@ -34,7 +33,6 @@ namespace DotNetCore.CAP IConsumerInvokerFactory consumerInvokerFactory, IConsumerClientFactory consumerClientFactory, ILoggerFactory loggerFactory, - ICapMessageStore messageStore, MethodMatcherCache selector, IOptions options) { @@ -45,7 +43,6 @@ namespace DotNetCore.CAP _consumerInvokerFactory = consumerInvokerFactory; _consumerClientFactory = consumerClientFactory; _options = options.Value; - _messageStore = messageStore; _cts = new CancellationTokenSource(); } @@ -56,9 +53,7 @@ namespace DotNetCore.CAP public void Start() { - _context = new CapStartContext(_serviceProvider, _cts.Token); - - var matchs = _selector.GetCandidatesMethods(_context); + var matchs = _selector.GetCandidatesMethods(_serviceProvider); var groupingMatchs = matchs.GroupBy(x => x.Value.Attribute.GroupOrExchange); @@ -86,30 +81,36 @@ namespace DotNetCore.CAP { _logger.EnqueuingReceivedMessage(message.KeyName, message.Content); - var capMessage = new CapReceivedMessage(message) + using (var scope = _serviceProvider.CreateScope()) { - StateName = StateName.Enqueued, - Added = DateTime.Now - }; - _messageStore.StoreReceivedMessageAsync(capMessage).Wait(); + var provider = scope.ServiceProvider; + var messageStore = provider.GetRequiredService(); - ConsumerExecutorDescriptor executeDescriptor = null; + var capMessage = new CapReceivedMessage(message) + { + StatusName = StatusName.Enqueued, + Added = DateTime.Now + }; + messageStore.StoreReceivedMessageAsync(capMessage).Wait(); - try - { - executeDescriptor = _selector.GetTopicExector(message.KeyName); + ConsumerExecutorDescriptor executeDescriptor = null; - var consumerContext = new ConsumerContext(executeDescriptor, message); + try + { + executeDescriptor = _selector.GetTopicExector(message.KeyName); - var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); + var consumerContext = new ConsumerContext(executeDescriptor, message); - invoker.InvokeAsync(); + var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); - _messageStore.UpdateReceivedMessageAsync(capMessage).Wait(); - } - catch (Exception ex) - { - _logger.ConsumerMethodExecutingFailed(executeDescriptor.MethodInfo.Name, ex); + invoker.InvokeAsync(); + + messageStore.ChangeReceivedMessageStateAsync(capMessage,StatusName.Succeeded).Wait(); + } + catch (Exception ex) + { + _logger.ConsumerMethodExecutingFailed(executeDescriptor.MethodInfo.Name, ex); + } } } diff --git a/src/DotNetCore.CAP/Infrastructure/CapMessage.cs b/src/DotNetCore.CAP/Infrastructure/CapMessage.cs index c41c29a..00d93b1 100644 --- a/src/DotNetCore.CAP/Infrastructure/CapMessage.cs +++ b/src/DotNetCore.CAP/Infrastructure/CapMessage.cs @@ -33,13 +33,13 @@ namespace DotNetCore.CAP.Infrastructure public int Retries { get; set; } - public string StateName { get; set; } + public string StatusName { get; set; } } /// - /// The message state name. + /// The message status name. /// - public struct StateName + public struct StatusName { public const string Enqueued = nameof(Enqueued); public const string Processing = nameof(Processing); diff --git a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs index 625bb5f..74c19c9 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs @@ -36,9 +36,9 @@ namespace DotNetCore.CAP.Internal return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key); } - public IReadOnlyList SelectCandidates(CapStartContext context) + public IReadOnlyList SelectCandidates(IServiceProvider provider) { - var consumerServices = context.ServiceProvider.GetServices(); + var consumerServices = provider.GetServices(); var executorDescriptorList = new List(); foreach (var service in consumerServices) diff --git a/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs b/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs index 1a5616d..c03883e 100644 --- a/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs +++ b/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs @@ -13,11 +13,11 @@ namespace DotNetCore.CAP.Internal _selector = selector; } - public ConcurrentDictionary GetCandidatesMethods(CapStartContext routeContext) + public ConcurrentDictionary GetCandidatesMethods(IServiceProvider provider) { if (Entries.Count == 0) { - var executorCollection = _selector.SelectCandidates(routeContext); + var executorCollection = _selector.SelectCandidates(provider); foreach (var item in executorCollection) { diff --git a/src/DotNetCore.CAP/Job/IJob.CapJob.cs b/src/DotNetCore.CAP/Job/IJob.CapJob.cs index 08a3c59..8c06a66 100644 --- a/src/DotNetCore.CAP/Job/IJob.CapJob.cs +++ b/src/DotNetCore.CAP/Job/IJob.CapJob.cs @@ -1,15 +1,57 @@ using System; using System.Threading.Tasks; +using DotNetCore.CAP.Internal; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; +using DotNetCore.CAP.Abstractions; +using DotNetCore.CAP.Infrastructure; namespace DotNetCore.CAP.Job { public class CapJob : IJob { - public Task ExecuteAsync() + private readonly MethodMatcherCache _selector; + private readonly IConsumerInvokerFactory _consumerInvokerFactory; + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + private readonly ICapMessageStore _messageStore; + + public CapJob( + ILogger logger, + IServiceProvider serviceProvider, + IConsumerInvokerFactory consumerInvokerFactory, + ICapMessageStore messageStore, + MethodMatcherCache selector) + { + _logger = logger; + _serviceProvider = serviceProvider; + _consumerInvokerFactory = consumerInvokerFactory; + _messageStore = messageStore; + _selector = selector; + } + + public async Task ExecuteAsync() { - Console.WriteLine("当前时间:" + DateTime.Now.ToString()); + var matchs = _selector.GetCandidatesMethods(_serviceProvider); + using (var scope = _serviceProvider.CreateScope()) + { + var provider = scope.ServiceProvider; + var messageStore = provider.GetService(); + + var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted(); + if (nextReceivedMessage != null) + { + var executeDescriptor = matchs[nextReceivedMessage.KeyName]; + var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage); + var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); + + await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing); + + await invoker.InvokeAsync(); - return Task.CompletedTask; + await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage,StatusName.Succeeded); + } + } } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/LoggerExtensions.cs b/src/DotNetCore.CAP/LoggerExtensions.cs index 11dcef1..9cff8d7 100644 --- a/src/DotNetCore.CAP/LoggerExtensions.cs +++ b/src/DotNetCore.CAP/LoggerExtensions.cs @@ -9,6 +9,7 @@ namespace DotNetCore.CAP internal static class LoggerExtensions { private static Action _serverStarting; + private static Action _serverStartingError; private static Action _serverShuttingDown; private static Action _expectedOperationCanceledException; @@ -28,6 +29,11 @@ namespace DotNetCore.CAP 1, "Starting the processing server. Detected {MachineProcessorCount} machine processor(s). Initiating {ProcessorCount} job processor(s)."); + _serverStartingError = LoggerMessage.Define( + LogLevel.Error, + 5, + "Starting the processing server throw an exception."); + _serverShuttingDown = LoggerMessage.Define( LogLevel.Debug, 2, @@ -94,6 +100,11 @@ namespace DotNetCore.CAP _serverStarting(logger, machineProcessorCount, processorCount, null); } + public static void ServerStartedError(this ILogger logger, Exception ex) + { + _serverStartingError(logger, ex); + } + public static void ServerShuttingDown(this ILogger logger) { _serverShuttingDown(logger, null);