RocketMQ顺序消费实现原理分析
RocketMQ顺序消费实现原理简要分析(基于最新的4.7.1
)
顺序消费 VS 并发消费
假如生产者投递了三条消息(M1,M2,M3)到topicM
并发消费
并发消费时如下图所示
消费顺序是乱序的,可能是M1被先消费,或者M2…
因为消费者是多线程的,每个线程得到CPU调度的时序得不到保证,每个线程都有可能消费到队列中拉取到的任意消息.
顺序消费
顺序消费时如下图
针对同一批次消息的不同状态,投递到同一队列.
拉取消息之前对队列加锁,保证同一消费组的多个消费实例,只能有一个实例能够拉取到消息.
拉取消息成功后,对消息队列使用synchronized
关键字同步,消息只能由一个线程消费.
源码分析
一段简短的消费端代码,摘抄自rocketmq官网
生产者代码
|
消费者代码
public class OrderedConsumer { |
拉消息流程
上述代码在给consumer
注册消息监听器时,使用了MessageListenerOrderly
(关键)
consumer#start()
启动逻辑
|
调用org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException { |
上述RebalanceImpl的实现类为(RebalancePushImpl)
rebalanceImmediately
|
唤醒RebalanceService
RebalanceService#run
|
mqClientFactory.doRebalance()
public void doRebalance() { |
MQConsumerInner#doRebalance
此处MQConsumerInner
的实现类为DefaultMqPushConsumerImpl
|
rebalanceImpl.doRebalance
此处的rebalanceImpl
实现类为RebalancePushImpl
|
rebalanceImpl.rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) { |
rebalanceImpl#updateProcessQueueTableInRebalance
|
实际拉消息的逻辑DefaultMQPushConsumerImpl#pullMessage
public void pullMessage(final PullRequest pullRequest) { |
消息消费
定位到ConsumeMessageOrderlyService
|
ConsumeRequest
该类实现了Runnable
接口,为实际的消息消费处理入口
run
方法逻辑
|
总结
大致流程图
值得注意的是,Consumer
在对broker
端的消息队列加锁时,在Consumer
实际发出拉取消息的请求后,broker
此时收到请求后,并未对请求的消息队列做是否加锁验证.
所以,总体而言,rocketmq为了保证消息的局部消费有序,做了以下工作
首先
Producer
得保证同一批次的不同状态的消息得投递到相同的MessageQueueConsumer
在拉取消息之前对要拉取的MessageQueue
加锁,成功后才拉取消息Consumer
在拉取消息成功后,保证同一MessageQueue
的消息只能由线程池中的某个线程处理.