yangxiaodong 7 years ago
parent
commit
e56637f599
11 changed files with 166 additions and 71 deletions
  1. +3
    -2
      src/Cap.Consistency.Kafka/Microsoft.Extensions.DependencyInjection/ConsistencyBuilderExtensions.cs
  2. +0
    -1
      src/Cap.Consistency.RabbitMQ/RabbitMQProducerClient.cs
  3. +1
    -0
      src/Cap.Consistency/Cap.Consistency.csproj
  4. +3
    -3
      src/Cap.Consistency/IBootstrapper.Default.cs
  5. +61
    -0
      src/Cap.Consistency/IConsistencyMessageStore.cs
  6. +9
    -2
      src/Cap.Consistency/Infrastructure/ConsistencyMessage.cs
  7. +2
    -0
      src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs
  8. +0
    -59
      src/Cap.Consistency/LoggerExtensions.cs
  9. +7
    -1
      src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ConsistencyBuilder.cs
  10. +2
    -3
      src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ServiceCollectionExtensions.cs
  11. +78
    -0
      src/Cap.Consistency/OperateResult.cs

+ 3
- 2
src/Cap.Consistency.Kafka/Microsoft.Extensions.DependencyInjection/ConsistencyBuilderExtensions.cs View File

@@ -1,15 +1,16 @@
using Cap.Consistency.Consumer; using Cap.Consistency.Consumer;
using Cap.Consistency.Job;
using Cap.Consistency.Kafka; using Cap.Consistency.Kafka;
using Cap.Consistency.Producer;


namespace Microsoft.Extensions.DependencyInjection namespace Microsoft.Extensions.DependencyInjection
{ {
public static class ConsistencyBuilderExtensions public static class ConsistencyBuilderExtensions
{ {
public static ConsistencyBuilder AddKafka(this ConsistencyBuilder builder) { public static ConsistencyBuilder AddKafka(this ConsistencyBuilder builder) {

builder.Services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>(); builder.Services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();


builder.Services.AddTransient<IProducerClient, KafkaProducerClient>();
builder.Services.AddTransient<IJobProcessor, KafkaJobProcessor>();


return builder; return builder;
} }


+ 0
- 1
src/Cap.Consistency.RabbitMQ/RabbitMQProducerClient.cs View File

@@ -1,7 +1,6 @@
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Cap.Consistency.Infrastructure; using Cap.Consistency.Infrastructure;
using Cap.Consistency.Producer;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using RabbitMQ.Client; using RabbitMQ.Client;


+ 1
- 0
src/Cap.Consistency/Cap.Consistency.csproj View File

@@ -20,6 +20,7 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="ncrontab" Version="3.3.0" /> <PackageReference Include="ncrontab" Version="3.3.0" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.2" /> <PackageReference Include="Newtonsoft.Json" Version="10.0.2" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" />
</ItemGroup> </ItemGroup>


</Project> </Project>

+ 3
- 3
src/Cap.Consistency/IBootstrapper.Default.cs View File

@@ -4,7 +4,6 @@ using System.Collections.Generic;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Cap.Consistency.Infrastructure; using Cap.Consistency.Infrastructure;
using Cap.Consistency.Store;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
@@ -20,7 +19,7 @@ namespace Cap.Consistency


public DefaultBootstrapper( public DefaultBootstrapper(
IOptions<ConsistencyOptions> options, IOptions<ConsistencyOptions> options,
ConsistencyMessageManager storage,
IConsistencyMessageStore storage,
IApplicationLifetime appLifetime, IApplicationLifetime appLifetime,
IServiceProvider provider) { IServiceProvider provider) {


@@ -42,7 +41,7 @@ namespace Cap.Consistency


protected ConsistencyOptions Options { get; } protected ConsistencyOptions Options { get; }


protected ConsistencyMessageManager Storage { get; }
protected IConsistencyMessageStore Storage { get; }


protected IEnumerable<IProcessingServer> Servers { get; } protected IEnumerable<IProcessingServer> Servers { get; }


@@ -58,6 +57,7 @@ namespace Cap.Consistency
if (_cts.IsCancellationRequested) return; if (_cts.IsCancellationRequested) return;


await BootstrapCoreAsync(); await BootstrapCoreAsync();

if (_cts.IsCancellationRequested) return; if (_cts.IsCancellationRequested) return;


foreach (var item in Servers) { foreach (var item in Servers) {


+ 61
- 0
src/Cap.Consistency/IConsistencyMessageStore.cs View File

@@ -0,0 +1,61 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;

namespace Cap.Consistency
{
/// <summary>
/// Provides an abstraction for a store which manages consistent message.
/// </summary>
/// <typeparam name="ConsistencyMessage"></typeparam>
public interface IConsistencyMessageStore
{
/// <summary>
/// Finds and returns a message, if any, who has the specified <paramref name="messageId"/>.
/// </summary>
/// <param name="messageId">The message ID to search for.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the message matching the specified <paramref name="messageId"/> if it exists.
/// </returns>
Task<ConsistencyMessage> FindByIdAsync(string messageId, CancellationToken cancellationToken);

/// <summary>
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to create in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> CreateAsync(ConsistencyMessage message, CancellationToken cancellationToken);

/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> UpdateAsync(ConsistencyMessage message, CancellationToken cancellationToken);

/// <summary>
/// Deletes a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to delete in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> DeleteAsync(ConsistencyMessage message, CancellationToken cancellationToken);

/// <summary>
/// Gets the ID for a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message whose ID should be returned.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that contains the ID of the message.</returns>
Task<string> GeConsistencyMessageIdAsync(ConsistencyMessage message, CancellationToken cancellationToken);

Task<ConsistencyMessage> GetFirstEnqueuedMessageAsync(CancellationToken cancellationToken);

// void ChangeState(ConsistencyMessage message, MessageStatus status);
}
}

+ 9
- 2
src/Cap.Consistency/Infrastructure/ConsistencyMessage.cs View File

@@ -5,7 +5,7 @@ namespace Cap.Consistency.Infrastructure
/// <summary> /// <summary>
/// The default implementation of <see cref="ConsistencyMessage{TKey}"/> which uses a string as a primary key. /// The default implementation of <see cref="ConsistencyMessage{TKey}"/> which uses a string as a primary key.
/// </summary> /// </summary>
public class ConsistencyMessage
public class ConsistencyMessage
{ {
/// <summary> /// <summary>
/// Initializes a new instance of <see cref="ConsistencyMessage"/>. /// Initializes a new instance of <see cref="ConsistencyMessage"/>.
@@ -24,11 +24,15 @@ namespace Cap.Consistency.Infrastructure


public DateTime SendTime { get; set; } public DateTime SendTime { get; set; }


public string Topic { get; set; }

public string Payload { get; set; } public string Payload { get; set; }


public MessageStatus Status { get; set; } public MessageStatus Status { get; set; }


public virtual DateTime? UpdateTime { get; set; } public virtual DateTime? UpdateTime { get; set; }

public byte[] RowVersion { get; set; }
} }


/// <summary> /// <summary>
@@ -38,7 +42,10 @@ namespace Cap.Consistency.Infrastructure
{ {
Deleted = 0, Deleted = 0,
WaitForSend = 1, WaitForSend = 1,
Processing = 2,
RollbackSuccessed = 3, RollbackSuccessed = 3,
RollbackFailed = 4
RollbackFailed = 4,
Successed = 5,
Received = 6
} }
} }

+ 2
- 0
src/Cap.Consistency/Infrastructure/ConsistencyOptions.cs View File

@@ -10,5 +10,7 @@ namespace Cap.Consistency.Infrastructure
public string BrokerUrlList { get; set; } = "localhost:9092"; public string BrokerUrlList { get; set; } = "localhost:9092";


public string CronExp { get; set; } = Cron.Minutely(); public string CronExp { get; set; } = Cron.Minutely();

public int PollingDelay { get; set; } = 8;
} }
} }

+ 0
- 59
src/Cap.Consistency/LoggerExtensions.cs View File

@@ -17,13 +17,6 @@ namespace Cap.Consistency
private static Action<ILogger, string, double, Exception> _cronJobExecuted; private static Action<ILogger, string, double, Exception> _cronJobExecuted;
private static Action<ILogger, string, Exception> _cronJobFailed; private static Action<ILogger, string, Exception> _cronJobFailed;


private static Action<ILogger, Exception> _jobFailed;
private static Action<ILogger, Exception> _jobFailedWillRetry;
private static Action<ILogger, double, Exception> _jobExecuted;
private static Action<ILogger, int, Exception> _jobRetrying;
private static Action<ILogger, int, Exception> _jobCouldNotBeLoaded;
private static Action<ILogger, int, Exception> _exceptionOccuredWhileExecutingJob;

static LoggerExtensions() { static LoggerExtensions() {
_serverStarting = LoggerMessage.Define<int, int>( _serverStarting = LoggerMessage.Define<int, int>(
LogLevel.Debug, LogLevel.Debug,
@@ -60,36 +53,7 @@ namespace Cap.Consistency
4, 4,
"Cron job '{jobName}' failed to execute."); "Cron job '{jobName}' failed to execute.");


_jobFailed = LoggerMessage.Define(
LogLevel.Warning,
1,
"Job failed to execute.");

_jobFailedWillRetry = LoggerMessage.Define(
LogLevel.Warning,
2,
"Job failed to execute. Will retry.");

_jobRetrying = LoggerMessage.Define<int>(
LogLevel.Debug,
3,
"Retrying a job: {Retries}...");

_jobExecuted = LoggerMessage.Define<double>(
LogLevel.Debug,
4,
"Job executed. Took: {Seconds} secs.");


_jobCouldNotBeLoaded = LoggerMessage.Define<int>(
LogLevel.Warning,
5,
"Could not load a job: '{JobId}'.");

_exceptionOccuredWhileExecutingJob = LoggerMessage.Define<int>(
LogLevel.Error,
6,
"An exception occured while trying to execute a job: '{JobId}'. " +
"Requeuing for another retry.");
} }


public static void ServerStarting(this ILogger logger, int machineProcessorCount, int processorCount) { public static void ServerStarting(this ILogger logger, int machineProcessorCount, int processorCount) {
@@ -120,28 +84,5 @@ namespace Cap.Consistency
_cronJobFailed(logger, name, ex); _cronJobFailed(logger, name, ex);
} }


public static void JobFailed(this ILogger logger, Exception ex) {
_jobFailed(logger, ex);
}

public static void JobFailedWillRetry(this ILogger logger, Exception ex) {
_jobFailedWillRetry(logger, ex);
}

public static void JobRetrying(this ILogger logger, int retries) {
_jobRetrying(logger, retries, null);
}

public static void JobExecuted(this ILogger logger, double seconds) {
_jobExecuted(logger, seconds, null);
}

public static void JobCouldNotBeLoaded(this ILogger logger, int jobId, Exception ex) {
_jobCouldNotBeLoaded(logger, jobId, ex);
}

public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, int jobId, Exception ex) {
_exceptionOccuredWhileExecutingJob(logger, jobId, ex);
}
} }
} }

+ 7
- 1
src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ConsistencyBuilder.cs View File

@@ -1,6 +1,6 @@
using System; using System;
using Cap.Consistency;
using Cap.Consistency.Job; using Cap.Consistency.Job;
using Cap.Consistency.Store;


namespace Microsoft.Extensions.DependencyInjection namespace Microsoft.Extensions.DependencyInjection
{ {
@@ -45,5 +45,11 @@ namespace Microsoft.Extensions.DependencyInjection


return AddSingleton<IJob, T>(); return AddSingleton<IJob, T>();
} }

public virtual ConsistencyBuilder AddProducerClient<T>()
where T:class, IProducerClient {

return AddScoped(typeof(IProducerClient), typeof(T));
}
} }
} }

+ 2
- 3
src/Cap.Consistency/Microsoft.Extensions.DependencyInjection/ServiceCollectionExtensions.cs View File

@@ -7,7 +7,6 @@ using Cap.Consistency.Consumer;
using Cap.Consistency.Infrastructure; using Cap.Consistency.Infrastructure;
using Cap.Consistency.Internal; using Cap.Consistency.Internal;
using Cap.Consistency.Job; using Cap.Consistency.Job;
using Cap.Consistency.Store;
using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.DependencyInjection.Extensions;


namespace Microsoft.Extensions.DependencyInjection namespace Microsoft.Extensions.DependencyInjection
@@ -48,8 +47,6 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<IConsumerInvokerFactory, ConsumerInvokerFactory>(); services.TryAddSingleton<IConsumerInvokerFactory, ConsumerInvokerFactory>();
services.TryAddSingleton<MethodMatcherCache>(); services.TryAddSingleton<MethodMatcherCache>();


services.TryAddScoped<ConsistencyMessageManager>();

services.AddSingleton<IProcessingServer, ConsumerHandler>(); services.AddSingleton<IProcessingServer, ConsumerHandler>();
services.AddSingleton<IProcessingServer, JobProcessingServer>(); services.AddSingleton<IProcessingServer, JobProcessingServer>();
services.AddSingleton<IBootstrapper, DefaultBootstrapper>(); services.AddSingleton<IBootstrapper, DefaultBootstrapper>();
@@ -58,6 +55,8 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<IJob, CapJob>(); services.TryAddSingleton<IJob, CapJob>();
services.TryAddTransient<DefaultCronJobRegistry>(); services.TryAddTransient<DefaultCronJobRegistry>();


services.TryAddScoped<IProducerClient, DefaultProducerClient>();

return new ConsistencyBuilder(services); return new ConsistencyBuilder(services);
} }




+ 78
- 0
src/Cap.Consistency/OperateResult.cs View File

@@ -0,0 +1,78 @@
using System.Collections.Generic;
using System.Linq;

namespace Cap.Consistency
{
/// <summary>
/// Represents the result of an consistent message operation.
/// </summary>
public class OperateResult
{
// ReSharper disable once InconsistentNaming
private static readonly OperateResult _success = new OperateResult { Succeeded = true };

// ReSharper disable once FieldCanBeMadeReadOnly.Local
private List<OperateError> _errors = new List<OperateError>();

/// <summary>
/// Flag indicating whether if the operation succeeded or not.
/// </summary>
public bool Succeeded { get; set; }

/// <summary>
/// An <see cref="IEnumerable{T}"/> of <see cref="OperateError"/>s containing an errors
/// that occurred during the operation.
/// </summary>
/// <value>An <see cref="IEnumerable{T}"/> of <see cref="OperateError"/>s.</value>
public IEnumerable<OperateError> Errors => _errors;

/// <summary>
/// Returns an <see cref="OperateResult"/> indicating a successful identity operation.
/// </summary>
/// <returns>An <see cref="OperateResult"/> indicating a successful operation.</returns>
public static OperateResult Success => _success;

/// <summary>
/// Creates an <see cref="OperateResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable.
/// </summary>
/// <param name="errors">An optional array of <see cref="OperateError"/>s which caused the operation to fail.</param>
/// <returns>An <see cref="OperateResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable.</returns>
public static OperateResult Failed(params OperateError[] errors) {
var result = new OperateResult { Succeeded = false };
if (errors != null) {
result._errors.AddRange(errors);
}
return result;
}

/// <summary>
/// Converts the value of the current <see cref="OperateResult"/> object to its equivalent string representation.
/// </summary>
/// <returns>A string representation of the current <see cref="OperateResult"/> object.</returns>
/// <remarks>
/// If the operation was successful the ToString() will return "Succeeded" otherwise it returned
/// "Failed : " followed by a comma delimited list of error codes from its <see cref="Errors"/> collection, if any.
/// </remarks>
public override string ToString() {
return Succeeded ?
"Succeeded" :
string.Format("{0} : {1}", "Failed", string.Join(",", Errors.Select(x => x.Code).ToList()));
}
}

/// <summary>
/// Encapsulates an error from the operate subsystem.
/// </summary>
public class OperateError
{
/// <summary>
/// Gets or sets ths code for this error.
/// </summary>
public string Code { get; set; }

/// <summary>
/// Gets or sets the description for this error.
/// </summary>
public string Description { get; set; }
}
}

Loading…
Cancel
Save