Browse Source

Add jobs module.

master
yangxiaodong 7 years ago
parent
commit
6191e0553a
10 changed files with 532 additions and 0 deletions
  1. +53
    -0
      src/Cap.Consistency/Job/ComputedCronJob.cs
  2. +36
    -0
      src/Cap.Consistency/Job/CronJob.cs
  3. +64
    -0
      src/Cap.Consistency/Job/CronJobRegistry.cs
  4. +20
    -0
      src/Cap.Consistency/Job/IJob.CapJob.cs
  5. +15
    -0
      src/Cap.Consistency/Job/IJob.cs
  6. +157
    -0
      src/Cap.Consistency/Job/IProcessor.CronJob.cs
  7. +40
    -0
      src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs
  8. +12
    -0
      src/Cap.Consistency/Job/IProcessor.cs
  9. +66
    -0
      src/Cap.Consistency/Job/ProcessingContext.cs
  10. +69
    -0
      src/Cap.Consistency/Job/RetryBehavior.cs

+ 53
- 0
src/Cap.Consistency/Job/ComputedCronJob.cs View File

@@ -0,0 +1,53 @@
using System;
using System.Collections.Generic;
using System.Text;
using NCrontab;

namespace Cap.Consistency.Job
{
public class ComputedCronJob
{
private CronJobRegistry.Entry _entry;

public ComputedCronJob() {
}

public ComputedCronJob(CronJob job) {
Job = job;

Schedule = CrontabSchedule.Parse(job.Cron);
if (job.TypeName != null) {
JobType = Type.GetType(job.TypeName);
}
}

public ComputedCronJob(CronJob job, CronJobRegistry.Entry entry)
: this(job) {
_entry = entry;
}

public CronJob Job { get; set; }

public CrontabSchedule Schedule { get; set; }

public Type JobType { get; set; }

public DateTime Next { get; set; }

public int Retries { get; set; }

public DateTime FirstTry { get; set; }

public RetryBehavior RetryBehavior => _entry.RetryBehavior;

public void Update(DateTime baseTime) {
Job.LastRun = baseTime;
}

public void UpdateNext(DateTime now) {
var next = Schedule.GetNextOccurrence(now);
var previousNext = Schedule.GetNextOccurrence(Job.LastRun);
Next = next > previousNext ? now : next;
}
}
}

+ 36
- 0
src/Cap.Consistency/Job/CronJob.cs View File

@@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Cap.Consistency.Job
{
/// <summary>
/// Represents a cron job to be executed at specified intervals of time.
/// </summary>
public class CronJob
{
public CronJob() {
Id = Guid.NewGuid().ToString();
}

public CronJob(string cron)
: this() {
Cron = cron;
}

public CronJob(string cron, DateTime lastRun)
: this(cron) {
LastRun = lastRun;
}

public string Id { get; set; }

public string Name { get; set; }

public string TypeName { get; set; }

public string Cron { get; set; }

public DateTime LastRun { get; set; }
}
}

+ 64
- 0
src/Cap.Consistency/Job/CronJobRegistry.cs View File

@@ -0,0 +1,64 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using NCrontab;

namespace Cap.Consistency.Job
{
public abstract class CronJobRegistry
{
private List<Entry> _entries;

public CronJobRegistry() {
_entries = new List<Entry>();
}

protected void RegisterJob<T>(string name, string cron, RetryBehavior retryBehavior = null)
where T : IJob {
RegisterJob(name, typeof(T), cron, retryBehavior);
}

/// <summary>
/// Registers a cron job.
/// </summary>
/// <param name="name">The name of the job.</param>
/// <param name="jobType">The job's type.</param>
/// <param name="cron">The cron expression to use.</param>
/// <param name="retryBehavior">The <see cref="RetryBehavior"/> to use.</param>
protected void RegisterJob(string name, Type jobType, string cron, RetryBehavior retryBehavior = null) {
if (string.IsNullOrWhiteSpace(name)) throw new ArgumentException(nameof(cron));
if (jobType == null) throw new ArgumentNullException(nameof(jobType));
if (cron == null) throw new ArgumentNullException(nameof(cron));
retryBehavior = retryBehavior ?? RetryBehavior.DefaultRetry;

CrontabSchedule.TryParse(cron);

if (!typeof(IJob).GetTypeInfo().IsAssignableFrom(jobType)) {
throw new ArgumentException(
"Cron jobs should extend IJob.", nameof(jobType));
}

_entries.Add(new Entry(name, jobType, cron));
}

public Entry[] Build() => _entries.ToArray();

public class Entry
{
public Entry(string name, Type jobType, string cron) {
Name = name;
JobType = jobType;
Cron = cron;
}

public string Name { get; set; }

public Type JobType { get; set; }

public string Cron { get; set; }

public RetryBehavior RetryBehavior { get; set; }
}
}
}

+ 20
- 0
src/Cap.Consistency/Job/IJob.CapJob.cs View File

@@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Cap.Consistency.Job
{
public class CapJob : IJob
{




public Task ExecuteAsync() {

throw new NotImplementedException();

}
}
}

+ 15
- 0
src/Cap.Consistency/Job/IJob.cs View File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Cap.Consistency.Job
{
public interface IJob
{
/// <summary>
/// Executes the job.
/// </summary>
Task ExecuteAsync();
}
}

+ 157
- 0
src/Cap.Consistency/Job/IProcessor.CronJob.cs View File

@@ -0,0 +1,157 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Cap.Consistency.Job
{
public class CronJobProcessor : IProcessor
{
private ILogger _logger;
private IServiceProvider _provider;

public CronJobProcessor(
ILogger<CronJobProcessor> logger,
IServiceProvider provider) {
_logger = logger;
_provider = provider;
}

public override string ToString() => nameof(CronJobProcessor);

public Task ProcessAsync(ProcessingContext context) {
if (context == null) throw new ArgumentNullException(nameof(context));
return ProcessCoreAsync(context);
}

private async Task ProcessCoreAsync(ProcessingContext context) {
//var storage = context.Storage;
//var jobs = await GetJobsAsync(storage);
var jobs = GetJobsAsync();
if (!jobs.Any()) {
_logger.CronJobsNotFound();

// This will cancel this processor.
throw new OperationCanceledException();
}
_logger.CronJobsScheduling(jobs);

context.ThrowIfStopping();

var computedJobs = Compute(jobs, context.CronJobRegistry.Build());
if (context.IsStopping) {
return;
}

await Task.WhenAll(computedJobs.Select(j => RunAsync(j, context)));
}

private async Task RunAsync(ComputedCronJob computedJob, ProcessingContext context) {
//var storage = context.Storage;
var retryBehavior = computedJob.RetryBehavior;

while (!context.IsStopping) {
var now = DateTime.UtcNow;

var due = ComputeDue(computedJob, now);
var timeSpan = due - now;

if (timeSpan.TotalSeconds > 0) {
await context.WaitAsync(timeSpan);
}

context.ThrowIfStopping();

using (var scopedContext = context.CreateScope()) {
var provider = scopedContext.Provider;

var job = provider.GetService<IJob>();
var success = true;

try {
var sw = Stopwatch.StartNew();
await job.ExecuteAsync();
sw.Stop();
computedJob.Retries = 0;
_logger.CronJobExecuted(computedJob.Job.Name, sw.Elapsed.TotalSeconds);
}
catch (Exception ex) {
success = false;
if (computedJob.Retries == 0) {
computedJob.FirstTry = DateTime.UtcNow;
}
computedJob.Retries++;
_logger.CronJobFailed(computedJob.Job.Name, ex);
}

if (success) {
//var connection = provider.GetRequiredService<IStorageConnection>();
//await connection.AttachCronJobAsync(computedJob.Job);

//computedJob.Update(DateTime.UtcNow);

//await connection.UpdateCronJobAsync(computedJob.Job);
}
}
}
}

private DateTime ComputeDue(ComputedCronJob computedJob, DateTime now) {
computedJob.UpdateNext(now);

var retryBehavior = computedJob.RetryBehavior ?? RetryBehavior.DefaultRetry;
var retries = computedJob.Retries;

if (retries == 0) {
return computedJob.Next;
}

var realNext = computedJob.Schedule.GetNextOccurrence(now);

if (!retryBehavior.Retry) {
// No retry. If job failed before, we don't care, just schedule it next as usual.
return realNext;
}

if (retries >= retryBehavior.RetryCount) {
// Max retries. Just schedule it for the next occurance.
return realNext;
}

// Delay a bit.
return computedJob.FirstTry.AddSeconds(retryBehavior.RetryIn(retries));
}

//private async Task<CronJob[]> GetJobsAsync(IStorage storage) {
// using (var scope = _provider.CreateScope()) {
// var provider = scope.ServiceProvider;
// var connection = provider.GetRequiredService<IStorageConnection>();

// return await connection.GetCronJobsAsync();
// }
//}

private CronJob[] GetJobsAsync() {
return new CronJob[] {
new CronJob {
Id= Guid.NewGuid().ToString(),
Cron= "* * * * *",
Name="Cap.Messages",
TypeName = null
}
};
}

private ComputedCronJob[] Compute(IEnumerable<CronJob> jobs, CronJobRegistry.Entry[] entries)
=> jobs.Select(j => CreateComputedCronJob(j, entries)).ToArray();

private ComputedCronJob CreateComputedCronJob(CronJob job, CronJobRegistry.Entry[] entries) {
var entry = entries.First(e => e.Name == job.Name);
return new ComputedCronJob(job, entry);
}
}
}

+ 40
- 0
src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs View File

@@ -0,0 +1,40 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Cap.Consistency.Job
{
public class InfiniteRetryProcessor : IProcessor
{
private IProcessor _inner;
private ILogger _logger;

public InfiniteRetryProcessor(
IProcessor inner,
ILoggerFactory loggerFactory) {
_inner = inner;
_logger = loggerFactory.CreateLogger<InfiniteRetryProcessor>();
}

public override string ToString() => _inner.ToString();

public async Task ProcessAsync(ProcessingContext context) {
while (!context.IsStopping) {
try {
await _inner.ProcessAsync(context);
}
catch (OperationCanceledException) {
return;
}
catch (Exception ex) {
_logger.LogWarning(
1,
ex,
"Prcessor '{ProcessorName}' failed. Retrying...", _inner.ToString());
}
}
}
}
}

+ 12
- 0
src/Cap.Consistency/Job/IProcessor.cs View File

@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Cap.Consistency.Job
{
public interface IProcessor
{
Task ProcessAsync(ProcessingContext context);
}
}

+ 66
- 0
src/Cap.Consistency/Job/ProcessingContext.cs View File

@@ -0,0 +1,66 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;

namespace Cap.Consistency.Job
{
public class ProcessingContext : IDisposable
{
private IServiceScope _scope;

private ProcessingContext(ProcessingContext other) {
Provider = other.Provider;
//Storage = other.Storage;
CronJobRegistry = other.CronJobRegistry;
CancellationToken = other.CancellationToken;
}

public ProcessingContext() {
}

public ProcessingContext(
IServiceProvider provider,
//IStorage storage,
CronJobRegistry cronJobRegistry,
CancellationToken cancellationToken) {
Provider = provider;
//Storage = storage;
CronJobRegistry = cronJobRegistry;
CancellationToken = cancellationToken;
}

public IServiceProvider Provider { get; private set; }

//public IStorage Storage { get; }

public CronJobRegistry CronJobRegistry { get; private set; }

public CancellationToken CancellationToken { get; }

public bool IsStopping => CancellationToken.IsCancellationRequested;

public void ThrowIfStopping() => CancellationToken.ThrowIfCancellationRequested();

public ProcessingContext CreateScope() {
var serviceScope = Provider.CreateScope();

return new ProcessingContext(this) {
_scope = serviceScope,
Provider = serviceScope.ServiceProvider
};
}

public Task WaitAsync(TimeSpan timeout) {
return Task.Delay(timeout, CancellationToken);
}

public void Dispose() {
if (_scope != null) {
_scope.Dispose();
}
}
}
}

+ 69
- 0
src/Cap.Consistency/Job/RetryBehavior.cs View File

@@ -0,0 +1,69 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Cap.Consistency.Job
{
public class RetryBehavior
{
public static readonly int DefaultRetryCount;
public static readonly Func<int, int> DefaultRetryInThunk;

public static readonly RetryBehavior DefaultRetry;
public static readonly RetryBehavior NoRetry;

private static Random _random = new Random();

private Func<int, int> _retryInThunk;

static RetryBehavior() {
DefaultRetryCount = 25;
DefaultRetryInThunk = retries =>
(int)Math.Round(Math.Pow(retries - 1, 4) + 15 + (_random.Next(30) * (retries)));

DefaultRetry = new RetryBehavior(true);
NoRetry = new RetryBehavior(false);
}

public RetryBehavior(bool retry)
: this(retry, DefaultRetryCount, DefaultRetryInThunk) {
}

/// <summary>
/// Initializes a new instance of the <see cref="RetryBehavior"/> class.
/// </summary>
/// <param name="retry">Whether to retry.</param>
/// <param name="retryCount">The maximum retry count.</param>
/// <param name="retryInThunk">The retry in function to use.</param>
public RetryBehavior(bool retry, int retryCount, Func<int, int> retryInThunk) {
if (retry) {
if (retryCount < 0) throw new ArgumentOutOfRangeException(nameof(retryCount), "Can't be negative.");
}

Retry = retry;
RetryCount = retryCount;
_retryInThunk = retryInThunk ?? DefaultRetryInThunk;
}

public Random Random => _random;

/// <summary>
/// Gets whether to retry or disable retrying.
/// </summary>
public bool Retry { get; }

/// <summary>
/// Gets the maximum retry count.
/// </summary>
public int RetryCount { get; }

/// <summary>
/// Returns the seconds to delay before retrying again.
/// </summary>
/// <param name="retries">The current retry count.</param>
/// <returns>The seconds to delay.</returns>
public int RetryIn(int retries) {
return _retryInThunk(retries);
}
}
}

Loading…
Cancel
Save