@@ -0,0 +1,95 @@ | |||
# 配置 | |||
默认情况下,你在向IoC容器中注册CAP服务的时候指定配置。 | |||
```c# | |||
services.AddCap(config=> { | |||
// config.XXX | |||
}); | |||
``` | |||
其中 `services` 代表的是 `IServiceCollection` 接口对象,它位于 `Microsoft.Extensions.DependencyInjection` 下面。 | |||
如果你不想使用微软的IoC容器,那么你可以查看 [ASP.NET Core 这里的文档](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/dependency-injection?view=aspnetcore-2.2#default-service-container-replacement) 来了解如何替换默认的容器实现。 | |||
## 什么是最低配置? | |||
最简单的回答就是,至少你要配置一个消息队列和一个事件存储,如果你想快速开始你可以使用下面的配置: | |||
```C# | |||
services.AddCap(config => | |||
{ | |||
config.UseInMemoryQueue(); | |||
config.UseInmemoryStorage(); | |||
}); | |||
``` | |||
有关具体的消息队列的配置和存储的配置,你可以查看 Transports 章节和 Persistent 章节中具体组件提供的配置项。 | |||
## CAP 中的自定义配置 | |||
在 `AddCap` 中 `CapOptions` 对象是用来存储配置相关信息,默认情况下它们都具有一些默认值,有些时候你可能需要自定义。 | |||
#### DefaultGroup | |||
默认值:cap.queue.{程序集名称} | |||
默认的消费者组的名字,在不同的 Transports 中对应不同的名字,可以通过自定义此值来自定义不同 Transports 中的名字,以便于查看。 | |||
> 在 RabbitMQ 中映射到 [Queue Names](https://www.rabbitmq.com/queues.html#names)。 | |||
> 在 Apache Kafka 中映射到 Topic Name。 | |||
> 在 Azure Service Bus 中映射到 Subscription Name。 | |||
#### Version | |||
默认值:v1 | |||
这是在CAP v2.4 版本中引入的新配置项,用于给消息指定版本来隔离不同版本服务的消息,常用于A/B测试或者多服务版本的场景。以下是其应用场景: | |||
!!! info "业务快速迭代,需要向前兼容" | |||
由于业务的快速迭代,在各个服务集成的过程中,消息的数据结构并不是固定不变的,有些时候我们为了适应新引入的需求,会添加或者修改一些数据结构。如果你是一套全新的系统这没有什么问题,但是如果你的系统已经部署到生产环境了并且正在服务客户,这就会导致新的功能在上线的时候和旧的数据结构发生不兼容,那么这些改变可能会导致出现严重的问题,要想解决这个问题,只能把消息队列和持久化的消息全部清空,然后才能启动应用程序,这对于生产环境来说显然是致命的。 | |||
!!! info "多个版本的服务端" | |||
有些时候,App的服务端需要提供多套接口,来支持不同版本的App,这些不同版本的App相同的接口和服务端交互的数据结构可能是不一样的,所以通常情况下服务端提供不用的路由地址来适配不同版本的App调用。 | |||
!!! info "不同实例,使用相同的持久化表/集合" | |||
希望多个不同实例的程序可以公用相同的数据库,在 2.4 之前的版本,我们可以通过指定不同的表名来隔离不同实例的数据库表,即在CAP配置的时候通过配置不同的表名前缀来实现。 | |||
> 查看博客来了解更多关于 Version 的信息: https://www.cnblogs.com/savorboard/p/cap-2-4.html | |||
#### FailedRetryInterval | |||
默认值:60 秒 | |||
在消息发送的时候,如果发送失败,CAP将会对消息进行重试,此配置项用来配置每次重试的间隔时间。 | |||
在消息消费的过程中,如果消费失败,CAP将会对消息进行重试消费,此配置项用来配置每次重试的间隔时间。 | |||
!!! WARNING "重试 & 间隔" | |||
在默认情况下,重试将在发送和消费消息失败的 **4分钟后** 开始,这是为了避免设置消息状态延迟导致可能出现的问题。 | |||
发送和消费消息的过程中失败会立即重试 3 次,在 3 次以后将进入重试轮询,此时 FailedRetryInterval 配置才会生效。 | |||
#### FailedRetryCount | |||
默认值:50 | |||
重试的最大次数。当达到此设置值时,将不会再继续重试,通过改变此参数来设置重试的最大次数。 | |||
#### FailedThresholdCallback | |||
默认值:NULL | |||
类型:`Action<MessageType, string, string>` | |||
> | |||
T1 : Message Type | |||
T2 : Message Name | |||
T3 : Message Content | |||
重试阈值的失败回调。当重试达到 FailedRetryCount 设置的值的时候,将调用此 Action 回调,你可以通过指定此回调来接收失败达到最大的通知,以做出人工介入。例如发送邮件或者短信。 | |||
#### SucceedMessageExpiredAfter | |||
默认值:24*3600 秒(1天后) | |||
成功消息的过期时间(秒)。 当消息发送或者消费成功时候,在时间达到 `SucceedMessageExpiredAfter` 秒时候将会从 Persistent 中删除,你可以通过指定此值来设置过期的时间。 |
@@ -0,0 +1,137 @@ | |||
# Idempotence | |||
幂等性(你可以在[Wikipedia](https://en.wikipedia.org/wiki/Idempotence)读到关于幂等性的定义),当我们谈论幂等时,一般是指可以重复处理传毒的消息,而不是产生意外的结果。 | |||
## 交付保证 | |||
在说幂等性之前,我们先来说下关于消费端的消息交付。 | |||
由于CAP不是使用的 MS DTC 或其他类型的2PC分布式事务机制,所以存在至少消息严格交付一次的问题,具体的说在基于消息的系统中,存在一下三种可能: | |||
* Exactly Once(*) (仅有一次) | |||
* At Most Once (最多一次) | |||
* At Least Once (最少一次) | |||
带 * 号表示在实际场景中,很难达到。 | |||
### At Most Once | |||
最多一次交付保证,涵盖了保证一次或根本不接收所有消息的情况。 | |||
这种类型的传递保证可能来自你的消息系统,你的代码按以下顺序执行其操作: | |||
``` | |||
1. 从队列移除消息 | |||
2. 开始一个工作事务 | |||
3. 处理消息 ( 你的代码 ) | |||
4. 是否成功 ? | |||
Yes: | |||
1. 提交工作事务 | |||
No: | |||
1. 回滚工作事务 | |||
2. 将消息发回到队列。 | |||
``` | |||
正常情况下,他们工作的很好,工作事务将被提交。 | |||
然而,有些时候并不能总是成功,比如在 1 之后出现异常,或者是你在将消息放回到队列中出现网络问题由或者宕机重启等情况。 | |||
使用这个协议,你将冒着丢失消息的风险,如果可以接受,那就没有关系。 | |||
### At Least Once | |||
这个交付保证包含你收到至少一次的消息,当出现故障时,可能会收到多次消息。 | |||
它需要稍微改变我们执行步骤的顺序,它要求消息队列系统支持事务或ACK机制,比如传统的 begin-commit-rollback 协议(MSMQ是这样),或者是 receive-ack-nack 协议(RabbitMQ,Azure Service Bus等是这样的)。 | |||
大致步骤如下: | |||
``` | |||
1. 抢占队列中的消息。 | |||
2. 开始一个工作事务 | |||
3. 处理消息 ( 你的代码 ) | |||
4. 是否成功 ? | |||
Yes: | |||
1. 提交工作事务 | |||
2. 从队列删除消息 | |||
No: | |||
1. 回滚工作事务 | |||
2. 从队列释放抢占的消息 | |||
``` | |||
当出现失败或者抢占消息超时的时候,我们总是能够再次接收到消息以保证我们工作事务提交成功。 | |||
### 什么是 “工作事务” ? | |||
上面所说的“工作事务”并不是特指关系型数据库中的事务,这里的工作事务是一个概念,也就是说执行代码的原子性。 | |||
比如它可以是传统的RDMS事务,也或者是 MongoDB 事务或者是一个交易等。 | |||
在这里它代表一个执行单元,这个执行单元是一个概念性的事实以支持前面提到的仅交付一次的这种问题。 | |||
通常,不可能做到消息的事务和工作事务来形成原子性进行提交或者回滚。 | |||
## CAP 中的幂等性 | |||
在CAP中,我们采用的交付保证为 At Least Once。 | |||
由于我们具有临时存储介质(数据库表),也许可以做到 At Most Once, 但是为了严格保证消息不会丢失,我们没有提供相关功能或配置。 | |||
### 为什么没有实现幂等? | |||
1、消息写入成功了,但是此时执行Consumer方法失败了 | |||
执行Consumer方法失败的原因有非常多,我如果不知道具体的场景盲目进行重试或者不进行重试都是不正确的选择。 | |||
举个例子:假如消费者为扣款服务,如果是执行扣款成功了,但是在写扣款日志的时候失败了,此时CAP会判断为消费者执行失败,进行重试。如果客户端自己没有保证幂等性,框架对其进行重试,这里势必会造成多次扣款出现严重后果。 | |||
2、执行Consumer方法成功了,但是又收到了同样的消息 | |||
此处场景也是可能存在的,假如开始的时候Consumer已经执行成功了,但是由于某种原因如 Broker 宕机恢复等,又收到了相同的消息,CAP 在收到Broker消息后会认为这个是一个新的消息,会对 Consumer再次执行,由于是新消息,此时 CAP 也是无法做到幂等的。 | |||
3、目前的数据存储模式无法做到幂等 | |||
由于CAP存消息的表对于成功消费的消息会于1个小时后删除,所以如果对于一些历史性消息无法做到幂等操作。 历史性指的是,假如 Broker由于某种原因维护了或者是人工处理的一些消息。 | |||
4、业界做法 | |||
许多基于事件驱动的框架都是要求 用户 来保证幂等性操作的,比如 ENode, RocketMQ 等等... | |||
从实现的角度来说,CAP可以做一些比较不严格的幂等,但是严格的幂等无法做到的。 | |||
### 以自然的方式处理幂等消息 | |||
通常情况下,保证消息被执行多次而不会产生意外结果是很自然的一种方式是采用操作对象自带的一些幂等功能。比如: | |||
数据库提供的 `INSERT ON DUPLICATE KEY UPDATE` 或者是采取类型的程序判断行为。 | |||
### 显式处理幂等消息 | |||
另外一种处理幂等性的方式就是在消息传递的过程中传递ID,然后由单独的消息跟踪器来处理。 | |||
比如你使用具有事务数据存储的 IMessageTracker 来跟踪消息ID,你的代码可能看起来像这样: | |||
```c# | |||
readonly IMessageTracker _messageTracker; | |||
public SomeMessageHandler(IMessageTracker messageTracker) | |||
{ | |||
_messageTracker = messageTracker; | |||
} | |||
[CapSubscribe] | |||
public async Task Handle(SomeMessage message) | |||
{ | |||
if (await _messageTracker.HasProcessed(message.Id)) | |||
{ | |||
return; | |||
} | |||
// do the work here | |||
// ... | |||
// remember that this message has been processed | |||
await _messageTracker.MarkAsProcessed(messageId); | |||
} | |||
``` | |||
至于 `IMessageTracker` 的实现,可以使用诸如Redis或者数据库等存储消息Id和对应的处理状态。 |
@@ -0,0 +1,11 @@ | |||
# License | |||
**MIT License** | |||
Copyright © 2016 - 2019 Savorboard | |||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: | |||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. | |||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
@@ -0,0 +1,37 @@ | |||
# 消息 | |||
使用 `ICapPublisher` 接口发送出去的数据称之为 Message (`消息`)。 | |||
## 消息调度 | |||
CAP 接收到消息之后会将消息发送到 Transport, 由 Transport 进行运输。 | |||
当你使用 `ICapPublisher` 接口发送时,CAP将会将消息调度到相应的 Transport中去,目前还不支持批量发送消息。 | |||
有关 Transports 的更多信息,可以查看 [Transports](../transports/general.md) 章节。 | |||
## 消息存储 | |||
CAP 接收到消息之后会将消息进行 Persistent(持久化), 有关 Persistent 的更多信息,可以查看 [Persistent](../persistent/general.md) 章节。 | |||
## 消息重试 | |||
重试在整个CAP架构设计中具有重要作用,CAP 中会针对发送失败或者执行失败的消息进行重试。在整个 CAP 的设计过程中有以下几处采用的重试策略。 | |||
1、 发送重试 | |||
在消息发送过程中,当出现 Broker 宕机或者连接失败的情况亦或者出现异常的情况下,这个时候 CAP 会对发送的重试,第一次重试次数为 3,4分钟后以后每分钟重试一次,进行次数 +1,当总次数达到50次后,CAP将不对其进行重试。 | |||
你可以在 CapOptions 中设置FailedRetryCount来调整默认重试的总次数。 | |||
当失败总次数达到默认失败总次数后,就不会进行重试了,你可以在 Dashboard 中查看消息失败的原因,然后进行人工重试处理。 | |||
2、 消费重试 | |||
当 Consumer 接收到消息时,会执行消费者方法,在执行消费者方法出现异常时,会进行重试。这个重试策略和上面的 发送重试 是相同的。 | |||
## 消息数据清理 | |||
数据库消息表中具有一个 ExpiresAt 字段表示消息的过期时间,当消息发送成功或者消费成功后,CAP会将消息状态为 Successed 的 ExpiresAt 设置为 1小时 后过期,会将消息状态为 Failed 的 ExpiresAt 设置为 15天 后过期。 | |||
CAP 默认情况下会每隔一个小时将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt 不为空并且小于当前时间的数据。 也就是说状态为Failed的消息(正常情况他们已经被重试了 50 次),如果你15天没有人工介入处理,同样会被清理掉。 |
@@ -0,0 +1,7 @@ | |||
# Sagas | |||
Sagas (also known in the literature as "process managers") are stateful services. You can think of them as state machines whose transitions are driven by messages. | |||
## Sagas on CAP | |||
TODO |
@@ -0,0 +1,81 @@ | |||
# 序列化 | |||
CAP 目前还不支持消息本身的序列化,在将消息发送到消息队列之前 CAP 使用 json 对消息对象进行序列化。 | |||
## 内容序列化 | |||
CAP 支持对消息的 Content 字段进行序列化,你可以自定义 `IContentSerializer` 接口来做到这一点。 | |||
目前由于消息对象需要进行数据库存储,所以只支持 string 的序列化和反序例化。 | |||
```csharp | |||
class MyContentSerializer : IContentSerializer | |||
{ | |||
public T DeSerialize<T>(string messageObjStr) | |||
{ | |||
} | |||
public object DeSerialize(string content, Type type) | |||
{ | |||
} | |||
public string Serialize<T>(T messageObj) | |||
{ | |||
} | |||
} | |||
``` | |||
## 消息适配器 | |||
在异构系统中,有时候需要和其他系统进行通讯,但是其他系统使用的消息对象可能和 CAP 的[**包装器对象**](../persistent/general.md#_7)不一样,这个时候就需要对消息进行自定义适配。 | |||
CAP 提供了 `IMessagePacker` 接口用于对 [**包装器对象**](../persistent/general.md#_7) 进行自定义,自定义的 MessagePacker 通常是将 `CapMessage` 进行打包和解包操作,在这个过程中可以添加自己的业务对象。 | |||
使用方法: | |||
```csharp | |||
class MyMessagePacker : IMessagePacker | |||
{ | |||
private readonly IContentSerializer _serializer; | |||
public DefaultMessagePacker(IContentSerializer serializer) | |||
{ | |||
_serializer = serializer; | |||
} | |||
public string Pack(CapMessage obj) | |||
{ | |||
var myStructure = new | |||
{ | |||
Id = obj.Id, | |||
Body = obj.Content, | |||
Date = obj.Timestamp, | |||
Callback = obj.CallbackName | |||
}; | |||
return _serializer.Serialize(myStructure); | |||
} | |||
public CapMessage UnPack(string packingMessage) | |||
{ | |||
var myStructure = _serializer.DeSerialize<MyStructure>(packingMessage); | |||
return new CapMessageDto | |||
{ | |||
Id = myStructure.Id, | |||
Timestamp = myStructure.Date, | |||
Content = myStructure.Body, | |||
CallbackName = myStructure.Callback | |||
}; | |||
} | |||
} | |||
``` | |||
接下来,配置自定义的 `MyMessagePacker` 到服务中。 | |||
```csharp | |||
services.AddCap(x =>{ }).AddMessagePacker<MyMessagePacker>(); | |||
``` |
@@ -0,0 +1,21 @@ | |||
# 事务 | |||
## 分布式事务? | |||
CAP 不直接提供开箱即用的基于 DTC 或者 2PC 的分布式事务,相反我们提供一种可以用于解决在分布式事务遇到的问题的一种解决方案。 | |||
在分布式环境中,由于涉及通讯的开销,使用基于2PC或DTC的分布式事务将非常昂贵,在性能方面也同样如此。另外由于基于2PC或DTC的分布式事务同样受**CAP定理**的约束,当发生网络分区时它将不得不放弃可用性(CAP中的A)。 | |||
针对于分布式事务的处理,CAP 采用的是“异步确保”这种方案。 | |||
### 异步确保 | |||
异步确保这种方案又叫做本地消息表,这是一种经典的方案,方案最初来源于 eBay,参考资料见段末链接。这种方案目前也是企业中使用最多的方案之一。 | |||
相对于 TCC 或者 2PC/3PC 来说,这个方案对于分布式事务来说是最简单的,而且它是去中心化的。在TCC 或者 2PC 的方案中,必须具有事务协调器来处理每个不同服务之间的状态,而此种方案不需要事务协调器。 | |||
另外 2PC/TCC 这种方案如果服务依赖过多,会带来管理复杂性增加和稳定性风险增大的问题。试想如果我们强依赖 10 个服务,9 个都执行成功了,最后一个执行失败了,那么是不是前面 9 个都要回滚掉?这个成本还是非常高的。 | |||
但是,并不是说 2PC 或者 TCC 这种方案不好,因为每一种方案都有其相对优势的使用场景和优缺点,这里就不做过多介绍了。 | |||
> 中文:[http://www.cnblogs.com/savorboard/p/base-an-acid-alternative.html](http://www.cnblogs.com/savorboard/p/base-an-acid-alternative.html) | |||
> 英文:[http://queue.acm.org/detail.cfm?id=1394128](http://queue.acm.org/detail.cfm?id=1394128) |
@@ -0,0 +1,3 @@ | |||
# 升级指导 | |||
请参阅 Github Release 页面: [https://github.com/dotnetcore/CAP/releases](https://github.com/dotnetcore/CAP/releases) |
@@ -0,0 +1,21 @@ | |||
# 贡献 | |||
One of the easiest ways to contribute is to participate in discussions and discuss issues. | |||
If you have any question or problems, please report them on the CAP repository: | |||
<a href="https://github.com/dotnetcore/cap/issues/new"><button data-md-color-primary="purple"><i class="fa fa-github fa-2x"></i> Report Issue</button></a> | |||
<a href="https://github.com/dotnetcore/cap/issues"><button data-md-color-primary="purple" type="submit"> Active Issues <i class="fa fa-github fa-2x"></i></button></a> | |||
## Submitting Changes | |||
You can also contribute by submitting pull requests with code changes. | |||
> | |||
Pull requests let you tell others about changes you've pushed to a repository on GitHub. Once a pull request is opened, you can discuss and review the potential changes with collaborators and add follow-up commits before the changes are merged into the repository. | |||
## Additional Resources | |||
* [Filtering issues and pull requests](https://help.github.com/articles/filtering-issues-and-pull-requests/) | |||
* [Using search to filter issues and pull requests](https://help.github.com/articles/using-search-to-filter-issues-and-pull-requests/) |
@@ -0,0 +1,34 @@ | |||
# 介绍 | |||
CAP 是一个EventBus,同时也是一个在微服务或者SOA系统中解决分布式事务问题的一个框架。它有助于创建可扩展,可靠并且易于更改的微服务系统。 | |||
在微软的 [eShopOnContainer](https://github.com/dotnet-architecture/eShopOnContainers) 微服务示例项目中,推荐使用 CAP 作为生产环境可用的 EventBus。 | |||
!!! question "什么是 EventBus?" | |||
An Eventbus is a mechanism that allows different components to communicate with each other without knowing about each other. A component can send an Event to the Eventbus without knowing who will pick it up or how many others will pick it up. Components can also listen to Events on an Eventbus, without knowing who sent the Events. That way, components can communicate without depending on each other. Also, it is very easy to substitute a component. As long as the new component understands the Events that are being sent and received, the other components will never know. | |||
相对于其他的 Service Bus 或者 Event Bus, CAP 拥有自己的特色,它不要求使用者发送消息或者处理消息的时候实现或者继承任何接口,拥有非常高的灵活性。我们一直坚信约定大于配置,所以CAP使用起来非常简单,对于新手非常友好,并且拥有轻量级。 | |||
CAP 采用模块化设计,具有高度的可扩展性。你有许多选项可以选择,包括消息队列,存储,序列化方式等,系统的许多元素内容可以替换为自定义实现。 | |||
## 相关视频 | |||
[Video: bilibili 教程](https://www.bilibili.com/video/av31582401/) | |||
[Video: Youtube 教程](https://youtu.be/K1e4e0eddNE) | |||
[Video: 腾讯视频教程](https://www.cnblogs.com/savorboard/p/7243609.html) | |||
## 相关文章 | |||
[Article: CAP 介绍及使用](http://www.cnblogs.com/savorboard/p/cap.html) | |||
[Article: CAP 2.5 版本中的新特性](https://www.cnblogs.com/savorboard/p/cap-2-5.html) | |||
[Article: CAP 2.4 版本中的新特性](http://www.cnblogs.com/savorboard/p/cap-2-4.html) | |||
[Article: CAP 2.3 版本中的新特性用](http://www.cnblogs.com/savorboard/p/cap-2-3.html) | |||
[Article: .NET Core Community 首个千星项目诞生:CAP](https://www.cnblogs.com/forerunner/p/ncc-cap-with-over-thousand-stars.html) |
@@ -0,0 +1,64 @@ | |||
# 快速开始 | |||
了解如何使用 CAP 构建微服务事件总线架构,它比直接集成消息队列提供了哪些优势,它提供了哪些开箱即用的功能。 | |||
## 安装 | |||
```powershell | |||
PM> Install-Package DotNetCore.CAP | |||
``` | |||
## 在 Asp.Net Core 中集成 | |||
以便于快速启动,我们使用基于内存的事件存储和消息队列。 | |||
```powershell | |||
PM> Install-Package DotNetCore.CAP.InMemoryStorage | |||
PM> Install-Package Savorboard.CAP.InMemoryMessageQueue | |||
``` | |||
在 `Startup.cs` 中,添加以下配置: | |||
```c# | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
services.AddCap(x => | |||
{ | |||
x.UseInMemoryStorage(); | |||
x.UseInMemoryMessageQueue(); | |||
}); | |||
} | |||
``` | |||
## 发送消息 | |||
```c# | |||
public class PublishController : Controller | |||
{ | |||
[Route("~/send")] | |||
public IActionResult SendMessage([FromService]ICapPublisher capBus) | |||
{ | |||
capBus.Publish("test.show.time", DateTime.Now); | |||
return Ok(); | |||
} | |||
} | |||
``` | |||
## 处理消息 | |||
```C# | |||
public class ConsumerController : Controller | |||
{ | |||
[NonAction] | |||
[CapSubscribe("test.show.time")] | |||
public void ReceiveMessage(DateTime time) | |||
{ | |||
Console.WriteLine("message time is:" + time); | |||
} | |||
} | |||
``` | |||
## 摘要 | |||
相对于直接集成消息队列,异步消息传递最强大的优势之一是可靠性,系统的一个部分中的故障不会传播,也不会导致整个系统崩溃。 在 CAP 内部会将消息进行存储,以保证消息的可靠性,并配合重试等策略以达到各个服务之间的数据最终一致性。 |
@@ -0,0 +1,4 @@ | |||
# Consul | |||
Consul is a distributed service mesh to connect, secure, and configure services across any runtime platform and public or private cloud. | |||
@@ -0,0 +1,70 @@ | |||
# Dashboard | |||
CAP 原生提供了 Dashboard 供查看消息,利用 Dashboard 提供的功能可以很方便的查看和管理消息。 | |||
## 启用 Dashboard | |||
默认情况下,不会启动Dashboard中间件,要开启Dashboard功能你需要在配置中添加如下代码: | |||
```C# | |||
services.AddCap(x => | |||
{ | |||
//... | |||
// Register Dashboard | |||
x.UseDashboard(); | |||
}); | |||
``` | |||
默认情况下,你可以访问 `http://localhost:xxx/cap` 这个地址打开Dashboard。 | |||
### Dashboard 配置项 | |||
* PathMatch | |||
默认值:'/cap' | |||
你可以通过修改此配置项来更改Dashboard的访问路径。 | |||
* StatsPollingInterval | |||
默认值:2000 毫秒 | |||
此配置项用来配置Dashboard 前端 获取状态接口(/stats)的轮询时间 | |||
* Authorization | |||
此配置项用来配置访问 Dashboard 时的授权过滤器,默认过滤器允许局域网访问,当你的应用想提供外网访问时候,可以通过设置此配置来自定义认证规则。详细参看下一节 | |||
### 自定义认证 | |||
通过实现 `IDashboardAuthorizationFilter` 接口可以自定义Dashboard认证。 | |||
以下是一个示例代码,通过从url请求参数中读取 accesskey 判断是否允许访问。 | |||
```C# | |||
public class TestAuthorizationFilter : IDashboardAuthorizationFilter | |||
{ | |||
public bool Authorize(DashboardContext context) | |||
{ | |||
if(context.Request.GetQuery("accesskey")=="xxxxxx"){ | |||
return true; | |||
} | |||
return false; | |||
} | |||
} | |||
``` | |||
然后在修改注册 Dashboard 时候配置此过滤对象。 | |||
```C# | |||
services.AddCap(x => | |||
{ | |||
//... | |||
// Register Dashboard | |||
x.UseDashboard(opt => { | |||
opt.Authorization = new[] {new TestAuthorizationFilter()}; | |||
}); | |||
}); | |||
``` |
@@ -0,0 +1,24 @@ | |||
# Diagnostics | |||
Diagnostics 提供一组功能使我们能够很方便的可以记录在应用程序运行期间发生的关键性操作以及他们的执行时间等,使管理员可以查找特别是生产环境中出现问题所在的根本原因。 | |||
## CAP 中的 Diagnostics | |||
在 CAP 中,对 `DiagnosticSource` 提供了支持,监听器名称为 `CapDiagnosticListener`。 | |||
Diagnostics 提供对外提供的事件信息有: | |||
* 消息持久化之前 | |||
* 消息持久化之后 | |||
* 消息持久化异常 | |||
* 消息向MQ发送之前 | |||
* 消息向MQ发送之后 | |||
* 消息向MQ发送异常 | |||
* 消息从MQ消费保存之前 | |||
* 消息从MQ消费保存之后 | |||
* 订阅者方法执行之前 | |||
* 订阅者方法执行之后 | |||
* 订阅者方法执行异常 | |||
相关涉及到的对象,你可以在 `DotNetCore.CAP.Diagnostics` 命名空间下看到。 |
@@ -0,0 +1,3 @@ | |||
# 健康检查 | |||
TODO |
@@ -0,0 +1,3 @@ | |||
# Metrics | |||
TODO: |
@@ -0,0 +1,65 @@ | |||
# 基本 | |||
CAP 需要使用具有持久化功能的存储介质来存储事件消息,例如通过数据库或者其他NoSql设施。CAP 使用这种方式来应对一切环境或者网络异常导致消息丢失的情况,消息的可靠性是分布式事务的基石,所以在任何情况下消息都不能丢失。 | |||
## 持久化 | |||
### 发送前 | |||
在消息进入到消息队列之前,CAP使用本地数据库表对消息进行持久化,这样可以保证当消息队列出现异常或者网络错误时候消息是没有丢失的。 | |||
为了保证这种机制的可靠性,CAP使用和业务代码相同的数据库事务来保证业务操作和CAP的消息在持久化的过程中是强一致的。也就是说在进行消息持久化的过程中,任何一方发生异常情况数据库都会进行回滚操作。 | |||
### 发送后 | |||
消息进入到消息队列之后,CAP会启动消息队列的持久化功能,我们需要说明一下在 RabbitMQ 和 Kafka 中CAP的消息是如何持久化的。 | |||
针对于 RabbitMQ 中的消息持久化,CAP 使用的是具有消息持久化功能的消费者队列,但是这里面可能有例外情况,参加 2.2.1 章节。 | |||
由于 Kafka 天生设计的就是使用文件进行的消息持久化,在所以在消息进入到Kafka之后,Kafka会保证消息能够正确被持久化而不丢失。 | |||
## 消息存储 | |||
在 CAP 启动后,会向持久化介质中生成两个表,默认情况下名称为:`Cap.Published` `Cap.Received`。 | |||
### 存储格式 | |||
**Published** 表结构: | |||
NAME | DESCRIPTION | TYPE | |||
:---|:---|:--- | |||
Id | Message Id | int | |||
Version | Message Version | string | |||
Name | Topic Name | string | |||
Content | Json Content | string | |||
Added | Added Time | DateTime | |||
ExpiresAt | Expire time | DateTime | |||
Retries | Retry times | int | |||
StatusName | Status Name | string | |||
**Received** 表结构: | |||
NAME | DESCRIPTION | TYPE | |||
:---|:---|:--- | |||
Id | Message Id | int | |||
Version | Message Version | string | |||
Name | Topic Name | string | |||
Group | Group Name | string | |||
Content | Json Content | string | |||
Added | Added Time | DateTime | |||
ExpiresAt | Expire time | DateTime | |||
Retries | Retry times | int | |||
StatusName | Status Name | string | |||
### 包装器对象 | |||
CAP 在进行消息发送到时候,会对原始消息对象进行一个二次包装存储到 `Content` 字段中,以下为包装 Content 的 Message 对象数据结构: | |||
NAME | DESCRIPTION | TYPE | |||
:---|:---|:--- | |||
Id | CAP生成的消息编号 | string | |||
Timestamp | 消息创建时间 | string | |||
Content | 内容 | string | |||
CallbackName | 回调的订阅者名称 | string | |||
其中 Id 字段,CAP 采用的 MongoDB 中的 ObjectId 分布式Id生成算法生成。 |
@@ -0,0 +1,36 @@ | |||
# In-Memory Storage | |||
内存消息的持久化存储常用于开发和测试环境,如果使用基于内存的存储则你会失去本地事务消息可靠性保证。 | |||
## 配置 | |||
如果要使用内存存储,你需要从 NuGet 安装以下扩展包: | |||
``` | |||
Install-Package DotNetCore.CAP.InMemoryStorage | |||
``` | |||
然后,你可以在 `Startup.cs` 的 `ConfigureServices` 方法中添加基于内存的配置项。 | |||
```csharp | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
// ... | |||
services.AddCap(x => | |||
{ | |||
x.UseInMemoryStorage(); | |||
// x.UseXXX ... | |||
}); | |||
} | |||
``` | |||
内存中的发送成功消息,CAP 将会每 5分钟 进行一次清理。 | |||
## Publish with transaction | |||
In-Memory 存储 **不支持** 事务方式发送消息。 | |||
@@ -0,0 +1,68 @@ | |||
# MongoDB | |||
MongoDB 是一个跨平台的面向文档型的数据库程序,它被归为 NOSQL 数据库,CAP 从 2.3 版本开始支持 MongoDB 作为消息存储。 | |||
MongoDB 从 4.0 版本开始支持 ACID 事务,所以 CAP 也只支持 4.0 以上的 MongoDB,并且 MongoDB 需要部署为集群,因为 MongoDB 的 ACID 事务需要集群才可以使用。 | |||
有关开发环境如何快速搭建 MongoDB 4.0+ 集群,你可以我的参考 [这篇文章](https://www.cnblogs.com/savorboard/p/mongodb-4-cluster-install.html)。 | |||
## 配置 | |||
要使用 MongoDB 存储,你需要从 NuGet 安装以下扩展包: | |||
```shell | |||
Install-Package DotNetCore.CAP.MongoDB | |||
``` | |||
然后,你可以在 `Startup.cs` 的 `ConfigureServices` 方法中添加基于内存的配置项。 | |||
```csharp | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
// ... | |||
services.AddCap(x => | |||
{ | |||
x.UseMongoDB(opt=>{ | |||
//MongoDBOptions | |||
}); | |||
// x.UseXXX ... | |||
}); | |||
} | |||
``` | |||
#### MongoDBOptions | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
DatabaseName | 数据库名称 | string | cap | |||
DatabaseConnection | 数据库连接字符串 | string | mongodb://localhost:27017 | |||
ReceivedCollection | 接收消息集合名称 | string | cap.received | |||
PublishedCollection | 发送消息集合名称 | string | cap.published | |||
## Publish with transaction | |||
下面的示例展示了如何利用 CAP 和 MongoDB 进行本地事务集成。 | |||
```csharp | |||
//NOTE: before your test, your need to create database and collection at first | |||
//注意:MongoDB 不能在事务中创建数据库和集合,所以你需要单独创建它们,模拟一条记录插入则会自动创建 | |||
//var mycollection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection"); | |||
//mycollection.InsertOne(new BsonDocument { { "test", "test" } }); | |||
using (var session = _client.StartTransaction(_capBus, autoCommit: false)) | |||
{ | |||
var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection"); | |||
collection.InsertOne(session, new BsonDocument { { "hello", "world" } }); | |||
_capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now); | |||
session.CommitTransaction(); | |||
} | |||
``` |
@@ -0,0 +1,78 @@ | |||
# MySQL | |||
MySQL 是一个开源的关系型数据库,你可以使用 MySQL 来作为 CAP 消息的持久化。 | |||
## 配置 | |||
要使用 MySQL 存储,你需要从 NuGet 安装以下扩展包: | |||
```shell | |||
Install-Package DotNetCore.CAP.MySql | |||
``` | |||
然后,你可以在 `Startup.cs` 的 `ConfigureServices` 方法中添加基于内存的配置项。 | |||
```csharp | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
// ... | |||
services.AddCap(x => | |||
{ | |||
x.UseMySql(opt=>{ | |||
//MySqlOptions | |||
}); | |||
// x.UseXXX ... | |||
}); | |||
} | |||
``` | |||
#### MySqlOptions | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
TableNamePrefix | Cap表名前缀 | string | cap | |||
ConnectionString | 数据库连接字符串 | string | null | |||
## Publish with transaction | |||
### ADO.NET with transaction | |||
```csharp | |||
private readonly ICapPublisher _capBus; | |||
using (var connection = new MySqlConnection(AppDbContext.ConnectionString)) | |||
{ | |||
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false)) | |||
{ | |||
//your business code | |||
connection.Execute("insert into test(name) values('test')", | |||
transaction: (IDbTransaction)transaction.DbTransaction); | |||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||
transaction.Commit(); | |||
} | |||
} | |||
``` | |||
### EntityFramework with transaction | |||
```csharp | |||
private readonly ICapPublisher _capBus; | |||
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false)) | |||
{ | |||
dbContext.Persons.Add(new Person() { Name = "ef.transaction" }); | |||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||
dbContext.SaveChanges(); | |||
trans.Commit(); | |||
} | |||
``` |
@@ -0,0 +1,80 @@ | |||
# Postgre SQL | |||
PostgreSQL 是一个开源的关系型数据库,它已经变得越来越流行,你可以使用 Postgre SQL 来作为 CAP 消息的持久化。 | |||
## 配置 | |||
要使用 PostgreSQL 存储,你需要从 NuGet 安装以下扩展包: | |||
```shell | |||
Install-Package DotNetCore.CAP.PostgreSql | |||
``` | |||
然后,你可以在 `Startup.cs` 的 `ConfigureServices` 方法中添加基于内存的配置项。 | |||
```csharp | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
// ... | |||
services.AddCap(x => | |||
{ | |||
x.UsePostgreSql(opt=>{ | |||
//PostgreSqlOptions | |||
}); | |||
// x.UseXXX ... | |||
}); | |||
} | |||
``` | |||
#### PostgreSqlOptions | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
Schema | 数据库架构 | string | cap | |||
ConnectionString | 数据库连接字符串 | string | | |||
## Publish with transaction | |||
### ADO.NET with transaction | |||
```csharp | |||
private readonly ICapPublisher _capBus; | |||
using (var connection = new NpgsqlConnection("ConnectionString")) | |||
{ | |||
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false)) | |||
{ | |||
//your business code | |||
connection.Execute("insert into test(name) values('test')", | |||
transaction: (IDbTransaction)transaction.DbTransaction); | |||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||
transaction.Commit(); | |||
} | |||
} | |||
``` | |||
### EntityFramework with transaction | |||
```csharp | |||
private readonly ICapPublisher _capBus; | |||
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false)) | |||
{ | |||
dbContext.Persons.Add(new Person() { Name = "ef.transaction" }); | |||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||
dbContext.SaveChanges(); | |||
trans.Commit(); | |||
} | |||
``` |
@@ -0,0 +1,80 @@ | |||
# SQL Server | |||
SQL Server 是由微软开发的一个关系型数据库,你可以使用 SQL Server 来作为 CAP 消息的持久化。 | |||
## 配置 | |||
要使用 SQL Server 存储,你需要从 NuGet 安装以下扩展包: | |||
```shell | |||
Install-Package DotNetCore.CAP.SqlServer | |||
``` | |||
然后,你可以在 `Startup.cs` 的 `ConfigureServices` 方法中添加基于内存的配置项。 | |||
```csharp | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
// ... | |||
services.AddCap(x => | |||
{ | |||
x.UsePostgreSql(opt=>{ | |||
//SqlServerOptions | |||
}); | |||
// x.UseXXX ... | |||
}); | |||
} | |||
``` | |||
#### SqlServerOptions | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
Schema | 数据库架构 | string | Cap | |||
ConnectionString | 数据库连接字符串 | string | | |||
## Publish with transaction | |||
### ADO.NET with transaction | |||
```csharp | |||
private readonly ICapPublisher _capBus; | |||
using (var connection = new SqlConnection("ConnectionString")) | |||
{ | |||
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false)) | |||
{ | |||
//your business code | |||
connection.Execute("insert into test(name) values('test')", | |||
transaction: (IDbTransaction)transaction.DbTransaction); | |||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||
transaction.Commit(); | |||
} | |||
} | |||
``` | |||
### EntityFramework with transaction | |||
```csharp | |||
private readonly ICapPublisher _capBus; | |||
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false)) | |||
{ | |||
dbContext.Persons.Add(new Person() { Name = "ef.transaction" }); | |||
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); | |||
dbContext.SaveChanges(); | |||
trans.Commit(); | |||
} | |||
``` |
@@ -0,0 +1,15 @@ | |||
# eShopOnContainers | |||
eShopOnContainers is a sample application written in C# running on .NET Core using a microservice architecture, Domain Driven Design. | |||
> .NET Core reference application, powered by Microsoft, based on a simplified microservices architecture and Docker containers. | |||
> This reference application is cross-platform at the server and client side, thanks to .NET Core services capable of running on Linux or Windows containers depending on your Docker host, and to Xamarin for mobile apps running on Android, iOS or Windows/UWP plus any browser for the client web apps. | |||
> The architecture proposes a microservice oriented architecture implementation with multiple autonomous microservices (each one owning its own data/db) and implementing different approaches within each microservice (simple CRUD vs. DDD/CQRS patterns) using Http as the communication protocol between the client apps and the microservices and supports asynchronous communication for data updates propagation across multiple services based on Integration Events and an Event Bus (a light message broker, to choose between RabbitMQ or Azure Service Bus, underneath) plus other features defined at the roadmap. | |||
## eShopOnContainers with CAP | |||
你可以在下面的地址看到如何在 eShopOnContainers 中使用 CAP。 | |||
https://github.com/yang-xiaodong/eShopOnContainers |
@@ -0,0 +1,44 @@ | |||
# FAQ | |||
!!! faq "Any IM group(e.g Tencent QQ group) to learn and chat about CAP?" | |||
None for that. Better than wasting much time in IM group, I hope developers could be capable of independent thinking more, and solve problems yourselves with referenced documents, even create issues or send emails when errors are remaining present. | |||
!!! faq "Does it require certain different databases, one each for productor and resumer in CAP?" | |||
Not requird differences necessary, a given advice is that using a special database for each program. | |||
Otherwise, look at Q&A below. | |||
!!! faq "How to use the same database for different applications?" | |||
defining a prefix name of table in `ConfigureServices` method。 | |||
codes exsample: | |||
```c# | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
services.AddCap(x => | |||
{ | |||
x.UseKafka(""); | |||
x.UseMySql(opt => | |||
{ | |||
opt.ConnectionString = "connection string"; | |||
opt.TableNamePrefix = "appone"; // different table name prefix here | |||
}); | |||
}); | |||
} | |||
``` | |||
!!! faq "Can CAP not use the database as event storage? I just want to sent the message" | |||
Not yet. | |||
The purpose of CAP is that ensure consistency principle right in microservice or SOA architechtrues. The solution is based on ACID features of database, there is no sense about a single client wapper of message queue without database. | |||
!!! faq "If the consumer is abnormal, can I roll back the database executed sql that the producer has executed?" | |||
Can't roll back, CAP is the ultimate consistency solution. | |||
You can imagine your scenario is to call a third party payment. If you are doing a third-party payment operation, after calling Alipay's interface successfully, and your own code is wrong, will Alipay roll back? If you don't roll back, what should you do? The same is true here. |
@@ -0,0 +1,5 @@ | |||
# Github 上的示例 | |||
你可以在下面的地址找到相关示例代码: | |||
https://github.com/dotnetcore/CAP/tree/master/samples |
@@ -0,0 +1,48 @@ | |||
# Azure Service Bus | |||
Azure 服务总线是一种多租户云消息服务,可用于在应用程序和服务之间发送信息。 异步操作可实现灵活的中转消息传送、结构化的先进先出 (FIFO) 消息传送以及发布/订阅功能。 | |||
CAP 支持使用 Azure Service Bus 作为消息传输器。 | |||
## Configuration | |||
!!! warning "必要条件" | |||
针对 Service Bus 定价层, CAP 要求使用 “标准” 或者 “高级” 以支持 Topic 功能。 | |||
要使用 Azure Service Bus 作为消息传输器,你需要从 NuGet 安装以下扩展包: | |||
```shell | |||
Install-Package DotNetCore.CAP.AzureServiceBus | |||
``` | |||
然后,你可以在 `Startup.cs` 的 `ConfigureServices` 方法中添加基于内存的配置项。 | |||
```csharp | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
// ... | |||
services.AddCap(x => | |||
{ | |||
x.UseAzureServiceBus(opt=> | |||
{ | |||
//AzureServiceBusOptions | |||
}); | |||
// x.UseXXX ... | |||
}); | |||
} | |||
``` | |||
#### AzureServiceBus Options | |||
CAP 直接对外提供的 Kafka 配置参数如下: | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
ConnectionString | Endpoint 地址 | string | | |||
TopicPath | Topic entity path | string | cap | |||
ManagementTokenProvider | Token提供 | ITokenProvider | null |
@@ -0,0 +1,29 @@ | |||
# Transports | |||
Transports move data from one place to another – between acquisition programs and pipelines, between pipelines and the entity database, and even between pipelines and external systems. | |||
## Supported transports | |||
CAP supports several transport methods: | |||
* [RabbitMQ](rabbitmq.md) | |||
* [Kafka](kafka.md) | |||
* [Azure Service Bus](azure-service-bus.md) | |||
* [In-Memory Queue](in-memory-queue.md) | |||
## How to select a transport | |||
🏳🌈 | RabbitMQ | Kafka | Azure Service Bus | In-Memory | |||
:-- | :--: | :--: | :--: | :-- : | |||
**定位** | 可靠消息传输 | 实时数据处理 | 云 | 内存型,测试 | |||
**分布式** | ✔ | ✔ | ✔ |❌ | |||
**持久化** | ✔ | ✔ | ✔ | ❌ | |||
**性能** | Medium | High | Medium | High | |||
> `Azure Service Bus` vs `RabbitMQ` : | |||
> http://geekswithblogs.net/michaelstephenson/archive/2012/08/12/150399.aspx | |||
>`Kafka` vs `RabbitMQ` : | |||
> https://stackoverflow.com/questions/42151544/is-there-any-reason-to-use-rabbitmq-over-kafka | |||
@@ -0,0 +1,30 @@ | |||
# In-Memory Queue | |||
In Memory Queue 为基于内存的消息队列,该扩展由 [社区](https://github.com/yang-xiaodong/Savorboard.CAP.InMemoryMessageQueue) 进行提供。 | |||
## 配置 | |||
要使用 In Memory Queue 作为消息传输器,你需要从 NuGet 安装以下扩展包: | |||
```shell | |||
Install-Package Savorboard.CAP.InMemoryMessageQueue | |||
``` | |||
然后,你可以在 `Startup.cs` 的 `ConfigureServices` 方法中添加基于内存的配置项。 | |||
```csharp | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
// ... | |||
services.AddCap(x => | |||
{ | |||
x.UseInMemoryMessageQueue(); | |||
// x.UseXXX ... | |||
}); | |||
} | |||
``` |
@@ -0,0 +1,64 @@ | |||
# Apache Kafka® | |||
[Apache Kafka®](https://kafka.apache.org/) 是一个开源流处理软件平台,由 LinkedIn 开发并捐赠给 Apache Software Foundation,用 Scala 和 Java 编写。 | |||
CAP 支持使用 Apache Kafka® 作为消息传输器。 | |||
## Configuration | |||
要使用 Kafka 作为消息传输器,你需要从 NuGet 安装以下扩展包: | |||
```shell | |||
Install-Package DotNetCore.CAP.Kafka | |||
``` | |||
然后,你可以在 `Startup.cs` 的 `ConfigureServices` 方法中添加基于内存的配置项。 | |||
```csharp | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
// ... | |||
services.AddCap(x => | |||
{ | |||
x.UseKafka(opt=>{ | |||
//KafkaOptions | |||
}); | |||
// x.UseXXX ... | |||
}); | |||
} | |||
``` | |||
#### Kafka Options | |||
CAP 直接对外提供的 Kafka 配置参数如下: | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
Servers | Broker 地址 | string | | |||
ConnectionPoolSize | 用户名 | int | 10 | |||
#### Kafka MainConfig Options | |||
如果你需要 **更多** 原生 Kakfa 相关的配置项,可以通过 `MainConfig` 配置项进行设定: | |||
```csharp | |||
services.AddCap(capOptions => | |||
{ | |||
capOptions.UseKafka(kafkaOption=> | |||
{ | |||
// kafka options. | |||
// kafkaOptions.MainConfig.Add("", ""); | |||
}); | |||
}); | |||
``` | |||
MainConfig 为配置字典,你可以通过以下链接找到其支持的配置项列表。 | |||
[https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) |
@@ -0,0 +1,68 @@ | |||
# RabbitMQ | |||
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器是用 Erlang 语言编写的,而聚类和故障转移是构建在开源的通讯平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。 | |||
CAP 支持使用 RabbitMQ 作为消息传输器。 | |||
## 配置 | |||
要使用 RabbitMQ 作为消息传输器,你需要从 NuGet 安装以下扩展包: | |||
```shell | |||
Install-Package DotNetCore.CAP.RabbitMQ | |||
``` | |||
然后,你可以在 `Startup.cs` 的 `ConfigureServices` 方法中添加基于内存的配置项。 | |||
```csharp | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
// ... | |||
services.AddCap(x => | |||
{ | |||
x.UseRabbitMQ(opt=> | |||
{ | |||
//RabbitMQOptions | |||
}); | |||
// x.UseXXX ... | |||
}); | |||
} | |||
``` | |||
#### RabbitMQ Options | |||
CAP 直接对外提供的 RabbitMQ 配置参数如下: | |||
NAME | DESCRIPTION | TYPE | DEFAULT | |||
:---|:---|---|:--- | |||
HostName | 宿主地址 | string | localhost | |||
UserName | 用户名 | string | guest | |||
Password | 密码 | string | guest | |||
VirtualHost | 虚拟主机 | string | / | |||
Port | 端口号 | int | -1 | |||
TopicExchangeName | CAP默认Exchange名称 | string | cap.default.topic | |||
QueueMessageExpires | 队列中消息自动删除时间 | int | (10天) 毫秒 | |||
#### ConnectionFactory Options | |||
如果你需要 **更多** 原生 `ConnectionFactory` 相关的配置项,可以通过 `ConnectionFactoryOptions` 配置项进行设定: | |||
```csharp | |||
services.AddCap(x => | |||
{ | |||
x.UseRabbitMQ(o => | |||
{ | |||
o.HostName = "localhost"; | |||
o.ConnectionFactoryOptions = opt => { | |||
//rabbitmq client ConnectionFactory config | |||
}; | |||
}); | |||
}); | |||
``` |