diff --git a/src/Cap.Consistency/Cap.Consistency.csproj b/src/Cap.Consistency/Cap.Consistency.csproj
index 611f756..d6307b5 100644
--- a/src/Cap.Consistency/Cap.Consistency.csproj
+++ b/src/Cap.Consistency/Cap.Consistency.csproj
@@ -8,11 +8,17 @@
+
+
+
+
+
+
diff --git a/src/Cap.Consistency/Consumer/ConsumerHandler.cs b/src/Cap.Consistency/Consumer/ConsumerHandler.cs
index 9a981ed..640bc0b 100644
--- a/src/Cap.Consistency/Consumer/ConsumerHandler.cs
+++ b/src/Cap.Consistency/Consumer/ConsumerHandler.cs
@@ -1,29 +1,36 @@
using System;
-using System.Text;
using System.Linq;
+using System.Text;
+using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Internal;
+using Cap.Consistency.Store;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
-using Cap.Consistency.Store;
namespace Cap.Consistency.Consumer
{
- public class ConsumerHandler : IConsumerHandler
+ public class ConsumerHandler : IConsumerHandler, IDisposable
{
private readonly IServiceProvider _serviceProvider;
private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IConsumerClientFactory _consumerClientFactory;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
+
private readonly MethodMatcherCache _selector;
private readonly ConsistencyOptions _options;
private readonly ConsistencyMessageManager _messageManager;
+ private readonly CancellationTokenSource _cts;
public event EventHandler MessageReceieved;
+ private TopicContext _context;
+ private Task _compositeTask;
+ private bool _disposed;
+
public ConsumerHandler(
IServiceProvider serviceProvider,
IConsumerInvokerFactory consumerInvokerFactory,
@@ -32,7 +39,6 @@ namespace Cap.Consistency.Consumer
ConsistencyMessageManager messageManager,
MethodMatcherCache selector,
IOptions options) {
-
_selector = selector;
_logger = loggerFactory.CreateLogger();
_loggerFactory = loggerFactory;
@@ -41,22 +47,17 @@ namespace Cap.Consistency.Consumer
_consumerClientFactory = consumerClientFactory;
_options = options.Value;
_messageManager = messageManager;
+ _cts = new CancellationTokenSource();
}
-
protected virtual void OnMessageReceieved(ConsistencyMessage message) {
MessageReceieved?.Invoke(this, message);
}
- public Task RouteAsync(TopicRouteContext context) {
-
- if (context == null) {
- throw new ArgumentNullException(nameof(context));
- }
-
- context.ServiceProvider = _serviceProvider;
+ public void Start() {
+ _context = new TopicContext(_serviceProvider, _cts.Token);
- var matchs = _selector.GetCandidatesMethods(context);
+ var matchs = _selector.GetCandidatesMethods(_context);
var groupingMatchs = matchs.GroupBy(x => x.Value.Attribute.GroupOrExchange);
@@ -73,10 +74,10 @@ namespace Cap.Consistency.Consumer
}
}, TaskCreationOptions.LongRunning);
}
- return Task.CompletedTask;
+ _compositeTask = Task.CompletedTask;
}
- private void OnMessageReceieved(object sender, DeliverMessage message) {
+ public virtual void OnMessageReceieved(object sender, DeliverMessage message) {
var consistencyMessage = new ConsistencyMessage() {
Id = message.MessageKey,
Payload = Encoding.UTF8.GetString(message.Body)
@@ -96,12 +97,30 @@ namespace Cap.Consistency.Consumer
invoker.InvokeAsync();
_messageManager.UpdateAsync(consistencyMessage).Wait();
-
}
catch (Exception ex) {
-
_logger.LogError("exception raised when excute method : " + ex.Message);
}
}
+
+ public void Dispose() {
+ if (_disposed) {
+ return;
+ }
+ _disposed = true;
+
+ _logger.ServerShuttingDown();
+ _cts.Cancel();
+
+ try {
+ _compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds);
+ }
+ catch (AggregateException ex) {
+ var innerEx = ex.InnerExceptions[0];
+ if (!(innerEx is OperationCanceledException)) {
+ _logger.ExpectedOperationCanceledException(innerEx);
+ }
+ }
+ }
}
-}
+}
\ No newline at end of file
diff --git a/src/Cap.Consistency/IBootstrapper.Base.cs b/src/Cap.Consistency/IBootstrapper.Base.cs
new file mode 100644
index 0000000..36e7949
--- /dev/null
+++ b/src/Cap.Consistency/IBootstrapper.Base.cs
@@ -0,0 +1,71 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Cap.Consistency.Infrastructure;
+using Cap.Consistency.Store;
+using Microsoft.AspNetCore.Hosting;
+
+namespace Cap.Consistency
+{
+ public abstract class BootstrapperBase : IBootstrapper
+ {
+ private IApplicationLifetime _appLifetime;
+ private CancellationTokenSource _cts;
+ private CancellationTokenRegistration _ctsRegistration;
+ private Task _bootstrappingTask;
+
+ public BootstrapperBase(
+ ConsistencyOptions options,
+ ConsistencyMessageManager storage,
+ ITopicServer server,
+ IApplicationLifetime appLifetime,
+ IServiceProvider provider) {
+ Options = options;
+ Storage = storage;
+ Server = server;
+ _appLifetime = appLifetime;
+ Provider = provider;
+
+ _cts = new CancellationTokenSource();
+ _ctsRegistration = appLifetime.ApplicationStopping.Register(() => {
+ _cts.Cancel();
+ try {
+ _bootstrappingTask?.Wait();
+ }
+ catch (OperationCanceledException) {
+ }
+ });
+ }
+
+ protected ConsistencyOptions Options { get; }
+
+ protected ConsistencyMessageManager Storage { get; }
+
+ protected ITopicServer Server { get; }
+
+ public IServiceProvider Provider { get; private set; }
+
+ public Task BootstrapAsync() {
+ return (_bootstrappingTask = BootstrapTaskAsync());
+ }
+
+ private async Task BootstrapTaskAsync() {
+ if (_cts.IsCancellationRequested) return;
+
+ if (_cts.IsCancellationRequested) return;
+
+ await BootstrapCoreAsync();
+ if (_cts.IsCancellationRequested) return;
+
+ Server.Start();
+
+ _ctsRegistration.Dispose();
+ _cts.Dispose();
+ }
+
+ public virtual Task BootstrapCoreAsync() {
+ _appLifetime.ApplicationStopping.Register(() => Server.Dispose());
+ return Task.FromResult(0);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Cap.Consistency/IBootstrapper.cs b/src/Cap.Consistency/IBootstrapper.cs
new file mode 100644
index 0000000..b47238c
--- /dev/null
+++ b/src/Cap.Consistency/IBootstrapper.cs
@@ -0,0 +1,12 @@
+using System.Threading.Tasks;
+
+namespace Cap.Consistency
+{
+ ///
+ /// Represents bootstrapping logic. For example, adding initial state to the storage or querying certain entities.
+ ///
+ public interface IBootstrapper
+ {
+ Task BootstrapAsync();
+ }
+}
\ No newline at end of file
diff --git a/src/Cap.Consistency/ITopicServer.cs b/src/Cap.Consistency/ITopicServer.cs
new file mode 100644
index 0000000..ae38fa0
--- /dev/null
+++ b/src/Cap.Consistency/ITopicServer.cs
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Cap.Consistency
+{
+ public interface ITopicServer : IDisposable
+ {
+ void Start();
+ }
+}
diff --git a/src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs b/src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs
index 619d4a6..67e3434 100644
--- a/src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs
+++ b/src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs
@@ -1,6 +1,4 @@
-using Cap.Consistency;
-
-namespace Cap.Consistency.Infrastructure
+namespace Cap.Consistency.Infrastructure
{
///
/// Represents all the options you can use to configure the system.
@@ -8,5 +6,7 @@ namespace Cap.Consistency.Infrastructure
public class ConsistencyOptions
{
public string BrokerUrlList { get; set; } = "localhost:9092";
+
+ public string Cron { get; set; } = "* * * * *";
}
}
\ No newline at end of file
diff --git a/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs b/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs
index 321c7c8..d72d621 100644
--- a/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs
+++ b/src/Cap.Consistency/Infrastructure/IConsumerExcutorSelector.cs
@@ -7,7 +7,7 @@ namespace Cap.Consistency.Infrastructure
{
public interface IConsumerExcutorSelector
{
- IReadOnlyList SelectCandidates(TopicRouteContext context);
+ IReadOnlyList SelectCandidates(TopicContext context);
ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList executeDescriptor);
}
diff --git a/src/Cap.Consistency/LoggerExtensions.cs b/src/Cap.Consistency/LoggerExtensions.cs
new file mode 100644
index 0000000..7957053
--- /dev/null
+++ b/src/Cap.Consistency/LoggerExtensions.cs
@@ -0,0 +1,147 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Cap.Consistency.Job;
+using Microsoft.Extensions.Logging;
+
+namespace Cap.Consistency
+{
+ internal static class LoggerExtensions
+ {
+ private static Action _serverStarting;
+ private static Action _serverShuttingDown;
+ private static Action _expectedOperationCanceledException;
+
+ private static Action _cronJobsNotFound;
+ private static Action _cronJobsScheduling;
+ private static Action _cronJobExecuted;
+ private static Action _cronJobFailed;
+
+ private static Action _jobFailed;
+ private static Action _jobFailedWillRetry;
+ private static Action _jobExecuted;
+ private static Action _jobRetrying;
+ private static Action _jobCouldNotBeLoaded;
+ private static Action _exceptionOccuredWhileExecutingJob;
+
+ static LoggerExtensions() {
+ _serverStarting = LoggerMessage.Define(
+ LogLevel.Debug,
+ 1,
+ "Starting the processing server. Detected {MachineProcessorCount} machine processor(s). Initiating {ProcessorCount} job processor(s).");
+
+ _serverShuttingDown = LoggerMessage.Define(
+ LogLevel.Debug,
+ 2,
+ "Shutting down the processing server...");
+
+ _expectedOperationCanceledException = LoggerMessage.Define(
+ LogLevel.Warning,
+ 3,
+ "Expected an OperationCanceledException, but found '{ExceptionMessage}'.");
+
+ _cronJobsNotFound = LoggerMessage.Define(
+ LogLevel.Debug,
+ 1,
+ "No cron jobs found to schedule, cancelling processing of cron jobs.");
+
+ _cronJobsScheduling = LoggerMessage.Define(
+ LogLevel.Debug,
+ 2,
+ "Found {JobCount} cron job(s) to schedule.");
+
+ _cronJobExecuted = LoggerMessage.Define(
+ LogLevel.Debug,
+ 3,
+ "Cron job '{JobName}' executed succesfully. Took: {Seconds} secs.");
+
+ _cronJobFailed = LoggerMessage.Define(
+ LogLevel.Warning,
+ 4,
+ "Cron job '{jobName}' failed to execute.");
+
+ _jobFailed = LoggerMessage.Define(
+ LogLevel.Warning,
+ 1,
+ "Job failed to execute.");
+
+ _jobFailedWillRetry = LoggerMessage.Define(
+ LogLevel.Warning,
+ 2,
+ "Job failed to execute. Will retry.");
+
+ _jobRetrying = LoggerMessage.Define(
+ LogLevel.Debug,
+ 3,
+ "Retrying a job: {Retries}...");
+
+ _jobExecuted = LoggerMessage.Define(
+ LogLevel.Debug,
+ 4,
+ "Job executed. Took: {Seconds} secs.");
+
+ _jobCouldNotBeLoaded = LoggerMessage.Define(
+ LogLevel.Warning,
+ 5,
+ "Could not load a job: '{JobId}'.");
+
+ _exceptionOccuredWhileExecutingJob = LoggerMessage.Define(
+ LogLevel.Error,
+ 6,
+ "An exception occured while trying to execute a job: '{JobId}'. " +
+ "Requeuing for another retry.");
+ }
+
+ public static void ServerStarting(this ILogger logger, int machineProcessorCount, int processorCount) {
+ _serverStarting(logger, machineProcessorCount, processorCount, null);
+ }
+
+ public static void ServerShuttingDown(this ILogger logger) {
+ _serverShuttingDown(logger, null);
+ }
+
+ public static void ExpectedOperationCanceledException(this ILogger logger, Exception ex) {
+ _expectedOperationCanceledException(logger, ex.Message, ex);
+ }
+
+ public static void CronJobsNotFound(this ILogger logger) {
+ _cronJobsNotFound(logger, null);
+ }
+
+ public static void CronJobsScheduling(this ILogger logger, IEnumerable jobs) {
+ _cronJobsScheduling(logger, jobs.Count(), null);
+ }
+
+ public static void CronJobExecuted(this ILogger logger, string name, double seconds) {
+ _cronJobExecuted(logger, name, seconds, null);
+ }
+
+ public static void CronJobFailed(this ILogger logger, string name, Exception ex) {
+ _cronJobFailed(logger, name, ex);
+ }
+
+ public static void JobFailed(this ILogger logger, Exception ex) {
+ _jobFailed(logger, ex);
+ }
+
+ public static void JobFailedWillRetry(this ILogger logger, Exception ex) {
+ _jobFailedWillRetry(logger, ex);
+ }
+
+ public static void JobRetrying(this ILogger logger, int retries) {
+ _jobRetrying(logger, retries, null);
+ }
+
+ public static void JobExecuted(this ILogger logger, double seconds) {
+ _jobExecuted(logger, seconds, null);
+ }
+
+ public static void JobCouldNotBeLoaded(this ILogger logger, int jobId, Exception ex) {
+ _jobCouldNotBeLoaded(logger, jobId, ex);
+ }
+
+ public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, int jobId, Exception ex) {
+ _exceptionOccuredWhileExecutingJob(logger, jobId, ex);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Cap.Consistency/Microsoft.AspNetCore.Builder/BuilderExtensions.cs b/src/Cap.Consistency/Microsoft.AspNetCore.Builder/BuilderExtensions.cs
index da6058f..1e31537 100644
--- a/src/Cap.Consistency/Microsoft.AspNetCore.Builder/BuilderExtensions.cs
+++ b/src/Cap.Consistency/Microsoft.AspNetCore.Builder/BuilderExtensions.cs
@@ -26,14 +26,10 @@ namespace Microsoft.AspNetCore.Builder
throw new InvalidOperationException("Add Consistency must be called on the service collection.");
}
- var router = app.ApplicationServices.GetService();
-
- var context = new TopicRouteContext();
-
- router.RouteAsync(context);
-
+ var provider = app.ApplicationServices;
+ var bootstrapper = provider.GetRequiredService();
+ bootstrapper.Start();
return app;
}
-
}
}
\ No newline at end of file
diff --git a/src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ServiceCollectionExtensions.cs b/src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ServiceCollectionExtensions.cs
index 279ee7d..a93893a 100644
--- a/src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ServiceCollectionExtensions.cs
+++ b/src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ServiceCollectionExtensions.cs
@@ -37,11 +37,8 @@ namespace Microsoft.Extensions.DependencyInjection
public static ConsistencyBuilder AddConsistency(this IServiceCollection services, Action setupAction) {
services.TryAddSingleton();
- services.TryAddSingleton();
-
services.Configure(setupAction);
-
var IConsumerListenerServices = new Dictionary();
foreach (var rejectedServices in services) {
if (rejectedServices.ImplementationType != null && typeof(IConsumerService).IsAssignableFrom(rejectedServices.ImplementationType))
@@ -64,7 +61,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton();
services.TryAddSingleton();
- services.TryAddSingleton(typeof(ITopicRouteHandler), typeof(ConsumerHandler));
+ services.TryAddSingleton(typeof(ITopicServer), typeof(ConsumerHandler));
return new ConsistencyBuilder(services);
}
@@ -73,6 +70,7 @@ namespace Microsoft.Extensions.DependencyInjection
public static ConsistencyBuilder AddMessageStore(this ConsistencyBuilder build)
where T : class, IConsistencyMessageStore {
build.Services.AddScoped();
+ build.Services.TryAddScoped();
return build;
}
}
diff --git a/src/Cap.Consistency/TopicContext.cs b/src/Cap.Consistency/TopicContext.cs
new file mode 100644
index 0000000..d24552a
--- /dev/null
+++ b/src/Cap.Consistency/TopicContext.cs
@@ -0,0 +1,22 @@
+using System;
+using System.Threading;
+
+namespace Cap.Consistency
+{
+ public class TopicContext
+ {
+ public TopicContext() {
+
+ }
+
+ public TopicContext(IServiceProvider provider, CancellationToken cancellationToken) {
+ ServiceProvider = provider;
+ CancellationToken = cancellationToken;
+ }
+
+
+ public IServiceProvider ServiceProvider { get; set; }
+
+ public CancellationToken CancellationToken { get; }
+ }
+}