@@ -4,7 +4,7 @@ Subscriber filters are similar to ASP.NET MVC filters and are mainly used to pro | |||||
## Create subscribe filter | ## Create subscribe filter | ||||
1. Create filter | |||||
### Create Filter | |||||
Create a new filter class and inherit the `SubscribeFilter` abstract class. | Create a new filter class and inherit the `SubscribeFilter` abstract class. | ||||
@@ -32,7 +32,15 @@ In some scenarios, if you want to terminate the subscriber method execution, you | |||||
To ignore exceptions, you can setting `context.ExceptionHandled = true` in `ExceptionContext` | To ignore exceptions, you can setting `context.ExceptionHandled = true` in `ExceptionContext` | ||||
2. Configuration | |||||
```C# | |||||
public override void OnSubscribeException(ExceptionContext context) | |||||
{ | |||||
context.ExceptionHandled = true; | |||||
} | |||||
``` | |||||
### Configuration Filter | |||||
Use `AddSubscribeFilter<>` to add a filter. | Use `AddSubscribeFilter<>` to add a filter. | ||||
@@ -23,25 +23,27 @@ _capBus.Publish("place.order.qty.deducted", | |||||
// publisher using `callbackName` to subscribe consumer result | // publisher using `callbackName` to subscribe consumer result | ||||
[CapSubscribe("place.order.mark.status")] | [CapSubscribe("place.order.mark.status")] | ||||
public void MarkOrderStatus(JToken param) | |||||
public void MarkOrderStatus(JsonElement param) | |||||
{ | { | ||||
var orderId = param.Value<int>("OrderId"); | |||||
var isSuccess = param.Value<bool>("IsSuccess"); | |||||
var orderId = param.GetProperty("OrderId").GetInt32(); | |||||
var isSuccess = param.GetProperty("IsSuccess").GetBoolean(); | |||||
if(isSuccess) | |||||
//mark order status to succeeded | |||||
else | |||||
//mark order status to failed | |||||
if(isSuccess){ | |||||
// mark order status to succeeded | |||||
} | |||||
else{ | |||||
// mark order status to failed | |||||
} | |||||
} | } | ||||
// ============= Consumer =================== | // ============= Consumer =================== | ||||
[CapSubscribe("place.order.qty.deducted")] | [CapSubscribe("place.order.qty.deducted")] | ||||
public object DeductProductQty(JToken param) | |||||
public object DeductProductQty(JsonElement param) | |||||
{ | { | ||||
var orderId = param.Value<int>("OrderId"); | |||||
var productId = param.Value<int>("ProductId"); | |||||
var qty = param.Value<int>("Qty"); | |||||
var orderId = param.GetProperty("OrderId").GetInt32(); | |||||
var productId = param.GetProperty("ProductId").GetInt32(); | |||||
var qty = param.GetProperty("Qty").GetInt32(); | |||||
//business logic | //business logic | ||||
@@ -109,7 +111,7 @@ Retrying plays an important role in the overall CAP architecture design, CAP ret | |||||
During the message sending process, when the broker crashes or the connection fails or an abnormality occurs, CAP will retry the sending. Retry 3 times for the first time, retry every minute after 4 minutes, and +1 retry. When the total number of retries reaches 50,CAP will stop retrying. | During the message sending process, when the broker crashes or the connection fails or an abnormality occurs, CAP will retry the sending. Retry 3 times for the first time, retry every minute after 4 minutes, and +1 retry. When the total number of retries reaches 50,CAP will stop retrying. | ||||
You can adjust the total number of retries by setting `FailedRetryCount` in CapOptions. | |||||
You can adjust the total number of retries by setting [FailedRetryCount](../configuration#failedretrycount) in CapOptions. | |||||
It will stop when the maximum number of times is reached. You can see the reason for the failure in Dashboard and choose whether to manually retry. | It will stop when the maximum number of times is reached. You can see the reason for the failure in Dashboard and choose whether to manually retry. | ||||
@@ -123,6 +125,8 @@ There is an `ExpiresAt` field in the database message table indicating the expir | |||||
Consuming failure will change the message status to `Failed` and `ExpiresAt` will be set to **15 days** later. | Consuming failure will change the message status to `Failed` and `ExpiresAt` will be set to **15 days** later. | ||||
By default, the data of the message in the table is deleted **every hour** to avoid performance degradation caused by too much data. The cleanup strategy `ExpiresAt` is performed when field is not empty and is less than the current time. | |||||
By default, the data of the message in the table is deleted **5 minutes** to avoid performance degradation caused by too much data. The cleanup strategy `ExpiresAt` is performed when field is not empty and is less than the current time. | |||||
That is to say, the message with the status Failed (by default they have been retried 50 times), if you do not have manual intervention for 15 days, it will **also be** cleaned up. | |||||
That is to say, the message with the status Failed (by default they have been retried 50 times), if you do not have manual intervention for 15 days, it will **also be** cleaned up. | |||||
You can use [CollectorCleaningInterval](../configuration#collectorcleaninginterval) configuration items to custom the interval time. |
@@ -2,6 +2,10 @@ | |||||
[NATS](https://nats.io/) is a simple, secure and performant communications system for digital systems, services and devices. NATS is part of the Cloud Native Computing Foundation (CNCF). | [NATS](https://nats.io/) is a simple, secure and performant communications system for digital systems, services and devices. NATS is part of the Cloud Native Computing Foundation (CNCF). | ||||
!!! warning | |||||
We currently implement NATS provider based on Request/Response mode, and we plan to replace it with JetStream in future version. | |||||
see https://github.com/dotnetcore/CAP/issues/983 for more information. | |||||
## Configuration | ## Configuration | ||||
To use NATS transporter, you need to install the following package from NuGet: | To use NATS transporter, you need to install the following package from NuGet: | ||||
@@ -6,7 +6,9 @@ | |||||
## 自定义过滤器 | ## 自定义过滤器 | ||||
1、创建一个过滤器类,并继承 `SubscribeFilter` 抽象类。 | |||||
### 添加过滤器 | |||||
创建一个过滤器类,并继承 `SubscribeFilter` 抽象类。 | |||||
```C# | ```C# | ||||
public class MyCapFilter: SubscribeFilter | public class MyCapFilter: SubscribeFilter | ||||
@@ -32,7 +34,14 @@ public class MyCapFilter: SubscribeFilter | |||||
通过在 `ExceptionContext` 中设置 `context.ExceptionHandled = true` 来忽略异常。 | 通过在 `ExceptionContext` 中设置 `context.ExceptionHandled = true` 来忽略异常。 | ||||
2、集成 | |||||
```C# | |||||
public override void OnSubscribeException(ExceptionContext context) | |||||
{ | |||||
context.ExceptionHandled = true; | |||||
} | |||||
``` | |||||
### 配置过滤器 | |||||
```C# | ```C# | ||||
services.AddCap(opt => | services.AddCap(opt => | ||||
@@ -19,30 +19,34 @@ | |||||
```C# | ```C# | ||||
// ============= Publisher ================= | // ============= Publisher ================= | ||||
_capBus.Publish("place.order.qty.deducted", new { OrderId = 1234, ProductId = 23255, Qty = 1 }, "place.order.mark.status"); | |||||
_capBus.Publish("place.order.qty.deducted", | |||||
contentObj: new { OrderId = 1234, ProductId = 23255, Qty = 1 }, | |||||
callbackName: "place.order.mark.status"); | |||||
// publisher using `callbackName` to subscribe consumer result | // publisher using `callbackName` to subscribe consumer result | ||||
[CapSubscribe("place.order.mark.status")] | [CapSubscribe("place.order.mark.status")] | ||||
public void MarkOrderStatus(JToken param) | |||||
public void MarkOrderStatus(JsonElement param) | |||||
{ | { | ||||
var orderId = param.Value<int>("OrderId"); | |||||
var isSuccess = param.Value<bool>("IsSuccess"); | |||||
var orderId = param.GetProperty("OrderId").GetInt32(); | |||||
var isSuccess = param.GetProperty("IsSuccess").GetBoolean(); | |||||
if(isSuccess) | |||||
//mark order status to succeeded | |||||
else | |||||
//mark order status to failed | |||||
if(isSuccess){ | |||||
// mark order status to succeeded | |||||
} | |||||
else{ | |||||
// mark order status to failed | |||||
} | |||||
} | } | ||||
// ============= Consumer =================== | // ============= Consumer =================== | ||||
[CapSubscribe("place.order.qty.deducted")] | [CapSubscribe("place.order.qty.deducted")] | ||||
public object DeductProductQty(JToken param) | |||||
public object DeductProductQty(JsonElement param) | |||||
{ | { | ||||
var orderId = param.Value<int>("OrderId"); | |||||
var productId = param.Value<int>("ProductId"); | |||||
var qty = param.Value<int>("Qty"); | |||||
var orderId = param.GetProperty("OrderId").GetInt32(); | |||||
var productId = param.GetProperty("ProductId").GetInt32(); | |||||
var qty = param.GetProperty("Qty").GetInt32(); | |||||
//business logic | //business logic | ||||
@@ -109,7 +113,7 @@ CAP 接收到消息之后会将消息进行 Persistent(持久化), 有关 | |||||
在消息发送过程中,当出现 Broker 宕机或者连接失败的情况亦或者出现异常的情况下,这个时候 CAP 会对发送的重试,第一次重试次数为 3,4分钟后以后每分钟重试一次,进行次数 +1,当总次数达到50次后,CAP将不对其进行重试。 | 在消息发送过程中,当出现 Broker 宕机或者连接失败的情况亦或者出现异常的情况下,这个时候 CAP 会对发送的重试,第一次重试次数为 3,4分钟后以后每分钟重试一次,进行次数 +1,当总次数达到50次后,CAP将不对其进行重试。 | ||||
你可以在 CapOptions 中设置FailedRetryCount来调整默认重试的总次数。 | |||||
你可以在 CapOptions 中设置 [FailedRetryCount](../configuration#failedretrycount) 来调整默认重试的总次数。 | |||||
当失败总次数达到默认失败总次数后,就不会进行重试了,你可以在 Dashboard 中查看消息失败的原因,然后进行人工重试处理。 | 当失败总次数达到默认失败总次数后,就不会进行重试了,你可以在 Dashboard 中查看消息失败的原因,然后进行人工重试处理。 | ||||
@@ -121,4 +125,5 @@ CAP 接收到消息之后会将消息进行 Persistent(持久化), 有关 | |||||
数据库消息表中具有一个 ExpiresAt 字段表示消息的过期时间,当消息发送成功或者消费成功后,CAP会将消息状态为 Successed 的 ExpiresAt 设置为 1天 后过期,会将消息状态为 Failed 的 ExpiresAt 设置为 15天 后过期。 | 数据库消息表中具有一个 ExpiresAt 字段表示消息的过期时间,当消息发送成功或者消费成功后,CAP会将消息状态为 Successed 的 ExpiresAt 设置为 1天 后过期,会将消息状态为 Failed 的 ExpiresAt 设置为 15天 后过期。 | ||||
CAP 默认情况下会每隔一个小时将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt 不为空并且小于当前时间的数据。 也就是说状态为Failed的消息(正常情况他们已经被重试了 50 次),如果你15天没有人工介入处理,同样会被清理掉。 | |||||
CAP 默认情况下会每隔**5分钟**将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt 不为空并且小于当前时间的数据。 也就是说状态为Failed的消息(正常情况他们已经被重试了 50 次),如果你15天没有人工介入处理,同样会被清理掉。你可以通过 [CollectorCleaningInterval](../configuration#collectorcleaninginterval) 配置项来自定义间隔时间。 | |||||
@@ -2,6 +2,10 @@ | |||||
[NATS](https://nats.io/)是一个简单、安全、高性能的数字系统、服务和设备通信系统。NATS 是 CNCF 的一部分。 | [NATS](https://nats.io/)是一个简单、安全、高性能的数字系统、服务和设备通信系统。NATS 是 CNCF 的一部分。 | ||||
!!! warning | |||||
我们当前基于 Request/Response 实现,我们计划将来版本中替换为 JetStream 。 | |||||
查看 https://github.com/dotnetcore/CAP/issues/983 了解更多。 | |||||
## 配置 | ## 配置 | ||||
要使用NATS 传输器,你需要安装下面的NuGet包: | 要使用NATS 传输器,你需要安装下面的NuGet包: | ||||