原创

分布式消息中间件(十一)——RocketMQ重复消费

本章,我们来看下RocketMQ中的重复消费问题。在分布式系列中,我已经介绍过了消费的幂等性,读者可以结合那篇文章一起来阅读本章内容。

一、幂等性

所谓幂等,就是当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就实现可消息幂等。

例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为 100 元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费 100 元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。

本章针对RocketMQ讲解消息消费的幂等性,本质其实是讲消费时接口设计的幂等性。

二、重复消费

在使用MQ的过程中,生产者可能因为没有收到MQ的响应而重复投递消息,MQ也可能因为没有收到消费者的offset响应而重复投递。特别是消费者端,必须做好消费幂等性的控制。

我们来以订单系统和积分系统举个例子,假设RocketMQ已经收到了订单系统的通知(消息A),此时积分系统也消费到了这个消息,并执行本地事务成功,但是如果此时订单系统没有收到积分系统对该消息的成功响应,就会认为积分系统没有消费成功,所以当分系统下次再拉取消息时,又会把消息A给积分系统重新消费:



所以,积分系统自身必须对重复消费的逻辑做好控制。一般来说,主要就是利用业务的唯一ID进行判断是否已经消费过这条消息。比如,积分系统每消费成功一条消息,就把消息中的订单ID标记为“已消费”,每次给用户增加积分前,先判断下是否已经针对该笔订单增加过积分了。

三、死信队列

我们再来看一种情况,假设积分系统成功消费到了消息A,然后执行本地事务,但是本地事务执行失败了,按照正常的逻辑,积分系统应该返回一个响应告诉RocketMQ:“我现在自身处理逻辑有些异常,这个消息A还没处理成功,请下次再重新投递给我”。如下图:



在RocketMQ中,如果出现消费异常,消费者可以返回RECONSUME_LATER状态, 表示消费者现在没法完成这批消息的处理,麻烦MQ稍后过段时间再次给消费者这批消息 :

consumer.registerMessageListener(
    @Override
    new MessageListenerConcurrently(){
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeConcurrentlyContext ctx){
            try{
                // 对消息进行处理:handle(msgs),执行本地事务等
                return ConsumeConcurrentlyStatus.CONSUME.SUCCESS;
            }catch(Exception ex){
                // 执行本地事务出现异常
                return ConsumeConcurrentlyStatus.CONSUME.RECONSUME_LATER;
            }
        }
    }
);

那么,RocketMQ在收到返回的RECONSUME_LATER状态之后,是如何让你进行消费重试的呢?

简单来说,RocketMQ会有一个针对你这个消费者群组的重试队列,比如对于积分消费者实例,其所属群组为Credit_GROUP,那么在Broker内部会有一个%RETRY%Credit_Group这个名字的重试队列。

对于返回状态为RECONSUME_LATER的消息,都会进入这个重试队列,然后过一段时间之后,Broker会把重试队列中的消息会再次给消费者。如果再次失败,又返回了RECONSUME_LATER,那么会再过一段时间让我们来进行处理,默认最多是重试16次!每次重试之间的间隔时间是不一样的,这个间隔时间可以如下进行配置:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

上面这段配置的意思是,第一次重试是1秒后,第二次重试是5秒后,第三次重试是10秒后,第四次重试是30秒后,第五次重试是1分钟后,以此类推,最多重试16次

如果重试了16次还是没法成功被消费,就会将这个消息放到死信队列里。 死信队列的名称是是%DLQ%Credit_Group,可以在RocketMQ的管理后台界面中看到。

死信队列里的消息表示死掉的消息,我们可以搞个后台定时任务,订阅死信队列,根据自己的业务需求做相应的处理。

四、总结

本章,我们介绍了RocketMQ中的重复消费场景,并对RocketMQ重复投递消息的原理做了介绍,引出了死信队列这一概念。防止重复消费的关键在于消费者接口的幂等性设计,具体要结合实际业务需求,具体问题具体分析,并没有万金油的方法。

正文到此结束
本文目录