@@ -1,233 +0,0 @@ | |||
CAP 的 API 接口只有一个,就是 `ICapPublisher` 接口,你可以从 DI 容器中获取到该接口的实例进行调用。 | |||
### 发布/发送 | |||
你可以使用 `ICapPublisher` 接口中的 `Publish<T>` 或者 `PublishAsync<T>` 方法来发送消息: | |||
```c# | |||
public class PublishController : Controller | |||
{ | |||
private readonly ICapPublisher _capBus; | |||
public PublishController(ICapPublisher capPublisher) | |||
{ | |||
_capBus = capPublisher; | |||
} | |||
//不使用事务 | |||
[Route("~/without/transaction")] | |||
public IActionResult WithoutTransaction() | |||
{ | |||
_capBus.Publish("xxx.services.show.time", DateTime.Now); | |||
return Ok(); | |||
} | |||
//Ado.Net 中使用事务,自动提交 | |||
[Route("~/adonet/transaction")] | |||
public IActionResult AdonetWithTransaction() | |||
{ | |||
using (var connection = new MySqlConnection(ConnectionString)) | |||
{ | |||
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true)) | |||
{ | |||
//业务代码 | |||
_capBus.Publish("xxx.services.show.time", DateTime.Now); | |||
} | |||
} | |||
return Ok(); | |||
} | |||
//EntityFramework 中使用事务,自动提交 | |||
[Route("~/ef/transaction")] | |||
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext) | |||
{ | |||
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true)) | |||
{ | |||
//业务代码 | |||
_capBus.Publish("xxx.services.show.time", DateTime.Now); | |||
} | |||
return Ok(); | |||
} | |||
} | |||
``` | |||
下面是PublishAsync这个接口的签名: | |||
**`PublishAsync<T>(string name,T object)`** | |||
默认情况下,在调用此方法的时候 CAP 将在内部创建事务,然后将消息写入到 `Cap.Published` 这个消息表。 | |||
#### 消息补偿 | |||
有时候当发送一条消息出去之后,希望有一个回调可以获得消费方法的通知,用来补偿发送方做的业务操作,那么可以使用下面这个重载。 | |||
**`PublishAsync<T>(string name,T object, string callBackName)`** | |||
这个重载中 `callbackName` 是一个回调的订阅方法名称,当消费端处理完成消息之后CAP会把消费者的处理结果返回并且调用指定的订阅方法。 | |||
> 在一些需要业务补偿的场景中,我们可以利用此特性进行一些还原的补偿操作。例如:电商系统中的付款操作,订单在进行支付调用支付服务的过程中如果发生异常,那么支付服务可以通过返回一个结果来告诉调用方此次业务失败,调用方将支付状态标记为失败。 调用方通过订阅 `callbackName`(订阅参数为消费方方法的返回值) 即可接收到支付服务消费者方法的返回结果,从而进行补偿的业务处理。 | |||
下面是使用方法: | |||
```C# | |||
// 发送方 | |||
_capBus.Publish("xxx.services.show.time",DaateTime.Now,"callback-show-execute-time"); | |||
[CapSubscribe("callback-show-execute-time")] //对应发送的 callbackName | |||
public void ShowPublishTimeAndReturnExecuteTime(DateTime time) | |||
{ | |||
Console.WriteLine(time); // 这是订阅方返回的时间 | |||
} | |||
//-------------------------------------------------------------------------------- | |||
//订阅方 | |||
[CapSubscribe("xxx.services.show.time")] | |||
public DateTime ShowPublishTimeAndReturnExecuteTime(DateTime time) | |||
{ | |||
Console.WriteLine(time); // 这是发送的时间 | |||
return DateTime.Now; // 这是消费者返回的时间,CAP会取该方法的返回值用来传递到发送方的回调订阅里面 | |||
} | |||
``` | |||
#### 事务 | |||
事务在 CAP 具有重要作用,它是保证消息可靠性的一个基石。 在发送一条消息到消息队列的过程中,如果不使用事务,我们是没有办法保证我们的业务代码在执行成功后消息已经成功的发送到了消息队列,或者是消息成功的发送到了消息队列,但是业务代码确执行失败。 | |||
这里的失败原因可能是多种多样的,比如连接异常,网络故障等等。 | |||
*只有业务代码和CAP的Publish代码必须在同一个事务中,才能够保证业务代码和消息代码同时成功或者失败。* | |||
以下是两种使用事务进行Publish的代码: | |||
* EntityFramework | |||
```c# | |||
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false) | |||
{ | |||
//业务代码 | |||
_capBus.Publish("xxx.services.show.time", DateTime.Now); | |||
trans.Commit(); | |||
} | |||
``` | |||
在不使用自动提交的时候,你的业务代码可以位于 Publish 之前或者之后,只需要保证在同一个事务。 | |||
当使用自动提交时候,需要确保 `_capBus.Publish` 位于代码的最后。 | |||
其中,发送的内容会序列化为Json存储到消息表中。 | |||
* Dapper | |||
```c# | |||
using (var connection = new MySqlConnection(ConnectionString)) | |||
{ | |||
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false)) | |||
{ | |||
//your business code | |||
connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); | |||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||
transaction.Commit(); | |||
} | |||
} | |||
``` | |||
### 订阅/消费 | |||
**注意:框架无法做到100%确保消息只执行一次,所以在一些关键场景消息端在方法实现的过程中自己保证幂等性。** | |||
使用 `CapSubscribeAttribute` 来订阅 CAP 发布出去的消息。 | |||
``` | |||
[CapSubscribe("xxx.services.bar")] | |||
public void BarMessageProcessor() | |||
{ | |||
} | |||
``` | |||
这里,你也可以使用多个 `CapSubscribe[""]` 来同时订阅多个不同的消息 : | |||
``` | |||
[CapSubscribe("xxx.services.bar")] | |||
[CapSubscribe("xxx.services.foo")] | |||
public void BarAndFooMessageProcessor() | |||
{ | |||
} | |||
``` | |||
其中,`xxx.services.bar` 为订阅的消息名称,内部实现上,这个名称在不同的消息队列具有不同的代表。 在 Kafka 中,这个名称即为 Topic Name。 在RabbitMQ 中,为 RouteKey。 | |||
> RabbitMQ 中的 RouteKey 支持绑定键表达式写法,有两种主要的绑定键: | |||
> | |||
> \*(星号)可以代替一个单词. | |||
> | |||
> \# (井号) 可以代替0个或多个单词. | |||
> | |||
> 比如在下面这个图中(P为发送者,X为RabbitMQ中的Exchange,C为消费者,Q为队列) | |||
> | |||
> ![](http://images2017.cnblogs.com/blog/250417/201708/250417-20170807093230268-283915002.png) | |||
> | |||
> 在这个示例中,我们将发送一条关于动物描述的消息,也就是说 Name(routeKey) 字段中的内容包含 3 个单词。第一个单词是描述速度的(celerity),第二个单词是描述颜色的(colour),第三个是描述哪种动物的(species),它们组合起来类似:“<celerity>.<colour>.<species>”。 | |||
> | |||
> 然后在使用 `CapSubscribe` 绑定的时候,Q1绑定为 `CapSubscribe["*.orange.*"]`, Q2 绑定为 `CapSubscribe["*.*.rabbit"]` 和 `[CapSubscribe["lazy.#]`。 | |||
> | |||
> 那么,当发送一个名为 "quick.orange.rabbit" 消息的时候,这两个队列将会同时收到该消息。同样名为 `lazy.orange.elephant`的消息也会被同时收到。另外,名为 "quick.orange.fox" 的消息将仅会被发送到Q1队列,名为 "lazy.brown.fox" 的消息仅会被发送到Q2。"lazy.pink.rabbit" 仅会被发送到Q2一次,即使它被绑定了2次。"quick.brown.fox" 没有匹配到任何绑定的队列,所以它将会被丢弃。 | |||
> | |||
> 另外一种情况,如果你违反约定,比如使用 4个单词进行组合,例如 "quick.orange.male.rabbit",那么它将匹配不到任何的队列,消息将会被丢弃。 | |||
> | |||
> 但是,假如你的消息名为 "lazy.orange.male.rabbit",那么他们将会被发送到Q2,因为 #(井号)可以匹配 0 或者多个单词。 | |||
在 CAP 中,我们把每一个拥有 `CapSubscribe[]`标记的方法叫做**订阅者**,你可以把订阅者进行分组。 | |||
**组(Group)**,是订阅者的一个集合,每一组可以有一个或者多个消费者,但是一个订阅者只能属于某一个组。同一个组内的订阅者订阅的消息只能被消费一次。 | |||
如果你在订阅的时候没有指定组,CAP会将订阅者设置到一个默认的组,默认的组名称为 `cap.queue.{程序集名称}`。 | |||
以下是使用组进行订阅的示例: | |||
```c# | |||
[CapSubscribe("xxx.services.foo", Group = "moduleA")] | |||
public void FooMessageProcessor() | |||
{ | |||
} | |||
``` | |||
#### 例外情况 | |||
这里有几种情况可能需要知道: | |||
**① 消息发布的时候订阅方还未启动** | |||
Kafka: | |||
当 Kafka 中,发布的消息存储于持久化的日志文件中,所以消息不会丢失,当订阅者所在的程序启动的时候会消费掉这些消息。 | |||
RabbitMQ: | |||
在 RabbitMQ 中,应用程序**首次启动**会创建具有持久化的 Exchange 和 Queue,CAP 会针对每一个订阅者Group会新建一个消费者队列,**由于首次启动时候订阅者未启动的所以是没有队列的,消息无法进行持久化,这个时候生产者发的消息会丢失**。 | |||
针对RabbitMQ的消息丢失的问题,有两种解决方式: | |||
i. 部署应用程序之前,在RabbitMQ中手动创建具有durable特性的Exchange和Queue,默认情况他们的名字分别是(cap.default.topic, cap.default.group)。 | |||
ii. 提前运行一遍所有实例,让Exchange和Queue初始化。 | |||
我们建议采用第 ii 种方案,因为很容易做到。 |
@@ -1,158 +0,0 @@ | |||
## 配置 | |||
CAP 使用 Microsoft.Extensions.DependencyInjection 进行配置的注入,你也可以依赖于 DI 从json文件中读取配置。 | |||
### Cap Options | |||
你可以使用如下方式来配置 CAP 中的一些配置项,例如 | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.FailedCallback = //... | |||
}); | |||
``` | |||
`CapOptions` 提供了以下配置项: | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
DefaultGroup | 订阅者所属的默认消费者组 | string | cap.queue+程序集名称 | |||
SuccessedMessageExpiredAfter | 成功的消息被删除的过期时间 | int | 3600 秒 | |||
FailedCallback| 执行失败消息时的回调函数,详情见下文 | Action | NULL | |||
FailedRetryInterval | 失败重试间隔时间 | int | 60 秒 | |||
FailedRetryCount | 失败最大重试次数 | int | 50 次 | |||
CapOptions 提供了 `FailedCallback` 为处理失败的消息时的回调函数。当消息多次发送失败后,CAP会将消息状态标记为`Failed`,CAP有一个专门的处理者用来处理这种失败的消息,针对失败的消息会重新放入到队列中发送到MQ,在这之前如果`FailedCallback`具有值,那么将首先调用此回调函数来告诉客户端。 | |||
FailedCallback 的类型为 `Action<MessageType,string,string>`,第一个参数为消息类型(发送的还是接收的),第二个参数为消息的名称(name),第三个参数为消息的内容(content)。 | |||
### RabbitMQ Options | |||
CAP 采用的是针对 CapOptions 进行扩展来实现RabbitMQ的配置功能,所以针对 RabbitMQ 的配置用法如下: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UseRabbitMQ(rabbitMQOption=>{ | |||
// rabbitmq options. | |||
}); | |||
}); | |||
``` | |||
`RabbitMQOptions` 提供了有关RabbitMQ相关的配置: | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
HostName | 宿主地址 | string | localhost | |||
UserName | 用户名 | string | guest | |||
Password | 密码 | string | guest | |||
VirtualHost | 虚拟主机 | string | / | |||
Port | 端口号 | int | -1 | |||
TopicExchangeName | CAP默认Exchange名称 | string | cap.default.topic | |||
RequestedConnectionTimeout | RabbitMQ连接超时时间 | int | 30,000 毫秒 | |||
SocketReadTimeout | RabbitMQ消息读取超时时间 | int | 30,000 毫秒 | |||
SocketWriteTimeout | RabbitMQ消息写入超时时间 | int | 30,000 毫秒 | |||
QueueMessageExpires | 队列中消息自动删除时间 | int | (10天) 毫秒 | |||
### Kafka Options | |||
CAP 采用的是针对 CapOptions 进行扩展来实现 Kafka 的配置功能,所以针对 Kafka 的配置用法如下: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UseKafka(kafkaOption=>{ | |||
// kafka options. | |||
// kafkaOptions.MainConfig.Add("", ""); | |||
}); | |||
}); | |||
``` | |||
`KafkaOptions` 提供了有关 Kafka 相关的配置,由于Kafka的配置比较多,所以此处使用的是提供的 MainConfig 字典来支持进行自定义配置,你可以查看这里来获取对配置项的支持信息。 | |||
[https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | |||
### EntityFramework Options | |||
如果使用的 Entityframework 来作为消息持久化存储的话,那么你可以在配置 CAP EntityFramework 配置项的时候来自定义一些配置。 | |||
```cs | |||
services.AddCap(x => | |||
{ | |||
x.UseEntityFramework<AppDbContext>(efOption => | |||
{ | |||
// entityframework options. | |||
}); | |||
}); | |||
``` | |||
注意,如果你使用了 `UseEntityFramework` 的配置项,那么你不需要再次配置下面的章节几个针对不同数据库的配置,CAP 将会自动读取 DbContext 中使用的数据库相关配置信息。 | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
Schema | Cap表架构 | string | Cap (SQL Server) | |||
Schema | Cap表架构 | string | cap (PostgreSql) | |||
TableNamePrefix | Cap表前缀 | string | cap (MySql) | |||
### SqlServer Options | |||
注意,如果你使用的是 EntityFramewrok,你用不到此配置项。 | |||
CAP 采用的是针对 CapOptions 进行扩展来实现 SqlServer 的配置功能,所以针对 SqlServer 的配置用法如下: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UseSqlServer(sqlserverOptions => { | |||
// sqlserverOptions.ConnectionString | |||
}); | |||
}); | |||
``` | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
Schema | Cap表架构 | string | Cap | |||
ConnectionString | 数据库连接字符串 | string | null | |||
### MySql Options | |||
注意,如果你使用的是 EntityFramewrok,你用不到此配置项。 | |||
CAP 采用的是针对 CapOptions 进行扩展来实现 MySql 的配置功能,所以针对 MySql 的配置用法如下: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UseMySql(mysqlOptions => { | |||
// mysqlOptions.ConnectionString | |||
}); | |||
}); | |||
``` | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
TableNamePrefix | Cap表名前缀 | string | cap | |||
ConnectionString | 数据库连接字符串 | string | null | |||
### PostgreSql Options | |||
注意,如果你使用的是 EntityFramewrok,你用不到此配置项。 | |||
CAP 采用的是针对 CapOptions 进行扩展来实现 PostgreSql 的配置功能,所以针对 PostgreSql 的配置用法如下: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UsePostgreSql(postgreOptions => { | |||
// postgreOptions.ConnectionString | |||
}); | |||
}); | |||
``` | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
Schema | Cap表名前缀 | string | cap | |||
ConnectionString | 数据库连接字符串 | string | null |
@@ -1,57 +0,0 @@ | |||
## 设计原理 | |||
### 动机 | |||
随着微服务架构的流行,越来越多的人在尝试使用微服务来架构他们的系统,而在这其中我们会遇到例如分布式事务的问题,为了解决这些问题,我没有发现简单并且易于使用的解决方案,所以我决定来打造这样一个库来解决这个问题。 | |||
最初 CAP 是为了解决分布式系统中的事务问题,她采用的是 异步确保 这种弱一致性事务机制实现了分布式事务的最终一致性,更多这方面的信息可以查看第6节。 | |||
现在 CAP 除了解决分布式事务的问题外,她另外一个重要的功能就是作为 EventBus 来使用,她具有 EventBus 的所有功能,并且提供了更加简化的方式来处理EventBus中的发布/订阅。 | |||
### 持久化 | |||
CAP 依靠本地数据库实现消息的持久化,CAP 使用这种方式来应对一切环境或者网络异常导致消息丢失的情况,消息的可靠性是分布式事务的基石,所以在任何情况下消息都不能丢失。 | |||
对于消息的持久化分为两种: | |||
**① 消息进入消息队列之前的持久化** | |||
在消息进入到消息队列之前,CAP使用本地数据库表对消息进行持久化,这样可以保证当消息队列出现异常或者网络错误时候消息是没有丢失的。 | |||
为了保证这种机制的可靠性,CAP使用和业务代码相同的数据库事务来保证业务操作和CAP的消息在持久化的过程中是强一致的。也就是说在进行消息持久化的过程中,任何一方发生异常情况数据库都会进行回滚操作。 | |||
**② 消息进入到消息队列之后的持久化** | |||
消息进入到消息队列之后,CAP会启动消息队列的持久化功能,我们需要说明一下在 RabbitMQ 和 Kafka 中CAP的消息是如何持久化的。 | |||
针对于 RabbitMQ 中的消息持久化,CAP 使用的是具有消息持久化功能的消费者队列,但是这里面可能有例外情况,参加 2.2.1 章节。 | |||
由于 Kafka 天生设计的就是使用文件进行的消息持久化,在所以在消息进入到Kafka之后,Kafka会保证消息能够正确被持久化而不丢失。 | |||
### 通讯数据流 | |||
CAP 中消息的流转过程大致如下: | |||
**2.2版本以前** | |||
![](http://images2017.cnblogs.com/blog/250417/201708/250417-20170803174645928-1813351415.png) | |||
> “ P ” 代表消息发送者(生产者)。 “ C ” 代表消息消费者(订阅者)。 | |||
**2.2版本以后** | |||
在2.2以后的版本中,我们调整了一些消息的流转流程,我们移除了数据库中的 Queue 表使用内存队列来代替,详情见:[Improve the implementation mechanism of queue mode](https://github.com/dotnetcore/CAP/issues/96) | |||
### 一致性 | |||
CAP 采用最终一致性作为的一致性方案,此方案是遵循 CAP 理论,以下是CAP理论的描述。 | |||
C(一致性)一致性是指数据的原子性,在经典的数据库中通过事务来保障,事务完成时,无论成功或回滚,数据都会处于一致的状态,在分布式环境下,一致性是指多个节点数据是否一致; | |||
A(可用性)服务一直保持可用的状态,当用户发出一个请求,服务能在一定的时间内返回结果; | |||
P(分区容忍性)在分布式应用中,可能因为一些分布式的原因导致系统无法运转,好的分区容忍性,使应用虽然是一个分布式系统,但是好像一个可以正常运转的整体 | |||
根据 [“CAP”分布式理论](https://en.wikipedia.org/wiki/CAP_theorem), 在一个分布式系统中,我们往往为了可用性和分区容错性,忍痛放弃强一致支持,转而追求最终一致性。大部分业务场景下,我们是可以接受短暂的不一致的。 | |||
第 6 节将对此做进一步介绍。 |
@@ -1,51 +0,0 @@ | |||
**1、有CAP学习QQ群吗?** | |||
CAP没有群。 | |||
原因是我希望培养大家独立思考和遇到问题时候的解决能力。 | |||
使用时候遇到问题先尝试看文档和独立解决,如果解决不了,可以提ISSUE或者发邮件。 | |||
QQ群有效沟通太低,浪费时间。 | |||
**2、CAP要求发送者与接收者必须使用不同的数据库吗?** | |||
没有要求,要求不同的实例使用不同的数据库。不同实例的意思为,不同代码的两套程序。 | |||
但是如果你真的需要在不同的实例使用相同的数据库,那么可以参考下面3的答案。 | |||
**3、CAP如何在不同的实例中使用相同的数据库?** | |||
如果想在不同的实例(程序)中连接相同的数据库,那么你可以在配置CAP的时候通过指定不同的数据库表名前缀来实现。 | |||
你可以通过以下方式来指定数据库表名前缀: | |||
```cs | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
services.AddCap(x => | |||
{ | |||
x.UseKafka(""); | |||
x.UseMySql(opt => | |||
{ | |||
opt.ConnectionString = "connection string"; | |||
opt.TableNamePrefix = "appone"; // 在这里配置不同的实例使用的表名前缀 | |||
}); | |||
}); | |||
} | |||
``` | |||
注意:相同的实例不需要指定不同的表名称前缀,他们在接收消息的时候会进行负载均衡。 | |||
**4、CAP可以不使用数据库吗? 我仅仅是想通过她来传递消息,我可以接受消息丢失的情况** | |||
目前是不可以的。 | |||
CAP 的设计目标即为在不同的微服务或者SOA系统中来保持数据一致性的一种解决方案,保证这种数据一致性方案的前提是利用了传统数据库的 ACID 特性,如果离开了数据库,那么CAP仅仅是一个消息队列的客户端封装,这没有任何意义。 | |||
**5、使用CAP时候,业务出现错误怎么回滚?** | |||
不能回滚,CAP是最终一致性的方案。 | |||
你可以想象你的场景为在调用第三方支付,假如你在进行一项第三方支付的操作,调用支付宝的接口成功后,而你自己的代码出现错误了,支付宝会回滚吗? 如果不回滚那么是又应该怎么处理呢? 这里也是同理。 | |||
@@ -1,81 +0,0 @@ | |||
### 介绍 | |||
CAP 是一个遵循 .NET Standard 标准库的C#库,用来处理分布式事务以及提供EventBus的功能,她具有轻量级,高性能,易使用等特点。 | |||
目前 CAP 使用的是 .NET Standard 1.6 的标准进行开发,目前最新预览版本已经支持 .NET Standard 2.0. | |||
### 应用场景 | |||
CAP 的应用场景主要有以下两个: | |||
* 1. 分布式事务中的最终一致性(异步确保)的方案。 | |||
分布式事务是在分布式系统中不可避免的一个硬性需求,而目前的分布式事务的解决方案也无外乎就那么几种,在了解 CAP 的分布式事务方案前,可以阅读以下 [这篇文章](http://www.infoq.com/cn/articles/solution-of-distributed-system-transaction-consistency)。 | |||
CAP 没有采用两阶段提交(2PC)这种事务机制,而是采用的 本地消息表+MQ 这种经典的实现方式,这种方式又叫做 异步确保。 | |||
* 2. 具有高可用性的 EventBus。 | |||
CAP 实现了 EventBus 中的发布/订阅,它具有 EventBus 的所有功能。也就是说你可以像使用 EventBus 一样来使用 CAP,另外 CAP 的 EventBus 是具有高可用性的,这是什么意思呢? | |||
CAP 借助于本地消息表来对 EventBus 中的消息进行了持久化,这样可以保证 EventBus 发出的消息是可靠的,当消息队列出现宕机或者连接失败的情况时,消息也不会丢失。 | |||
### Quick Start | |||
* **引用 NuGet 包** | |||
使用一下命令来引用CAP的NuGet包: | |||
``` | |||
PM> Install-Package DotNetCore.CAP | |||
``` | |||
根据使用的不同类型的消息队列,来引入不同的扩展包: | |||
``` | |||
PM> Install-Package DotNetCore.CAP.RabbitMQ | |||
PM> Install-Package DotNetCore.CAP.Kafka | |||
``` | |||
根据使用的不同类型的数据库,来引入不同的扩展包: | |||
``` | |||
PM> Install-Package DotNetCore.CAP.SqlServer | |||
PM> Install-Package DotNetCore.CAP.MySql | |||
PM> Install-Package DotNetCore.CAP.PostgreSql | |||
PM> Install-Package DotNetCore.CAP.MongoDB | |||
``` | |||
* **启动配置** | |||
在 ASP.NET Core 程序中,你可以在 `Startup.cs` 文件 `ConfigureServices()` 中配置 CAP 使用到的服务: | |||
```cs | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
//...... | |||
services.AddDbContext<AppDbContext>(); //Options, If you are using EF as the ORM | |||
services.AddSingleton<IMongoClient>(new MongoClient("")); //Options, If you are using MongoDB | |||
services.AddCap(x => | |||
{ | |||
// If you are using EF, you need to add the configuration: | |||
x.UseEntityFramework<AppDbContext>(); //Options, Notice: You don't need to config x.UseSqlServer(""") again! CAP can autodiscovery. | |||
// If you are using Ado.Net, you need to add the configuration: | |||
x.UseSqlServer("Your ConnectionStrings"); | |||
x.UseMySql("Your ConnectionStrings"); | |||
x.UsePostgreSql("Your ConnectionStrings"); | |||
// If you are using MongoDB, you need to add the configuration: | |||
x.UseMongoDB("Your ConnectionStrings"); //MongoDB 4.0+ cluster | |||
// If you are using RabbitMQ, you need to add the configuration: | |||
x.UseRabbitMQ("localhost"); | |||
// If you are using Kafka, you need to add the configuration: | |||
x.UseKafka("localhost"); | |||
}); | |||
} | |||
``` |
@@ -1,88 +0,0 @@ | |||
CAP 封装了在 ASP.NET Core 中的使用依赖注入来获取 Publisher (`ICapPublisher`)的接口。而启动方式类似于 “中间件” 的形式,通过在 Startup.cs 配置 `ConfigureServices` 和 `Configure` 进行启动。 | |||
### 消息表 | |||
当系统引入CAP之后并首次启动后,CAP会在客户端生成 2 个表,分别是 Cap.Published, Cap.Received 。注意表名可能在不同的数据库具有不同的大小写区分,如果你在运行项目的时候没有显式的指定数据库生成架构(Schema)或者表名前缀(TableNamePrefix)的话,默认情况下就是以上的名字。 | |||
**Cap.Published**:这个表主要是用来存储 CAP 发送到MQ(Message Queue)的客户端消息,也就是说你使用 `ICapPublisher` 接口 Publish 的消息内容。 | |||
**Cap.Received**:这个表主要是用来存储 CAP 接收到 MQ(Message Queue) 的客户端订阅的消息,也就是使用 `CapSubscribe[]` 订阅的那些消息。 | |||
> 2.2 版本以前: | |||
> **Cap.Queue**: 这个表主要是CAP内部用来处理发送和接收消息的一个临时表,通常情况下,如果系统不出现问题,这个表将是空的。 | |||
`Published` 和 `Received` 表具有 StatusName 字段,这个字段用来标识当前消息的状态。目前共有 `Scheduled`,`Successed`,`Failed` 等几个状态。 | |||
> 在 2.2 版本以前的所有状态为:`Scheduled`,`Enqueued`,`Processing`,`Successed`,`Failed` | |||
CAP 在处理消息的过程中会依次从`Scheduled` 到 `Successed` 来改变这些消息状态的值。如果是状态值为 `Successed`,代表该消息已经成功的发送到了 MQ 中。如果为 Failed 则代表消息发送失败。 | |||
CAP 2.2 以上版本中会针对 `Scheduled`,`Failed` 状态的消息 CAP 会于消息持久化过后 4 分钟后开始进行重试,重试的间隔默认为 60 秒,你可以在 `CapOptions` 中配置的`FailedRetryInterval` 来调整默认间隔时间。 | |||
> 2.2 版本以前, CAP 会对状态为 `Failed` 的消息默认进行 100 次重试。 | |||
### 消息格式 | |||
CAP 采用 JSON 格式进行消息传输,以下是消息的对象模型: | |||
NAME | DESCRIPTION | TYPE | |||
:---|:---|:--- | |||
Id | 消息编号 | int | |||
Version | 消息版本 | string | |||
Name | 消息名称 | string | |||
Content | 内容 | string | |||
Group | 所属消费组 | string | |||
Added | 创建时间 | DateTime | |||
ExpiresAt | 过期时间 | DateTime | |||
Retries | 重试次数 | int | |||
StatusName | 状态 | string | |||
>对于 Cap.Received 中的消息,会多一个 `Group` 字段来标记所属的消费者组。 | |||
对于消息内容 Content 属性里面的内容CAP 使用 Message 对象进行了一次二次包装。一下为Message对象的信息 | |||
NAME | DESCRIPTION | TYPE | |||
:---|:---|:--- | |||
Id | CAP生成的消息编号 | string | |||
Timestamp | 消息创建时间 | string | |||
Content | 内容 | string | |||
CallbackName | 回调的订阅者名称 | string | |||
其中 Id 字段,CAP 采用的 MongoDB 中的 ObjectId 分布式Id生成算法生成。 | |||
### EventBus | |||
EventBus 采用 发布-订阅 风格进行组件之间的通讯,它不需要显式在组件中进行注册。 | |||
![](http://images2017.cnblogs.com/blog/250417/201708/250417-20170804153901240-1774287236.png) | |||
上图是EventBus的一个Event的流程,关于 EventBus 的更多信息就不在这里介绍了... | |||
在 CAP 中,为什么说 CAP 实现了 EventBus 中的全部特性,因为 EventBus 具有的两个大功能就是发布和订阅, 在 CAP 中 使用了另外一种优雅的方式来实现的,另外一个 CAP 提供的强大功能就是消息的持久化,以及在任何异常情况下消息的可靠性,这是EventBus不具有的功能。 | |||
![](https://camo.githubusercontent.com/452505edb71d41f2c1bd18907275b76291621e46/687474703a2f2f696d61676573323031352e636e626c6f67732e636f6d2f626c6f672f3235303431372f3230313730372f3235303431372d32303137303730353137353832373132382d313230333239313436392e706e67) | |||
CAP 里面发送一个消息可以看做是一个 “Event”,一个使用了CAP的ASP.NET Core 应用程序既可以进行发送也可以进行订阅接收。 | |||
### 重试 | |||
重试在整个CAP架构设计中具有重要作用,CAP 中会针对发送失败或者执行失败的消息进行重试。在整个 CAP 的设计过程中有以下几处采用的重试策略。 | |||
**1、 发送重试** | |||
在消息发送过程中,当出现 Broker 宕机或者连接失败的情况亦或者出现异常的情况下,这个时候 CAP 会对发送的重试,第一次重试次数为 3,4分钟后以后每分钟重试一次,进行次数 +1,当总次数达到50次后,CAP将不对其进行重试。 | |||
> 你可以在 `CapOptions` 中设置`FailedRetryCount`来调整默认重试的总次数。 | |||
当失败总次数达到默认失败总次数后,就不会进行重试了,你可以在 Dashboard 中查看消息失败的原因,然后进行人工重试处理。 | |||
**2、 消费重试** | |||
当 Consumer 接收到消息时,会执行消费者方法,在执行消费者方法出现异常时,会进行重试。这个重试策略和上面的 `发送重试` 是相同的。 | |||
### 数据清理 | |||
数据库消息表中具有一个 `ExpiresAt` 字段表示消息的过期时间,当消息发送成功或者消费成功后,CAP会将消息状态为 `Successed` 的 `ExpiresAt` 设置为 **1小时** 后过期,会将消息状态为 `Failed` 的 `ExpiresAt` 设置为 **15天** 后过期。 | |||
CAP 默认情况下会每隔一个小时将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt 不为空并且小于当前时间的数据。 也就是说状态为`Failed`的消息(正常情况他们已经被重试了 50 次),如果你15天没有人工介入处理,同样会被清理掉。 |
@@ -1,13 +0,0 @@ | |||
针对于分布式事务的处理,CAP 采用的是“异步确保”这种方案。 | |||
### 异步确保 | |||
异步确保这种方案又叫做本地消息表,这是一种经典的方案,方案最初来源于 eBay,参考资料见段末链接。这种方案目前也是企业中使用最多的方案之一。 | |||
相对于 TCC 或者 2PC/3PC 来说,这个方案对于分布式事务来说是最简单的,而且它是去中心化的。在TCC 或者 2PC 的方案中,必须具有事务协调器来处理每个不同服务之间的状态,而此种方案不需要事务协调器。 | |||
另外 2PC/TCC 这种方案如果服务依赖过多,会带来管理复杂性增加和稳定性风险增大的问题。试想如果我们强依赖 10 个服务,9 个都执行成功了,最后一个执行失败了,那么是不是前面 9 个都要回滚掉?这个成本还是非常高的。 | |||
但是,并不是说 2PC 或者 TCC 这种方案不好,因为每一种方案都有其相对优势的使用场景和优缺点,这里就不做过多介绍了。 | |||
> 中文:[http://www.cnblogs.com/savorboard/p/base-an-acid-alternative.html](http://www.cnblogs.com/savorboard/p/base-an-acid-alternative.html) | |||
> 英文:[http://queue.acm.org/detail.cfm?id=1394128](http://queue.acm.org/detail.cfm?id=1394128) |
@@ -1,195 +0,0 @@ | |||
# Interfaces | |||
CAP only has one interface,It is `ICapPublisher`, You can get its instance from the DI container and then call it. | |||
## Publish & Send | |||
You can use the `Publish<T>` or `PublishAsync<T>` methods defined in the `ICapPublisher` interface to send the event messages. | |||
```c# hl_lines="19 33" | |||
public class PublishController : Controller | |||
{ | |||
private readonly ICapPublisher _capBus; | |||
public PublishController(ICapPublisher capPublisher) | |||
{ | |||
_capBus = capPublisher; | |||
} | |||
[Route("~/adonet/transaction")] | |||
public IActionResult AdonetWithTransaction() | |||
{ | |||
using (var connection = new MySqlConnection(ConnectionString)) | |||
{ | |||
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true)) | |||
{ | |||
//your business code | |||
_capBus.Publish("xxx.services.show.time", DateTime.Now); | |||
} | |||
} | |||
return Ok(); | |||
} | |||
[Route("~/ef/transaction")] | |||
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext) | |||
{ | |||
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true)) | |||
{ | |||
//your business code | |||
_capBus.Publish("xxx.services.show.time", DateTime.Now); | |||
} | |||
return Ok(); | |||
} | |||
} | |||
``` | |||
The following is the signature of the of the PublishAsync method | |||
```c# | |||
PublishAsync<T>(string name, T object) | |||
``` | |||
By default,when this method(PublishAsync<T>) is called,CAP will create a transaction internally, | |||
and then write messages into the `Cap.Published` message table. | |||
In some situations,you may need a callback when a message is sent out, you can use the follwing | |||
overload of the `PublishAsync<T>` method: | |||
```c# | |||
PublishAsync<T>(string name, T object, string callBackName) | |||
``` | |||
In this overload method, `callbackName` is the callback name of the subscription method,when the consumption-side finished processing messages,CAP will return the processed result and also call the specified subscription method | |||
### Transactions | |||
Transaction plays a very import role in CAP, It is a main factor to ensure the reliability of messaging. | |||
In the process of sending a message to the message queue without transaction we can not ensure that messages are sent to the message queue successfully after we finish dealing the business logic,or messages are send to the message queque successfully but our bussiness logic is failed. | |||
There is a variety of reasons that causing failure,eg:connection errors,network errors,etc. | |||
!!! note | |||
Only by putting the business logic and logic in the Publish of CAP in the same transaction so that we can enssure both them to be success or fail | |||
The following two blocks of code snippet demonstrate how to use transactions in EntityFramework and dapper when publishing messages. | |||
#### EntityFramework | |||
```c# | |||
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false) | |||
{ | |||
// Your business logic。 | |||
_capBus.Publish("xxx.services.show.time", DateTime.Now); | |||
trans.Commit(); | |||
} | |||
``` | |||
When you set the `autoCommit: false`, you can put your business logic before or after the Publish logic,the only thing you need to do is to ensure that they are in the same transaction. | |||
If you set the `autoCommit: true`, you need publish message `_capBus.Publish` at the last. | |||
During the course,the message content will be serialized as json and stored in the message table. | |||
#### Dapper | |||
```c# | |||
using (var connection = new MySqlConnection(ConnectionString)) | |||
{ | |||
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false)) | |||
{ | |||
//your business code | |||
connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); | |||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||
transaction.Commit(); | |||
} | |||
} | |||
``` | |||
## Subscribe & Consume | |||
!!! warning | |||
The businsess logics in the subscription side should be keep idempotent. | |||
You can view more details in this [ISSUE](https://github.com/dotnetcore/CAP/issues/29#issuecomment-451841287). | |||
Use `CapSubscribe[""]` to decorate a method so that it can subscribe messages published by CAP. | |||
```c# | |||
[CapSubscribe("xxx.services.bar")] | |||
public void BarMessageProcessor() | |||
{ | |||
} | |||
``` | |||
You can also use multiple `CapSubscribe[""]` to decorate a method so that you can subscribe messages from different sources accordingly. | |||
```c# | |||
[CapSubscribe("xxx.services.bar")] | |||
[CapSubscribe("xxx.services.foo")] | |||
public void BarAndFooMessageProcessor() | |||
{ | |||
} | |||
``` | |||
`xxx.services.bar` is the name of the message to be subscribed.And it has different name in different message queque Clients.for example,in kafka the name is called Topic Name and in RAbbitMQ it is called RouteKey. | |||
In RabbitMQ you can use regular expression in RouteKey: | |||
<blockquote> | |||
<p> <cite title="Source Title">\*</cite> (Asterisk) stands for a single word.</p> | |||
<p> <cite title="Source Title">#</cite> (hash sign) standards for zero or more words.</p> | |||
<p class="small">See the following picture(P for Publisher,X for Exchange,C for consumer and Q for Queue)</p> | |||
<p><image src='../../img/rabbitmq-route.png'></image></p> | |||
<p class="small">In this example, we're going to send messages which all describe animals. The messages will be sent with a routing key that consists of three words (two dots). The first word in the routing key will describe a celerity, second a colour and third a species: "<celerity>.<colour>.<species>".</p> | |||
<p class="small">We created three bindings: Q1 is bound with binding key "*.orange.*" and Q2 with "*.*.rabbit" and "lazy.#".</p> | |||
<p class="small">These bindings can be summarised as:</p> | |||
<p class="small">Q1 is interested in all the orange animals.Q2 wants to hear everything about rabbits, and everything about lazy animals.A message with a routing key set to "quick.orange.rabbit" will be delivered to both queues. Message "lazy.orange.elephant" also will go to both of them. On the other hand "quick.orange.fox" will only go to the first queue, and "lazy.brown.fox" only to the second. "lazy.pink.rabbit" will be delivered to the second queue only once, even though it matches two bindings. "quick.brown.fox" doesn't match any binding so it will be discarded.</p> | |||
<p class="small">What happens if we break our contract and send a message with one or four words, like "orange" or "quick.orange.male.rabbit"? Well, these messages won't match any bindings and will be lost.</p> | |||
<p class="small">On the other hand "lazy.orange.male.rabbit", even though it has four words, will match the last binding and will be delivered to the second queue.</p> | |||
</blockquote> | |||
In CAP, we called a method decorated by `CapSubscribe[]` a **subscriber**, you can group different subscribers. | |||
**Group** is a collection of subscribers,each group can have one or multiple consumers,but a subscriber can only belongs to a certain group(you can not put a subscriber into multiple groups).Messages subscribed by members in a certain group can only be consumed once. | |||
If you do not specify any group when subscribing,CAP will put the subscriber into a default group named `cap.default.group` | |||
the following is a demo shows how to use group when subscribing. | |||
```c# | |||
[CapSubscribe("xxx.services.foo", Group = "moduleA")] | |||
public void FooMessageProcessor() | |||
{ | |||
} | |||
``` | |||
### Exceptional case | |||
The following situations you shoud be aware of. | |||
**① the subscription side has not started yet when publishing a message** | |||
#### Kafka | |||
In Kafka,published messages stored in the Persistent log files,so messages will not lost.when the subscription side started,it can still consume the message. | |||
#### RabbitMQ | |||
In RabbitMQ, the application will create Persistent Exchange and Queue at the **first start**, CAP will create a new consumer queue for each consumer group,**because the application started but the subscription side hasn's start yet so there has no queue,thus the message can not be persited,and the published messages will lost** | |||
There are two ways to solve this `message lost` issue in RamitMQ: | |||
* Before the deployment of your application,you can create durable Exchange and Queue in RabbitMQ by hand,the default names them are (cap.default.topic, cap.default.group). | |||
* Run all instances in advance to ensure that both Exchange and Queue are initialized. | |||
It is highly recommanded that users adopt the second way,because it is easier to achieve. |
@@ -1,95 +0,0 @@ | |||
# 配置 | |||
默认情况下,你在向IoC容器中注册CAP服务的时候指定配置。 | |||
```c# | |||
services.AddCap(config=> { | |||
// config.XXX | |||
}); | |||
``` | |||
其中 `services` 代表的是 `IServiceCollection` 接口对象,它位于 `Microsoft.Extensions.DependencyInjection` 下面。 | |||
如果你不想使用微软的IoC容器,那么你可以查看 [ASP.NET Core 这里的文档](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/dependency-injection?view=aspnetcore-2.2#default-service-container-replacement) 来了解如何替换默认的容器实现。 | |||
## 什么是最低配置? | |||
最简单的回答就是,至少你要配置一个消息队列和一个事件存储,如果你想快速开始你可以使用下面的配置: | |||
```C# | |||
services.AddCap(config => | |||
{ | |||
config.UseInMemoryQueue(); | |||
config.UseInmemoryStorage(); | |||
}); | |||
``` | |||
有关具体的消息队列的配置和存储的配置,你可以查看 Transports 章节和 Persistent 章节中具体组件提供的配置项。 | |||
## CAP 中的自定义配置 | |||
在 `AddCap` 中 `CapOptions` 对象是用来存储配置相关信息,默认情况下它们都具有一些默认值,有些时候你可能需要自定义。 | |||
#### DefaultGroup | |||
默认值:cap.queue.{程序集名称} | |||
默认的消费者组的名字,在不同的 Transports 中对应不同的名字,可以通过自定义此值来自定义不同 Transports 中的名字,以便于查看。 | |||
> 在 RabbitMQ 中映射到 [Queue Names](https://www.rabbitmq.com/queues.html#names)。 | |||
> 在 Apache Kafka 中映射到 Topic Name。 | |||
> 在 Azure Service Bus 中映射到 Subscription Name。 | |||
#### Version | |||
默认值:v1 | |||
这是在CAP v2.4 版本中引入的新配置项,用于给消息指定版本来隔离不同版本服务的消息,常用于A/B测试或者多服务版本的场景。以下是其应用场景: | |||
!!! info "业务快速迭代,需要向前兼容" | |||
由于业务的快速迭代,在各个服务集成的过程中,消息的数据结构并不是固定不变的,有些时候我们为了适应新引入的需求,会添加或者修改一些数据结构。如果你是一套全新的系统这没有什么问题,但是如果你的系统已经部署到生产环境了并且正在服务客户,这就会导致新的功能在上线的时候和旧的数据结构发生不兼容,那么这些改变可能会导致出现严重的问题,要想解决这个问题,只能把消息队列和持久化的消息全部清空,然后才能启动应用程序,这对于生产环境来说显然是致命的。 | |||
!!! info "多个版本的服务端" | |||
有些时候,App的服务端需要提供多套接口,来支持不同版本的App,这些不同版本的App相同的接口和服务端交互的数据结构可能是不一样的,所以通常情况下服务端提供不用的路由地址来适配不同版本的App调用。 | |||
!!! info "不同实例,使用相同的持久化表/集合" | |||
希望多个不同实例的程序可以公用相同的数据库,在 2.4 之前的版本,我们可以通过指定不同的表名来隔离不同实例的数据库表,即在CAP配置的时候通过配置不同的表名前缀来实现。 | |||
> 查看博客来了解更多关于 Version 的信息: https://www.cnblogs.com/savorboard/p/cap-2-4.html | |||
#### FailedRetryInterval | |||
默认值:60 秒 | |||
在消息发送的时候,如果发送失败,CAP将会对消息进行重试,此配置项用来配置每次重试的间隔时间。 | |||
在消息消费的过程中,如果消费失败,CAP将会对消息进行重试消费,此配置项用来配置每次重试的间隔时间。 | |||
!!! WARNING "重试 & 间隔" | |||
在默认情况下,重试将在发送和消费消息失败的 **4分钟后** 开始,这是为了避免设置消息状态延迟导致可能出现的问题。 | |||
发送和消费消息的过程中失败会立即重试 3 次,在 3 次以后将进入重试轮询,此时 FailedRetryInterval 配置才会生效。 | |||
#### FailedRetryCount | |||
默认值:50 | |||
重试的最大次数。当达到此设置值时,将不会再继续重试,通过改变此参数来设置重试的最大次数。 | |||
#### FailedThresholdCallback | |||
默认值:NULL | |||
类型:`Action<MessageType, string, string>` | |||
> | |||
T1 : Message Type | |||
T2 : Message Name | |||
T3 : Message Content | |||
重试阈值的失败回调。当重试达到 FailedRetryCount 设置的值的时候,将调用此 Action 回调,你可以通过指定此回调来接收失败达到最大的通知,以做出人工介入。例如发送邮件或者短信。 | |||
#### SucceedMessageExpiredAfter | |||
默认值:24*3600 秒(1天后) | |||
成功消息的过期时间(秒)。 当消息发送或者消费成功时候,在时间达到 `SucceedMessageExpiredAfter` 秒时候将会从 Persistent 中删除,你可以通过指定此值来设置过期的时间。 |
@@ -1,137 +0,0 @@ | |||
# Idempotence | |||
幂等性(你可以在[Wikipedia](https://en.wikipedia.org/wiki/Idempotence)读到关于幂等性的定义),当我们谈论幂等时,一般是指可以重复处理传毒的消息,而不是产生意外的结果。 | |||
## 交付保证 | |||
在说幂等性之前,我们先来说下关于消费端的消息交付。 | |||
由于CAP不是使用的 MS DTC 或其他类型的2PC分布式事务机制,所以存在至少消息严格交付一次的问题,具体的说在基于消息的系统中,存在一下三种可能: | |||
* Exactly Once(*) (仅有一次) | |||
* At Most Once (最多一次) | |||
* At Least Once (最少一次) | |||
带 * 号表示在实际场景中,很难达到。 | |||
### At Most Once | |||
最多一次交付保证,涵盖了保证一次或根本不接收所有消息的情况。 | |||
这种类型的传递保证可能来自你的消息系统,你的代码按以下顺序执行其操作: | |||
``` | |||
1. 从队列移除消息 | |||
2. 开始一个工作事务 | |||
3. 处理消息 ( 你的代码 ) | |||
4. 是否成功 ? | |||
Yes: | |||
1. 提交工作事务 | |||
No: | |||
1. 回滚工作事务 | |||
2. 将消息发回到队列。 | |||
``` | |||
正常情况下,他们工作的很好,工作事务将被提交。 | |||
然而,有些时候并不能总是成功,比如在 1 之后出现异常,或者是你在将消息放回到队列中出现网络问题由或者宕机重启等情况。 | |||
使用这个协议,你将冒着丢失消息的风险,如果可以接受,那就没有关系。 | |||
### At Least Once | |||
这个交付保证包含你收到至少一次的消息,当出现故障时,可能会收到多次消息。 | |||
它需要稍微改变我们执行步骤的顺序,它要求消息队列系统支持事务或ACK机制,比如传统的 begin-commit-rollback 协议(MSMQ是这样),或者是 receive-ack-nack 协议(RabbitMQ,Azure Service Bus等是这样的)。 | |||
大致步骤如下: | |||
``` | |||
1. 抢占队列中的消息。 | |||
2. 开始一个工作事务 | |||
3. 处理消息 ( 你的代码 ) | |||
4. 是否成功 ? | |||
Yes: | |||
1. 提交工作事务 | |||
2. 从队列删除消息 | |||
No: | |||
1. 回滚工作事务 | |||
2. 从队列释放抢占的消息 | |||
``` | |||
当出现失败或者抢占消息超时的时候,我们总是能够再次接收到消息以保证我们工作事务提交成功。 | |||
### 什么是 “工作事务” ? | |||
上面所说的“工作事务”并不是特指关系型数据库中的事务,这里的工作事务是一个概念,也就是说执行代码的原子性。 | |||
比如它可以是传统的RDMS事务,也或者是 MongoDB 事务或者是一个交易等。 | |||
在这里它代表一个执行单元,这个执行单元是一个概念性的事实以支持前面提到的仅交付一次的这种问题。 | |||
通常,不可能做到消息的事务和工作事务来形成原子性进行提交或者回滚。 | |||
## CAP 中的幂等性 | |||
在CAP中,我们采用的交付保证为 At Least Once。 | |||
由于我们具有临时存储介质(数据库表),也许可以做到 At Most Once, 但是为了严格保证消息不会丢失,我们没有提供相关功能或配置。 | |||
### 为什么没有实现幂等? | |||
1、消息写入成功了,但是此时执行Consumer方法失败了 | |||
执行Consumer方法失败的原因有非常多,我如果不知道具体的场景盲目进行重试或者不进行重试都是不正确的选择。 | |||
举个例子:假如消费者为扣款服务,如果是执行扣款成功了,但是在写扣款日志的时候失败了,此时CAP会判断为消费者执行失败,进行重试。如果客户端自己没有保证幂等性,框架对其进行重试,这里势必会造成多次扣款出现严重后果。 | |||
2、执行Consumer方法成功了,但是又收到了同样的消息 | |||
此处场景也是可能存在的,假如开始的时候Consumer已经执行成功了,但是由于某种原因如 Broker 宕机恢复等,又收到了相同的消息,CAP 在收到Broker消息后会认为这个是一个新的消息,会对 Consumer再次执行,由于是新消息,此时 CAP 也是无法做到幂等的。 | |||
3、目前的数据存储模式无法做到幂等 | |||
由于CAP存消息的表对于成功消费的消息会于1个小时后删除,所以如果对于一些历史性消息无法做到幂等操作。 历史性指的是,假如 Broker由于某种原因维护了或者是人工处理的一些消息。 | |||
4、业界做法 | |||
许多基于事件驱动的框架都是要求 用户 来保证幂等性操作的,比如 ENode, RocketMQ 等等... | |||
从实现的角度来说,CAP可以做一些比较不严格的幂等,但是严格的幂等无法做到的。 | |||
### 以自然的方式处理幂等消息 | |||
通常情况下,保证消息被执行多次而不会产生意外结果是很自然的一种方式是采用操作对象自带的一些幂等功能。比如: | |||
数据库提供的 `INSERT ON DUPLICATE KEY UPDATE` 或者是采取类型的程序判断行为。 | |||
### 显式处理幂等消息 | |||
另外一种处理幂等性的方式就是在消息传递的过程中传递ID,然后由单独的消息跟踪器来处理。 | |||
比如你使用具有事务数据存储的 IMessageTracker 来跟踪消息ID,你的代码可能看起来像这样: | |||
```c# | |||
readonly IMessageTracker _messageTracker; | |||
public SomeMessageHandler(IMessageTracker messageTracker) | |||
{ | |||
_messageTracker = messageTracker; | |||
} | |||
[CapSubscribe] | |||
public async Task Handle(SomeMessage message) | |||
{ | |||
if (await _messageTracker.HasProcessed(message.Id)) | |||
{ | |||
return; | |||
} | |||
// do the work here | |||
// ... | |||
// remember that this message has been processed | |||
await _messageTracker.MarkAsProcessed(messageId); | |||
} | |||
``` | |||
至于 `IMessageTracker` 的实现,可以使用诸如Redis或者数据库等存储消息Id和对应的处理状态。 |
@@ -1,11 +0,0 @@ | |||
# License | |||
**MIT License** | |||
Copyright © 2016 - 2019 Savorboard | |||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: | |||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. | |||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
@@ -1,37 +0,0 @@ | |||
# 消息 | |||
使用 `ICapPublisher` 接口发送出去的数据称之为 Message (`消息`)。 | |||
## 消息调度 | |||
CAP 接收到消息之后会将消息发送到 Transport, 由 Transport 进行运输。 | |||
当你使用 `ICapPublisher` 接口发送时,CAP将会将消息调度到相应的 Transport中去,目前还不支持批量发送消息。 | |||
有关 Transports 的更多信息,可以查看 [Transports](../transports/general.md) 章节。 | |||
## 消息存储 | |||
CAP 接收到消息之后会将消息进行 Persistent(持久化), 有关 Persistent 的更多信息,可以查看 [Persistent](../persistent/general.md) 章节。 | |||
## 消息重试 | |||
重试在整个CAP架构设计中具有重要作用,CAP 中会针对发送失败或者执行失败的消息进行重试。在整个 CAP 的设计过程中有以下几处采用的重试策略。 | |||
1、 发送重试 | |||
在消息发送过程中,当出现 Broker 宕机或者连接失败的情况亦或者出现异常的情况下,这个时候 CAP 会对发送的重试,第一次重试次数为 3,4分钟后以后每分钟重试一次,进行次数 +1,当总次数达到50次后,CAP将不对其进行重试。 | |||
你可以在 CapOptions 中设置FailedRetryCount来调整默认重试的总次数。 | |||
当失败总次数达到默认失败总次数后,就不会进行重试了,你可以在 Dashboard 中查看消息失败的原因,然后进行人工重试处理。 | |||
2、 消费重试 | |||
当 Consumer 接收到消息时,会执行消费者方法,在执行消费者方法出现异常时,会进行重试。这个重试策略和上面的 发送重试 是相同的。 | |||
## 消息数据清理 | |||
数据库消息表中具有一个 ExpiresAt 字段表示消息的过期时间,当消息发送成功或者消费成功后,CAP会将消息状态为 Successed 的 ExpiresAt 设置为 1小时 后过期,会将消息状态为 Failed 的 ExpiresAt 设置为 15天 后过期。 | |||
CAP 默认情况下会每隔一个小时将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt 不为空并且小于当前时间的数据。 也就是说状态为Failed的消息(正常情况他们已经被重试了 50 次),如果你15天没有人工介入处理,同样会被清理掉。 |
@@ -1,7 +0,0 @@ | |||
# Sagas | |||
Sagas (also known in the literature as "process managers") are stateful services. You can think of them as state machines whose transitions are driven by messages. | |||
## Sagas on CAP | |||
TODO |
@@ -1 +0,0 @@ | |||
# 序列化 |
@@ -1,21 +0,0 @@ | |||
# 事务 | |||
## 分布式事务? | |||
CAP 不直接提供开箱即用的基于 DTC 或者 2PC 的分布式事务,相反我们提供一种可以用于解决在分布式事务遇到的问题的一种解决方案。 | |||
在分布式环境中,由于涉及通讯的开销,使用基于2PC或DTC的分布式事务将非常昂贵,在性能方面也同样如此。另外由于基于2PC或DTC的分布式事务同样受**CAP定理**的约束,当发生网络分区时它将不得不放弃可用性(CAP中的A)。 | |||
针对于分布式事务的处理,CAP 采用的是“异步确保”这种方案。 | |||
### 异步确保 | |||
异步确保这种方案又叫做本地消息表,这是一种经典的方案,方案最初来源于 eBay,参考资料见段末链接。这种方案目前也是企业中使用最多的方案之一。 | |||
相对于 TCC 或者 2PC/3PC 来说,这个方案对于分布式事务来说是最简单的,而且它是去中心化的。在TCC 或者 2PC 的方案中,必须具有事务协调器来处理每个不同服务之间的状态,而此种方案不需要事务协调器。 | |||
另外 2PC/TCC 这种方案如果服务依赖过多,会带来管理复杂性增加和稳定性风险增大的问题。试想如果我们强依赖 10 个服务,9 个都执行成功了,最后一个执行失败了,那么是不是前面 9 个都要回滚掉?这个成本还是非常高的。 | |||
但是,并不是说 2PC 或者 TCC 这种方案不好,因为每一种方案都有其相对优势的使用场景和优缺点,这里就不做过多介绍了。 | |||
> 中文:[http://www.cnblogs.com/savorboard/p/base-an-acid-alternative.html](http://www.cnblogs.com/savorboard/p/base-an-acid-alternative.html) | |||
> 英文:[http://queue.acm.org/detail.cfm?id=1394128](http://queue.acm.org/detail.cfm?id=1394128) |
@@ -1,3 +0,0 @@ | |||
# 升级指导 | |||
请参阅 Github Release 页面: [https://github.com/dotnetcore/CAP/releases](https://github.com/dotnetcore/CAP/releases) |
@@ -1,21 +0,0 @@ | |||
# 贡献 | |||
One of the easiest ways to contribute is to participate in discussions and discuss issues. | |||
If you have any question or problems, please report them on the CAP repository: | |||
<a href="https://github.com/dotnetcore/cap/issues/new"><button data-md-color-primary="purple"><i class="fa fa-github fa-2x"></i> Report Issue</button></a> | |||
<a href="https://github.com/dotnetcore/cap/issues"><button data-md-color-primary="purple" type="submit"> Active Issues <i class="fa fa-github fa-2x"></i></button></a> | |||
## Submitting Changes | |||
You can also contribute by submitting pull requests with code changes. | |||
> | |||
Pull requests let you tell others about changes you've pushed to a repository on GitHub. Once a pull request is opened, you can discuss and review the potential changes with collaborators and add follow-up commits before the changes are merged into the repository. | |||
## Additional Resources | |||
* [Filtering issues and pull requests](https://help.github.com/articles/filtering-issues-and-pull-requests/) | |||
* [Using search to filter issues and pull requests](https://help.github.com/articles/using-search-to-filter-issues-and-pull-requests/) |
@@ -1,34 +0,0 @@ | |||
# 介绍 | |||
CAP 是一个EventBus,同时也是一个在微服务或者SOA系统中解决分布式事务问题的一个框架。它有助于创建可扩展,可靠并且易于更改的微服务系统。 | |||
在微软的 [eShopOnContainer](https://github.com/dotnet-architecture/eShopOnContainers) 微服务示例项目中,推荐使用 CAP 作为生产环境可用的 EventBus。 | |||
!!! question "什么是 EventBus?" | |||
An Eventbus is a mechanism that allows different components to communicate with each other without knowing about each other. A component can send an Event to the Eventbus without knowing who will pick it up or how many others will pick it up. Components can also listen to Events on an Eventbus, without knowing who sent the Events. That way, components can communicate without depending on each other. Also, it is very easy to substitute a component. As long as the new component understands the Events that are being sent and received, the other components will never know. | |||
相对于其他的 Service Bus 或者 Event Bus, CAP 拥有自己的特色,它不要求使用者发送消息或者处理消息的时候实现或者继承任何接口,拥有非常高的灵活性。我们一直坚信约定大于配置,所以CAP使用起来非常简单,对于新手非常友好,并且拥有轻量级。 | |||
CAP 采用模块化设计,具有高度的可扩展性。你有许多选项可以选择,包括消息队列,存储,序列化方式等,系统的许多元素内容可以替换为自定义实现。 | |||
## 相关视频 | |||
[Video: bilibili 教程](https://www.bilibili.com/video/av31582401/) | |||
[Video: Youtube 教程](https://youtu.be/K1e4e0eddNE) | |||
[Video: 腾讯视频教程](https://www.cnblogs.com/savorboard/p/7243609.html) | |||
## 相关文章 | |||
[Article: CAP 介绍及使用](http://www.cnblogs.com/savorboard/p/cap.html) | |||
[Article: CAP 2.5 版本中的新特性](https://www.cnblogs.com/savorboard/p/cap-2-5.html) | |||
[Article: CAP 2.4 版本中的新特性](http://www.cnblogs.com/savorboard/p/cap-2-4.html) | |||
[Article: CAP 2.3 版本中的新特性用](http://www.cnblogs.com/savorboard/p/cap-2-3.html) | |||
[Article: .NET Core Community 首个千星项目诞生:CAP](https://www.cnblogs.com/forerunner/p/ncc-cap-with-over-thousand-stars.html) |
@@ -1,64 +0,0 @@ | |||
# 快速开始 | |||
了解如何使用 CAP 构建微服务事件总线架构,它比直接集成消息队列提供了哪些优势,它提供了哪些开箱即用的功能。 | |||
## 安装 | |||
```powershell | |||
PM> Install-Package DotNetCore.CAP | |||
``` | |||
## 在 Asp.Net Core 中集成 | |||
以便于快速启动,我们使用基于内存的事件存储和消息队列。 | |||
```powershell | |||
PM> Install-Package DotNetCore.CAP.InMemoryStorage | |||
PM> Install-Package Savorboard.CAP.InMemoryMessageQueue | |||
``` | |||
在 `Startup.cs` 中,添加以下配置: | |||
```c# | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
services.AddCap(x => | |||
{ | |||
x.UseInMemoryStorage(); | |||
x.UseInMemoryMessageQueue(); | |||
}); | |||
} | |||
``` | |||
## 发送消息 | |||
```c# | |||
public class PublishController : Controller | |||
{ | |||
[Route("~/send")] | |||
public IActionResult SendMessage([FromService]ICapPublisher capBus) | |||
{ | |||
capBus.Publish("test.show.time", DateTime.Now); | |||
return Ok(); | |||
} | |||
} | |||
``` | |||
## 处理消息 | |||
```C# | |||
public class ConsumerController : Controller | |||
{ | |||
[NonAction] | |||
[CapSubscribe("test.show.time")] | |||
public void ReceiveMessage(DateTime time) | |||
{ | |||
Console.WriteLine("message time is:" + time); | |||
} | |||
} | |||
``` | |||
## 摘要 | |||
相对于直接集成消息队列,异步消息传递最强大的优势之一是可靠性,系统的一个部分中的故障不会传播,也不会导致整个系统崩溃。 在 CAP 内部会将消息进行存储,以保证消息的可靠性,并配合重试等策略以达到各个服务之间的数据最终一致性。 |
@@ -1,4 +0,0 @@ | |||
# Consul | |||
Consul is a distributed service mesh to connect, secure, and configure services across any runtime platform and public or private cloud. | |||
@@ -1,70 +0,0 @@ | |||
# Dashboard | |||
CAP 原生提供了 Dashboard 供查看消息,利用 Dashboard 提供的功能可以很方便的查看和管理消息。 | |||
## 启用 Dashboard | |||
默认情况下,不会启动Dashboard中间件,要开启Dashboard功能你需要在配置中添加如下代码: | |||
```C# | |||
services.AddCap(x => | |||
{ | |||
//... | |||
// Register Dashboard | |||
x.UseDashboard(); | |||
}); | |||
``` | |||
默认情况下,你可以访问 `http://localhost:xxx/cap` 这个地址打开Dashboard。 | |||
### Dashboard 配置项 | |||
* PathMatch | |||
默认值:'/cap' | |||
你可以通过修改此配置项来更改Dashboard的访问路径。 | |||
* StatsPollingInterval | |||
默认值:2000 毫秒 | |||
此配置项用来配置Dashboard 前端 获取状态接口(/stats)的轮询时间 | |||
* Authorization | |||
此配置项用来配置访问 Dashboard 时的授权过滤器,默认过滤器允许局域网访问,当你的应用想提供外网访问时候,可以通过设置此配置来自定义认证规则。详细参看下一节 | |||
### 自定义认证 | |||
通过实现 `IDashboardAuthorizationFilter` 接口可以自定义Dashboard认证。 | |||
以下是一个示例代码,通过从url请求参数中读取 accesskey 判断是否允许访问。 | |||
```C# | |||
public class TestAuthorizationFilter : IDashboardAuthorizationFilter | |||
{ | |||
public bool Authorize(DashboardContext context) | |||
{ | |||
if(context.Request.GetQuery("accesskey")=="xxxxxx"){ | |||
return true; | |||
} | |||
return false; | |||
} | |||
} | |||
``` | |||
然后在修改注册 Dashboard 时候配置此过滤对象。 | |||
```C# | |||
services.AddCap(x => | |||
{ | |||
//... | |||
// Register Dashboard | |||
x.UseDashboard(opt => { | |||
opt.Authorization = new[] {new TestAuthorizationFilter()}; | |||
}); | |||
}); | |||
``` |
@@ -1,24 +0,0 @@ | |||
# Diagnostics | |||
Diagnostics 提供一组功能使我们能够很方便的可以记录在应用程序运行期间发生的关键性操作以及他们的执行时间等,使管理员可以查找特别是生产环境中出现问题所在的根本原因。 | |||
## CAP 中的 Diagnostics | |||
在 CAP 中,对 `DiagnosticSource` 提供了支持,监听器名称为 `CapDiagnosticListener`。 | |||
Diagnostics 提供对外提供的事件信息有: | |||
* 消息持久化之前 | |||
* 消息持久化之后 | |||
* 消息持久化异常 | |||
* 消息向MQ发送之前 | |||
* 消息向MQ发送之后 | |||
* 消息向MQ发送异常 | |||
* 消息从MQ消费保存之前 | |||
* 消息从MQ消费保存之后 | |||
* 订阅者方法执行之前 | |||
* 订阅者方法执行之后 | |||
* 订阅者方法执行异常 | |||
相关涉及到的对象,你可以在 `DotNetCore.CAP.Diagnostics` 命名空间下看到。 |
@@ -1 +0,0 @@ | |||
# 健康检查 |
@@ -1 +0,0 @@ | |||
# Metrics |
@@ -1,61 +0,0 @@ | |||
# 基本 | |||
CAP 需要使用具有持久化功能的存储介质来存储事件消息,例如通过数据库或者其他NoSql设施。CAP 使用这种方式来应对一切环境或者网络异常导致消息丢失的情况,消息的可靠性是分布式事务的基石,所以在任何情况下消息都不能丢失。 | |||
对于消息的持久化分为两种: | |||
**① 消息进入消息队列之前的持久化** | |||
在消息进入到消息队列之前,CAP使用本地数据库表对消息进行持久化,这样可以保证当消息队列出现异常或者网络错误时候消息是没有丢失的。 | |||
为了保证这种机制的可靠性,CAP使用和业务代码相同的数据库事务来保证业务操作和CAP的消息在持久化的过程中是强一致的。也就是说在进行消息持久化的过程中,任何一方发生异常情况数据库都会进行回滚操作。 | |||
在 CAP 启动后,会向持久化介质中生成两个表,默认情况下名称为:`Cap.Published` `Cap.Received`。 | |||
## 消息存储格式 | |||
**Published** 表结构: | |||
NAME | DESCRIPTION | TYPE | |||
:---|:---|:--- | |||
Id | Message Id | int | |||
Version | Message Version | string | |||
Name | Topic Name | string | |||
Content | Json Content | string | |||
Added | Added Time | DateTime | |||
ExpiresAt | Expire time | DateTime | |||
Retries | Retry times | int | |||
StatusName | Status Name | string | |||
**Received** 表结构: | |||
NAME | DESCRIPTION | TYPE | |||
:---|:---|:--- | |||
Id | Message Id | int | |||
Version | Message Version | string | |||
Name | Topic Name | string | |||
Group | Group Name | string | |||
Content | Json Content | string | |||
Added | Added Time | DateTime | |||
ExpiresAt | Expire time | DateTime | |||
Retries | Retry times | int | |||
StatusName | Status Name | string | |||
CAP 在进行消息发送到时候,会对原始消息对象进行一个二次包装存储到 `Content` 字段中,以下为包装 Content 的 Message 对象数据结构: | |||
NAME | DESCRIPTION | TYPE | |||
:---|:---|:--- | |||
Id | CAP生成的消息编号 | string | |||
Timestamp | 消息创建时间 | string | |||
Content | 内容 | string | |||
CallbackName | 回调的订阅者名称 | string | |||
其中 Id 字段,CAP 采用的 MongoDB 中的 ObjectId 分布式Id生成算法生成。 | |||
**② 消息进入到消息队列之后的持久化** | |||
消息进入到消息队列之后,CAP会启动消息队列的持久化功能,我们需要说明一下在 RabbitMQ 和 Kafka 中CAP的消息是如何持久化的。 | |||
针对于 RabbitMQ 中的消息持久化,CAP 使用的是具有消息持久化功能的消费者队列,但是这里面可能有例外情况,参加 2.2.1 章节。 | |||
由于 Kafka 天生设计的就是使用文件进行的消息持久化,在所以在消息进入到Kafka之后,Kafka会保证消息能够正确被持久化而不丢失。 |
@@ -1 +0,0 @@ | |||
# In-Memory Storage |
@@ -1 +0,0 @@ | |||
# MongoDB |
@@ -1,22 +0,0 @@ | |||
# MySQL | |||
### MySql Options | |||
注意,如果你使用的是 EntityFramewrok,你用不到此配置项。 | |||
CAP 采用的是针对 CapOptions 进行扩展来实现 MySql 的配置功能,所以针对 MySql 的配置用法如下: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UseMySql(mysqlOptions => { | |||
// mysqlOptions.ConnectionString | |||
}); | |||
}); | |||
``` | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
TableNamePrefix | Cap表名前缀 | string | cap | |||
ConnectionString | 数据库连接字符串 | string | null |
@@ -1,22 +0,0 @@ | |||
# Postgre SQL | |||
### PostgreSql Configs | |||
Note that if you are using EntityFramewrok, you do not use this configuration item. | |||
CAP uses PostgreSql configuration functions for CapOptions extensions, so the configuration usage for PostgreSql is as follows: | |||
```c# | |||
services.AddCap(capOptions => { | |||
capOptions.UsePostgreSql(postgreOptions => { | |||
// postgreOptions.ConnectionString | |||
}); | |||
}); | |||
``` | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:------ | |||
Schema | Cap table name prefix | string | cap | |||
ConnectionString | Database connection string | string | null |
@@ -1,22 +0,0 @@ | |||
# SQLServer | |||
### SqlServer Options | |||
注意,如果你使用的是 EntityFramewrok,你用不到此配置项。 | |||
CAP 采用的是针对 CapOptions 进行扩展来实现 SqlServer 的配置功能,所以针对 SqlServer 的配置用法如下: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UseSqlServer(sqlserverOptions => { | |||
// sqlserverOptions.ConnectionString | |||
}); | |||
}); | |||
``` | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
Schema | Cap表架构 | string | Cap | |||
ConnectionString | 数据库连接字符串 | string | null |
@@ -1,2 +0,0 @@ | |||
# eShop On Containers | |||
@@ -1,44 +0,0 @@ | |||
# FAQ | |||
!!! faq "Any IM group(e.g Tencent QQ group) to learn and chat about CAP?" | |||
None for that. Better than wasting much time in IM group, I hope developers could be capable of independent thinking more, and solve problems yourselves with referenced documents, even create issues or send emails when errors are remaining present. | |||
!!! faq "Does it require certain different databases, one each for productor and resumer in CAP?" | |||
Not requird differences necessary, a given advice is that using a special database for each program. | |||
Otherwise, look at Q&A below. | |||
!!! faq "How to use the same database for different applications?" | |||
defining a prefix name of table in `ConfigureServices` method。 | |||
codes exsample: | |||
```c# | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
services.AddCap(x => | |||
{ | |||
x.UseKafka(""); | |||
x.UseMySql(opt => | |||
{ | |||
opt.ConnectionString = "connection string"; | |||
opt.TableNamePrefix = "appone"; // different table name prefix here | |||
}); | |||
}); | |||
} | |||
``` | |||
!!! faq "Can CAP not use the database as event storage? I just want to sent the message" | |||
Not yet. | |||
The purpose of CAP is that ensure consistency principle right in microservice or SOA architechtrues. The solution is based on ACID features of database, there is no sense about a single client wapper of message queue without database. | |||
!!! faq "If the consumer is abnormal, can I roll back the database executed sql that the producer has executed?" | |||
Can't roll back, CAP is the ultimate consistency solution. | |||
You can imagine your scenario is to call a third party payment. If you are doing a third-party payment operation, after calling Alipay's interface successfully, and your own code is wrong, will Alipay roll back? If you don't roll back, what should you do? The same is true here. |
@@ -1 +0,0 @@ | |||
# Github 上的示例 |
@@ -1 +0,0 @@ | |||
# Azure Service Bus |
@@ -1,20 +0,0 @@ | |||
# Transports | |||
Transports move data from one place to another – between acquisition programs and pipelines, between pipelines and the entity database, and even between pipelines and external systems. | |||
## Supported transports | |||
CAP supports several transport methods: | |||
* [RabbitMQ](rabbitmq.md) | |||
* [Kafka](kafka.md) | |||
* [Azure Service Bus](azure-service-bus.md) | |||
* [In-Memory Queue](in-memory-queue.md) | |||
## How to select a transport | |||
TODO: | |||
@@ -1 +0,0 @@ | |||
# In-Memory Message Queue |
@@ -1,20 +0,0 @@ | |||
# Kafka | |||
### Kafka Options | |||
CAP 采用的是针对 CapOptions 进行扩展来实现 Kafka 的配置功能,所以针对 Kafka 的配置用法如下: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UseKafka(kafkaOption=>{ | |||
// kafka options. | |||
// kafkaOptions.MainConfig.Add("", ""); | |||
}); | |||
}); | |||
``` | |||
`KafkaOptions` 提供了有关 Kafka 相关的配置,由于Kafka的配置比较多,所以此处使用的是提供的 MainConfig 字典来支持进行自定义配置,你可以查看这里来获取对配置项的支持信息。 | |||
[https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | |||
@@ -1,28 +0,0 @@ | |||
# RabbitMQ | |||
## RabbitMQ Configs | |||
The CAP uses the CapOptions extension to implement the RabbitMQ configuration function. Therefore, the configuration of the RabbitMQ is used as follows: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UseRabbitMQ(rabbitMQOption=>{ | |||
// rabbitmq options. | |||
}); | |||
}); | |||
``` | |||
`RabbitMQOptions` provides related RabbitMQ configuration: | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:------ | |||
HostName | Host Address | string | localhost | |||
UserName | username | string | guest | |||
Password | Password | string | guest | |||
VirtualHost | Virtual Host | string | / | |||
Port | Port number | int | -1 | |||
TopicExchangeName | CAP Default Exchange Name | string | cap.default.topic | |||
RequestedConnectionTimeout | RabbitMQ Connection Timeout | int | 30,000 milliseconds | |||
SocketReadTimeout | RabbitMQ message read timeout | int | 30,000 milliseconds | |||
SocketWriteTimeout | RabbitMQ message write timeout | int | 30,000 milliseconds | |||
QueueMessageExpires | Automatic deletion of messages in queue | int | (10 days) ms |
@@ -1,155 +0,0 @@ | |||
# Configuration | |||
CAP uses Microsoft.Extensions.DependencyInjection for configuration injection. | |||
## CAP Configs | |||
You can use the following methods to configure some configuration items in the CAP, for example: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.FailedCallback = //... | |||
}); | |||
``` | |||
`CapOptions` provides the following configuration items:: | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:------ | |||
DefaultGroup | Default consumer group to which the subscriber belongs | string | cap.queue+{assembly name} | |||
SuccessedMessageExpiredAfter | Expiration date after successful message was deleted | int | 3600 seconds | |||
FailedCallback|Callback function when the failed message is executed. See below for details | Action | NULL | |||
FailedRetryInterval | Failed Retry Interval | int | 60 seconds | |||
FailedRetryCount | Failed RetryCount | int | 50th | |||
CapOptions provides a callback function for `FailedCallback` to handle failed messages. When the message fails to be sent multiple times, the CAP will mark the message state as `Failed`. The CAP has a special handler to handle this failed message. The failed message will be put back into the queue and sent to MQ. Prior to this, if `FailedCallback` has a value, this callback function will be called first to tell the client. | |||
The type of FailedCallback is `Action<MessageType,string,string>`. The first parameter is the message type (send or receive), the second parameter is the name of the message, and the third parameter is the content of the message. | |||
## RabbitMQ Configs | |||
The CAP uses the CapOptions extension to implement the RabbitMQ configuration function. Therefore, the configuration of the RabbitMQ is used as follows: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UseRabbitMQ(rabbitMQOption=>{ | |||
// rabbitmq options. | |||
}); | |||
}); | |||
``` | |||
`RabbitMQOptions` provides related RabbitMQ configuration: | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:------ | |||
HostName | Host Address | string | localhost | |||
UserName | username | string | guest | |||
Password | Password | string | guest | |||
VirtualHost | Virtual Host | string | / | |||
Port | Port number | int | -1 | |||
TopicExchangeName | CAP Default Exchange Name | string | cap.default.topic | |||
RequestedConnectionTimeout | RabbitMQ Connection Timeout | int | 30,000 milliseconds | |||
SocketReadTimeout | RabbitMQ message read timeout | int | 30,000 milliseconds | |||
SocketWriteTimeout | RabbitMQ message write timeout | int | 30,000 milliseconds | |||
QueueMessageExpires | Automatic deletion of messages in queue | int | (10 days) ms | |||
### Kafka Configs | |||
CAP adopts Kafka's configuration function to expand CapOptions, so the configuration usage for Kafka is as follows: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UseKafka(kafkaOption=>{ | |||
// kafka options. | |||
// kafkaOptions.MainConfig.Add("", ""); | |||
}); | |||
}); | |||
``` | |||
`KafkaOptions` provides Kafka-related configurations. Because Kafka has more configurations, the MainConfig dictionary provided here is used to support custom configurations. You can check here to get support information for configuration items. | |||
[https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | |||
### EntityFramework Configs | |||
If you are using Entityframework as a message persistence store, then you can customize some configuration when configuring the CAP EntityFramework configuration item. | |||
```cs | |||
services.AddCap(x => | |||
{ | |||
x.UseEntityFramework<AppDbContext>(efOption => | |||
{ | |||
// entityframework options. | |||
}); | |||
}); | |||
``` | |||
Note that if you use the `UseEntityFramework` configuration item, then you do not need to reconfigure the following sections for several different database configurations. The CAP will automatically read the database configuration information used in DbContext. | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:------ | |||
Schema | Cap table schema | string | Cap (SQL Server) | |||
Schema | Cap table schema | string | cap (PostgreSql) | |||
TableNamePrefix | Cap table name prefix | string | cap (MySql) | |||
### SqlServer Configs | |||
Note that if you are using EntityFramewrok, you do not use this configuration item. | |||
CAP adopts the configuration function of SqlServer for extending CapOptions. Therefore, the configuration usage of SqlServer is as follows: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UseSqlServer(sqlserverOptions => { | |||
// sqlserverOptions.ConnectionString | |||
}); | |||
}); | |||
``` | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:------ | |||
Schema | Cap Table Schema | string | Cap | |||
ConnectionString | Database connection string | string | null | |||
### MySql Configs | |||
Note that if you are using EntityFramewrok, you do not use this configuration item. | |||
CAP uses the configuration function for MySql that extends for CapOptions, so the configuration usage for MySql is as follows: | |||
```cs | |||
services.AddCap(capOptions => { | |||
capOptions.UseMySql(mysqlOptions => { | |||
// mysqlOptions.ConnectionString | |||
}); | |||
}); | |||
``` | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:------ | |||
TableNamePrefix | Cap table name prefix | string | cap | |||
ConnectionString | Database connection string | string | null | |||
### PostgreSql Configs | |||
Note that if you are using EntityFramewrok, you do not use this configuration item. | |||
CAP uses PostgreSql configuration functions for CapOptions extensions, so the configuration usage for PostgreSql is as follows: | |||
```c# | |||
services.AddCap(capOptions => { | |||
capOptions.UsePostgreSql(postgreOptions => { | |||
// postgreOptions.ConnectionString | |||
}); | |||
}); | |||
``` | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:------ | |||
Schema | Cap table name prefix | string | cap | |||
ConnectionString | Database connection string | string | null |
@@ -1,57 +0,0 @@ | |||
# Design | |||
## Motivation | |||
With the popularity of microservices architecture, more and more people are trying to use microservices to architect their systems. In this we encounter problems such as distributed transactions. To solve these problems, I did not find simplicity and Easy to use solution, so I decided to create such a library to solve this problem. | |||
The original CAP was to solve the transaction problems in the distributed system. She used asynchronous to ensure that this weak consistency transaction mechanism achieved the eventual consistency of the distributed transaction. For more information, see section 6. | |||
Now in addition to solving distributed transaction problems, CAP's other important function is to use it as an EventBus. It has all of the features of EventBus and provides a more simplified way to handle publish/subscribe in EventBus. | |||
## Persistence | |||
The CAP relies on the local database for persistence of messages. The CAP uses this method to deal with situations in which all messages are lost due to environmental or network anomalies. The reliability of messages is the cornerstone of distributed transactions, so messages cannot be lost under any circumstances. | |||
There are two types of persistence for messages: | |||
**1 Persistence before the message enters the message queue** | |||
Before the message enters the message queue, the CAP uses the local database table to persist the message. This ensures that the message is not lost when the message queue is abnormal or the network error occurs. | |||
In order to ensure the reliability of this mechanism, CAP uses database transactions with the same business code to ensure that business operations and CAP messages are strongly consistent throughout the persistence process. That is to say, in the process of message persistence, the database of any abnormal situation will be rolled back. | |||
**2 Persistence after messages enter the message queue** | |||
After the message enters the message queue, the CAP starts the persistence function of the message queue. We need to explain how the message of the CAP in RabbitMQ and Kafka is persistent. | |||
For message persistence in RabbitMQ, CAP uses a consumer queue with message persistence, but there may be exceptions to this and take part in 2.2.1. | |||
Since Kafka is inherently designed to persist messages using files, Kafka ensures that messages are correctly persisted without loss after the message enters Kafka. | |||
## Communication Data Streams | |||
The flow of messages in the CAP is roughly as follows: | |||
>2.2 version before | |||
![](http://images2017.cnblogs.com/blog/250417/201708/250417-20170803174645928-1813351415.png) | |||
> "P" represents the sender of the message (producer). "C" stands for message consumer (subscriber). | |||
**After version 2.2** | |||
In the 2.2 and later versions, we adjusted the flow of some messages. We removed the Queue table in the database and used the memory queue instead. For details, see: [Improve the implementation mechanism of queue mode](https://github.com/dotnetcore/CAP/issues/96) | |||
## Consistency | |||
The CAP uses the ultimate consistency as a consistent solution. This solution follows the CAP theory. The following is the description of the CAP theory. | |||
C (consistent) consistency refers to the atomicity of data. It is guaranteed by transactions in a classic database. When a transaction completes, the data will be in a consistent state regardless of success or rollback. In a distributed environment, consistency is Indicates whether the data of multiple nodes is consistent; | |||
A (availability) service is always available, when the user sends a request, the service can return the result within a certain time; | |||
P (Partition Tolerance) In distributed applications, the system may not operate due to some distributed reasons. The good partition tolerance makes the application a distributed system but it seems to be a functioning whole. | |||
According to ["CAP" distributed theory](https://en.wikipedia.org/wiki/CAP_theorem), in a distributed system, we often reluctantly give up strong consensus support for availability and partition fault tolerance, and instead pursue Ultimate consistency. In most business scenarios, we can accept short-term inconsistencies. | |||
Section 6 will introduce this further. |
@@ -1,44 +0,0 @@ | |||
# FAQ | |||
!!! faq "Any IM group(e.g Tencent QQ group) to learn and chat about CAP?" | |||
None for that. Better than wasting much time in IM group, I hope developers could be capable of independent thinking more, and solve problems yourselves with referenced documents, even create issues or send emails when errors are remaining present. | |||
!!! faq "Does it require certain different databases, one each for productor and resumer in CAP?" | |||
Not requird differences necessary, a given advice is that using a special database for each program. | |||
Otherwise, look at Q&A below. | |||
!!! faq "How to use the same database for different applications?" | |||
defining a prefix name of table in `ConfigureServices` method。 | |||
codes exsample: | |||
```c# | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
services.AddCap(x => | |||
{ | |||
x.UseKafka(""); | |||
x.UseMySql(opt => | |||
{ | |||
opt.ConnectionString = "connection string"; | |||
opt.TableNamePrefix = "appone"; // different table name prefix here | |||
}); | |||
}); | |||
} | |||
``` | |||
!!! faq "Can CAP not use the database as event storage? I just want to sent the message" | |||
Not yet. | |||
The purpose of CAP is that ensure consistency principle right in microservice or SOA architechtrues. The solution is based on ACID features of database, there is no sense about a single client wapper of message queue without database. | |||
!!! faq "If the consumer is abnormal, can I roll back the database executed sql that the producer has executed?" | |||
Can't roll back, CAP is the ultimate consistency solution. | |||
You can imagine your scenario is to call a third party payment. If you are doing a third-party payment operation, after calling Alipay's interface successfully, and your own code is wrong, will Alipay roll back? If you don't roll back, what should you do? The same is true here. |
@@ -1,195 +0,0 @@ | |||
# Getting Stared | |||
## Usage | |||
#### 1. Distributed transaction alternative solution in micro-service base on eventually consistency | |||
A distributed transaction is a very complex process with a lot of moving parts that can fail. Also, if these parts run on different machines or even in different data centers, the process of committing a transaction could become very long and unreliable. | |||
This could seriously affect the user experience and overall system bandwidth. So one of the best ways to solve the problem of distributed transactions is to avoid them completely. | |||
Usually, a microservice is designed in such way as to be independent and useful on its own. It should be able to solve some atomic business task. | |||
If we could split our system in such microservices, there’s a good chance we wouldn’t need to implement transactions between them at all. | |||
By far, one of the most feasible models of handling consistency across microservices is eventual consistency. This model doesn’t enforce distributed ACID transactions across microservices. Instead, it proposes to use some mechanisms of ensuring that the system would be eventually consistent at some point in the future. | |||
CAP ia an alternative solution without transactions, it comply the eventually consistency and implement base on message queue. | |||
#### 2. EventBus with Outbox pattern | |||
CAP is an event bus that implements the Outbox pattern, Outbox is an infrastructure feature which simulates the reliability of distributed transactions without requiring use of the Distributed Transaction Coordinator(DTC). | |||
The outbox feature can be used instead of the DTC to mimic the same level of consistency without using distributed transactions. | |||
!!! Tip "CAP implements the Outbox Pattern described in the [eShop ebook](https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/subscribe-events#designing-atomicity-and-resiliency-when-publishing-to-the-event-bus)" | |||
<img src="https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/media/image24.png"> | |||
> Atomicity when publishing events to the event bus with a worker microservice | |||
## Quick Start | |||
### NuGet Package | |||
Use the following command to reference the CAP NuGet package: | |||
``` | |||
PM> Install-Package DotNetCore.CAP | |||
``` | |||
According to the different types of message queues used, different extension packages are introduced: | |||
``` text | |||
PM> Install-Package DotNetCore.CAP.RabbitMQ | |||
PM> Install-Package DotNetCore.CAP.Kafka | |||
PM> Install-Package DotNetCore.CAP.AzureServiceBus | |||
``` | |||
According to the different types of databases used, different extension packages are introduced: | |||
``` text | |||
PM> Install-Package DotNetCore.CAP.SqlServer | |||
PM> Install-Package DotNetCore.CAP.MySql | |||
PM> Install-Package DotNetCore.CAP.PostgreSql | |||
PM> Install-Package DotNetCore.CAP.MongoDB | |||
``` | |||
### Startup Configuration | |||
In an ASP.NET Core program, you can configure the services used by the CAP in the `Startup.cs` file `ConfigureServices()`: | |||
```c# | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
//...... | |||
services.AddDbContext<AppDbContext>(); //Options, If you are using EF as the ORM | |||
services.AddSingleton<IMongoClient>(new MongoClient("")); //Options, If you are using MongoDB | |||
services.AddCap(x => | |||
{ | |||
// If you are using EF, you need to add the configuration: | |||
//Options, Notice: You don't need to config x.UseSqlServer(""") again! CAP can autodiscovery. | |||
x.UseEntityFramework<AppDbContext>(); | |||
// If you are using Ado.Net, you need to add the configuration: | |||
x.UseSqlServer("Your ConnectionStrings"); | |||
x.UseMySql("Your ConnectionStrings"); | |||
x.UsePostgreSql("Your ConnectionStrings"); | |||
// If you are using MongoDB, you need to add the configuration: | |||
x.UseMongoDB("Your ConnectionStrings"); //MongoDB 4.0+ cluster | |||
// If you are using RabbitMQ, you need to add the configuration: | |||
x.UseRabbitMQ("localhost"); | |||
// If you are using Kafka, you need to add the configuration: | |||
x.UseKafka("localhost"); | |||
}); | |||
} | |||
``` | |||
### Usage | |||
#### Publish | |||
Inject `ICapPublisher` in your Controller, then use the `ICapPublisher` to send message | |||
```c# | |||
public class PublishController : Controller | |||
{ | |||
private readonly ICapPublisher _capBus; | |||
public PublishController(ICapPublisher capPublisher) | |||
{ | |||
_capBus = capPublisher; | |||
} | |||
[Route("~/adonet/transaction")] | |||
public IActionResult AdonetWithTransaction() | |||
{ | |||
using (var connection = new MySqlConnection(ConnectionString)) | |||
{ | |||
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true)) | |||
{ | |||
//your business logic code | |||
_capBus.Publish("xxx.services.show.time", DateTime.Now); | |||
} | |||
} | |||
return Ok(); | |||
} | |||
[Route("~/ef/transaction")] | |||
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext) | |||
{ | |||
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true)) | |||
{ | |||
//your business logic code | |||
_capBus.Publish("xxx.services.show.time", DateTime.Now); | |||
} | |||
return Ok(); | |||
} | |||
} | |||
``` | |||
#### Subscribe | |||
**In Controller Action** | |||
Add the Attribute `[CapSubscribe()]` on Action to subscribe message: | |||
```c# | |||
public class PublishController : Controller | |||
{ | |||
[CapSubscribe("xxx.services.show.time")] | |||
public void CheckReceivedMessage(DateTime datetime) | |||
{ | |||
Console.WriteLine(datetime); | |||
} | |||
} | |||
``` | |||
**In Business Logic Service** | |||
If your subscribe method is not in the Controller, the service class you need to Inheritance `ICapSubscribe`: | |||
```c# | |||
namespace BusinessCode.Service | |||
{ | |||
public interface ISubscriberService | |||
{ | |||
public void CheckReceivedMessage(DateTime datetime); | |||
} | |||
public class SubscriberService: ISubscriberService, ICapSubscribe | |||
{ | |||
[CapSubscribe("xxx.services.show.time")] | |||
public void CheckReceivedMessage(DateTime datetime) | |||
{ | |||
} | |||
} | |||
} | |||
``` | |||
Then inject your `ISubscriberService` class in `Startup.cs` | |||
```c# | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
//Note: The injection of services needs before of `services.AddCap()` | |||
services.AddTransient<ISubscriberService,SubscriberService>(); | |||
services.AddCap(x=> | |||
{ | |||
//... | |||
}); | |||
} | |||
``` |
@@ -1,83 +0,0 @@ | |||
# IMPLEMENTATION | |||
Users can get a ICapPublisher interface from the ASP.NET Core DI container to publish a message .It is initialized by configurations in the `ConfigureServices` and `configure` method in the Startup.cs file,just like the way to initialize a `MiddleWare` in ASP.NET Core. | |||
## Message Table | |||
After initialized, CAP will create two tables in the client side,they are `Cap.Published` and `Cap.Received`. Please noted that different databases may deal letter case differently,if you do not explicitly specify the Schema or the TableName Prefix before project startup,the default names are the ones mentioned above. | |||
**Cap.Published**:Used to store messages(Published by the `ICapPublisher` service) that CAP published to the MQ(Message Queue)Client side | |||
**Cap.Received**:Used to Store messages(subscribed by the `CapSubscribe[]`) subscribed by the MQ(message Queue) client side that CAP received. | |||
Both `Published` and `Received` tables have a `StatusName` field,which is used to mark the status of the current message.Until now it has `Scheduled`,`Successed` and `Failed` statuses. | |||
In the process of dealing with messages,CAP will change the status from `Scheduled` to `Successed`(or `Failed` ).if the final status is `Successed`,it means that the message is sent to MQ successfully,and `Failed` means the message is failed to sent to MQ. | |||
Version later than 2.2, CAP will retry after 4 minutes if the status is `Scheduled` or `Failed`,the retry interval is default to 60 seconds.You can change it by modify `FailedRetryInterval` in `CapOptions`. | |||
## Message format | |||
CAP use JSON to transfer message,the following is CAP's messaging object model: | |||
NAME | DESCRIPTION | TYPE | |||
:---|:---|:--- | |||
Id | Message Id | int | |||
Version | Message Version | string | |||
Name | Name | string | |||
Content | Content | string | |||
Group | Group a message belongs to | string | |||
Added |add time | DateTime | |||
ExpiresAt | expire time | DateTime | |||
Retries | retry times | int | |||
StatusName | Status Name | string | |||
>for `Cap.Received`,there is an extra `Group` filed to mark which group the mesage belongs to. | |||
>for the `Content` property CAP will use a Messsage object to wrap all the contents.The following shows details of the Message Object: | |||
NAME | DESCRIPTION | TYPE | |||
:---|:---|:--- | |||
Id | Generated by CAP | string | |||
Timestamp | message create time | string | |||
Content | content | string | |||
CallbackName | the subscriber which is used to call back | string | |||
CAP use the same algorithms as MongoDB ObjectId's distributed Id generation algorithms. | |||
## EventBus | |||
EventBus adopt the publish-subscribe messaging style to communicate with different components,and there is no need to register it in component explicitly. | |||
![](http://images2017.cnblogs.com/blog/250417/201708/250417-20170804153901240-1774287236.png) | |||
the diagram in the above link shows Eventbus's event flowchart,about EventBus,users can refer to other meterials to learn about it. | |||
We say that CAP implement all the features in Eventbus,EventBus has two features:publish and subscribe,In CAP we implement them in an elegant way.Besides,CAP also has two very robust feature,they are message persistence and messaging reliability under any circumstances,But EventBus don't have such features. | |||
![](https://camo.githubusercontent.com/452505edb71d41f2c1bd18907275b76291621e46/687474703a2f2f696d61676573323031352e636e626c6f67732e636f6d2f626c6f672f3235303431372f3230313730372f3235303431372d32303137303730353137353832373132382d313230333239313436392e706e67) | |||
In CAP,send a message can be regarded as an "Event",When CAP is used in an ASP.NET Core applicaiton,the application has the ablity to publish as well as receive messages. | |||
## Retry | |||
Retry plays a very important role in CAP's infrastructure,CAP will retry for Failed messages.CAP has the following retry strategies: | |||
**1、 Retry on sending** | |||
in the process of sending a message,when the Broker crashed or connection failed or exceptions are thrown,CAP will retry,it will retry 3 times for the first time,if still failed,then it will retry every 1 minute after 4 minutes,the retry the retry count +1,when the retry count come to 50,CAP will not retry any more. | |||
>You can modify `FailedRetryCount` in `CapOptions` to change the default retry count. | |||
As metioned above,when the retry count comes to a certain number,CAP will not retry anymore,this time,you can find out the fail reason in the Dashboard and they deal with it manually. | |||
**2、 Retry on Consuming** | |||
When consumer received messages,specified method in the consumer will be executed,if exceptions are thrown during this course,CAP will retry,the retry strategy is the same as above `Retry on sending`. | |||
## Data clean out | |||
table to store messages in database has an `ExpiresAt` field to mark the expiration time of the message. CAP will set `ExpiresAt` value as **1 hour** for `Successed` messages and **15days** for `Failed` messages. | |||
To avoid performance slow down caused by a large amount of data,CAP will delete expired data every hour by default,the deletion rule is that `ExpiresAt` field's value isn't null and samller than current time.That is, `Failed` messages(it has been retried 50 times by default),if you do not deal with it manually,will also be deleted after 15 days as well,you have to pay attention to it. |
@@ -1,16 +0,0 @@ | |||
# Transaction | |||
For the processing of distributed transactions, this CAP library matches the "Asynchronous recovery events" scenario. | |||
## Asynchronous recovery events | |||
As known as the name "native message table", this is a classic solution, originally from EBay, and referenced links about it are at the end of this section. This is also one of the most popular solutions in the business development. | |||
Compared to TCC or 2pc/3pc, this solution is the simplest one for distributed transactions, and is decentralized. In TCC or 2PC solutions, the common transaction handlers synchronize the state among different services with a transaction coordinator, but it's not much required in this CAP solution. In addition, the deeper references of other conditions these services have, the more management complexity and stability risk may be increased in 2PC/TCC. Imagine that if we have 9 services committed successfully of all 10 whitch relied heavily, though the last one execute fail, should we roll back transactions of those 9 service? In fact, the cost is still very high. | |||
However, it's not mean that 2PC or TCC are at a disadvantage, each has its own suitability and matched scenarios, here won't introduce more. | |||
> cn: [base-an-acid-alternative](http://www.cnblogs.com/savorboard/p/base-an-acid-alternative.html) | |||
> | |||
> en: [Base: An Acid Alternative](http://queue.acm.org/detail.cfm?id=1394128) |