From 7d7cd57d5d09eada65cc3ae5ee4e3129ee88bc24 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Wed, 19 Jul 2017 22:08:51 +0800 Subject: [PATCH 01/23] add api_key. --- appveyor.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/appveyor.yml b/appveyor.yml index 203f11c..3d78602 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -17,6 +17,6 @@ deploy: on: appveyor_repo_tag: true api_key: - secure: U62rpGTEqztrUO4ncscm4XSaAoCSmWwT/rOWO/2JJS44psJvl0QpjRL0o0ughMoY + secure: /gak8VxtAbZvOTqON513KwsK5BtDUmoPjjzCMu+tn2i+vkupZbjnIKq/XnP4GGgv skip_symbols: true artifact: /artifacts\/packages\/.+\.nupkg/ From 22362abb10e70203a5893be9c8dde512304aa928 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Wed, 19 Jul 2017 22:10:03 +0800 Subject: [PATCH 02/23] update apikey --- appveyor.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/appveyor.yml b/appveyor.yml index e86c099..3d78602 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -17,6 +17,6 @@ deploy: on: appveyor_repo_tag: true api_key: - secure: BmfEd8I5evTtjtvB6KJsCGei+VVOYwY72pmNCzVA+G7iM0JK/YdIC58hAFoHvSW/ + secure: /gak8VxtAbZvOTqON513KwsK5BtDUmoPjjzCMu+tn2i+vkupZbjnIKq/XnP4GGgv skip_symbols: true artifact: /artifacts\/packages\/.+\.nupkg/ From 303713f17142b34b263b9c0bf32b3b2dc5fda3f0 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Wed, 19 Jul 2017 23:04:02 +0800 Subject: [PATCH 03/23] if not user opend transaction, cap will auto commit transaction. --- src/DotNetCore.CAP.SqlServer/CapPublisher.cs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs index 3cba0a0..6889c27 100644 --- a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs +++ b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs @@ -15,7 +15,10 @@ namespace DotNetCore.CAP.SqlServer private readonly SqlServerOptions _options; private readonly DbContext _dbContext; + protected bool IsCapOpenedTrans { get; set; } + protected bool IsUsingEF { get; } + protected IServiceProvider ServiceProvider { get; } public CapPublisher(IServiceProvider provider, SqlServerOptions options) @@ -56,6 +59,7 @@ namespace DotNetCore.CAP.SqlServer if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + IsCapOpenedTrans = true; return PublishWithTrans(name, content, dbConnection, dbTransaction); } @@ -73,12 +77,13 @@ namespace DotNetCore.CAP.SqlServer { var connection = _dbContext.Database.GetDbConnection(); var transaction = _dbContext.Database.CurrentTransaction; + IsCapOpenedTrans = transaction == null; transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); var dbTransaction = transaction.GetDbTransaction(); await PublishWithTrans(name, content, connection, dbTransaction); } - private async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + protected virtual async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) { var message = new CapPublishedMessage { @@ -89,7 +94,11 @@ namespace DotNetCore.CAP.SqlServer var sql = $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction); - + if (IsCapOpenedTrans) + { + dbTransaction.Commit(); + dbConnection.Dispose(); + } PublishQueuer.PulseEvent.Set(); } } From f9f6b0bbcf4edc601b5d0f1d7f13d94a3ec326bd Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Thu, 20 Jul 2017 09:32:53 +0800 Subject: [PATCH 04/23] modify version prefix. --- build/version.cake | 2 +- build/version.props | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build/version.cake b/build/version.cake index f0db1ac..e1069fc 100644 --- a/build/version.cake +++ b/build/version.cake @@ -77,7 +77,7 @@ public class BuildParameters var suffix = versionQuality; if (!IsTagged) { - suffix += (IsCI ? "ci-" : "dv-") + Util.CreateStamp(); + suffix += (IsCI ? "preview-" : "dv-") + Util.CreateStamp(); } suffix = string.IsNullOrWhiteSpace(suffix) ? null : suffix; diff --git a/build/version.props b/build/version.props index dacf358..a3d04c3 100644 --- a/build/version.props +++ b/build/version.props @@ -2,7 +2,7 @@ 1 0 - 0 + 1 $(VersionMajor).$(VersionMinor).$(VersionPatch) From 0a0cd8a5107680a0ece709c6266ccce17880d3c2 Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Thu, 20 Jul 2017 09:33:11 +0800 Subject: [PATCH 05/23] update samples. --- samples/Sample.Kafka/Controllers/ValuesController.cs | 1 - samples/Sample.Kafka/Sample.Kafka.csproj | 2 +- samples/Sample.Kafka/Startup.cs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/samples/Sample.Kafka/Controllers/ValuesController.cs b/samples/Sample.Kafka/Controllers/ValuesController.cs index 13142a5..6783b19 100644 --- a/samples/Sample.Kafka/Controllers/ValuesController.cs +++ b/samples/Sample.Kafka/Controllers/ValuesController.cs @@ -1,7 +1,6 @@ using System; using System.Threading.Tasks; using DotNetCore.CAP; -using DotNetCore.CAP.RabbitMQ; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Mvc; diff --git a/samples/Sample.Kafka/Sample.Kafka.csproj b/samples/Sample.Kafka/Sample.Kafka.csproj index 3788806..5c9e38f 100644 --- a/samples/Sample.Kafka/Sample.Kafka.csproj +++ b/samples/Sample.Kafka/Sample.Kafka.csproj @@ -24,7 +24,7 @@ - + diff --git a/samples/Sample.Kafka/Startup.cs b/samples/Sample.Kafka/Startup.cs index 5cb3fd8..add80bb 100644 --- a/samples/Sample.Kafka/Startup.cs +++ b/samples/Sample.Kafka/Startup.cs @@ -29,7 +29,7 @@ namespace Sample.Kafka { x.UseEntityFramework(); //x.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"); - x.UseRabbitMQ(o => { o.HostName = "192.168.2.206"; o.UserName = "admin"; o.Password = "123123"; }); + x.UseKafka("localhost:9092"); }); // Add framework services. From df7e286fd63862e17bb3fee1983923077765523f Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Thu, 20 Jul 2017 09:53:03 +0800 Subject: [PATCH 06/23] add nuget preview bagdes. --- README.zh-cn.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.zh-cn.md b/README.zh-cn.md index bbb356b..7766df0 100644 --- a/README.zh-cn.md +++ b/README.zh-cn.md @@ -5,7 +5,8 @@ # CAP                        [![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/master.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) -[![NuGet](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) +[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) +[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) [![Member Project Of .NET China Foundation](https://github.com/dotnetcore/Home/raw/master/icons/member-project-of-netchina.png)](https://github.com/dotnetcore) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) From a03f4952e254e013d8d9c724e5a2ab27775b1841 Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Thu, 20 Jul 2017 09:58:09 +0800 Subject: [PATCH 07/23] update badges. --- README.md | 8 +++++--- README.zh-cn.md | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index bcd07ba..692bdf1 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,14 @@

-   中文 +   中文

# CAP -[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/master.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) +[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/develop.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) -[![NuGet](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) +[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) +[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) +[![Member Project Of .NET China Foundation](https://github.com/dotnetcore/Home/raw/master/icons/member-project-of-netchina.png)](https://github.com/dotnetcore) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) CAP is a .Net Standard library to achieve eventually consistent in distributed architectures system like SOA,MicroService. It is lightweight,easy to use and efficiently. diff --git a/README.zh-cn.md b/README.zh-cn.md index 7766df0..4968a4f 100644 --- a/README.zh-cn.md +++ b/README.zh-cn.md @@ -1,9 +1,9 @@

-English +English

# CAP                        -[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/master.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) +[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/develop.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) [![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) [![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) From fcfe9741039c529f25b9f4c38df6565922f87f71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B6=E7=A4=BC=E9=A3=9E?= <463936274@qq.com> Date: Thu, 20 Jul 2017 10:07:01 +0800 Subject: [PATCH 08/23] Update English version --- README.md | 52 ++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 692bdf1..e1048ea 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,12 @@ [![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/develop.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) +<<<<<<< HEAD +[![NuGet](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) +======= [![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) [![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) +>>>>>>> a03f4952e254e013d8d9c724e5a2ab27775b1841 [![Member Project Of .NET China Foundation](https://github.com/dotnetcore/Home/raw/master/icons/member-project-of-netchina.png)](https://github.com/dotnetcore) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) @@ -29,26 +33,30 @@ This is a diagram of the CAP working in the ASP.NET Core MicroService architectu ## Getting Started -### NuGet (Coming soon) +### NuGet You can run the following command to install the CAP in your project. +``` +PM> Install-Package DotNetCore.CAP -Pre +``` + If your Message Queue is using Kafka, you can: ``` PM> Install-Package DotNetCore.CAP.Kafka -Pre ``` -or RabbitMQ: +If your Message Queue is using RabbitMQ, you can: ``` PM> Install-Package DotNetCore.CAP.RabbitMQ -Pre ``` -CAP provides EntityFramework as default database store extension : +CAP provides EntityFramework as default database store extension (The MySQL version is under development): ``` -PM> Install-Package DotNetCore.CAP.EntityFrameworkCore -Pre +PM> Install-Package DotNetCore.CAP.SqlServer -Pre ``` ### Configuration @@ -60,11 +68,23 @@ public void ConfigureServices(IServiceCollection services) { ...... - services.AddDbContext(); + services.AddDbContext(); - services.AddCap() - .AddEntityFrameworkStores() - .AddKafka(x => x.Servers = "localhost:9092"); + services.AddCap(x => + { + // If your SqlServer is using EF for data operations, you need to add the following configuration: + // Notice: You don't need to config x.UseSqlServer(""") again! + x.UseEntityFramework(); + + // If you are using Dapper,you need to add the config: + x.UseSqlServer("Your ConnectionStrings"); + + // If your Message Queue is using RabbitMQ you need to add the config: + x.UseRabbitMQ("localhost"); + + // If your Message Queue is using Kafka you need to add the config: + x.UseKafka("localhost"); + }); } public void Configure(IApplicationBuilder app) @@ -94,11 +114,23 @@ public class PublishController : Controller [Route("~/checkAccount")] public async Task PublishMessage() { - //Specifies the message header and content to be sent + // Specifies the message header and content to be sent await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }); return Ok(); } + + [Route("~/checkAccountWithTrans")] + public async Task PublishMessageWithTransaction([FromServices]AppDbContext dbContext) + { + using (var trans = dbContext.Database.BeginTransaction()) + { + await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }); + + trans.Commit(); + } + return Ok(); + } } ``` @@ -148,7 +180,7 @@ namespace xxx.Service public class SubscriberService: ISubscriberService, ICapSubscribe { - [CapSubscribe("xxx.services.account.check")] + [KafkaTopic("xxx.services.account.check")] public void CheckReceivedMessage(Person person) { From 0381a79eb3d5ad19623183494bb391e37cd98da6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B6=E7=A4=BC=E9=A3=9E?= <463936274@qq.com> Date: Thu, 20 Jul 2017 10:13:21 +0800 Subject: [PATCH 09/23] Update README.md in develop --- README.md | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/README.md b/README.md index e1048ea..000ba1c 100644 --- a/README.md +++ b/README.md @@ -2,16 +2,11 @@   中文

-# CAP - +# CAP                        [![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/develop.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) -<<<<<<< HEAD -[![NuGet](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) -======= [![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) [![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) ->>>>>>> a03f4952e254e013d8d9c724e5a2ab27775b1841 [![Member Project Of .NET China Foundation](https://github.com/dotnetcore/Home/raw/master/icons/member-project-of-netchina.png)](https://github.com/dotnetcore) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) From 45963f5e286c3ed01bbaf5db7b7321d22faef1d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B6=E7=A4=BC=E9=A3=9E?= <463936274@qq.com> Date: Thu, 20 Jul 2017 16:38:48 +0800 Subject: [PATCH 10/23] Fix Service Method attribute and nuget package command in readme --- README.md | 2 +- README.zh-cn.md | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 000ba1c..d06d483 100644 --- a/README.md +++ b/README.md @@ -175,7 +175,7 @@ namespace xxx.Service public class SubscriberService: ISubscriberService, ICapSubscribe { - [KafkaTopic("xxx.services.account.check")] + [CapSubscribe("xxx.services.account.check")] public void CheckReceivedMessage(Person person) { diff --git a/README.zh-cn.md b/README.zh-cn.md index 4968a4f..aad1901 100644 --- a/README.zh-cn.md +++ b/README.zh-cn.md @@ -33,25 +33,25 @@ CAP 具有消息持久化的功能,当你的服务进行重启或者宕机时 你可以运行以下下命令在你的项目中安装 CAP。 ``` -PM> Install-Package DotNetCore.CAP -Pre +PM> Install-Package DotNetCore.CAP ``` 如果你的消息队列使用的是 Kafka 的话,你可以: ``` -PM> Install-Package DotNetCore.CAP.Kafka -Pre +PM> Install-Package DotNetCore.CAP.Kafka ``` 如果你的消息队列使用的是 RabbitMQ 的话,你可以: ``` -PM> Install-Package DotNetCore.CAP.RabbitMQ -Pre +PM> Install-Package DotNetCore.CAP.RabbitMQ ``` CAP 默认提供了 Sql Server 的扩展作为数据库存储(MySql的正在开发中): ``` -PM> Install-Package DotNetCore.CAP.SqlServer -Pre +PM> Install-Package DotNetCore.CAP.SqlServer ``` ### Configuration @@ -175,7 +175,7 @@ namespace xxx.Service public class SubscriberService: ISubscriberService, ICapSubscribe { - [KafkaTopic("xxx.services.account.check")] + [CapSubscribe("xxx.services.account.check")] public void CheckReceivedMessage(Person person) { From 5d22762e49d8fe8328a4dedf072b7db779a4d9a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B6=E7=A4=BC=E9=A3=9E?= <463936274@qq.com> Date: Thu, 20 Jul 2017 16:40:04 +0800 Subject: [PATCH 11/23] Fix Service Method attribute and nuget package command in readme --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index d06d483..56ca005 100644 --- a/README.md +++ b/README.md @@ -33,25 +33,25 @@ This is a diagram of the CAP working in the ASP.NET Core MicroService architectu You can run the following command to install the CAP in your project. ``` -PM> Install-Package DotNetCore.CAP -Pre +PM> Install-Package DotNetCore.CAP ``` If your Message Queue is using Kafka, you can: ``` -PM> Install-Package DotNetCore.CAP.Kafka -Pre +PM> Install-Package DotNetCore.CAP.Kafka ``` If your Message Queue is using RabbitMQ, you can: ``` -PM> Install-Package DotNetCore.CAP.RabbitMQ -Pre +PM> Install-Package DotNetCore.CAP.RabbitMQ ``` CAP provides EntityFramework as default database store extension (The MySQL version is under development): ``` -PM> Install-Package DotNetCore.CAP.SqlServer -Pre +PM> Install-Package DotNetCore.CAP.SqlServer ``` ### Configuration From 4b33f27810c701e527fd493885d00bd46ccc2b54 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 20 Jul 2017 20:24:47 +0800 Subject: [PATCH 12/23] Update CODE_OF_CONDUCT.md --- CODE_OF_CONDUCT.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 2ea6e75..7f29ea1 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -34,7 +34,7 @@ This Code of Conduct applies both within project spaces and in public spaces whe ## Enforcement -Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at m.r992@hotmail.com. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately. +Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at yangxiaodong1214@126.com. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately. Project maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project's leadership. From aea31b718c990de642069f137b4b2f6a158a1ffd Mon Sep 17 00:00:00 2001 From: alexinea Date: Fri, 21 Jul 2017 11:41:03 +0800 Subject: [PATCH 13/23] modified .net core badge to svg version --- README.md | 2 +- README.zh-cn.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 56ca005..3b62c9d 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) [![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) [![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) -[![Member Project Of .NET China Foundation](https://github.com/dotnetcore/Home/raw/master/icons/member-project-of-netchina.png)](https://github.com/dotnetcore) +[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) CAP is a .Net Standard library to achieve eventually consistent in distributed architectures system like SOA,MicroService. It is lightweight,easy to use and efficiently. diff --git a/README.zh-cn.md b/README.zh-cn.md index aad1901..cff689e 100644 --- a/README.zh-cn.md +++ b/README.zh-cn.md @@ -7,7 +7,7 @@ [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) [![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) [![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) -[![Member Project Of .NET China Foundation](https://github.com/dotnetcore/Home/raw/master/icons/member-project-of-netchina.png)](https://github.com/dotnetcore) +[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) CAP 是一个在分布式系统(SOA、MicroService)中实现最终一致性的库,它具有轻量级、易使用、高性能等特点。 From 4afd7a737290f5fbb21cc5811fca8d57e78c9b2e Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Fri, 21 Jul 2017 13:21:52 +0800 Subject: [PATCH 14/23] language version relocation. --- README.md | 6 +----- README.zh-cn.md | 6 +----- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 3b62c9d..537f75d 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,4 @@ -

-   中文 -

- -# CAP                        +# CAP                       [中文](https://github.com/dotnetcore/CAP/blob/develop/README.zh-cn.md) [![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/develop.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) [![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) diff --git a/README.zh-cn.md b/README.zh-cn.md index cff689e..732ae68 100644 --- a/README.zh-cn.md +++ b/README.zh-cn.md @@ -1,8 +1,4 @@ -

-English -

- -# CAP                        +# CAP                       [English](https://github.com/dotnetcore/CAP/blob/develop/README.md) [![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/develop.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) [![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) From 33fde860a28f69736e6156ee14c98a99ba4b844e Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Fri, 21 Jul 2017 15:21:20 +0800 Subject: [PATCH 15/23] WebHook is not supported --- src/DotNetCore.CAP/CAP.Options.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 334c709..9c4a872 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -29,7 +29,7 @@ namespace DotNetCore.CAP /// /// We’ll send a POST request to the URL below with details of any subscribed events. /// - public WebHook WebHook { get; set; } + public WebHook WebHook => throw new NotSupportedException(); /// /// Registers an extension that will be executed when building services. From b167dbde9f27dddc6d4f6797f6129d05b1908cbf Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Fri, 21 Jul 2017 15:21:31 +0800 Subject: [PATCH 16/23] fix error name. --- src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs index 65314f4..fa6dd2b 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs @@ -8,13 +8,13 @@ namespace Microsoft.Extensions.DependencyInjection { public static CapOptions UseKafka(this CapOptions options, string bootstrapServers) { - return options.UseRabbitMQ(opt => + return options.UseKafka(opt => { opt.Servers = bootstrapServers; }); } - public static CapOptions UseRabbitMQ(this CapOptions options, Action configure) + public static CapOptions UseKafka(this CapOptions options, Action configure) { if (configure == null) throw new ArgumentNullException(nameof(configure)); From 14320e86e215c8d4d54bc721eeb25ba8233769bf Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Fri, 21 Jul 2017 15:22:06 +0800 Subject: [PATCH 17/23] Shielding unsupport code. --- .../CAP.SubscribeAttribute.cs | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs b/src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs index a914df6..06ec6e6 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs @@ -6,35 +6,35 @@ namespace DotNetCore.CAP public class CapSubscribeAttribute : TopicAttribute { public CapSubscribeAttribute(string name) - : this(name, 0) + : base(name) { } - /// - /// Not support - /// - public CapSubscribeAttribute(string name, int partition) - : this(name, partition, 0) - { - } + ///// + ///// Not support + ///// + //public CapSubscribeAttribute(string name, int partition) + // : this(name, partition, 0) + //{ + //} - /// - /// Not support - /// - public CapSubscribeAttribute(string name, int partition, long offset) - : base(name) - { - Offset = offset; - Partition = partition; - } + ///// + ///// Not support + ///// + //public CapSubscribeAttribute(string name, int partition, long offset) + // : base(name) + //{ + // Offset = offset; + // Partition = partition; + //} - public int Partition { get; } + //public int Partition { get; } - public long Offset { get; } + //public long Offset { get; } - public bool IsPartition => Partition == 0; + //public bool IsPartition => Partition == 0; - public bool IsOffset => Offset == 0; + //public bool IsOffset => Offset == 0; public override string ToString() { From 2ba74606771cec6afea6f09a2374f3b23cd8843f Mon Sep 17 00:00:00 2001 From: Chandan Rai Date: Fri, 21 Jul 2017 17:06:16 +0530 Subject: [PATCH 18/23] corrected minor typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 537f75d..72fe766 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ CAP is a library that used in an ASP.NET Core project, Of Course you can ues it You can think of CAP as an EventBus because it has all the features of EventBus, and CAP provides a easier way to handle the publishing and subscribing than EventBus. -CAP has the function of Message Presistence, and it makes messages reliability when your service is restarted or down. CAP provides a Publish Service based on Microsoft DI that integrates seamlessly with your business services and supports strong consistency transactions. +CAP has the function of Message Persistence, and it makes messages reliability when your service is restarted or down. CAP provides a Publish Service based on Microsoft DI that integrates seamlessly with your business services and supports strong consistency transactions. This is a diagram of the CAP working in the ASP.NET Core MicroService architecture: From 34e90105fa83322e6dbca8c855abb1adc25465df Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Sat, 22 Jul 2017 18:24:53 +0800 Subject: [PATCH 19/23] add FailedJobProcessor to infinite retry failed messages. --- .../SqlServerStorageConnection.cs | 20 +++++ src/DotNetCore.CAP/CAP.Options.cs | 9 +- .../CAP.ServiceCollectionExtensions.cs | 1 + src/DotNetCore.CAP/IStorageConnection.cs | 10 +++ .../Processor/IProcessingServer.Cap.cs | 1 + .../Processor/IProcessor.FailedJob.cs | 86 +++++++++++++++++++ 6 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs index b5d56c3..a324a98 100644 --- a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs +++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.Threading.Tasks; @@ -55,6 +56,16 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];"; } } + public async Task> GetFailedPublishedMessages() + { + var sql = $"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'"; + + using (var connection = new SqlConnection(_options.ConnectionString)) + { + return await connection.QueryAsync(sql); + } + } + // CapReceviedMessage public async Task StoreReceivedMessageAsync(CapReceivedMessage message) @@ -89,6 +100,15 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; } } + public async Task> GetFailedReceviedMessages() + { + var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'"; + using (var connection = new SqlConnection(_options.ConnectionString)) + { + return await connection.QueryAsync(sql); + } + } + public void Dispose() { } diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 9c4a872..8128441 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -22,9 +22,14 @@ namespace DotNetCore.CAP } /// - /// Productor job polling delay time. Default is 8 sec. + /// Productor job polling delay time. Default is 5 sec. /// - public int PollingDelay { get; set; } = 8; + public int PollingDelay { get; set; } = 5; + + /// + /// Failed messages polling delay time. Default is 2 min. + /// + public TimeSpan FailedMessageWaitingInterval = TimeSpan.FromMinutes(2); /// /// We’ll send a POST request to the URL below with details of any subscribed events. diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs index 7e3908d..60a5fca 100644 --- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs +++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs @@ -47,6 +47,7 @@ namespace Microsoft.Extensions.DependencyInjection //Processors services.AddTransient(); services.AddTransient(); + services.AddTransient(); services.AddTransient(); //Executors diff --git a/src/DotNetCore.CAP/IStorageConnection.cs b/src/DotNetCore.CAP/IStorageConnection.cs index 4c16a5f..e481a56 100644 --- a/src/DotNetCore.CAP/IStorageConnection.cs +++ b/src/DotNetCore.CAP/IStorageConnection.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using DotNetCore.CAP.Models; @@ -27,6 +28,11 @@ namespace DotNetCore.CAP /// Task GetNextPublishedMessageToBeEnqueuedAsync(); + /// + /// Returns executed failed messages. + /// + Task> GetFailedPublishedMessages(); + // Received messages /// @@ -46,6 +52,10 @@ namespace DotNetCore.CAP /// Task GetNextReceviedMessageToBeEnqueuedAsync(); + /// + /// Returns executed failed message. + /// + Task> GetFailedReceviedMessages(); //----------------------------------------- /// diff --git a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs index 5843382..d8328bc 100644 --- a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs +++ b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs @@ -117,6 +117,7 @@ namespace DotNetCore.CAP.Processor returnedProcessors.Add(_provider.GetRequiredService()); returnedProcessors.Add(_provider.GetRequiredService()); + returnedProcessors.Add(_provider.GetRequiredService()); returnedProcessors.Add(_provider.GetRequiredService()); diff --git a/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs b/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs new file mode 100644 index 0000000..9d5f12d --- /dev/null +++ b/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using DotNetCore.CAP.Processor.States; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.Processor +{ + public class FailedJobProcessor : IProcessor + { + private readonly CapOptions _options; + private readonly ILogger _logger; + private readonly IServiceProvider _provider; + private readonly IStateChanger _stateChanger; + + private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); + private readonly TimeSpan _waitingInterval; + + public FailedJobProcessor( + IOptions options, + ILogger logger, + IServiceProvider provider, + IStateChanger stateChanger) + { + _options = options.Value; + _logger = logger; + _provider = provider; + _stateChanger = stateChanger; + _waitingInterval = _options.FailedMessageWaitingInterval; + } + + public async Task ProcessAsync(ProcessingContext context) + { + if (context == null) + throw new ArgumentNullException(nameof(context)); + + using (var scope = _provider.CreateScope()) + { + var provider = scope.ServiceProvider; + var connection = provider.GetRequiredService(); + + await Task.WhenAll( + ProcessPublishedAsync(connection, context), + ProcessReceivededAsync(connection, context)); + + DefaultDispatcher.PulseEvent.Set(); + + await context.WaitAsync(_waitingInterval); + } + } + + private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context) + { + var messages = await connection.GetFailedPublishedMessages(); + foreach (var message in messages) + { + using (var transaction = connection.CreateTransaction()) + { + _stateChanger.ChangeState(message, new EnqueuedState(), transaction); + await transaction.CommitAsync(); + } + context.ThrowIfStopping(); + await context.WaitAsync(_delay); + } + } + + private async Task ProcessReceivededAsync(IStorageConnection connection, ProcessingContext context) + { + var messages = await connection.GetFailedReceviedMessages(); + foreach (var message in messages) + { + using (var transaction = connection.CreateTransaction()) + { + _stateChanger.ChangeState(message, new EnqueuedState(), transaction); + await transaction.CommitAsync(); + } + context.ThrowIfStopping(); + await context.WaitAsync(_delay); + } + } + } +} From 9cdd4afafe672bb7170132ff595ac957a47d3ee3 Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Sat, 22 Jul 2017 18:25:27 +0800 Subject: [PATCH 20/23] refactor. --- .../Processor/IProcessor.PublishQueuer.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs b/src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs index 4537c17..e7f261a 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs @@ -12,11 +12,11 @@ namespace DotNetCore.CAP.Processor { public class PublishQueuer : IProcessor { - private ILogger _logger; - private CapOptions _options; - private IStateChanger _stateChanger; - private IServiceProvider _provider; - private TimeSpan _pollingDelay; + private readonly ILogger _logger; + private readonly CapOptions _options; + private readonly IStateChanger _stateChanger; + private readonly IServiceProvider _provider; + private readonly TimeSpan _pollingDelay; public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); From 1c03de4f944ea73fd9f5a19a5e014dba2b65ab3f Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Sat, 22 Jul 2017 18:25:54 +0800 Subject: [PATCH 21/23] update default retrycount 15 to 25. --- src/DotNetCore.CAP/Processor/RetryBehavior.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DotNetCore.CAP/Processor/RetryBehavior.cs b/src/DotNetCore.CAP/Processor/RetryBehavior.cs index 1421fae..4a1f3ba 100644 --- a/src/DotNetCore.CAP/Processor/RetryBehavior.cs +++ b/src/DotNetCore.CAP/Processor/RetryBehavior.cs @@ -16,7 +16,7 @@ namespace DotNetCore.CAP.Processor static RetryBehavior() { - DefaultRetryCount = 25; + DefaultRetryCount = 15; DefaultRetryInThunk = retries => (int)Math.Round(Math.Pow(retries - 1, 4) + 15 + (_random.Next(30) * (retries))); From 5842dfd58be832231d300adf30ff56f9e65b882c Mon Sep 17 00:00:00 2001 From: Savorboard Date: Sun, 23 Jul 2017 00:34:12 +0800 Subject: [PATCH 22/23] ICapPublisher add synchronous publish methods. --- src/DotNetCore.CAP.SqlServer/CapPublisher.cs | 159 ++++++++++++++++--- src/DotNetCore.CAP/ICapPublisher.cs | 60 +++++-- 2 files changed, 185 insertions(+), 34 deletions(-) diff --git a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs index 6889c27..5f84093 100644 --- a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs +++ b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs @@ -7,11 +7,13 @@ using DotNetCore.CAP.Models; using DotNetCore.CAP.Processor; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.Logging; namespace DotNetCore.CAP.SqlServer { public class CapPublisher : ICapPublisher { + private readonly ILogger _logger; private readonly SqlServerOptions _options; private readonly DbContext _dbContext; @@ -21,9 +23,12 @@ namespace DotNetCore.CAP.SqlServer protected IServiceProvider ServiceProvider { get; } - public CapPublisher(IServiceProvider provider, SqlServerOptions options) + public CapPublisher(IServiceProvider provider, + ILogger logger, + SqlServerOptions options) { ServiceProvider = provider; + _logger = logger; _options = options; if (_options.DbContextType != null) @@ -33,57 +38,152 @@ namespace DotNetCore.CAP.SqlServer } } + public void Publish(string name, string content) + { + CheckIsUsingEF(name); + + PublishCore(name, content); + } + public Task PublishAsync(string name, string content) { - if (name == null) throw new ArgumentNullException(nameof(name)); - if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + - " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + CheckIsUsingEF(name); - return Publish(name, content); + return PublishCoreAsync(name, content); + } + + public void Publish(string name, T contentObj) + { + CheckIsUsingEF(name); + + var content = Helper.ToJson(contentObj); + + PublishCore(name, content); } public Task PublishAsync(string name, T contentObj) { - if (name == null) throw new ArgumentNullException(nameof(name)); - if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + - " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + CheckIsUsingEF(name); var content = Helper.ToJson(contentObj); - return Publish(name, content); + + return PublishCoreAsync(name, content); } - public Task PublishAsync(string name, string content, IDbConnection dbConnection) + public void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) { - if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); - if (name == null) throw new ArgumentNullException(nameof(name)); - if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); - var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); IsCapOpenedTrans = true; - return PublishWithTrans(name, content, dbConnection, dbTransaction); + + PublishWithTrans(name, content, dbConnection, dbTransaction); } - public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + IsCapOpenedTrans = true; + + return PublishWithTransAsync(name, content, dbConnection, dbTransaction); + } + + public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + var content = Helper.ToJson(contentObj); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + + PublishWithTrans(name, content, dbConnection, dbTransaction); + } + + public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + var content = Helper.ToJson(contentObj); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + + return PublishWithTransAsync(name, content, dbConnection, dbTransaction); + } + + #region private methods + + private void CheckIsUsingEF(string name) { - if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); if (name == null) throw new ArgumentNullException(nameof(name)); - if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); - if (dbTransaction == null) throw new ArgumentNullException(nameof(dbTransaction)); + if (!IsUsingEF) + throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + + " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + } - return PublishWithTrans(name, content, dbConnection, dbTransaction); + private void CheckIsAdoNet(string name) + { + if (name == null) throw new ArgumentNullException(nameof(name)); + if (IsUsingEF) + throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); } - private async Task Publish(string name, string content) + private async Task PublishCoreAsync(string name, string content) { var connection = _dbContext.Database.GetDbConnection(); var transaction = _dbContext.Database.CurrentTransaction; IsCapOpenedTrans = transaction == null; transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); var dbTransaction = transaction.GetDbTransaction(); - await PublishWithTrans(name, content, connection, dbTransaction); + await PublishWithTransAsync(name, content, connection, dbTransaction); + } + + private void PublishCore(string name, string content) + { + var connection = _dbContext.Database.GetDbConnection(); + var transaction = _dbContext.Database.CurrentTransaction; + IsCapOpenedTrans = transaction == null; + transaction = transaction ?? _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); + var dbTransaction = transaction.GetDbTransaction(); + PublishWithTrans(name, content, connection, dbTransaction); + } + + private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + { + var message = new CapPublishedMessage + { + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; + await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction); + + _logger.LogInformation("Message has been persisted in the database. name:" + name); + + if (IsCapOpenedTrans) + { + dbTransaction.Commit(); + dbTransaction.Dispose(); + dbConnection.Dispose(); + } + + PublishQueuer.PulseEvent.Set(); } - protected virtual async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) { var message = new CapPublishedMessage { @@ -91,15 +191,24 @@ namespace DotNetCore.CAP.SqlServer Content = content, StatusName = StatusName.Scheduled }; + var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction); + + _logger.LogInformation("Message has been persisted in the database. name:" + name); - var sql = $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; - await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction); if (IsCapOpenedTrans) { dbTransaction.Commit(); + dbTransaction.Dispose(); dbConnection.Dispose(); } PublishQueuer.PulseEvent.Set(); } + + private string PrepareSql() + { + return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; + } + + #endregion private methods } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/ICapPublisher.cs b/src/DotNetCore.CAP/ICapPublisher.cs index 95f28b0..a45061c 100644 --- a/src/DotNetCore.CAP/ICapPublisher.cs +++ b/src/DotNetCore.CAP/ICapPublisher.cs @@ -9,7 +9,7 @@ namespace DotNetCore.CAP public interface ICapPublisher { /// - /// Publish a string message to specified topic. + /// (EntityFramework) Asynchronous publish a message. /// /// If you are using the EntityFramework, you need to configure the DbContextType first. /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. @@ -20,7 +20,18 @@ namespace DotNetCore.CAP Task PublishAsync(string name, string content); /// - /// Publis a object message to specified topic. + /// (EntityFramework) Publish a message. + /// + /// If you are using the EntityFramework, you need to configure the DbContextType first. + /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. + /// + /// + /// the topic name or exchange router key. + /// message body content. + void Publish(string name, string content); + + /// + /// (EntityFramework) Asynchronous publish a object message. /// /// If you are using the EntityFramework, you need to configure the DbContextType first. /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. @@ -28,24 +39,55 @@ namespace DotNetCore.CAP /// /// The type of conetent object. /// the topic name or exchange router key. - /// object instance that will be serialized of json. + /// message body content, that will be serialized of json. Task PublishAsync(string name, T contentObj); /// - /// Publish a string message to specified topic with transacton. + /// (EntityFramework) Publish a object message. + /// + /// If you are using the EntityFramework, you need to configure the DbContextType first. + /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. + /// /// + /// The type of conetent object. /// the topic name or exchange router key. - /// message body content. - /// the dbConnection of - Task PublishAsync(string name, string content, IDbConnection dbConnection); + /// message body content, that will be serialized of json. + void Publish(string name, T contentObj); /// - /// Publish a string message to specified topic with transacton. + /// (ado.net) Asynchronous publish a message. + /// + /// the topic name or exchange router key. + /// message body content + /// the connection of + /// the transaction of + Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null); + + /// + /// (ado.net) Publish a message. /// /// the topic name or exchange router key. /// message body content. /// the connection of /// the transaction of - Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction); + void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null); + + /// + /// (ado.net) Asynchronous publish a object message. + /// + /// the topic name or exchange router key. + /// message body content, that will be serialized of json. + /// the connection of + /// the transaction of + Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null); + + /// + /// (ado.net) Publish a object message. + /// + /// the topic name or exchange router key. + /// message body content, that will be serialized of json. + /// the connection of + /// the transaction of + void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null); } } \ No newline at end of file From 6f0ab0b4cb6a52ea5398a10ce6b3a0143735e4ee Mon Sep 17 00:00:00 2001 From: Savorboard Date: Sun, 23 Jul 2017 00:36:59 +0800 Subject: [PATCH 23/23] update unit tests. --- test/DotNetCore.CAP.Test/CAP.BuilderTest.cs | 25 +++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs b/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs index 2b18691..0bd3c8f 100644 --- a/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs +++ b/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs @@ -61,6 +61,26 @@ namespace DotNetCore.CAP.Test private class MyProducerService : ICapPublisher { + public void Publish(string name, string content) + { + throw new NotImplementedException(); + } + + public void Publish(string name, T contentObj) + { + throw new NotImplementedException(); + } + + public void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + throw new NotImplementedException(); + } + + public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + throw new NotImplementedException(); + } + public Task PublishAsync(string topic, string content) { throw new NotImplementedException(); @@ -80,6 +100,11 @@ namespace DotNetCore.CAP.Test { throw new NotImplementedException(); } + + public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + throw new NotImplementedException(); + } } } } \ No newline at end of file