引言

在MQ中,ProducerConsumer因为各种原因会进行消息重试处理,在消费消息时,会按照一定规则推送消息到消费端进行消息消费。既然有重试,那么就少不了幂等

幂等概念

在消息重试多次时,消费端对该重复消息消费多次与消费一次的结果是相同的,并且多次消费没有对系统产生副作用,那么就称这个过程是消息幂等的。

例如:支付场景下,消费者消费扣款消息,对一笔订单进行扣款操作,该扣款操作需要扣除10元。

这个扣款操作重复多次与执行一次的效果相同,只进行一次真实扣款,用户的扣款记录中对应该笔订单的只有一条扣款流水。不会多扣。那么可以说这个扣款操作是符合要求的,这个消费过程是消息幂等的。

消息幂等的场景

Producer发送消息重复

​ 生产者发送消息时,消息成功投递到broker,但此时发生网络闪断或者生产者down掉,导致broker发送ACK失败。此时生产者由于未能收到消息发送响应,认为发送失败,因此尝试重新发送消息到broker。当消息发送成功后,在broker中就会存在两条相同内容的消息,最终消费者会拉取到两条内容一样并且Message ID也相同的消息,因此造成了消息的重复。

Consumer消费时重复

​ 消费消息时同样会出现重复消费的情况。当消费者在处理业务完成返回消费状态给broker时,由于网络闪断等异常情况导致未能将消费完成的CONSUME_SUCCESS状态返回给brokerbroker为了保证消息被至少消费一次的语义,会在网络环境恢复之后再次投递该条被处理的消息,最终造成消费者多次收到内容一样并且Message ID也相同的消息,造成了消息的重复。

所以,无论是发送时重复还是消费时重复,最终的效果均为消费者消费时收到了重复的消息,可以推论出:只需要在消费者端统一进行幂等处理就能够实现消息幂等。

实现幂等方式

消息幂等两要素

  • 幂等令牌
  • 处理唯一性的确保

必须保证存在幂等令牌的情况下保证业务处理结果的唯一性,才认为幂等实现是成功的。

幂等令牌

幂等令牌是生产者和消费者两者中的既定协议,在业务中通常是具备唯一业务标识的字符串,如:订单号、流水号等。且一般由生产者端生成并传递给消费者端。

处理唯一性的确保

服务端应当采用一定的策略保证同一个业务逻辑一定不会重复执行成功多次。如:使用支付宝进行支付,买一个产品支付多次只会成功一笔。较为常用的方式是采用缓存去重并且通过对业务标识添加数据库的唯一索引实现幂等。

具体的思路为:如支付场景下,支付的发起端生成了一个支付流水号,服务端处理该支付请求成功后,数据持久化成功。由于表中对支付流水添加了唯一索引,因此当重复支付时会因为唯一索引的存在报错 duplicate entry,服务端的业务逻辑捕获该异常并返回调用侧“重复支付”提示。这样就不会重复扣款。

在上面场景的基础上,还可以引入Redis等缓存组件实现去重:当支付请求打到服务端,首先去缓存进行判断,根据 key=“支付流水号” 去get存储的值,如果返回为空,表明是首次进行支付操作同时将当前的支付流水号作为key、value可以为任意字符串通过set(key, value, expireTime)存储在redis中。当重复的支付请求到来时,尝试进行*get(支付流水号)*操作,这个操作会命中缓存,因此可以认为该请求是重复的支付请求,服务端业务将重复支付的业务提示返回给请求方。

由于一般都会在缓存使用过程中设置过期时间,缓存可能会失效从而导致请求穿透到持久化存储中(如:MySQL)。因此不能因为引入缓存而放弃使用唯一索引,将二者结合在一起是一个比较好的方案。

RocketMQ下的消息幂等

RocketMQ作为一款高性能的消息中间件,能够保证消息不丢失但是不能保证消息不重复。

如果RMQ实现消息去重其实也是可以的,但是考虑到对高可用以及高性能的影响就放弃了。如果RMQ做服务端消息去重,就要对消息做额外的rehash、排序等操作,这会花费较大的空间及时间等代价,收益并不明显。所以RMQ就将消息幂等交给了业务方处理。

在RMQ中,每条消息都会有一个MessageID,那么能否用该ID作为去重依据,也就是幂等令牌呢?

答案是否定的,因为MessageID可能会出现冲突的情况,因此不建议通过MessageID作为处理依据,应该使用业务唯一标识如:订单号、流水号等作为幂等处理的关键依据。

幂等令牌由消息生产者生成,在发消息消息时,可以通过消息的key设值为该id。对应API为org.apache.rocketmq.common.message.setKeys(String keys)

1
2
Message sendMessage = new Message("topic", message.getBytes());
sendMessage.setKeys("OD0000000001");

当消息消费者收到该消息时,根据该消息的key做幂等处理,API为 org.apache.rocketmq.common.message.getKeys() 。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
(msgs, context) -> {
try {
// 默认msgs只有一条消息
for (MessageExt msg : msgs) {
String key = msg.getKeys();
return walletCharge(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
LOGGER.error("钱包扣款消费异常。", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}

消费者通过getKeys()能够读取到生产者设置的幂等依据(如:订单号),然后业务逻辑围绕该id进行幂等处理即可。

如果觉得每次都需要在生产者侧setkey,在消费者侧getkey有点繁琐。也可以将该幂等依据设置在消息协议中,消费者接收到消息后解析该id进行幂等操作。只需要消息的生产者和消费者约定好如何解析id的协议即可。

消费端常见的幂等操作

业务操作前状态查询

消费端开始执行业务操作时,通过幂等id首先进行业务状态的查询,如:修改订单状态环节,当订单状态为成功/失败则不需要再进行处理。那么只需要在消费逻辑执行之前通过订单号进行订单状态查询,一旦获取到确定的订单状态则对消息进行提交,通知broker消息状态为:ConsumeConcurrentlyStatus.CONSUME_SUCCESS

业务操作前数据检索

逻辑与第一点相似,即消费之前进行数据的检索。如果能够通过业务唯一id查询到对应的数据则不需要进行再后续的业务逻辑。如:下单环节中,在消费者执行异步下单之前首先通过订单号查询订单是否已经存在,这里可以查库也可以查缓存。如果存在则直接返回消费成功,否则进行下单操作。

唯一性约束保证最后一道防线

上述第二点操作并不能保证一定不出现重复的数据。如:并发插入的场景下,如果没有乐观锁、分布式锁作为保证的前提下,很有可能出现数据的重复插入,因此务必要对幂等id添加唯一性索引,这样就能够保证在并发场景下也能保证数据的唯一性。

锁机制

上述的第一点中,如果是并发更新的情况,没有使用悲观锁、乐观锁、分布式锁等机制的前提下,进行更新,很可能会出现多次更新导致状态的不准确。如:对订单状态的更新,业务要求订单只能从初始化->处理中,处理中->成功,处理中->失败,不允许跨状态更新。如果没有锁机制,很可能会将初始化的订单更新为成功,成功订单更新为失败等异常的情况。
高并发下,建议通过状态机的方式定义好业务状态的变迁,通过乐观锁、分布式锁机制保证多次更新的结果是确定的,悲观锁在并发环境不利于业务吞吐量的提高因此不建议使用。

消息记录表

这种方案和业务层做的幂等操作类似,由于消息id是唯一的,可以借助该id进行消息的去重操作,间接实现消费的幂等。
首先准备一个消息记录表,在消费成功的同时插入一条已经处理成功的消息id记录到该表中,注意一定要与业务操作处于同一个事务中,当新的消息到达的时候,根据新消息的id在该表中查询是否已经存在该id,如果存在则表明消息已经被消费过,那么丢弃该消息不再进行业务操作即可。

总结

肯定还有更多的场景没有涉及到,这里说到的操作均是互相之间有关联的,将他们配合使用更能够保证消费业务的幂等性。

不论怎样,一定要牢记一个原则:缓存是不可靠的,查询是不可靠的

在高并发的场景下,一定要通过持久化存储的唯一索引以及引入锁机制作为共同保障数据准确性和完整性的最后一道防线

参考

[跟我学RocketMQ之消息幂等]