diff --git a/src/Cap.Consistency/Job/IProcessingServer.Job.cs b/src/Cap.Consistency/Job/IProcessingServer.Job.cs new file mode 100644 index 0000000..0e1df46 --- /dev/null +++ b/src/Cap.Consistency/Job/IProcessingServer.Job.cs @@ -0,0 +1,82 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Cap.Consistency.Job; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; +using Cap.Consistency.Infrastructure; +using Microsoft.Extensions.Options; + +namespace Cap.Consistency +{ + public class JobProcessingServer : IProcessingServer, IDisposable + { + private ILogger _logger; + private ILoggerFactory _loggerFactory; + private IServiceProvider _provider; + private CancellationTokenSource _cts; + private IJobProcessor _processor; + private ConsistencyOptions _options; + private ProcessingContext _context; + private DefaultCronJobRegistry _defaultJobRegistry; + private Task _compositeTask; + private bool _disposed; + + public JobProcessingServer( + ILogger logger, + ILoggerFactory loggerFactory, + IServiceProvider provider, + DefaultCronJobRegistry defaultJobRegistry, + IOptions options) { + + _logger = logger; + _loggerFactory = loggerFactory; + _provider = provider; + _defaultJobRegistry = defaultJobRegistry; + _options = options.Value; + _cts = new CancellationTokenSource(); + } + + public void Start() { + + var processorCount = Environment.ProcessorCount; + _processor = _provider.GetService(); + _logger.ServerStarting(processorCount, 1); + + _context = new ProcessingContext( + _provider, + _defaultJobRegistry, + _cts.Token); + + _compositeTask = Task.Run(() => { + InfiniteRetry(_processor).ProcessAsync(_context); + }); + } + + public void Dispose() { + if (_disposed) { + return; + } + _disposed = true; + + _logger.ServerShuttingDown(); + _cts.Cancel(); + try { + _compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds); + } + catch (AggregateException ex) { + var innerEx = ex.InnerExceptions[0]; + if (!(innerEx is OperationCanceledException)) { + _logger.ExpectedOperationCanceledException(innerEx); + } + } + } + + private IJobProcessor InfiniteRetry(IJobProcessor inner) { + return new InfiniteRetryProcessor(inner, _loggerFactory); + } + + } +} diff --git a/src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs b/src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs index ae7ba5b..5147241 100644 --- a/src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs +++ b/src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs @@ -6,13 +6,13 @@ using Microsoft.Extensions.Logging; namespace Cap.Consistency.Job { - public class InfiniteRetryProcessor : IProcessor + public class InfiniteRetryProcessor : IJobProcessor { - private IProcessor _inner; + private IJobProcessor _inner; private ILogger _logger; public InfiniteRetryProcessor( - IProcessor inner, + IJobProcessor inner, ILoggerFactory loggerFactory) { _inner = inner; _logger = loggerFactory.CreateLogger();