diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/EFFetchedMessage.cs b/src/DotNetCore.CAP.EntityFrameworkCore/EFFetchedMessage.cs index 5724656..8e75f3f 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/EFFetchedMessage.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/EFFetchedMessage.cs @@ -4,6 +4,7 @@ using System.Data; using System.Text; using System.Threading; using Dapper; +using DotNetCore.CAP.Models; using Microsoft.EntityFrameworkCore.Storage; namespace DotNetCore.CAP.EntityFrameworkCore @@ -17,7 +18,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore private readonly object _lockObject = new object(); public EFFetchedMessage(string messageId, - int type, + MessageType type, IDbConnection connection, IDbContextTransaction transaction) { @@ -30,7 +31,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore public string MessageId { get; } - public int Type { get; } + public MessageType Type { get; } public void RemoveFromQueue() { diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageTransaction.cs b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageTransaction.cs index 47cb24c..ed38d9e 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageTransaction.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageTransaction.cs @@ -46,7 +46,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore _connection.Context.Add(new CapQueue { MessageId = message.Id, - Type = 1 + Type = MessageType.Subscribe }); } diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/FetchedMessage.cs b/src/DotNetCore.CAP.EntityFrameworkCore/FetchedMessage.cs index 98dcae3..b1827e1 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/FetchedMessage.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/FetchedMessage.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Text; +using DotNetCore.CAP.Models; namespace DotNetCore.CAP.EntityFrameworkCore { @@ -8,6 +9,6 @@ namespace DotNetCore.CAP.EntityFrameworkCore { public string MessageId { get; set; } - public int Type { get; set; } + public MessageType Type { get; set; } } } diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs index 9d2cbe4..208b635 100644 --- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs +++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs @@ -53,9 +53,10 @@ namespace Microsoft.Extensions.DependencyInjection services.AddSingleton(); //Processors services.AddTransient(); - //services.AddTransient<> + + services.AddSingleton(); + services.AddSingleton(); - //services.TryAddSingleton(); services.TryAddScoped(); diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj index b497c57..ffbf70e 100644 --- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj +++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj @@ -13,6 +13,11 @@ False + + + + + diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 89aed8d..1322b17 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -125,7 +125,7 @@ namespace DotNetCore.CAP public void Pulse() { - throw new NotImplementedException(); + WaitHandleEx.ReceviedPulseEvent.Set(); } //private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage) diff --git a/src/DotNetCore.CAP/IFetchedMessage.cs b/src/DotNetCore.CAP/IFetchedMessage.cs index f6b4291..ce5897c 100644 --- a/src/DotNetCore.CAP/IFetchedMessage.cs +++ b/src/DotNetCore.CAP/IFetchedMessage.cs @@ -1,4 +1,5 @@ using System; +using DotNetCore.CAP.Models; namespace DotNetCore.CAP { @@ -6,7 +7,7 @@ namespace DotNetCore.CAP { string MessageId { get; } - int Type { get; } + MessageType Type { get; } void RemoveFromQueue(); diff --git a/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs b/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs new file mode 100644 index 0000000..521b9f8 --- /dev/null +++ b/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs @@ -0,0 +1,105 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using System.Threading.Tasks; +using DotNetCore.CAP.Job; +using DotNetCore.CAP.Job.States; +using DotNetCore.CAP.Models; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP +{ + public abstract class BasePublishQueueExecutor : IQueueExecutor + { + private readonly IStateChanger _stateChanger; + private readonly ILogger _logger; + + public BasePublishQueueExecutor(IStateChanger stateChanger, + ILogger logger) + { + _stateChanger = stateChanger; + _logger = logger; + } + + public abstract Task PublishAsync(string keyName, string content); + + public async Task ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched) + { + using (fetched) + { + + var message = await connection.GetSentMessageAsync(fetched.MessageId); + try + { + var sp = Stopwatch.StartNew(); + await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection); + + if (message.Retries > 0) + { + _logger.JobRetrying(message.Retries); + } + var result = await PublishAsync(message.KeyName, message.Content); + sp.Stop(); + + var newState = default(IState); + if (!result.Succeeded) + { + var shouldRetry = await UpdateJobForRetryAsync(message, connection); + if (shouldRetry) + { + newState = new ScheduledState(); + _logger.JobFailedWillRetry(result.Exception); + } + else + { + newState = new FailedState(); + _logger.JobFailed(result.Exception); + } + } + else + { + newState = new SucceededState(); + } + await _stateChanger.ChangeStateAsync(message, newState, connection); + + fetched.RemoveFromQueue(); + + if (result.Succeeded) + { + _logger.JobExecuted(sp.Elapsed.TotalSeconds); + } + + return OperateResult.Success; + } + + catch (Exception ex) + { + _logger.ExceptionOccuredWhileExecutingJob(message?.KeyName, ex); + return OperateResult.Failed(ex); + } + } + } + + private async Task UpdateJobForRetryAsync(CapSentMessage message, IStorageConnection connection) + { + var retryBehavior = RetryBehavior.DefaultRetry; + + var now = DateTime.UtcNow; + var retries = ++message.Retries; + if (retries >= retryBehavior.RetryCount) + { + return false; + } + + var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); + message.LastRun = due; + using (var transaction = connection.CreateTransaction()) + { + transaction.UpdateMessage(message); + await transaction.CommitAsync(); + } + return true; + } + } +} diff --git a/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs b/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs new file mode 100644 index 0000000..31cc863 --- /dev/null +++ b/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs @@ -0,0 +1,153 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Abstractions; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Job; +using DotNetCore.CAP.Job.States; +using DotNetCore.CAP.Models; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP +{ + public class SubscibeQueueExecutor : IQueueExecutor + { + private readonly IConsumerInvokerFactory _consumerInvokerFactory; + private readonly IConsumerClientFactory _consumerClientFactory; + private readonly IStateChanger _stateChanger; + private readonly ILogger _logger; + + private readonly MethodMatcherCache _selector; + private readonly CapOptions _options; + + public SubscibeQueueExecutor( + IStateChanger stateChanger, + MethodMatcherCache selector, + IConsumerInvokerFactory consumerInvokerFactory, + IConsumerClientFactory consumerClientFactory, + ILogger logger) + { + _selector = selector; + _consumerInvokerFactory = consumerInvokerFactory; + _consumerClientFactory = consumerClientFactory; + _stateChanger = stateChanger; + _logger = logger; + } + + public async Task ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched) + { + using (fetched) + { + var message = await connection.GetReceivedMessageAsync(fetched.MessageId); + try + { + var sp = Stopwatch.StartNew(); + await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection); + + if (message.Retries > 0) + { + _logger.JobRetrying(message.Retries); + } + var result = await ExecuteSubscribeAsync(message); + sp.Stop(); + + var newState = default(IState); + if (!result.Succeeded) + { + var shouldRetry = await UpdateJobForRetryAsync(message, connection); + if (shouldRetry) + { + newState = new ScheduledState(); + _logger.JobFailedWillRetry(result.Exception); + } + else + { + newState = new FailedState(); + _logger.JobFailed(result.Exception); + } + } + else + { + newState = new SucceededState(); + } + await _stateChanger.ChangeStateAsync(message, newState, connection); + + fetched.RemoveFromQueue(); + + if (result.Succeeded) + { + _logger.JobExecuted(sp.Elapsed.TotalSeconds); + } + + return OperateResult.Success; + } + catch (SubscriberNotFoundException ex) + { + _logger.LogError(ex.Message); + return OperateResult.Failed(ex); + } + catch (Exception ex) + { + _logger.ExceptionOccuredWhileExecutingJob(message?.KeyName, ex); + return OperateResult.Failed(ex); + } + } + } + + protected virtual async Task ExecuteSubscribeAsync(CapReceivedMessage receivedMessage) + { + try + { + var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName); + + if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group)) + { + throw new SubscriberNotFoundException(receivedMessage.KeyName + " has not been found."); + } + + // If there are multiple consumers in the same group, we will take the first + var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; + var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext()); + + await _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); + + return OperateResult.Success; + } + catch (SubscriberNotFoundException ex) + { + throw ex; + } + catch (Exception ex) + { + _logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex); + return OperateResult.Failed(ex); + } + } + + private async Task UpdateJobForRetryAsync(CapReceivedMessage message, IStorageConnection connection) + { + var retryBehavior = RetryBehavior.DefaultRetry; + + var now = DateTime.UtcNow; + var retries = ++message.Retries; + if (retries >= retryBehavior.RetryCount) + { + return false; + } + + var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); + message.LastRun = due; + using (var transaction = connection.CreateTransaction()) + { + transaction.UpdateMessage(message); + await transaction.CommitAsync(); + } + return true; + } + + } +} diff --git a/src/DotNetCore.CAP/IQueueExecutor.cs b/src/DotNetCore.CAP/IQueueExecutor.cs new file mode 100644 index 0000000..7135eed --- /dev/null +++ b/src/DotNetCore.CAP/IQueueExecutor.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP +{ + public interface IQueueExecutor + { + Task ExecuteAsync(IStorageConnection connection, IFetchedMessage message); + } +} diff --git a/src/DotNetCore.CAP/IQueueExecutorFactory.cs b/src/DotNetCore.CAP/IQueueExecutorFactory.cs new file mode 100644 index 0000000..5b46ac6 --- /dev/null +++ b/src/DotNetCore.CAP/IQueueExecutorFactory.cs @@ -0,0 +1,9 @@ +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP +{ + public interface IQueueExecutorFactory + { + IQueueExecutor GetInstance(MessageType messageType); + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/SubscriberNotFoundException.cs b/src/DotNetCore.CAP/Internal/SubscriberNotFoundException.cs new file mode 100644 index 0000000..4bf9844 --- /dev/null +++ b/src/DotNetCore.CAP/Internal/SubscriberNotFoundException.cs @@ -0,0 +1,13 @@ +using System; +namespace DotNetCore.CAP.Internal +{ + public class SubscriberNotFoundException : Exception + { + public SubscriberNotFoundException() { } + + public SubscriberNotFoundException(string message) : base(message) { } + + public SubscriberNotFoundException(string message, Exception inner) : + base(message, inner) { } + } +} diff --git a/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs b/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs index ae5a644..8973c17 100644 --- a/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs +++ b/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs @@ -37,7 +37,6 @@ namespace DotNetCore.CAP.Job using (var scope = _provider.CreateScope()) { CapSentMessage sentMessage; - // CapReceivedMessage receivedMessage; var provider = scope.ServiceProvider; var connection = provider.GetRequiredService(); @@ -46,7 +45,6 @@ namespace DotNetCore.CAP.Job (sentMessage = await connection.GetNextSentMessageToBeEnqueuedAsync()) != null) { - System.Diagnostics.Debug.WriteLine("JobQueuer 执行 内部循环: " + DateTime.Now); var state = new EnqueuedState(); using (var transaction = connection.CreateTransaction()) @@ -54,10 +52,9 @@ namespace DotNetCore.CAP.Job _stateChanger.ChangeState(sentMessage, state, transaction); await transaction.CommitAsync(); } - } + } } - System.Diagnostics.Debug.WriteLine("JobQueuer 执行: " + DateTime.Now); context.ThrowIfStopping(); WaitHandleEx.SentPulseEvent.Set(); diff --git a/src/DotNetCore.CAP/Job/IJobProcessor.MessageJob.Default.cs b/src/DotNetCore.CAP/Job/IJobProcessor.MessageJob.Default.cs new file mode 100644 index 0000000..e880c36 --- /dev/null +++ b/src/DotNetCore.CAP/Job/IJobProcessor.MessageJob.Default.cs @@ -0,0 +1,96 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Infrastructure; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.Job +{ + public class DefaultMessageJobProcessor : IMessageJobProcessor + { + private readonly IQueueExecutorFactory _queueExecutorFactory; + private readonly IServiceProvider _provider; + private readonly ILogger _logger; + + private readonly CancellationTokenSource _cts; + private readonly TimeSpan _pollingDelay; + + internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); + + public DefaultMessageJobProcessor( + IServiceProvider provider, + IQueueExecutorFactory queueExecutorFactory, + IOptions capOptions, + ILogger logger) + { + _logger = logger; + _queueExecutorFactory = queueExecutorFactory; + _provider = provider; + _cts = new CancellationTokenSource(); + _pollingDelay = TimeSpan.FromSeconds(capOptions.Value.PollingDelay); + } + + public bool Waiting { get; private set; } + + public Task ProcessAsync(ProcessingContext context) + { + if (context == null) + throw new ArgumentNullException(nameof(context)); + + context.ThrowIfStopping(); + + return ProcessCoreAsync(context); + } + + public async Task ProcessCoreAsync(ProcessingContext context) + { + try + { + _logger.LogInformation("BaseMessageJobProcessor processing ..."); + + var worked = await Step(context); + + context.ThrowIfStopping(); + + Waiting = true; + + if (!worked) + { + var token = GetTokenToWaitOn(context); + await WaitHandleEx.WaitAnyAsync(PulseEvent, token.WaitHandle, _pollingDelay); + } + } + finally + { + Waiting = false; + } + } + + protected virtual CancellationToken GetTokenToWaitOn(ProcessingContext context) + { + return context.CancellationToken; + } + + private async Task Step(ProcessingContext context) + { + var fetched = default(IFetchedMessage); + using (var scopedContext = context.CreateScope()) + { + var provider = scopedContext.Provider; + var connection = provider.GetRequiredService(); + + if ((fetched = await connection.FetchNextMessageAsync()) != null) + { + using (fetched) + { + var queueExecutor = _queueExecutorFactory.GetInstance(fetched.Type); + await queueExecutor.ExecuteAsync(connection, fetched); + } + } + } + return fetched != null; + } + } +} diff --git a/src/DotNetCore.CAP/LoggerExtensions.cs b/src/DotNetCore.CAP/LoggerExtensions.cs index 9a4fa9e..0d1894a 100644 --- a/src/DotNetCore.CAP/LoggerExtensions.cs +++ b/src/DotNetCore.CAP/LoggerExtensions.cs @@ -18,6 +18,12 @@ namespace DotNetCore.CAP private static readonly Action _executingConsumerMethod; private static readonly Action _receivedMessageRetryExecuting; + private static Action _jobFailed; + private static Action _jobFailedWillRetry; + private static Action _jobExecuted; + private static Action _jobRetrying; + private static Action _exceptionOccuredWhileExecutingJob; + static LoggerExtensions() { _serverStarting = LoggerMessage.Define( @@ -59,8 +65,58 @@ namespace DotNetCore.CAP LogLevel.Error, 5, "Received message topic method '{topicName}' failed to execute."); + + + _jobRetrying = LoggerMessage.Define( + LogLevel.Debug, + 3, + "Retrying a job: {Retries}..."); + + _jobExecuted = LoggerMessage.Define( + LogLevel.Debug, + 4, + "Job executed. Took: {Seconds} secs."); + + _jobFailed = LoggerMessage.Define( + LogLevel.Warning, + 1, + "Job failed to execute."); + + _jobFailedWillRetry = LoggerMessage.Define( + LogLevel.Warning, + 2, + "Job failed to execute. Will retry."); + + _exceptionOccuredWhileExecutingJob = LoggerMessage.Define( + LogLevel.Error, + 6, + "An exception occured while trying to execute a job: '{JobId}'. " + + "Requeuing for another retry."); + } + + public static void JobFailed(this ILogger logger, Exception ex) + { + _jobFailed(logger, ex); } + public static void JobFailedWillRetry(this ILogger logger, Exception ex) + { + _jobFailedWillRetry(logger, ex); + } + + + public static void JobRetrying(this ILogger logger, int retries) + { + _jobRetrying(logger, retries, null); + } + + + public static void JobExecuted(this ILogger logger, double seconds) + { + _jobExecuted(logger, seconds, null); + } + + public static void ConsumerMethodExecutingFailed(this ILogger logger, string methodName, Exception ex) { _executingConsumerMethod(logger, methodName, ex); @@ -100,5 +156,10 @@ namespace DotNetCore.CAP { _expectedOperationCanceledException(logger, ex.Message, ex); } + + public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, string jobId, Exception ex) + { + _exceptionOccuredWhileExecutingJob(logger, jobId, ex); + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Models/CapQueue.cs b/src/DotNetCore.CAP/Models/CapQueue.cs index bb64de1..229ecef 100644 --- a/src/DotNetCore.CAP/Models/CapQueue.cs +++ b/src/DotNetCore.CAP/Models/CapQueue.cs @@ -9,6 +9,6 @@ /// /// 0 is CapSentMessage, 1 is CapReceviedMessage /// - public int Type { get; set; } + public MessageType Type { get; set; } } } diff --git a/src/DotNetCore.CAP/Models/MessageType.cs b/src/DotNetCore.CAP/Models/MessageType.cs new file mode 100644 index 0000000..1ddfc74 --- /dev/null +++ b/src/DotNetCore.CAP/Models/MessageType.cs @@ -0,0 +1,8 @@ +namespace DotNetCore.CAP.Models +{ + public enum MessageType + { + Publish, + Subscribe + } +} diff --git a/src/DotNetCore.CAP/QueueExecutorFactory.cs b/src/DotNetCore.CAP/QueueExecutorFactory.cs new file mode 100644 index 0000000..1c71e93 --- /dev/null +++ b/src/DotNetCore.CAP/QueueExecutorFactory.cs @@ -0,0 +1,32 @@ +using System; +using System.Linq; +using System.Reflection; +using DotNetCore.CAP.Models; +using Microsoft.Extensions.DependencyInjection; + +namespace DotNetCore.CAP +{ + public class QueueExecutorFactory : IQueueExecutorFactory + { + private readonly IServiceProvider _serviceProvider; + + public QueueExecutorFactory(IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider; + } + + public IQueueExecutor GetInstance(MessageType messageType) + { + var _queueExectors = _serviceProvider.GetServices(); + + if (messageType== MessageType.Publish) + { + return _queueExectors.FirstOrDefault(x => typeof(BasePublishQueueExecutor).IsAssignableFrom(x.GetType())); + } + else + { + return _queueExectors.FirstOrDefault(x => !typeof(BasePublishQueueExecutor).IsAssignableFrom(x.GetType())); + } + } + } +}