|
|
@@ -33,25 +33,21 @@ CAP 采用的是和当前数据库集成的本地消息表的方案来解决在 |
|
|
|
PM> Install-Package DotNetCore.CAP |
|
|
|
``` |
|
|
|
|
|
|
|
如果你的消息队列使用的是 Kafka 的话,你可以: |
|
|
|
CAP 支持 Kafka 或者 RabbitMQ 消息队列,你可以选择下面的包进行安装: |
|
|
|
|
|
|
|
``` |
|
|
|
PM> Install-Package DotNetCore.CAP.Kafka |
|
|
|
``` |
|
|
|
|
|
|
|
如果你的消息队列使用的是 RabbitMQ 的话,你可以: |
|
|
|
|
|
|
|
``` |
|
|
|
PM> Install-Package DotNetCore.CAP.RabbitMQ |
|
|
|
``` |
|
|
|
|
|
|
|
CAP 提供了 Sql Server, MySql, PostgreSQL 的扩展作为数据库存储: |
|
|
|
CAP 提供了 Sql Server, MySql, PostgreSQL,MongoDB 的扩展作为数据库存储: |
|
|
|
|
|
|
|
``` |
|
|
|
// 按需选择安装你正在使用的数据库 |
|
|
|
PM> Install-Package DotNetCore.CAP.SqlServer |
|
|
|
PM> Install-Package DotNetCore.CAP.MySql |
|
|
|
PM> Install-Package DotNetCore.CAP.PostgreSql |
|
|
|
PM> Install-Package DotNetCore.CAP.MongoDB |
|
|
|
``` |
|
|
|
|
|
|
|
### Configuration |
|
|
@@ -67,28 +63,23 @@ public void ConfigureServices(IServiceCollection services) |
|
|
|
|
|
|
|
services.AddCap(x => |
|
|
|
{ |
|
|
|
// 如果你的 SqlServer 使用的 EF 进行数据操作,你需要添加如下配置: |
|
|
|
// 注意: 你不需要再次配置 x.UseSqlServer(""") |
|
|
|
x.UseEntityFramework<AppDbContext>(); |
|
|
|
//如果你使用的 EF 进行数据操作,你需要添加如下配置: |
|
|
|
x.UseEntityFramework<AppDbContext>(); //可选项,你不需要再次配置 x.UseSqlServer 了 |
|
|
|
|
|
|
|
// 如果你使用的Dapper,你需要添加如下配置: |
|
|
|
//如果你使用的Ado.Net,根据数据库选择进行配置: |
|
|
|
x.UseSqlServer("数据库连接字符串"); |
|
|
|
x.UseMySql("Your ConnectionStrings"); |
|
|
|
x.UsePostgreSql("Your ConnectionStrings"); |
|
|
|
|
|
|
|
// 如果你使用的 RabbitMQ 作为MQ,你需要添加如下配置: |
|
|
|
//如果你使用的 MongoDB,你可以添加如下配置: |
|
|
|
x.UseMongoDB("Your ConnectionStrings"); //注意,仅支持MongoDB 4.0+集群 |
|
|
|
|
|
|
|
//如果你使用的 RabbitMQ 或者 Kafka 作为MQ,根据使用选择配置: |
|
|
|
x.UseRabbitMQ("localhost"); |
|
|
|
|
|
|
|
//如果你使用的 Kafka 作为MQ,你需要添加如下配置: |
|
|
|
x.UseKafka("localhost"); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
public void Configure(IApplicationBuilder app) |
|
|
|
{ |
|
|
|
..... |
|
|
|
|
|
|
|
app.UseCap(); |
|
|
|
} |
|
|
|
|
|
|
|
``` |
|
|
|
|
|
|
|
### 发布 |
|
|
@@ -96,39 +87,50 @@ public void Configure(IApplicationBuilder app) |
|
|
|
在 Controller 中注入 `ICapPublisher` 然后使用 `ICapPublisher` 进行消息发送 |
|
|
|
|
|
|
|
```c# |
|
|
|
|
|
|
|
public class PublishController : Controller |
|
|
|
{ |
|
|
|
[Route("~/checkAccountWithTrans")] |
|
|
|
public async Task<IActionResult> PublishMessageWithTransaction([FromServices]AppDbContext dbContext, [FromServices]ICapPublisher publisher) |
|
|
|
private readonly ICapPublisher _capBus; |
|
|
|
|
|
|
|
public PublishController(ICapPublisher capPublisher) |
|
|
|
{ |
|
|
|
using (var trans = dbContext.Database.BeginTransaction()) |
|
|
|
{ |
|
|
|
// 此处填写你的业务代码 |
|
|
|
_capBus = capPublisher; |
|
|
|
} |
|
|
|
|
|
|
|
//不使用事务 |
|
|
|
[Route("~/without/transaction")] |
|
|
|
public IActionResult WithoutTransaction() |
|
|
|
{ |
|
|
|
_capBus.Publish("xxx.services.show.time", DateTime.Now); |
|
|
|
|
|
|
|
return Ok(); |
|
|
|
} |
|
|
|
|
|
|
|
//如果你使用的是EF,CAP会自动发现当前环境中的事务,所以你不必显式传递事务参数。 |
|
|
|
//由于本地事务, 当前数据库的业务操作和发布事件日志之间将实现原子性。 |
|
|
|
await publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }); |
|
|
|
//Ado.Net 中使用事务,自动提交 |
|
|
|
[Route("~/adonet/transaction")] |
|
|
|
public IActionResult AdonetWithTransaction() |
|
|
|
{ |
|
|
|
using (var connection = new MySqlConnection(ConnectionString)) |
|
|
|
{ |
|
|
|
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true)) |
|
|
|
{ |
|
|
|
//业务代码 |
|
|
|
|
|
|
|
trans.Commit(); |
|
|
|
_capBus.Publish("xxx.services.show.time", DateTime.Now); |
|
|
|
} |
|
|
|
} |
|
|
|
return Ok(); |
|
|
|
} |
|
|
|
|
|
|
|
[Route("~/publishWithTransactionUsingAdonet")] |
|
|
|
public async Task<IActionResult> PublishMessageWithTransactionUsingAdonet([FromServices]ICapPublisher publisher) |
|
|
|
//EntityFramework 中使用事务,自动提交 |
|
|
|
[Route("~/ef/transaction")] |
|
|
|
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext) |
|
|
|
{ |
|
|
|
var connectionString = ""; |
|
|
|
using (var sqlConnection = new SqlConnection(connectionString)) |
|
|
|
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true)) |
|
|
|
{ |
|
|
|
sqlConnection.Open(); |
|
|
|
using (var sqlTransaction = sqlConnection.BeginTransaction()) |
|
|
|
{ |
|
|
|
// 此处填写你的业务代码,通常情况下,你可以将业务代码使用一个委托传递进来进行封装该区域代码。 |
|
|
|
//业务代码 |
|
|
|
|
|
|
|
publisher.Publish("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }, sqlTransaction); |
|
|
|
|
|
|
|
sqlTransaction.Commit(); |
|
|
|
} |
|
|
|
_capBus.Publish("xxx.services.show.time", DateTime.Now); |
|
|
|
} |
|
|
|
return Ok(); |
|
|
|
} |
|
|
@@ -145,12 +147,10 @@ public class PublishController : Controller |
|
|
|
```c# |
|
|
|
public class PublishController : Controller |
|
|
|
{ |
|
|
|
[CapSubscribe("xxx.services.account.check")] |
|
|
|
public async Task CheckReceivedMessage(Person person) |
|
|
|
[CapSubscribe("xxx.services.show.time")] |
|
|
|
public void CheckReceivedMessage(DateTime datetime) |
|
|
|
{ |
|
|
|
Console.WriteLine(person.Name); |
|
|
|
Console.WriteLine(person.Age); |
|
|
|
return Task.CompletedTask; |
|
|
|
Console.WriteLine(datetime); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -169,13 +169,11 @@ namespace xxx.Service |
|
|
|
public void CheckReceivedMessage(Person person); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public class SubscriberService: ISubscriberService, ICapSubscribe |
|
|
|
{ |
|
|
|
[CapSubscribe("xxx.services.account.check")] |
|
|
|
public void CheckReceivedMessage(Person person) |
|
|
|
[CapSubscribe("xxx.services.show.time")] |
|
|
|
public void CheckReceivedMessage(DateTime datetime) |
|
|
|
{ |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|