From e56637f5991b545ed3e44cae09b8817ecee14e7b Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Fri, 23 Jun 2017 10:21:02 +0800 Subject: [PATCH] --- .../ConsistencyBuilderExtensions.cs | 5 +- .../RabbitMQProducerClient.cs | 1 - src/Cap.Consistency/Cap.Consistency.csproj | 1 + src/Cap.Consistency/IBootstrapper.Default.cs | 6 +- .../IConsistencyMessageStore.cs | 61 +++++++++++++++ .../Infrastructure/ConsistencyMessage.cs | 11 ++- .../Infrastructure/ConsistencyOptions.cs | 2 + src/Cap.Consistency/LoggerExtensions.cs | 59 -------------- .../ConsistencyBuilder.cs | 8 +- .../ServiceCollectionExtensions.cs | 5 +- src/Cap.Consistency/OperateResult.cs | 78 +++++++++++++++++++ 11 files changed, 166 insertions(+), 71 deletions(-) create mode 100644 src/Cap.Consistency/IConsistencyMessageStore.cs create mode 100644 src/Cap.Consistency/OperateResult.cs diff --git a/src/Cap.Consistency.Kafka/Microsoft.Extensions.DependencyInjection/ConsistencyBuilderExtensions.cs b/src/Cap.Consistency.Kafka/Microsoft.Extensions.DependencyInjection/ConsistencyBuilderExtensions.cs index 7eeab30..c46a0d4 100644 --- a/src/Cap.Consistency.Kafka/Microsoft.Extensions.DependencyInjection/ConsistencyBuilderExtensions.cs +++ b/src/Cap.Consistency.Kafka/Microsoft.Extensions.DependencyInjection/ConsistencyBuilderExtensions.cs @@ -1,15 +1,16 @@ using Cap.Consistency.Consumer; +using Cap.Consistency.Job; using Cap.Consistency.Kafka; -using Cap.Consistency.Producer; namespace Microsoft.Extensions.DependencyInjection { public static class ConsistencyBuilderExtensions { public static ConsistencyBuilder AddKafka(this ConsistencyBuilder builder) { + builder.Services.AddSingleton(); - builder.Services.AddTransient(); + builder.Services.AddTransient(); return builder; } diff --git a/src/Cap.Consistency.RabbitMQ/RabbitMQProducerClient.cs b/src/Cap.Consistency.RabbitMQ/RabbitMQProducerClient.cs index 402ec35..ad9e0f9 100644 --- a/src/Cap.Consistency.RabbitMQ/RabbitMQProducerClient.cs +++ b/src/Cap.Consistency.RabbitMQ/RabbitMQProducerClient.cs @@ -1,7 +1,6 @@ using System.Text; using System.Threading.Tasks; using Cap.Consistency.Infrastructure; -using Cap.Consistency.Producer; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; diff --git a/src/Cap.Consistency/Cap.Consistency.csproj b/src/Cap.Consistency/Cap.Consistency.csproj index d6307b5..7f13274 100644 --- a/src/Cap.Consistency/Cap.Consistency.csproj +++ b/src/Cap.Consistency/Cap.Consistency.csproj @@ -20,6 +20,7 @@ + diff --git a/src/Cap.Consistency/IBootstrapper.Default.cs b/src/Cap.Consistency/IBootstrapper.Default.cs index 81b3f82..e0361b1 100644 --- a/src/Cap.Consistency/IBootstrapper.Default.cs +++ b/src/Cap.Consistency/IBootstrapper.Default.cs @@ -4,7 +4,6 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Cap.Consistency.Infrastructure; -using Cap.Consistency.Store; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -20,7 +19,7 @@ namespace Cap.Consistency public DefaultBootstrapper( IOptions options, - ConsistencyMessageManager storage, + IConsistencyMessageStore storage, IApplicationLifetime appLifetime, IServiceProvider provider) { @@ -42,7 +41,7 @@ namespace Cap.Consistency protected ConsistencyOptions Options { get; } - protected ConsistencyMessageManager Storage { get; } + protected IConsistencyMessageStore Storage { get; } protected IEnumerable Servers { get; } @@ -58,6 +57,7 @@ namespace Cap.Consistency if (_cts.IsCancellationRequested) return; await BootstrapCoreAsync(); + if (_cts.IsCancellationRequested) return; foreach (var item in Servers) { diff --git a/src/Cap.Consistency/IConsistencyMessageStore.cs b/src/Cap.Consistency/IConsistencyMessageStore.cs new file mode 100644 index 0000000..6a8389c --- /dev/null +++ b/src/Cap.Consistency/IConsistencyMessageStore.cs @@ -0,0 +1,61 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Cap.Consistency.Infrastructure; + +namespace Cap.Consistency +{ + /// + /// Provides an abstraction for a store which manages consistent message. + /// + /// + public interface IConsistencyMessageStore + { + /// + /// Finds and returns a message, if any, who has the specified . + /// + /// The message ID to search for. + /// The used to propagate notifications that the operation should be canceled. + /// + /// The that represents the asynchronous operation, containing the message matching the specified if it exists. + /// + Task FindByIdAsync(string messageId, CancellationToken cancellationToken); + + + /// + /// Creates a new message in a store as an asynchronous operation. + /// + /// The message to create in the store. + /// The used to propagate notifications that the operation should be canceled. + /// A that represents the of the asynchronous query. + Task CreateAsync(ConsistencyMessage message, CancellationToken cancellationToken); + + /// + /// Updates a message in a store as an asynchronous operation. + /// + /// The message to update in the store. + /// The used to propagate notifications that the operation should be canceled. + /// A that represents the of the asynchronous query. + Task UpdateAsync(ConsistencyMessage message, CancellationToken cancellationToken); + + /// + /// Deletes a message from the store as an asynchronous operation. + /// + /// The message to delete in the store. + /// The used to propagate notifications that the operation should be canceled. + /// A that represents the of the asynchronous query. + Task DeleteAsync(ConsistencyMessage message, CancellationToken cancellationToken); + + /// + /// Gets the ID for a message from the store as an asynchronous operation. + /// + /// The message whose ID should be returned. + /// The used to propagate notifications that the operation should be canceled. + /// A that contains the ID of the message. + Task GeConsistencyMessageIdAsync(ConsistencyMessage message, CancellationToken cancellationToken); + + Task GetFirstEnqueuedMessageAsync(CancellationToken cancellationToken); + + // void ChangeState(ConsistencyMessage message, MessageStatus status); + } +} \ No newline at end of file diff --git a/src/Cap.Consistency/Infrastructure/ConsistencyMessage.cs b/src/Cap.Consistency/Infrastructure/ConsistencyMessage.cs index 4accc0f..a3bef62 100644 --- a/src/Cap.Consistency/Infrastructure/ConsistencyMessage.cs +++ b/src/Cap.Consistency/Infrastructure/ConsistencyMessage.cs @@ -5,7 +5,7 @@ namespace Cap.Consistency.Infrastructure /// /// The default implementation of which uses a string as a primary key. /// - public class ConsistencyMessage + public class ConsistencyMessage { /// /// Initializes a new instance of . @@ -24,11 +24,15 @@ namespace Cap.Consistency.Infrastructure public DateTime SendTime { get; set; } + public string Topic { get; set; } + public string Payload { get; set; } public MessageStatus Status { get; set; } public virtual DateTime? UpdateTime { get; set; } + + public byte[] RowVersion { get; set; } } /// @@ -38,7 +42,10 @@ namespace Cap.Consistency.Infrastructure { Deleted = 0, WaitForSend = 1, + Processing = 2, RollbackSuccessed = 3, - RollbackFailed = 4 + RollbackFailed = 4, + Successed = 5, + Received = 6 } } \ No newline at end of file diff --git a/src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs b/src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs index fac9019..963ba89 100644 --- a/src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs +++ b/src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs @@ -10,5 +10,7 @@ namespace Cap.Consistency.Infrastructure public string BrokerUrlList { get; set; } = "localhost:9092"; public string CronExp { get; set; } = Cron.Minutely(); + + public int PollingDelay { get; set; } = 8; } } \ No newline at end of file diff --git a/src/Cap.Consistency/LoggerExtensions.cs b/src/Cap.Consistency/LoggerExtensions.cs index 7957053..77d0e3a 100644 --- a/src/Cap.Consistency/LoggerExtensions.cs +++ b/src/Cap.Consistency/LoggerExtensions.cs @@ -17,13 +17,6 @@ namespace Cap.Consistency private static Action _cronJobExecuted; private static Action _cronJobFailed; - private static Action _jobFailed; - private static Action _jobFailedWillRetry; - private static Action _jobExecuted; - private static Action _jobRetrying; - private static Action _jobCouldNotBeLoaded; - private static Action _exceptionOccuredWhileExecutingJob; - static LoggerExtensions() { _serverStarting = LoggerMessage.Define( LogLevel.Debug, @@ -60,36 +53,7 @@ namespace Cap.Consistency 4, "Cron job '{jobName}' failed to execute."); - _jobFailed = LoggerMessage.Define( - LogLevel.Warning, - 1, - "Job failed to execute."); - - _jobFailedWillRetry = LoggerMessage.Define( - LogLevel.Warning, - 2, - "Job failed to execute. Will retry."); - - _jobRetrying = LoggerMessage.Define( - LogLevel.Debug, - 3, - "Retrying a job: {Retries}..."); - - _jobExecuted = LoggerMessage.Define( - LogLevel.Debug, - 4, - "Job executed. Took: {Seconds} secs."); - _jobCouldNotBeLoaded = LoggerMessage.Define( - LogLevel.Warning, - 5, - "Could not load a job: '{JobId}'."); - - _exceptionOccuredWhileExecutingJob = LoggerMessage.Define( - LogLevel.Error, - 6, - "An exception occured while trying to execute a job: '{JobId}'. " + - "Requeuing for another retry."); } public static void ServerStarting(this ILogger logger, int machineProcessorCount, int processorCount) { @@ -120,28 +84,5 @@ namespace Cap.Consistency _cronJobFailed(logger, name, ex); } - public static void JobFailed(this ILogger logger, Exception ex) { - _jobFailed(logger, ex); - } - - public static void JobFailedWillRetry(this ILogger logger, Exception ex) { - _jobFailedWillRetry(logger, ex); - } - - public static void JobRetrying(this ILogger logger, int retries) { - _jobRetrying(logger, retries, null); - } - - public static void JobExecuted(this ILogger logger, double seconds) { - _jobExecuted(logger, seconds, null); - } - - public static void JobCouldNotBeLoaded(this ILogger logger, int jobId, Exception ex) { - _jobCouldNotBeLoaded(logger, jobId, ex); - } - - public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, int jobId, Exception ex) { - _exceptionOccuredWhileExecutingJob(logger, jobId, ex); - } } } \ No newline at end of file diff --git a/src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ConsistencyBuilder.cs b/src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ConsistencyBuilder.cs index 9245cb9..999d8a0 100644 --- a/src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ConsistencyBuilder.cs +++ b/src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ConsistencyBuilder.cs @@ -1,6 +1,6 @@ using System; +using Cap.Consistency; using Cap.Consistency.Job; -using Cap.Consistency.Store; namespace Microsoft.Extensions.DependencyInjection { @@ -45,5 +45,11 @@ namespace Microsoft.Extensions.DependencyInjection return AddSingleton(); } + + public virtual ConsistencyBuilder AddProducerClient() + where T:class, IProducerClient { + + return AddScoped(typeof(IProducerClient), typeof(T)); + } } } \ No newline at end of file diff --git a/src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ServiceCollectionExtensions.cs b/src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ServiceCollectionExtensions.cs index 561683d..314a8d2 100644 --- a/src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ServiceCollectionExtensions.cs @@ -7,7 +7,6 @@ using Cap.Consistency.Consumer; using Cap.Consistency.Infrastructure; using Cap.Consistency.Internal; using Cap.Consistency.Job; -using Cap.Consistency.Store; using Microsoft.Extensions.DependencyInjection.Extensions; namespace Microsoft.Extensions.DependencyInjection @@ -48,8 +47,6 @@ namespace Microsoft.Extensions.DependencyInjection services.TryAddSingleton(); services.TryAddSingleton(); - services.TryAddScoped(); - services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); @@ -58,6 +55,8 @@ namespace Microsoft.Extensions.DependencyInjection services.TryAddSingleton(); services.TryAddTransient(); + services.TryAddScoped(); + return new ConsistencyBuilder(services); } diff --git a/src/Cap.Consistency/OperateResult.cs b/src/Cap.Consistency/OperateResult.cs new file mode 100644 index 0000000..f51e3dd --- /dev/null +++ b/src/Cap.Consistency/OperateResult.cs @@ -0,0 +1,78 @@ +using System.Collections.Generic; +using System.Linq; + +namespace Cap.Consistency +{ + /// + /// Represents the result of an consistent message operation. + /// + public class OperateResult + { + // ReSharper disable once InconsistentNaming + private static readonly OperateResult _success = new OperateResult { Succeeded = true }; + + // ReSharper disable once FieldCanBeMadeReadOnly.Local + private List _errors = new List(); + + /// + /// Flag indicating whether if the operation succeeded or not. + /// + public bool Succeeded { get; set; } + + /// + /// An of s containing an errors + /// that occurred during the operation. + /// + /// An of s. + public IEnumerable Errors => _errors; + + /// + /// Returns an indicating a successful identity operation. + /// + /// An indicating a successful operation. + public static OperateResult Success => _success; + + /// + /// Creates an indicating a failed operation, with a list of if applicable. + /// + /// An optional array of s which caused the operation to fail. + /// An indicating a failed operation, with a list of if applicable. + public static OperateResult Failed(params OperateError[] errors) { + var result = new OperateResult { Succeeded = false }; + if (errors != null) { + result._errors.AddRange(errors); + } + return result; + } + + /// + /// Converts the value of the current object to its equivalent string representation. + /// + /// A string representation of the current object. + /// + /// If the operation was successful the ToString() will return "Succeeded" otherwise it returned + /// "Failed : " followed by a comma delimited list of error codes from its collection, if any. + /// + public override string ToString() { + return Succeeded ? + "Succeeded" : + string.Format("{0} : {1}", "Failed", string.Join(",", Errors.Select(x => x.Code).ToList())); + } + } + + /// + /// Encapsulates an error from the operate subsystem. + /// + public class OperateError + { + /// + /// Gets or sets ths code for this error. + /// + public string Code { get; set; } + + /// + /// Gets or sets the description for this error. + /// + public string Description { get; set; } + } +} \ No newline at end of file