幂等性(你可以在Wikipedia读到关于幂等性的定义),当我们谈论幂等时,一般是指可以重复处理传递的消息,而不会产生意外的结果。
在说幂等性之前,我们先来说下关于消费端的消息交付。
由于CAP不是使用的 MS DTC 或其他类型的2PC分布式事务机制,所以存在至少消息严格交付一次的问题,具体的说在基于消息的系统中,存在以下三种可能:
带 * 号表示在实际场景中,很难达到。
最多一次交付保证,涵盖了保证一次或根本不接收所有消息的情况。
这种类型的传递保证可能来自你的消息系统,你的代码按以下顺序执行其操作:
1. 从队列移除消息
2. 开始一个工作事务
3. 处理消息 ( 你的代码 )
4. 是否成功 ?
Yes:
1. 提交工作事务
No:
1. 回滚工作事务
2. 将消息发回到队列。
正常情况下,他们工作的很好,工作事务将被提交。
然而,有些时候并不能总是成功,比如在 1 之后出现异常,或者是你在将消息放回到队列中出现网络问题由或者宕机重启等情况。
使用这个协议,你将冒着丢失消息的风险,如果可以接受,那就没有关系。
这个交付保证包含你收到至少一次的消息,当出现故障时,可能会收到多次消息。
它需要稍微改变我们执行步骤的顺序,它要求消息队列系统支持事务或ACK机制,比如传统的 begin-commit-rollback 协议(MSMQ是这样),或者是 receive-ack-nack 协议(RabbitMQ,Azure Service Bus等是这样的)。
大致步骤如下:
1. 抢占队列中的消息。
2. 开始一个工作事务
3. 处理消息 ( 你的代码 )
4. 是否成功 ?
Yes:
1. 提交工作事务
2. 从队列删除消息
No:
1. 回滚工作事务
2. 从队列释放抢占的消息
当出现失败或者抢占消息超时的时候,我们总是能够再次接收到消息以保证我们工作事务提交成功。
上面所说的“工作事务”并不是特指关系型数据库中的事务,这里的工作事务是一个概念,也就是说执行代码的原子性。
比如它可以是传统的RDMS事务,也或者是 MongoDB 事务或者是一个交易等。
在这里它代表一个执行单元,这个执行单元是一个概念性的事实以支持前面提到的仅交付一次的这种问题。
通常,不可能做到消息的事务和工作事务来形成原子性进行提交或者回滚。
在CAP中,我们采用的交付保证为 At Least Once。
由于我们具有临时存储介质(数据库表),也许可以做到 At Most Once, 但是为了严格保证消息不会丢失,我们没有提供相关功能或配置。
1、消息写入成功了,但是此时执行Consumer方法失败了
执行Consumer方法失败的原因有非常多,我如果不知道具体的场景盲目进行重试或者不进行重试都是不正确的选择。 举个例子:假如消费者为扣款服务,如果是执行扣款成功了,但是在写扣款日志的时候失败了,此时CAP会判断为消费者执行失败,进行重试。如果客户端自己没有保证幂等性,框架对其进行重试,这里势必会造成多次扣款出现严重后果。
2、执行Consumer方法成功了,但是又收到了同样的消息
此处场景也是可能存在的,假如开始的时候Consumer已经执行成功了,但是由于某种原因如 Broker 宕机恢复等,又收到了相同的消息,CAP 在收到Broker消息后会认为这个是一个新的消息,会对 Consumer再次执行,由于是新消息,此时 CAP 也是无法做到幂等的。
3、目前的数据存储模式无法做到幂等
由于CAP存消息的表对于成功消费的消息会于1个小时后删除,所以如果对于一些历史性消息无法做到幂等操作。 历史性指的是,假如 Broker由于某种原因维护了或者是人工处理的一些消息。
4、业界做法
许多基于事件驱动的框架都是要求 用户 来保证幂等性操作的,比如 ENode, RocketMQ 等等...
从实现的角度来说,CAP可以做一些比较不严格的幂等,但是严格的幂等无法做到的。
通常情况下,保证消息被执行多次而不会产生意外结果是很自然的一种方式是采用操作对象自带的一些幂等功能。比如:
数据库提供的 INSERT ON DUPLICATE KEY UPDATE
或者是采取类型的程序判断行为。
另外一种处理幂等性的方式就是在消息传递的过程中传递ID,然后由单独的消息跟踪器来处理。
比如你使用具有事务数据存储的 IMessageTracker 来跟踪消息ID,你的代码可能看起来像这样:
readonly IMessageTracker _messageTracker;
public SomeMessageHandler(IMessageTracker messageTracker)
{
_messageTracker = messageTracker;
}
[CapSubscribe]
public async Task Handle(SomeMessage message)
{
if (await _messageTracker.HasProcessed(message.Id))
{
return;
}
// do the work here
// ...
// remember that this message has been processed
await _messageTracker.MarkAsProcessed(messageId);
}
至于 IMessageTracker
的实现,可以使用诸如Redis或者数据库等存储消息Id和对应的处理状态。