diff --git a/docs/content/user-guide/en/cap/messaging.md b/docs/content/user-guide/en/cap/messaging.md index 169549b..f6ca2b1 100644 --- a/docs/content/user-guide/en/cap/messaging.md +++ b/docs/content/user-guide/en/cap/messaging.md @@ -2,6 +2,90 @@ The data sent by using the `ICapPublisher` interface is called `Message`. +## Compensating transaction + +Wiki : +[Compensating transaction](https://en.wikipedia.org/wiki/Compensating_transaction) + +In some cases, consumers need to return the execution value to tell the publisher, so that the publisher can implement some compensation actions, usually we called message compensation. + +Usually you can notify the upstream by republishing a new message in the consumer code. CAP provides a simple way to do this. You can specify `callbackName` parameter when publishing message, usually this only applies to point-to-point consumption. The following is an example. + +For example, in an e-commerce application, the initial status of the order is pending, and the status is marked as succeeded when the product quantity is successfully deducted, otherwise it is failed. + +```C# +// ============= Publisher ================= + +_capBus.Publish("place.order.qty.deducted", new { OrderId = 1234, ProductId = 23255, Qty = 1 }, "place.order.mark.status"); + +// publisher using `callbackName` to subscribe consumer result + +[CapSubscribe("place.order.mark.status")] +public void MarkOrderStatus(JToken param) +{ + var orderId = param.Value("OrderId"); + var isSuccess = param.Value("IsSuccess"); + + if(isSuccess) + //mark order status to succeeded + else + //mark order status to failed +} + +// ============= Consumer =================== + +[CapSubscribe("place.order.qty.deducted")] +public object DeductProductQty(JToken param) +{ + var orderId = param.Value("OrderId"); + var productId = param.Value("ProductId"); + var qty = param.Value("Qty"); + + //business logic + + return new { OrderId = orderId, IsSuccess = true }; +} +``` + +## 异构系统集成 + +在 3.0+ 版本中,我们对消息结构进行了重构,我们利用了消息队列中消息协议中的 Header 来传输一些额外信息,以便于在 Body 中我们可以做到不需要修改或包装使用者的原始消息数据格式和内容进行发送。 + +这样的做法是合理的,它有助于在异构系统中进行更好的集成,相对于以前的版本使用者不需要知道CAP内部使用的消息结构就可以完成集成工作。 + +现在我们将消息划分为 Header 和 Body 来进行传输。 + +Body 中的数据为用户发送的原始消息内容,也就是调用 Publish 方法发送的内容,我们不进行任何包装仅仅是序列化后传递到消息队列。 + +在 Header 中,我们需要传递一些额外信息以便于CAP在收到消息时能够提取到关键特征进行操作。 + +以下是在异构系统中,需要在发消息的时候向消息的Header 中写入的内容: + + 键 | 类型 | 说明 +-- | --| -- +cap-msg-id | string | 消息Id, 由雪花算法生成,也可以是 guid +cap-msg-name | string | 消息名称,即 Topic 名字 +cap-msg-type | string | 消息的类型, 即 typeof(T).FullName (非必须) +cap-senttime | stringg | 发送的时间 (非必须) + +以 Java 系统发送 RabbitMQ 为例: + +```java + +Map headers = new HashMap(); +headers.put("cap-msg-id", UUID.randomUUID().toString()); +headers.put("cap-msg-name", routingKey); + +channel.basicPublish(exchangeName, routingKey, + new AMQP.BasicProperties.Builder() + .headers(headers) + .build(), + messageBodyBytes); +// messageBodyBytes = "发送的json".getBytes(Charset.forName("UTF-8")) +// 注意 messageBody 默认为 json 的 byte[],如果采用其他系列化,需要在CAP侧自定义反序列化器 + +``` + ## Scheduling After CAP receives a message, it sends the message to Transport(RabitMq, Kafka...), which is transported by transport. diff --git a/docs/content/user-guide/zh/cap/messaging.md b/docs/content/user-guide/zh/cap/messaging.md index bd853a6..678520c 100644 --- a/docs/content/user-guide/zh/cap/messaging.md +++ b/docs/content/user-guide/zh/cap/messaging.md @@ -6,6 +6,50 @@ 你可以阅读 [quick-start](../getting-started/quick-start.md#_3) 来学习如何发送和处理消息。 +## 补偿事务 + +[Compensating transaction](https://en.wikipedia.org/wiki/Compensating_transaction) + +某些情况下,消费者需要返回值以告诉发布者执行结果,以便于发布者实施一些动作,通常情况下这属于补偿范围。 + +你可以在消费者执行的代码中通过重新发布一个新消息来通知上游,CAP 提供了一种简单的方式来做到这一点。 你可以在发送的时候指定 `callbackName` 来得到消费者的执行结果,通常这仅适用于点对点的消费。以下是一个示例。 + +例如,在一个电商程序中,订单初始状态为 pending,当商品数量成功扣除时将状态标记为 succeeded ,否则为 failed。 + +```C# +// ============= Publisher ================= + +_capBus.Publish("place.order.qty.deducted", new { OrderId = 1234, ProductId = 23255, Qty = 1 }, "place.order.mark.status"); + +// publisher using `callbackName` to subscribe consumer result + +[CapSubscribe("place.order.mark.status")] +public void MarkOrderStatus(JToken param) +{ + var orderId = param.Value("OrderId"); + var isSuccess = param.Value("IsSuccess"); + + if(isSuccess) + //mark order status to succeeded + else + //mark order status to failed +} + +// ============= Consumer =================== + +[CapSubscribe("place.order.qty.deducted")] +public object DeductProductQty(JToken param) +{ + var orderId = param.Value("OrderId"); + var productId = param.Value("ProductId"); + var qty = param.Value("Qty"); + + //business logic + + return new { OrderId = orderId, IsSuccess = true }; +} +``` + ## 异构系统集成 在 3.0+ 版本中,我们对消息结构进行了重构,我们利用了消息队列中消息协议中的 Header 来传输一些额外信息,以便于在 Body 中我们可以做到不需要修改或包装使用者的原始消息数据格式和内容进行发送。 diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 0d2daab..3eaf87d 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -81,7 +81,6 @@ nav: - CAP: - Configuration: user-guide/en/cap/configuration.md - Messaging: user-guide/en/cap/messaging.md - - Sagas: user-guide/en/cap/sagas.md - Serialization: user-guide/en/cap/serialization.md - Transactions: user-guide/en/cap/transactions.md - Idempotence: user-guide/en/cap/idempotence.md @@ -115,7 +114,6 @@ nav: - CAP: - 配置: user-guide/zh/cap/configuration.md - 消息: user-guide/zh/cap/messaging.md - - Sagas: user-guide/zh/cap/sagas.md - 序列化: user-guide/zh/cap/serialization.md - 运输: user-guide/zh/cap/transactions.md - 幂等性: user-guide/zh/cap/idempotence.md