Browse Source

rename

master
yangxiaodong 7 years ago
parent
commit
8595d896e0
2 changed files with 85 additions and 3 deletions
  1. +82
    -0
      src/Cap.Consistency/Job/IProcessingServer.Job.cs
  2. +3
    -3
      src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs

+ 82
- 0
src/Cap.Consistency/Job/IProcessingServer.Job.cs View File

@@ -0,0 +1,82 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Job;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using Cap.Consistency.Infrastructure;
using Microsoft.Extensions.Options;

namespace Cap.Consistency
{
public class JobProcessingServer : IProcessingServer, IDisposable
{
private ILogger _logger;
private ILoggerFactory _loggerFactory;
private IServiceProvider _provider;
private CancellationTokenSource _cts;
private IJobProcessor _processor;
private ConsistencyOptions _options;
private ProcessingContext _context;
private DefaultCronJobRegistry _defaultJobRegistry;
private Task _compositeTask;
private bool _disposed;

public JobProcessingServer(
ILogger<JobProcessingServer> logger,
ILoggerFactory loggerFactory,
IServiceProvider provider,
DefaultCronJobRegistry defaultJobRegistry,
IOptions<ConsistencyOptions> options) {

_logger = logger;
_loggerFactory = loggerFactory;
_provider = provider;
_defaultJobRegistry = defaultJobRegistry;
_options = options.Value;
_cts = new CancellationTokenSource();
}

public void Start() {

var processorCount = Environment.ProcessorCount;
_processor = _provider.GetService<IJobProcessor>();
_logger.ServerStarting(processorCount, 1);

_context = new ProcessingContext(
_provider,
_defaultJobRegistry,
_cts.Token);

_compositeTask = Task.Run(() => {
InfiniteRetry(_processor).ProcessAsync(_context);
});
}

public void Dispose() {
if (_disposed) {
return;
}
_disposed = true;

_logger.ServerShuttingDown();
_cts.Cancel();
try {
_compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds);
}
catch (AggregateException ex) {
var innerEx = ex.InnerExceptions[0];
if (!(innerEx is OperationCanceledException)) {
_logger.ExpectedOperationCanceledException(innerEx);
}
}
}

private IJobProcessor InfiniteRetry(IJobProcessor inner) {
return new InfiniteRetryProcessor(inner, _loggerFactory);
}

}
}

+ 3
- 3
src/Cap.Consistency/Job/IProcessor.InfiniteRetry.cs View File

@@ -6,13 +6,13 @@ using Microsoft.Extensions.Logging;

namespace Cap.Consistency.Job
{
public class InfiniteRetryProcessor : IProcessor
public class InfiniteRetryProcessor : IJobProcessor
{
private IProcessor _inner;
private IJobProcessor _inner;
private ILogger _logger;

public InfiniteRetryProcessor(
IProcessor inner,
IJobProcessor inner,
ILoggerFactory loggerFactory) {
_inner = inner;
_logger = loggerFactory.CreateLogger<InfiniteRetryProcessor>();


Loading…
Cancel
Save