Browse Source

refactor

master
yangxiaodong 7 years ago
parent
commit
30e52966a7
11 changed files with 316 additions and 33 deletions
  1. +6
    -0
      src/Cap.Consistency/Cap.Consistency.csproj
  2. +37
    -18
      src/Cap.Consistency/Consumer/ConsumerHandler.cs
  3. +71
    -0
      src/Cap.Consistency/IBootstrapper.Base.cs
  4. +12
    -0
      src/Cap.Consistency/IBootstrapper.cs
  5. +12
    -0
      src/Cap.Consistency/ITopicServer.cs
  6. +3
    -3
      src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs
  7. +1
    -1
      src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs
  8. +147
    -0
      src/Cap.Consistency/LoggerExtensions.cs
  9. +3
    -7
      src/Cap.Consistency/Microsoft.AspNetCore.Builder/BuilderExtensions.cs
  10. +2
    -4
      src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ServiceCollectionExtensions.cs
  11. +22
    -0
      src/Cap.Consistency/TopicContext.cs

+ 6
- 0
src/Cap.Consistency/Cap.Consistency.csproj View File

@@ -8,11 +8,17 @@
</PropertyGroup>

<ItemGroup>
<None Include="Job\IProcessor.CronJob.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="ncrontab" Version="3.3.0" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.2" />
</ItemGroup>



+ 37
- 18
src/Cap.Consistency/Consumer/ConsumerHandler.cs View File

@@ -1,29 +1,36 @@
using System;
using System.Text;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Internal;
using Cap.Consistency.Store;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Cap.Consistency.Store;

namespace Cap.Consistency.Consumer
{
public class ConsumerHandler : IConsumerHandler
public class ConsumerHandler : IConsumerHandler, IDisposable
{
private readonly IServiceProvider _serviceProvider;
private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IConsumerClientFactory _consumerClientFactory;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;

private readonly MethodMatcherCache _selector;
private readonly ConsistencyOptions _options;
private readonly ConsistencyMessageManager _messageManager;
private readonly CancellationTokenSource _cts;

public event EventHandler<ConsistencyMessage> MessageReceieved;

private TopicContext _context;
private Task _compositeTask;
private bool _disposed;

public ConsumerHandler(
IServiceProvider serviceProvider,
IConsumerInvokerFactory consumerInvokerFactory,
@@ -32,7 +39,6 @@ namespace Cap.Consistency.Consumer
ConsistencyMessageManager messageManager,
MethodMatcherCache selector,
IOptions<ConsistencyOptions> options) {

_selector = selector;
_logger = loggerFactory.CreateLogger<ConsumerHandler>();
_loggerFactory = loggerFactory;
@@ -41,22 +47,17 @@ namespace Cap.Consistency.Consumer
_consumerClientFactory = consumerClientFactory;
_options = options.Value;
_messageManager = messageManager;
_cts = new CancellationTokenSource();
}


protected virtual void OnMessageReceieved(ConsistencyMessage message) {
MessageReceieved?.Invoke(this, message);
}

public Task RouteAsync(TopicRouteContext context) {

if (context == null) {
throw new ArgumentNullException(nameof(context));
}

context.ServiceProvider = _serviceProvider;
public void Start() {
_context = new TopicContext(_serviceProvider, _cts.Token);

var matchs = _selector.GetCandidatesMethods(context);
var matchs = _selector.GetCandidatesMethods(_context);

var groupingMatchs = matchs.GroupBy(x => x.Value.Attribute.GroupOrExchange);

@@ -73,10 +74,10 @@ namespace Cap.Consistency.Consumer
}
}, TaskCreationOptions.LongRunning);
}
return Task.CompletedTask;
_compositeTask = Task.CompletedTask;
}

private void OnMessageReceieved(object sender, DeliverMessage message) {
public virtual void OnMessageReceieved(object sender, DeliverMessage message) {
var consistencyMessage = new ConsistencyMessage() {
Id = message.MessageKey,
Payload = Encoding.UTF8.GetString(message.Body)
@@ -96,12 +97,30 @@ namespace Cap.Consistency.Consumer
invoker.InvokeAsync();

_messageManager.UpdateAsync(consistencyMessage).Wait();

}
catch (Exception ex) {

_logger.LogError("exception raised when excute method : " + ex.Message);
}
}

public void Dispose() {
if (_disposed) {
return;
}
_disposed = true;

_logger.ServerShuttingDown();
_cts.Cancel();

try {
_compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds);
}
catch (AggregateException ex) {
var innerEx = ex.InnerExceptions[0];
if (!(innerEx is OperationCanceledException)) {
_logger.ExpectedOperationCanceledException(innerEx);
}
}
}
}
}
}

+ 71
- 0
src/Cap.Consistency/IBootstrapper.Base.cs View File

@@ -0,0 +1,71 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Store;
using Microsoft.AspNetCore.Hosting;

namespace Cap.Consistency
{
public abstract class BootstrapperBase : IBootstrapper
{
private IApplicationLifetime _appLifetime;
private CancellationTokenSource _cts;
private CancellationTokenRegistration _ctsRegistration;
private Task _bootstrappingTask;

public BootstrapperBase(
ConsistencyOptions options,
ConsistencyMessageManager storage,
ITopicServer server,
IApplicationLifetime appLifetime,
IServiceProvider provider) {
Options = options;
Storage = storage;
Server = server;
_appLifetime = appLifetime;
Provider = provider;

_cts = new CancellationTokenSource();
_ctsRegistration = appLifetime.ApplicationStopping.Register(() => {
_cts.Cancel();
try {
_bootstrappingTask?.Wait();
}
catch (OperationCanceledException) {
}
});
}

protected ConsistencyOptions Options { get; }

protected ConsistencyMessageManager Storage { get; }

protected ITopicServer Server { get; }

public IServiceProvider Provider { get; private set; }

public Task BootstrapAsync() {
return (_bootstrappingTask = BootstrapTaskAsync());
}

private async Task BootstrapTaskAsync() {
if (_cts.IsCancellationRequested) return;

if (_cts.IsCancellationRequested) return;

await BootstrapCoreAsync();
if (_cts.IsCancellationRequested) return;

Server.Start();

_ctsRegistration.Dispose();
_cts.Dispose();
}

public virtual Task BootstrapCoreAsync() {
_appLifetime.ApplicationStopping.Register(() => Server.Dispose());
return Task.FromResult(0);
}
}
}

+ 12
- 0
src/Cap.Consistency/IBootstrapper.cs View File

@@ -0,0 +1,12 @@
using System.Threading.Tasks;

namespace Cap.Consistency
{
/// <summary>
/// Represents bootstrapping logic. For example, adding initial state to the storage or querying certain entities.
/// </summary>
public interface IBootstrapper
{
Task BootstrapAsync();
}
}

+ 12
- 0
src/Cap.Consistency/ITopicServer.cs View File

@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Cap.Consistency
{
public interface ITopicServer : IDisposable
{
void Start();
}
}

+ 3
- 3
src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs View File

@@ -1,6 +1,4 @@
using Cap.Consistency;

namespace Cap.Consistency.Infrastructure
namespace Cap.Consistency.Infrastructure
{
/// <summary>
/// Represents all the options you can use to configure the system.
@@ -8,5 +6,7 @@ namespace Cap.Consistency.Infrastructure
public class ConsistencyOptions
{
public string BrokerUrlList { get; set; } = "localhost:9092";

public string Cron { get; set; } = "* * * * *";
}
}

+ 1
- 1
src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs View File

@@ -7,7 +7,7 @@ namespace Cap.Consistency.Infrastructure
{
public interface IConsumerExcutorSelector
{
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(TopicRouteContext context);
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(TopicContext context);

ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor);
}


+ 147
- 0
src/Cap.Consistency/LoggerExtensions.cs View File

@@ -0,0 +1,147 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Cap.Consistency.Job;
using Microsoft.Extensions.Logging;

namespace Cap.Consistency
{
internal static class LoggerExtensions
{
private static Action<ILogger, int, int, Exception> _serverStarting;
private static Action<ILogger, Exception> _serverShuttingDown;
private static Action<ILogger, string, Exception> _expectedOperationCanceledException;

private static Action<ILogger, Exception> _cronJobsNotFound;
private static Action<ILogger, int, Exception> _cronJobsScheduling;
private static Action<ILogger, string, double, Exception> _cronJobExecuted;
private static Action<ILogger, string, Exception> _cronJobFailed;

private static Action<ILogger, Exception> _jobFailed;
private static Action<ILogger, Exception> _jobFailedWillRetry;
private static Action<ILogger, double, Exception> _jobExecuted;
private static Action<ILogger, int, Exception> _jobRetrying;
private static Action<ILogger, int, Exception> _jobCouldNotBeLoaded;
private static Action<ILogger, int, Exception> _exceptionOccuredWhileExecutingJob;

static LoggerExtensions() {
_serverStarting = LoggerMessage.Define<int, int>(
LogLevel.Debug,
1,
"Starting the processing server. Detected {MachineProcessorCount} machine processor(s). Initiating {ProcessorCount} job processor(s).");

_serverShuttingDown = LoggerMessage.Define(
LogLevel.Debug,
2,
"Shutting down the processing server...");

_expectedOperationCanceledException = LoggerMessage.Define<string>(
LogLevel.Warning,
3,
"Expected an OperationCanceledException, but found '{ExceptionMessage}'.");

_cronJobsNotFound = LoggerMessage.Define(
LogLevel.Debug,
1,
"No cron jobs found to schedule, cancelling processing of cron jobs.");

_cronJobsScheduling = LoggerMessage.Define<int>(
LogLevel.Debug,
2,
"Found {JobCount} cron job(s) to schedule.");

_cronJobExecuted = LoggerMessage.Define<string, double>(
LogLevel.Debug,
3,
"Cron job '{JobName}' executed succesfully. Took: {Seconds} secs.");

_cronJobFailed = LoggerMessage.Define<string>(
LogLevel.Warning,
4,
"Cron job '{jobName}' failed to execute.");

_jobFailed = LoggerMessage.Define(
LogLevel.Warning,
1,
"Job failed to execute.");

_jobFailedWillRetry = LoggerMessage.Define(
LogLevel.Warning,
2,
"Job failed to execute. Will retry.");

_jobRetrying = LoggerMessage.Define<int>(
LogLevel.Debug,
3,
"Retrying a job: {Retries}...");

_jobExecuted = LoggerMessage.Define<double>(
LogLevel.Debug,
4,
"Job executed. Took: {Seconds} secs.");

_jobCouldNotBeLoaded = LoggerMessage.Define<int>(
LogLevel.Warning,
5,
"Could not load a job: '{JobId}'.");

_exceptionOccuredWhileExecutingJob = LoggerMessage.Define<int>(
LogLevel.Error,
6,
"An exception occured while trying to execute a job: '{JobId}'. " +
"Requeuing for another retry.");
}

public static void ServerStarting(this ILogger logger, int machineProcessorCount, int processorCount) {
_serverStarting(logger, machineProcessorCount, processorCount, null);
}

public static void ServerShuttingDown(this ILogger logger) {
_serverShuttingDown(logger, null);
}

public static void ExpectedOperationCanceledException(this ILogger logger, Exception ex) {
_expectedOperationCanceledException(logger, ex.Message, ex);
}

public static void CronJobsNotFound(this ILogger logger) {
_cronJobsNotFound(logger, null);
}

public static void CronJobsScheduling(this ILogger logger, IEnumerable<CronJob> jobs) {
_cronJobsScheduling(logger, jobs.Count(), null);
}

public static void CronJobExecuted(this ILogger logger, string name, double seconds) {
_cronJobExecuted(logger, name, seconds, null);
}

public static void CronJobFailed(this ILogger logger, string name, Exception ex) {
_cronJobFailed(logger, name, ex);
}

public static void JobFailed(this ILogger logger, Exception ex) {
_jobFailed(logger, ex);
}

public static void JobFailedWillRetry(this ILogger logger, Exception ex) {
_jobFailedWillRetry(logger, ex);
}

public static void JobRetrying(this ILogger logger, int retries) {
_jobRetrying(logger, retries, null);
}

public static void JobExecuted(this ILogger logger, double seconds) {
_jobExecuted(logger, seconds, null);
}

public static void JobCouldNotBeLoaded(this ILogger logger, int jobId, Exception ex) {
_jobCouldNotBeLoaded(logger, jobId, ex);
}

public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, int jobId, Exception ex) {
_exceptionOccuredWhileExecutingJob(logger, jobId, ex);
}
}
}

+ 3
- 7
src/Cap.Consistency/Microsoft.AspNetCore.Builder/BuilderExtensions.cs View File

@@ -26,14 +26,10 @@ namespace Microsoft.AspNetCore.Builder
throw new InvalidOperationException("Add Consistency must be called on the service collection.");
}

var router = app.ApplicationServices.GetService<ITopicRouteHandler>();

var context = new TopicRouteContext();

router.RouteAsync(context);

var provider = app.ApplicationServices;
var bootstrapper = provider.GetRequiredService<ITopicServer>();
bootstrapper.Start();
return app;
}

}
}

+ 2
- 4
src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ServiceCollectionExtensions.cs View File

@@ -37,11 +37,8 @@ namespace Microsoft.Extensions.DependencyInjection
public static ConsistencyBuilder AddConsistency(this IServiceCollection services, Action<ConsistencyOptions> setupAction) {
services.TryAddSingleton<ConsistencyMarkerService>();

services.TryAddSingleton<ConsistencyMessageManager>();

services.Configure(setupAction);


var IConsumerListenerServices = new Dictionary<Type, Type>();
foreach (var rejectedServices in services) {
if (rejectedServices.ImplementationType != null && typeof(IConsumerService).IsAssignableFrom(rejectedServices.ImplementationType))
@@ -64,7 +61,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<IConsumerInvokerFactory, ConsumerInvokerFactory>();
services.TryAddSingleton<MethodMatcherCache>();

services.TryAddSingleton(typeof(ITopicRouteHandler), typeof(ConsumerHandler));
services.TryAddSingleton(typeof(ITopicServer), typeof(ConsumerHandler));

return new ConsistencyBuilder(services);
}
@@ -73,6 +70,7 @@ namespace Microsoft.Extensions.DependencyInjection
public static ConsistencyBuilder AddMessageStore<T>(this ConsistencyBuilder build)
where T : class, IConsistencyMessageStore {
build.Services.AddScoped<IConsistencyMessageStore, T>();
build.Services.TryAddScoped<ConsistencyMessageManager>();
return build;
}
}

+ 22
- 0
src/Cap.Consistency/TopicContext.cs View File

@@ -0,0 +1,22 @@
using System;
using System.Threading;

namespace Cap.Consistency
{
public class TopicContext
{
public TopicContext() {

}

public TopicContext(IServiceProvider provider, CancellationToken cancellationToken) {
ServiceProvider = provider;
CancellationToken = cancellationToken;
}


public IServiceProvider ServiceProvider { get; set; }

public CancellationToken CancellationToken { get; }
}
}

Loading…
Cancel
Save