From 6191e0553aad3931f02f7548a9ea93ebb88f32ce Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Sun, 18 Jun 2017 15:51:31 +0800 Subject: [PATCH] Add jobs module. --- src/Cap.Consistency/Job/ComputedCronJob.cs | 53 ++++++ src/Cap.Consistency/Job/CronJob.cs | 36 ++++ src/Cap.Consistency/Job/CronJobRegistry.cs | 64 +++++++ src/Cap.Consistency/Job/IJob.CapJob.cs | 20 +++ src/Cap.Consistency/Job/IJob.cs | 15 ++ src/Cap.Consistency/Job/IProcessor.CronJob.cs | 157 ++++++++++++++++++ .../Job/IProcessor.InfiniteRetry.cs | 40 +++++ src/Cap.Consistency/Job/IProcessor.cs | 12 ++ src/Cap.Consistency/Job/ProcessingContext.cs | 66 ++++++++ src/Cap.Consistency/Job/RetryBehavior.cs | 69 ++++++++ 10 files changed, 532 insertions(+) create mode 100644 src/Cap.Consistency/Job/ComputedCronJob.cs create mode 100644 src/Cap.Consistency/Job/CronJob.cs create mode 100644 src/Cap.Consistency/Job/CronJobRegistry.cs create mode 100644 src/Cap.Consistency/Job/IJob.CapJob.cs create mode 100644 src/Cap.Consistency/Job/IJob.cs create mode 100644 src/Cap.Consistency/Job/IProcessor.CronJob.cs create mode 100644 src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs create mode 100644 src/Cap.Consistency/Job/IProcessor.cs create mode 100644 src/Cap.Consistency/Job/ProcessingContext.cs create mode 100644 src/Cap.Consistency/Job/RetryBehavior.cs diff --git a/src/Cap.Consistency/Job/ComputedCronJob.cs b/src/Cap.Consistency/Job/ComputedCronJob.cs new file mode 100644 index 0000000..b145f1c --- /dev/null +++ b/src/Cap.Consistency/Job/ComputedCronJob.cs @@ -0,0 +1,53 @@ +using System; +using System.Collections.Generic; +using System.Text; +using NCrontab; + +namespace Cap.Consistency.Job +{ + public class ComputedCronJob + { + private CronJobRegistry.Entry _entry; + + public ComputedCronJob() { + } + + public ComputedCronJob(CronJob job) { + Job = job; + + Schedule = CrontabSchedule.Parse(job.Cron); + if (job.TypeName != null) { + JobType = Type.GetType(job.TypeName); + } + } + + public ComputedCronJob(CronJob job, CronJobRegistry.Entry entry) + : this(job) { + _entry = entry; + } + + public CronJob Job { get; set; } + + public CrontabSchedule Schedule { get; set; } + + public Type JobType { get; set; } + + public DateTime Next { get; set; } + + public int Retries { get; set; } + + public DateTime FirstTry { get; set; } + + public RetryBehavior RetryBehavior => _entry.RetryBehavior; + + public void Update(DateTime baseTime) { + Job.LastRun = baseTime; + } + + public void UpdateNext(DateTime now) { + var next = Schedule.GetNextOccurrence(now); + var previousNext = Schedule.GetNextOccurrence(Job.LastRun); + Next = next > previousNext ? now : next; + } + } +} diff --git a/src/Cap.Consistency/Job/CronJob.cs b/src/Cap.Consistency/Job/CronJob.cs new file mode 100644 index 0000000..f85c529 --- /dev/null +++ b/src/Cap.Consistency/Job/CronJob.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Cap.Consistency.Job +{ + /// + /// Represents a cron job to be executed at specified intervals of time. + /// + public class CronJob + { + public CronJob() { + Id = Guid.NewGuid().ToString(); + } + + public CronJob(string cron) + : this() { + Cron = cron; + } + + public CronJob(string cron, DateTime lastRun) + : this(cron) { + LastRun = lastRun; + } + + public string Id { get; set; } + + public string Name { get; set; } + + public string TypeName { get; set; } + + public string Cron { get; set; } + + public DateTime LastRun { get; set; } + } +} diff --git a/src/Cap.Consistency/Job/CronJobRegistry.cs b/src/Cap.Consistency/Job/CronJobRegistry.cs new file mode 100644 index 0000000..b87abad --- /dev/null +++ b/src/Cap.Consistency/Job/CronJobRegistry.cs @@ -0,0 +1,64 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; +using NCrontab; + +namespace Cap.Consistency.Job +{ + public abstract class CronJobRegistry + { + private List _entries; + + public CronJobRegistry() { + _entries = new List(); + } + + protected void RegisterJob(string name, string cron, RetryBehavior retryBehavior = null) + where T : IJob { + RegisterJob(name, typeof(T), cron, retryBehavior); + } + + /// + /// Registers a cron job. + /// + /// The name of the job. + /// The job's type. + /// The cron expression to use. + /// The to use. + protected void RegisterJob(string name, Type jobType, string cron, RetryBehavior retryBehavior = null) { + if (string.IsNullOrWhiteSpace(name)) throw new ArgumentException(nameof(cron)); + if (jobType == null) throw new ArgumentNullException(nameof(jobType)); + if (cron == null) throw new ArgumentNullException(nameof(cron)); + retryBehavior = retryBehavior ?? RetryBehavior.DefaultRetry; + + CrontabSchedule.TryParse(cron); + + if (!typeof(IJob).GetTypeInfo().IsAssignableFrom(jobType)) { + throw new ArgumentException( + "Cron jobs should extend IJob.", nameof(jobType)); + } + + _entries.Add(new Entry(name, jobType, cron)); + } + + public Entry[] Build() => _entries.ToArray(); + + public class Entry + { + public Entry(string name, Type jobType, string cron) { + Name = name; + JobType = jobType; + Cron = cron; + } + + public string Name { get; set; } + + public Type JobType { get; set; } + + public string Cron { get; set; } + + public RetryBehavior RetryBehavior { get; set; } + } + } +} diff --git a/src/Cap.Consistency/Job/IJob.CapJob.cs b/src/Cap.Consistency/Job/IJob.CapJob.cs new file mode 100644 index 0000000..75112e2 --- /dev/null +++ b/src/Cap.Consistency/Job/IJob.CapJob.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Cap.Consistency.Job +{ + public class CapJob : IJob + { + + + + + public Task ExecuteAsync() { + + throw new NotImplementedException(); + + } + } +} diff --git a/src/Cap.Consistency/Job/IJob.cs b/src/Cap.Consistency/Job/IJob.cs new file mode 100644 index 0000000..f72c614 --- /dev/null +++ b/src/Cap.Consistency/Job/IJob.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Cap.Consistency.Job +{ + public interface IJob + { + /// + /// Executes the job. + /// + Task ExecuteAsync(); + } +} diff --git a/src/Cap.Consistency/Job/IProcessor.CronJob.cs b/src/Cap.Consistency/Job/IProcessor.CronJob.cs new file mode 100644 index 0000000..cd4e1a8 --- /dev/null +++ b/src/Cap.Consistency/Job/IProcessor.CronJob.cs @@ -0,0 +1,157 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Cap.Consistency.Job +{ + public class CronJobProcessor : IProcessor + { + private ILogger _logger; + private IServiceProvider _provider; + + public CronJobProcessor( + ILogger logger, + IServiceProvider provider) { + _logger = logger; + _provider = provider; + } + + public override string ToString() => nameof(CronJobProcessor); + + public Task ProcessAsync(ProcessingContext context) { + if (context == null) throw new ArgumentNullException(nameof(context)); + return ProcessCoreAsync(context); + } + + private async Task ProcessCoreAsync(ProcessingContext context) { + //var storage = context.Storage; + //var jobs = await GetJobsAsync(storage); + var jobs = GetJobsAsync(); + if (!jobs.Any()) { + _logger.CronJobsNotFound(); + + // This will cancel this processor. + throw new OperationCanceledException(); + } + _logger.CronJobsScheduling(jobs); + + context.ThrowIfStopping(); + + var computedJobs = Compute(jobs, context.CronJobRegistry.Build()); + if (context.IsStopping) { + return; + } + + await Task.WhenAll(computedJobs.Select(j => RunAsync(j, context))); + } + + private async Task RunAsync(ComputedCronJob computedJob, ProcessingContext context) { + //var storage = context.Storage; + var retryBehavior = computedJob.RetryBehavior; + + while (!context.IsStopping) { + var now = DateTime.UtcNow; + + var due = ComputeDue(computedJob, now); + var timeSpan = due - now; + + if (timeSpan.TotalSeconds > 0) { + await context.WaitAsync(timeSpan); + } + + context.ThrowIfStopping(); + + using (var scopedContext = context.CreateScope()) { + var provider = scopedContext.Provider; + + var job = provider.GetService(); + var success = true; + + try { + var sw = Stopwatch.StartNew(); + await job.ExecuteAsync(); + sw.Stop(); + computedJob.Retries = 0; + _logger.CronJobExecuted(computedJob.Job.Name, sw.Elapsed.TotalSeconds); + } + catch (Exception ex) { + success = false; + if (computedJob.Retries == 0) { + computedJob.FirstTry = DateTime.UtcNow; + } + computedJob.Retries++; + _logger.CronJobFailed(computedJob.Job.Name, ex); + } + + if (success) { + //var connection = provider.GetRequiredService(); + //await connection.AttachCronJobAsync(computedJob.Job); + + //computedJob.Update(DateTime.UtcNow); + + //await connection.UpdateCronJobAsync(computedJob.Job); + } + } + } + } + + private DateTime ComputeDue(ComputedCronJob computedJob, DateTime now) { + computedJob.UpdateNext(now); + + var retryBehavior = computedJob.RetryBehavior ?? RetryBehavior.DefaultRetry; + var retries = computedJob.Retries; + + if (retries == 0) { + return computedJob.Next; + } + + var realNext = computedJob.Schedule.GetNextOccurrence(now); + + if (!retryBehavior.Retry) { + // No retry. If job failed before, we don't care, just schedule it next as usual. + return realNext; + } + + if (retries >= retryBehavior.RetryCount) { + // Max retries. Just schedule it for the next occurance. + return realNext; + } + + // Delay a bit. + return computedJob.FirstTry.AddSeconds(retryBehavior.RetryIn(retries)); + } + + //private async Task GetJobsAsync(IStorage storage) { + // using (var scope = _provider.CreateScope()) { + // var provider = scope.ServiceProvider; + // var connection = provider.GetRequiredService(); + + // return await connection.GetCronJobsAsync(); + // } + //} + + private CronJob[] GetJobsAsync() { + return new CronJob[] { + new CronJob { + Id= Guid.NewGuid().ToString(), + Cron= "* * * * *", + Name="Cap.Messages", + TypeName = null + } + }; + } + + private ComputedCronJob[] Compute(IEnumerable jobs, CronJobRegistry.Entry[] entries) + => jobs.Select(j => CreateComputedCronJob(j, entries)).ToArray(); + + private ComputedCronJob CreateComputedCronJob(CronJob job, CronJobRegistry.Entry[] entries) { + var entry = entries.First(e => e.Name == job.Name); + return new ComputedCronJob(job, entry); + } + } +} diff --git a/src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs b/src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs new file mode 100644 index 0000000..ae7ba5b --- /dev/null +++ b/src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace Cap.Consistency.Job +{ + public class InfiniteRetryProcessor : IProcessor + { + private IProcessor _inner; + private ILogger _logger; + + public InfiniteRetryProcessor( + IProcessor inner, + ILoggerFactory loggerFactory) { + _inner = inner; + _logger = loggerFactory.CreateLogger(); + } + + public override string ToString() => _inner.ToString(); + + public async Task ProcessAsync(ProcessingContext context) { + while (!context.IsStopping) { + try { + await _inner.ProcessAsync(context); + } + catch (OperationCanceledException) { + return; + } + catch (Exception ex) { + _logger.LogWarning( + 1, + ex, + "Prcessor '{ProcessorName}' failed. Retrying...", _inner.ToString()); + } + } + } + } +} diff --git a/src/Cap.Consistency/Job/IProcessor.cs b/src/Cap.Consistency/Job/IProcessor.cs new file mode 100644 index 0000000..c62887d --- /dev/null +++ b/src/Cap.Consistency/Job/IProcessor.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Cap.Consistency.Job +{ + public interface IProcessor + { + Task ProcessAsync(ProcessingContext context); + } +} diff --git a/src/Cap.Consistency/Job/ProcessingContext.cs b/src/Cap.Consistency/Job/ProcessingContext.cs new file mode 100644 index 0000000..65d5fff --- /dev/null +++ b/src/Cap.Consistency/Job/ProcessingContext.cs @@ -0,0 +1,66 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; + +namespace Cap.Consistency.Job +{ + public class ProcessingContext : IDisposable + { + private IServiceScope _scope; + + private ProcessingContext(ProcessingContext other) { + Provider = other.Provider; + //Storage = other.Storage; + CronJobRegistry = other.CronJobRegistry; + CancellationToken = other.CancellationToken; + } + + public ProcessingContext() { + } + + public ProcessingContext( + IServiceProvider provider, + //IStorage storage, + CronJobRegistry cronJobRegistry, + CancellationToken cancellationToken) { + Provider = provider; + //Storage = storage; + CronJobRegistry = cronJobRegistry; + CancellationToken = cancellationToken; + } + + public IServiceProvider Provider { get; private set; } + + //public IStorage Storage { get; } + + public CronJobRegistry CronJobRegistry { get; private set; } + + public CancellationToken CancellationToken { get; } + + public bool IsStopping => CancellationToken.IsCancellationRequested; + + public void ThrowIfStopping() => CancellationToken.ThrowIfCancellationRequested(); + + public ProcessingContext CreateScope() { + var serviceScope = Provider.CreateScope(); + + return new ProcessingContext(this) { + _scope = serviceScope, + Provider = serviceScope.ServiceProvider + }; + } + + public Task WaitAsync(TimeSpan timeout) { + return Task.Delay(timeout, CancellationToken); + } + + public void Dispose() { + if (_scope != null) { + _scope.Dispose(); + } + } + } +} diff --git a/src/Cap.Consistency/Job/RetryBehavior.cs b/src/Cap.Consistency/Job/RetryBehavior.cs new file mode 100644 index 0000000..14a5e60 --- /dev/null +++ b/src/Cap.Consistency/Job/RetryBehavior.cs @@ -0,0 +1,69 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Cap.Consistency.Job +{ + public class RetryBehavior + { + public static readonly int DefaultRetryCount; + public static readonly Func DefaultRetryInThunk; + + public static readonly RetryBehavior DefaultRetry; + public static readonly RetryBehavior NoRetry; + + private static Random _random = new Random(); + + private Func _retryInThunk; + + static RetryBehavior() { + DefaultRetryCount = 25; + DefaultRetryInThunk = retries => + (int)Math.Round(Math.Pow(retries - 1, 4) + 15 + (_random.Next(30) * (retries))); + + DefaultRetry = new RetryBehavior(true); + NoRetry = new RetryBehavior(false); + } + + public RetryBehavior(bool retry) + : this(retry, DefaultRetryCount, DefaultRetryInThunk) { + } + + /// + /// Initializes a new instance of the class. + /// + /// Whether to retry. + /// The maximum retry count. + /// The retry in function to use. + public RetryBehavior(bool retry, int retryCount, Func retryInThunk) { + if (retry) { + if (retryCount < 0) throw new ArgumentOutOfRangeException(nameof(retryCount), "Can't be negative."); + } + + Retry = retry; + RetryCount = retryCount; + _retryInThunk = retryInThunk ?? DefaultRetryInThunk; + } + + public Random Random => _random; + + /// + /// Gets whether to retry or disable retrying. + /// + public bool Retry { get; } + + /// + /// Gets the maximum retry count. + /// + public int RetryCount { get; } + + /// + /// Returns the seconds to delay before retrying again. + /// + /// The current retry count. + /// The seconds to delay. + public int RetryIn(int retries) { + return _retryInThunk(retries); + } + } +}