diff --git a/src/Cap.Consistency/Job/ComputedCronJob.cs b/src/Cap.Consistency/Job/ComputedCronJob.cs
new file mode 100644
index 0000000..b145f1c
--- /dev/null
+++ b/src/Cap.Consistency/Job/ComputedCronJob.cs
@@ -0,0 +1,53 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using NCrontab;
+
+namespace Cap.Consistency.Job
+{
+ public class ComputedCronJob
+ {
+ private CronJobRegistry.Entry _entry;
+
+ public ComputedCronJob() {
+ }
+
+ public ComputedCronJob(CronJob job) {
+ Job = job;
+
+ Schedule = CrontabSchedule.Parse(job.Cron);
+ if (job.TypeName != null) {
+ JobType = Type.GetType(job.TypeName);
+ }
+ }
+
+ public ComputedCronJob(CronJob job, CronJobRegistry.Entry entry)
+ : this(job) {
+ _entry = entry;
+ }
+
+ public CronJob Job { get; set; }
+
+ public CrontabSchedule Schedule { get; set; }
+
+ public Type JobType { get; set; }
+
+ public DateTime Next { get; set; }
+
+ public int Retries { get; set; }
+
+ public DateTime FirstTry { get; set; }
+
+ public RetryBehavior RetryBehavior => _entry.RetryBehavior;
+
+ public void Update(DateTime baseTime) {
+ Job.LastRun = baseTime;
+ }
+
+ public void UpdateNext(DateTime now) {
+ var next = Schedule.GetNextOccurrence(now);
+ var previousNext = Schedule.GetNextOccurrence(Job.LastRun);
+ Next = next > previousNext ? now : next;
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Job/CronJob.cs b/src/Cap.Consistency/Job/CronJob.cs
new file mode 100644
index 0000000..f85c529
--- /dev/null
+++ b/src/Cap.Consistency/Job/CronJob.cs
@@ -0,0 +1,36 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Cap.Consistency.Job
+{
+ ///
+ /// Represents a cron job to be executed at specified intervals of time.
+ ///
+ public class CronJob
+ {
+ public CronJob() {
+ Id = Guid.NewGuid().ToString();
+ }
+
+ public CronJob(string cron)
+ : this() {
+ Cron = cron;
+ }
+
+ public CronJob(string cron, DateTime lastRun)
+ : this(cron) {
+ LastRun = lastRun;
+ }
+
+ public string Id { get; set; }
+
+ public string Name { get; set; }
+
+ public string TypeName { get; set; }
+
+ public string Cron { get; set; }
+
+ public DateTime LastRun { get; set; }
+ }
+}
diff --git a/src/Cap.Consistency/Job/CronJobRegistry.cs b/src/Cap.Consistency/Job/CronJobRegistry.cs
new file mode 100644
index 0000000..b87abad
--- /dev/null
+++ b/src/Cap.Consistency/Job/CronJobRegistry.cs
@@ -0,0 +1,64 @@
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Text;
+using NCrontab;
+
+namespace Cap.Consistency.Job
+{
+ public abstract class CronJobRegistry
+ {
+ private List _entries;
+
+ public CronJobRegistry() {
+ _entries = new List();
+ }
+
+ protected void RegisterJob(string name, string cron, RetryBehavior retryBehavior = null)
+ where T : IJob {
+ RegisterJob(name, typeof(T), cron, retryBehavior);
+ }
+
+ ///
+ /// Registers a cron job.
+ ///
+ /// The name of the job.
+ /// The job's type.
+ /// The cron expression to use.
+ /// The to use.
+ protected void RegisterJob(string name, Type jobType, string cron, RetryBehavior retryBehavior = null) {
+ if (string.IsNullOrWhiteSpace(name)) throw new ArgumentException(nameof(cron));
+ if (jobType == null) throw new ArgumentNullException(nameof(jobType));
+ if (cron == null) throw new ArgumentNullException(nameof(cron));
+ retryBehavior = retryBehavior ?? RetryBehavior.DefaultRetry;
+
+ CrontabSchedule.TryParse(cron);
+
+ if (!typeof(IJob).GetTypeInfo().IsAssignableFrom(jobType)) {
+ throw new ArgumentException(
+ "Cron jobs should extend IJob.", nameof(jobType));
+ }
+
+ _entries.Add(new Entry(name, jobType, cron));
+ }
+
+ public Entry[] Build() => _entries.ToArray();
+
+ public class Entry
+ {
+ public Entry(string name, Type jobType, string cron) {
+ Name = name;
+ JobType = jobType;
+ Cron = cron;
+ }
+
+ public string Name { get; set; }
+
+ public Type JobType { get; set; }
+
+ public string Cron { get; set; }
+
+ public RetryBehavior RetryBehavior { get; set; }
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Job/IJob.CapJob.cs b/src/Cap.Consistency/Job/IJob.CapJob.cs
new file mode 100644
index 0000000..75112e2
--- /dev/null
+++ b/src/Cap.Consistency/Job/IJob.CapJob.cs
@@ -0,0 +1,20 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Cap.Consistency.Job
+{
+ public class CapJob : IJob
+ {
+
+
+
+
+ public Task ExecuteAsync() {
+
+ throw new NotImplementedException();
+
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Job/IJob.cs b/src/Cap.Consistency/Job/IJob.cs
new file mode 100644
index 0000000..f72c614
--- /dev/null
+++ b/src/Cap.Consistency/Job/IJob.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Cap.Consistency.Job
+{
+ public interface IJob
+ {
+ ///
+ /// Executes the job.
+ ///
+ Task ExecuteAsync();
+ }
+}
diff --git a/src/Cap.Consistency/Job/IProcessor.CronJob.cs b/src/Cap.Consistency/Job/IProcessor.CronJob.cs
new file mode 100644
index 0000000..cd4e1a8
--- /dev/null
+++ b/src/Cap.Consistency/Job/IProcessor.CronJob.cs
@@ -0,0 +1,157 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+
+namespace Cap.Consistency.Job
+{
+ public class CronJobProcessor : IProcessor
+ {
+ private ILogger _logger;
+ private IServiceProvider _provider;
+
+ public CronJobProcessor(
+ ILogger logger,
+ IServiceProvider provider) {
+ _logger = logger;
+ _provider = provider;
+ }
+
+ public override string ToString() => nameof(CronJobProcessor);
+
+ public Task ProcessAsync(ProcessingContext context) {
+ if (context == null) throw new ArgumentNullException(nameof(context));
+ return ProcessCoreAsync(context);
+ }
+
+ private async Task ProcessCoreAsync(ProcessingContext context) {
+ //var storage = context.Storage;
+ //var jobs = await GetJobsAsync(storage);
+ var jobs = GetJobsAsync();
+ if (!jobs.Any()) {
+ _logger.CronJobsNotFound();
+
+ // This will cancel this processor.
+ throw new OperationCanceledException();
+ }
+ _logger.CronJobsScheduling(jobs);
+
+ context.ThrowIfStopping();
+
+ var computedJobs = Compute(jobs, context.CronJobRegistry.Build());
+ if (context.IsStopping) {
+ return;
+ }
+
+ await Task.WhenAll(computedJobs.Select(j => RunAsync(j, context)));
+ }
+
+ private async Task RunAsync(ComputedCronJob computedJob, ProcessingContext context) {
+ //var storage = context.Storage;
+ var retryBehavior = computedJob.RetryBehavior;
+
+ while (!context.IsStopping) {
+ var now = DateTime.UtcNow;
+
+ var due = ComputeDue(computedJob, now);
+ var timeSpan = due - now;
+
+ if (timeSpan.TotalSeconds > 0) {
+ await context.WaitAsync(timeSpan);
+ }
+
+ context.ThrowIfStopping();
+
+ using (var scopedContext = context.CreateScope()) {
+ var provider = scopedContext.Provider;
+
+ var job = provider.GetService();
+ var success = true;
+
+ try {
+ var sw = Stopwatch.StartNew();
+ await job.ExecuteAsync();
+ sw.Stop();
+ computedJob.Retries = 0;
+ _logger.CronJobExecuted(computedJob.Job.Name, sw.Elapsed.TotalSeconds);
+ }
+ catch (Exception ex) {
+ success = false;
+ if (computedJob.Retries == 0) {
+ computedJob.FirstTry = DateTime.UtcNow;
+ }
+ computedJob.Retries++;
+ _logger.CronJobFailed(computedJob.Job.Name, ex);
+ }
+
+ if (success) {
+ //var connection = provider.GetRequiredService();
+ //await connection.AttachCronJobAsync(computedJob.Job);
+
+ //computedJob.Update(DateTime.UtcNow);
+
+ //await connection.UpdateCronJobAsync(computedJob.Job);
+ }
+ }
+ }
+ }
+
+ private DateTime ComputeDue(ComputedCronJob computedJob, DateTime now) {
+ computedJob.UpdateNext(now);
+
+ var retryBehavior = computedJob.RetryBehavior ?? RetryBehavior.DefaultRetry;
+ var retries = computedJob.Retries;
+
+ if (retries == 0) {
+ return computedJob.Next;
+ }
+
+ var realNext = computedJob.Schedule.GetNextOccurrence(now);
+
+ if (!retryBehavior.Retry) {
+ // No retry. If job failed before, we don't care, just schedule it next as usual.
+ return realNext;
+ }
+
+ if (retries >= retryBehavior.RetryCount) {
+ // Max retries. Just schedule it for the next occurance.
+ return realNext;
+ }
+
+ // Delay a bit.
+ return computedJob.FirstTry.AddSeconds(retryBehavior.RetryIn(retries));
+ }
+
+ //private async Task GetJobsAsync(IStorage storage) {
+ // using (var scope = _provider.CreateScope()) {
+ // var provider = scope.ServiceProvider;
+ // var connection = provider.GetRequiredService();
+
+ // return await connection.GetCronJobsAsync();
+ // }
+ //}
+
+ private CronJob[] GetJobsAsync() {
+ return new CronJob[] {
+ new CronJob {
+ Id= Guid.NewGuid().ToString(),
+ Cron= "* * * * *",
+ Name="Cap.Messages",
+ TypeName = null
+ }
+ };
+ }
+
+ private ComputedCronJob[] Compute(IEnumerable jobs, CronJobRegistry.Entry[] entries)
+ => jobs.Select(j => CreateComputedCronJob(j, entries)).ToArray();
+
+ private ComputedCronJob CreateComputedCronJob(CronJob job, CronJobRegistry.Entry[] entries) {
+ var entry = entries.First(e => e.Name == job.Name);
+ return new ComputedCronJob(job, entry);
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs b/src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs
new file mode 100644
index 0000000..ae7ba5b
--- /dev/null
+++ b/src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs
@@ -0,0 +1,40 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace Cap.Consistency.Job
+{
+ public class InfiniteRetryProcessor : IProcessor
+ {
+ private IProcessor _inner;
+ private ILogger _logger;
+
+ public InfiniteRetryProcessor(
+ IProcessor inner,
+ ILoggerFactory loggerFactory) {
+ _inner = inner;
+ _logger = loggerFactory.CreateLogger();
+ }
+
+ public override string ToString() => _inner.ToString();
+
+ public async Task ProcessAsync(ProcessingContext context) {
+ while (!context.IsStopping) {
+ try {
+ await _inner.ProcessAsync(context);
+ }
+ catch (OperationCanceledException) {
+ return;
+ }
+ catch (Exception ex) {
+ _logger.LogWarning(
+ 1,
+ ex,
+ "Prcessor '{ProcessorName}' failed. Retrying...", _inner.ToString());
+ }
+ }
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Job/IProcessor.cs b/src/Cap.Consistency/Job/IProcessor.cs
new file mode 100644
index 0000000..c62887d
--- /dev/null
+++ b/src/Cap.Consistency/Job/IProcessor.cs
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Cap.Consistency.Job
+{
+ public interface IProcessor
+ {
+ Task ProcessAsync(ProcessingContext context);
+ }
+}
diff --git a/src/Cap.Consistency/Job/ProcessingContext.cs b/src/Cap.Consistency/Job/ProcessingContext.cs
new file mode 100644
index 0000000..65d5fff
--- /dev/null
+++ b/src/Cap.Consistency/Job/ProcessingContext.cs
@@ -0,0 +1,66 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Cap.Consistency.Job
+{
+ public class ProcessingContext : IDisposable
+ {
+ private IServiceScope _scope;
+
+ private ProcessingContext(ProcessingContext other) {
+ Provider = other.Provider;
+ //Storage = other.Storage;
+ CronJobRegistry = other.CronJobRegistry;
+ CancellationToken = other.CancellationToken;
+ }
+
+ public ProcessingContext() {
+ }
+
+ public ProcessingContext(
+ IServiceProvider provider,
+ //IStorage storage,
+ CronJobRegistry cronJobRegistry,
+ CancellationToken cancellationToken) {
+ Provider = provider;
+ //Storage = storage;
+ CronJobRegistry = cronJobRegistry;
+ CancellationToken = cancellationToken;
+ }
+
+ public IServiceProvider Provider { get; private set; }
+
+ //public IStorage Storage { get; }
+
+ public CronJobRegistry CronJobRegistry { get; private set; }
+
+ public CancellationToken CancellationToken { get; }
+
+ public bool IsStopping => CancellationToken.IsCancellationRequested;
+
+ public void ThrowIfStopping() => CancellationToken.ThrowIfCancellationRequested();
+
+ public ProcessingContext CreateScope() {
+ var serviceScope = Provider.CreateScope();
+
+ return new ProcessingContext(this) {
+ _scope = serviceScope,
+ Provider = serviceScope.ServiceProvider
+ };
+ }
+
+ public Task WaitAsync(TimeSpan timeout) {
+ return Task.Delay(timeout, CancellationToken);
+ }
+
+ public void Dispose() {
+ if (_scope != null) {
+ _scope.Dispose();
+ }
+ }
+ }
+}
diff --git a/src/Cap.Consistency/Job/RetryBehavior.cs b/src/Cap.Consistency/Job/RetryBehavior.cs
new file mode 100644
index 0000000..14a5e60
--- /dev/null
+++ b/src/Cap.Consistency/Job/RetryBehavior.cs
@@ -0,0 +1,69 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Cap.Consistency.Job
+{
+ public class RetryBehavior
+ {
+ public static readonly int DefaultRetryCount;
+ public static readonly Func DefaultRetryInThunk;
+
+ public static readonly RetryBehavior DefaultRetry;
+ public static readonly RetryBehavior NoRetry;
+
+ private static Random _random = new Random();
+
+ private Func _retryInThunk;
+
+ static RetryBehavior() {
+ DefaultRetryCount = 25;
+ DefaultRetryInThunk = retries =>
+ (int)Math.Round(Math.Pow(retries - 1, 4) + 15 + (_random.Next(30) * (retries)));
+
+ DefaultRetry = new RetryBehavior(true);
+ NoRetry = new RetryBehavior(false);
+ }
+
+ public RetryBehavior(bool retry)
+ : this(retry, DefaultRetryCount, DefaultRetryInThunk) {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Whether to retry.
+ /// The maximum retry count.
+ /// The retry in function to use.
+ public RetryBehavior(bool retry, int retryCount, Func retryInThunk) {
+ if (retry) {
+ if (retryCount < 0) throw new ArgumentOutOfRangeException(nameof(retryCount), "Can't be negative.");
+ }
+
+ Retry = retry;
+ RetryCount = retryCount;
+ _retryInThunk = retryInThunk ?? DefaultRetryInThunk;
+ }
+
+ public Random Random => _random;
+
+ ///
+ /// Gets whether to retry or disable retrying.
+ ///
+ public bool Retry { get; }
+
+ ///
+ /// Gets the maximum retry count.
+ ///
+ public int RetryCount { get; }
+
+ ///
+ /// Returns the seconds to delay before retrying again.
+ ///
+ /// The current retry count.
+ /// The seconds to delay.
+ public int RetryIn(int retries) {
+ return _retryInThunk(retries);
+ }
+ }
+}