diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 2ea6e75..7f29ea1 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -34,7 +34,7 @@ This Code of Conduct applies both within project spaces and in public spaces whe ## Enforcement -Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at m.r992@hotmail.com. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately. +Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at yangxiaodong1214@126.com. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately. Project maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project's leadership. diff --git a/README.md b/README.md index bcd07ba..72fe766 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,9 @@ -

-   中文 -

- -# CAP - -[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/master.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) +# CAP                       [中文](https://github.com/dotnetcore/CAP/blob/develop/README.zh-cn.md) +[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/develop.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) -[![NuGet](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) +[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) +[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) +[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) CAP is a .Net Standard library to achieve eventually consistent in distributed architectures system like SOA,MicroService. It is lightweight,easy to use and efficiently. @@ -17,7 +14,7 @@ CAP is a library that used in an ASP.NET Core project, Of Course you can ues it You can think of CAP as an EventBus because it has all the features of EventBus, and CAP provides a easier way to handle the publishing and subscribing than EventBus. -CAP has the function of Message Presistence, and it makes messages reliability when your service is restarted or down. CAP provides a Publish Service based on Microsoft DI that integrates seamlessly with your business services and supports strong consistency transactions. +CAP has the function of Message Persistence, and it makes messages reliability when your service is restarted or down. CAP provides a Publish Service based on Microsoft DI that integrates seamlessly with your business services and supports strong consistency transactions. This is a diagram of the CAP working in the ASP.NET Core MicroService architecture: @@ -27,26 +24,30 @@ This is a diagram of the CAP working in the ASP.NET Core MicroService architectu ## Getting Started -### NuGet (Coming soon) +### NuGet You can run the following command to install the CAP in your project. +``` +PM> Install-Package DotNetCore.CAP +``` + If your Message Queue is using Kafka, you can: ``` -PM> Install-Package DotNetCore.CAP.Kafka -Pre +PM> Install-Package DotNetCore.CAP.Kafka ``` -or RabbitMQ: +If your Message Queue is using RabbitMQ, you can: ``` -PM> Install-Package DotNetCore.CAP.RabbitMQ -Pre +PM> Install-Package DotNetCore.CAP.RabbitMQ ``` -CAP provides EntityFramework as default database store extension : +CAP provides EntityFramework as default database store extension (The MySQL version is under development): ``` -PM> Install-Package DotNetCore.CAP.EntityFrameworkCore -Pre +PM> Install-Package DotNetCore.CAP.SqlServer ``` ### Configuration @@ -58,11 +59,23 @@ public void ConfigureServices(IServiceCollection services) { ...... - services.AddDbContext(); + services.AddDbContext(); - services.AddCap() - .AddEntityFrameworkStores() - .AddKafka(x => x.Servers = "localhost:9092"); + services.AddCap(x => + { + // If your SqlServer is using EF for data operations, you need to add the following configuration: + // Notice: You don't need to config x.UseSqlServer(""") again! + x.UseEntityFramework(); + + // If you are using Dapper,you need to add the config: + x.UseSqlServer("Your ConnectionStrings"); + + // If your Message Queue is using RabbitMQ you need to add the config: + x.UseRabbitMQ("localhost"); + + // If your Message Queue is using Kafka you need to add the config: + x.UseKafka("localhost"); + }); } public void Configure(IApplicationBuilder app) @@ -92,11 +105,23 @@ public class PublishController : Controller [Route("~/checkAccount")] public async Task PublishMessage() { - //Specifies the message header and content to be sent + // Specifies the message header and content to be sent await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }); return Ok(); } + + [Route("~/checkAccountWithTrans")] + public async Task PublishMessageWithTransaction([FromServices]AppDbContext dbContext) + { + using (var trans = dbContext.Database.BeginTransaction()) + { + await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }); + + trans.Commit(); + } + return Ok(); + } } ``` diff --git a/README.zh-cn.md b/README.zh-cn.md index bbb356b..732ae68 100644 --- a/README.zh-cn.md +++ b/README.zh-cn.md @@ -1,12 +1,9 @@ -

-English -

- -# CAP                        -[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/master.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) +# CAP                       [English](https://github.com/dotnetcore/CAP/blob/develop/README.md) +[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/develop.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) -[![NuGet](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) -[![Member Project Of .NET China Foundation](https://github.com/dotnetcore/Home/raw/master/icons/member-project-of-netchina.png)](https://github.com/dotnetcore) +[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) +[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) +[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) CAP 是一个在分布式系统(SOA、MicroService)中实现最终一致性的库,它具有轻量级、易使用、高性能等特点。 @@ -32,25 +29,25 @@ CAP 具有消息持久化的功能,当你的服务进行重启或者宕机时 你可以运行以下下命令在你的项目中安装 CAP。 ``` -PM> Install-Package DotNetCore.CAP -Pre +PM> Install-Package DotNetCore.CAP ``` 如果你的消息队列使用的是 Kafka 的话,你可以: ``` -PM> Install-Package DotNetCore.CAP.Kafka -Pre +PM> Install-Package DotNetCore.CAP.Kafka ``` 如果你的消息队列使用的是 RabbitMQ 的话,你可以: ``` -PM> Install-Package DotNetCore.CAP.RabbitMQ -Pre +PM> Install-Package DotNetCore.CAP.RabbitMQ ``` CAP 默认提供了 Sql Server 的扩展作为数据库存储(MySql的正在开发中): ``` -PM> Install-Package DotNetCore.CAP.SqlServer -Pre +PM> Install-Package DotNetCore.CAP.SqlServer ``` ### Configuration @@ -174,7 +171,7 @@ namespace xxx.Service public class SubscriberService: ISubscriberService, ICapSubscribe { - [KafkaTopic("xxx.services.account.check")] + [CapSubscribe("xxx.services.account.check")] public void CheckReceivedMessage(Person person) { diff --git a/build/version.cake b/build/version.cake index f0db1ac..e1069fc 100644 --- a/build/version.cake +++ b/build/version.cake @@ -77,7 +77,7 @@ public class BuildParameters var suffix = versionQuality; if (!IsTagged) { - suffix += (IsCI ? "ci-" : "dv-") + Util.CreateStamp(); + suffix += (IsCI ? "preview-" : "dv-") + Util.CreateStamp(); } suffix = string.IsNullOrWhiteSpace(suffix) ? null : suffix; diff --git a/build/version.props b/build/version.props index dacf358..a3d04c3 100644 --- a/build/version.props +++ b/build/version.props @@ -2,7 +2,7 @@ 1 0 - 0 + 1 $(VersionMajor).$(VersionMinor).$(VersionPatch) diff --git a/samples/Sample.Kafka/Controllers/ValuesController.cs b/samples/Sample.Kafka/Controllers/ValuesController.cs index 13142a5..6783b19 100644 --- a/samples/Sample.Kafka/Controllers/ValuesController.cs +++ b/samples/Sample.Kafka/Controllers/ValuesController.cs @@ -1,7 +1,6 @@ using System; using System.Threading.Tasks; using DotNetCore.CAP; -using DotNetCore.CAP.RabbitMQ; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Mvc; diff --git a/samples/Sample.Kafka/Sample.Kafka.csproj b/samples/Sample.Kafka/Sample.Kafka.csproj index 3788806..5c9e38f 100644 --- a/samples/Sample.Kafka/Sample.Kafka.csproj +++ b/samples/Sample.Kafka/Sample.Kafka.csproj @@ -24,7 +24,7 @@ - + diff --git a/samples/Sample.Kafka/Startup.cs b/samples/Sample.Kafka/Startup.cs index 5cb3fd8..add80bb 100644 --- a/samples/Sample.Kafka/Startup.cs +++ b/samples/Sample.Kafka/Startup.cs @@ -29,7 +29,7 @@ namespace Sample.Kafka { x.UseEntityFramework(); //x.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"); - x.UseRabbitMQ(o => { o.HostName = "192.168.2.206"; o.UserName = "admin"; o.Password = "123123"; }); + x.UseKafka("localhost:9092"); }); // Add framework services. diff --git a/src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs index 65314f4..fa6dd2b 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs @@ -8,13 +8,13 @@ namespace Microsoft.Extensions.DependencyInjection { public static CapOptions UseKafka(this CapOptions options, string bootstrapServers) { - return options.UseRabbitMQ(opt => + return options.UseKafka(opt => { opt.Servers = bootstrapServers; }); } - public static CapOptions UseRabbitMQ(this CapOptions options, Action configure) + public static CapOptions UseKafka(this CapOptions options, Action configure) { if (configure == null) throw new ArgumentNullException(nameof(configure)); diff --git a/src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs b/src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs index a914df6..06ec6e6 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs @@ -6,35 +6,35 @@ namespace DotNetCore.CAP public class CapSubscribeAttribute : TopicAttribute { public CapSubscribeAttribute(string name) - : this(name, 0) + : base(name) { } - /// - /// Not support - /// - public CapSubscribeAttribute(string name, int partition) - : this(name, partition, 0) - { - } + ///// + ///// Not support + ///// + //public CapSubscribeAttribute(string name, int partition) + // : this(name, partition, 0) + //{ + //} - /// - /// Not support - /// - public CapSubscribeAttribute(string name, int partition, long offset) - : base(name) - { - Offset = offset; - Partition = partition; - } + ///// + ///// Not support + ///// + //public CapSubscribeAttribute(string name, int partition, long offset) + // : base(name) + //{ + // Offset = offset; + // Partition = partition; + //} - public int Partition { get; } + //public int Partition { get; } - public long Offset { get; } + //public long Offset { get; } - public bool IsPartition => Partition == 0; + //public bool IsPartition => Partition == 0; - public bool IsOffset => Offset == 0; + //public bool IsOffset => Offset == 0; public override string ToString() { diff --git a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs index 3cba0a0..5f84093 100644 --- a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs +++ b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs @@ -7,20 +7,28 @@ using DotNetCore.CAP.Models; using DotNetCore.CAP.Processor; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.Logging; namespace DotNetCore.CAP.SqlServer { public class CapPublisher : ICapPublisher { + private readonly ILogger _logger; private readonly SqlServerOptions _options; private readonly DbContext _dbContext; + protected bool IsCapOpenedTrans { get; set; } + protected bool IsUsingEF { get; } + protected IServiceProvider ServiceProvider { get; } - public CapPublisher(IServiceProvider provider, SqlServerOptions options) + public CapPublisher(IServiceProvider provider, + ILogger logger, + SqlServerOptions options) { ServiceProvider = provider; + _logger = logger; _options = options; if (_options.DbContextType != null) @@ -30,55 +38,130 @@ namespace DotNetCore.CAP.SqlServer } } + public void Publish(string name, string content) + { + CheckIsUsingEF(name); + + PublishCore(name, content); + } + public Task PublishAsync(string name, string content) { - if (name == null) throw new ArgumentNullException(nameof(name)); - if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + - " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + CheckIsUsingEF(name); + + return PublishCoreAsync(name, content); + } - return Publish(name, content); + public void Publish(string name, T contentObj) + { + CheckIsUsingEF(name); + + var content = Helper.ToJson(contentObj); + + PublishCore(name, content); } public Task PublishAsync(string name, T contentObj) { - if (name == null) throw new ArgumentNullException(nameof(name)); - if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + - " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + CheckIsUsingEF(name); var content = Helper.ToJson(contentObj); - return Publish(name, content); + + return PublishCoreAsync(name, content); } - public Task PublishAsync(string name, string content, IDbConnection dbConnection) + public void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) { - if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); - if (name == null) throw new ArgumentNullException(nameof(name)); - if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + IsCapOpenedTrans = true; - var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); - return PublishWithTrans(name, content, dbConnection, dbTransaction); + PublishWithTrans(name, content, dbConnection, dbTransaction); } - public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + IsCapOpenedTrans = true; + + return PublishWithTransAsync(name, content, dbConnection, dbTransaction); + } + + public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + var content = Helper.ToJson(contentObj); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + + PublishWithTrans(name, content, dbConnection, dbTransaction); + } + + public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + var content = Helper.ToJson(contentObj); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + + return PublishWithTransAsync(name, content, dbConnection, dbTransaction); + } + + #region private methods + + private void CheckIsUsingEF(string name) { - if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); if (name == null) throw new ArgumentNullException(nameof(name)); - if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); - if (dbTransaction == null) throw new ArgumentNullException(nameof(dbTransaction)); + if (!IsUsingEF) + throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + + " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + } - return PublishWithTrans(name, content, dbConnection, dbTransaction); + private void CheckIsAdoNet(string name) + { + if (name == null) throw new ArgumentNullException(nameof(name)); + if (IsUsingEF) + throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); } - private async Task Publish(string name, string content) + private async Task PublishCoreAsync(string name, string content) { var connection = _dbContext.Database.GetDbConnection(); var transaction = _dbContext.Database.CurrentTransaction; + IsCapOpenedTrans = transaction == null; transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); var dbTransaction = transaction.GetDbTransaction(); - await PublishWithTrans(name, content, connection, dbTransaction); + await PublishWithTransAsync(name, content, connection, dbTransaction); + } + + private void PublishCore(string name, string content) + { + var connection = _dbContext.Database.GetDbConnection(); + var transaction = _dbContext.Database.CurrentTransaction; + IsCapOpenedTrans = transaction == null; + transaction = transaction ?? _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); + var dbTransaction = transaction.GetDbTransaction(); + PublishWithTrans(name, content, connection, dbTransaction); } - private async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) { var message = new CapPublishedMessage { @@ -86,11 +169,46 @@ namespace DotNetCore.CAP.SqlServer Content = content, StatusName = StatusName.Scheduled }; + await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction); - var sql = $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; - await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction); + _logger.LogInformation("Message has been persisted in the database. name:" + name); + + if (IsCapOpenedTrans) + { + dbTransaction.Commit(); + dbTransaction.Dispose(); + dbConnection.Dispose(); + } PublishQueuer.PulseEvent.Set(); } + + private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + { + var message = new CapPublishedMessage + { + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; + var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction); + + _logger.LogInformation("Message has been persisted in the database. name:" + name); + + if (IsCapOpenedTrans) + { + dbTransaction.Commit(); + dbTransaction.Dispose(); + dbConnection.Dispose(); + } + PublishQueuer.PulseEvent.Set(); + } + + private string PrepareSql() + { + return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; + } + + #endregion private methods } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs index b5d56c3..a324a98 100644 --- a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs +++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.Threading.Tasks; @@ -55,6 +56,16 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];"; } } + public async Task> GetFailedPublishedMessages() + { + var sql = $"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'"; + + using (var connection = new SqlConnection(_options.ConnectionString)) + { + return await connection.QueryAsync(sql); + } + } + // CapReceviedMessage public async Task StoreReceivedMessageAsync(CapReceivedMessage message) @@ -89,6 +100,15 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; } } + public async Task> GetFailedReceviedMessages() + { + var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'"; + using (var connection = new SqlConnection(_options.ConnectionString)) + { + return await connection.QueryAsync(sql); + } + } + public void Dispose() { } diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 334c709..8128441 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -22,14 +22,19 @@ namespace DotNetCore.CAP } /// - /// Productor job polling delay time. Default is 8 sec. + /// Productor job polling delay time. Default is 5 sec. /// - public int PollingDelay { get; set; } = 8; + public int PollingDelay { get; set; } = 5; + + /// + /// Failed messages polling delay time. Default is 2 min. + /// + public TimeSpan FailedMessageWaitingInterval = TimeSpan.FromMinutes(2); /// /// We’ll send a POST request to the URL below with details of any subscribed events. /// - public WebHook WebHook { get; set; } + public WebHook WebHook => throw new NotSupportedException(); /// /// Registers an extension that will be executed when building services. diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs index 7e3908d..60a5fca 100644 --- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs +++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs @@ -47,6 +47,7 @@ namespace Microsoft.Extensions.DependencyInjection //Processors services.AddTransient(); services.AddTransient(); + services.AddTransient(); services.AddTransient(); //Executors diff --git a/src/DotNetCore.CAP/ICapPublisher.cs b/src/DotNetCore.CAP/ICapPublisher.cs index 95f28b0..a45061c 100644 --- a/src/DotNetCore.CAP/ICapPublisher.cs +++ b/src/DotNetCore.CAP/ICapPublisher.cs @@ -9,7 +9,7 @@ namespace DotNetCore.CAP public interface ICapPublisher { /// - /// Publish a string message to specified topic. + /// (EntityFramework) Asynchronous publish a message. /// /// If you are using the EntityFramework, you need to configure the DbContextType first. /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. @@ -20,7 +20,18 @@ namespace DotNetCore.CAP Task PublishAsync(string name, string content); /// - /// Publis a object message to specified topic. + /// (EntityFramework) Publish a message. + /// + /// If you are using the EntityFramework, you need to configure the DbContextType first. + /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. + /// + /// + /// the topic name or exchange router key. + /// message body content. + void Publish(string name, string content); + + /// + /// (EntityFramework) Asynchronous publish a object message. /// /// If you are using the EntityFramework, you need to configure the DbContextType first. /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. @@ -28,24 +39,55 @@ namespace DotNetCore.CAP /// /// The type of conetent object. /// the topic name or exchange router key. - /// object instance that will be serialized of json. + /// message body content, that will be serialized of json. Task PublishAsync(string name, T contentObj); /// - /// Publish a string message to specified topic with transacton. + /// (EntityFramework) Publish a object message. + /// + /// If you are using the EntityFramework, you need to configure the DbContextType first. + /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. + /// /// + /// The type of conetent object. /// the topic name or exchange router key. - /// message body content. - /// the dbConnection of - Task PublishAsync(string name, string content, IDbConnection dbConnection); + /// message body content, that will be serialized of json. + void Publish(string name, T contentObj); /// - /// Publish a string message to specified topic with transacton. + /// (ado.net) Asynchronous publish a message. + /// + /// the topic name or exchange router key. + /// message body content + /// the connection of + /// the transaction of + Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null); + + /// + /// (ado.net) Publish a message. /// /// the topic name or exchange router key. /// message body content. /// the connection of /// the transaction of - Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction); + void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null); + + /// + /// (ado.net) Asynchronous publish a object message. + /// + /// the topic name or exchange router key. + /// message body content, that will be serialized of json. + /// the connection of + /// the transaction of + Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null); + + /// + /// (ado.net) Publish a object message. + /// + /// the topic name or exchange router key. + /// message body content, that will be serialized of json. + /// the connection of + /// the transaction of + void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/IStorageConnection.cs b/src/DotNetCore.CAP/IStorageConnection.cs index 4c16a5f..e481a56 100644 --- a/src/DotNetCore.CAP/IStorageConnection.cs +++ b/src/DotNetCore.CAP/IStorageConnection.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using DotNetCore.CAP.Models; @@ -27,6 +28,11 @@ namespace DotNetCore.CAP /// Task GetNextPublishedMessageToBeEnqueuedAsync(); + /// + /// Returns executed failed messages. + /// + Task> GetFailedPublishedMessages(); + // Received messages /// @@ -46,6 +52,10 @@ namespace DotNetCore.CAP /// Task GetNextReceviedMessageToBeEnqueuedAsync(); + /// + /// Returns executed failed message. + /// + Task> GetFailedReceviedMessages(); //----------------------------------------- /// diff --git a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs index 5843382..d8328bc 100644 --- a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs +++ b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs @@ -117,6 +117,7 @@ namespace DotNetCore.CAP.Processor returnedProcessors.Add(_provider.GetRequiredService()); returnedProcessors.Add(_provider.GetRequiredService()); + returnedProcessors.Add(_provider.GetRequiredService()); returnedProcessors.Add(_provider.GetRequiredService()); diff --git a/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs b/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs new file mode 100644 index 0000000..9d5f12d --- /dev/null +++ b/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using DotNetCore.CAP.Processor.States; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.Processor +{ + public class FailedJobProcessor : IProcessor + { + private readonly CapOptions _options; + private readonly ILogger _logger; + private readonly IServiceProvider _provider; + private readonly IStateChanger _stateChanger; + + private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); + private readonly TimeSpan _waitingInterval; + + public FailedJobProcessor( + IOptions options, + ILogger logger, + IServiceProvider provider, + IStateChanger stateChanger) + { + _options = options.Value; + _logger = logger; + _provider = provider; + _stateChanger = stateChanger; + _waitingInterval = _options.FailedMessageWaitingInterval; + } + + public async Task ProcessAsync(ProcessingContext context) + { + if (context == null) + throw new ArgumentNullException(nameof(context)); + + using (var scope = _provider.CreateScope()) + { + var provider = scope.ServiceProvider; + var connection = provider.GetRequiredService(); + + await Task.WhenAll( + ProcessPublishedAsync(connection, context), + ProcessReceivededAsync(connection, context)); + + DefaultDispatcher.PulseEvent.Set(); + + await context.WaitAsync(_waitingInterval); + } + } + + private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context) + { + var messages = await connection.GetFailedPublishedMessages(); + foreach (var message in messages) + { + using (var transaction = connection.CreateTransaction()) + { + _stateChanger.ChangeState(message, new EnqueuedState(), transaction); + await transaction.CommitAsync(); + } + context.ThrowIfStopping(); + await context.WaitAsync(_delay); + } + } + + private async Task ProcessReceivededAsync(IStorageConnection connection, ProcessingContext context) + { + var messages = await connection.GetFailedReceviedMessages(); + foreach (var message in messages) + { + using (var transaction = connection.CreateTransaction()) + { + _stateChanger.ChangeState(message, new EnqueuedState(), transaction); + await transaction.CommitAsync(); + } + context.ThrowIfStopping(); + await context.WaitAsync(_delay); + } + } + } +} diff --git a/src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs b/src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs index 4537c17..e7f261a 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs @@ -12,11 +12,11 @@ namespace DotNetCore.CAP.Processor { public class PublishQueuer : IProcessor { - private ILogger _logger; - private CapOptions _options; - private IStateChanger _stateChanger; - private IServiceProvider _provider; - private TimeSpan _pollingDelay; + private readonly ILogger _logger; + private readonly CapOptions _options; + private readonly IStateChanger _stateChanger; + private readonly IServiceProvider _provider; + private readonly TimeSpan _pollingDelay; public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); diff --git a/src/DotNetCore.CAP/Processor/RetryBehavior.cs b/src/DotNetCore.CAP/Processor/RetryBehavior.cs index 1421fae..4a1f3ba 100644 --- a/src/DotNetCore.CAP/Processor/RetryBehavior.cs +++ b/src/DotNetCore.CAP/Processor/RetryBehavior.cs @@ -16,7 +16,7 @@ namespace DotNetCore.CAP.Processor static RetryBehavior() { - DefaultRetryCount = 25; + DefaultRetryCount = 15; DefaultRetryInThunk = retries => (int)Math.Round(Math.Pow(retries - 1, 4) + 15 + (_random.Next(30) * (retries))); diff --git a/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs b/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs index 2b18691..0bd3c8f 100644 --- a/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs +++ b/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs @@ -61,6 +61,26 @@ namespace DotNetCore.CAP.Test private class MyProducerService : ICapPublisher { + public void Publish(string name, string content) + { + throw new NotImplementedException(); + } + + public void Publish(string name, T contentObj) + { + throw new NotImplementedException(); + } + + public void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + throw new NotImplementedException(); + } + + public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + throw new NotImplementedException(); + } + public Task PublishAsync(string topic, string content) { throw new NotImplementedException(); @@ -80,6 +100,11 @@ namespace DotNetCore.CAP.Test { throw new NotImplementedException(); } + + public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + throw new NotImplementedException(); + } } } } \ No newline at end of file