文章参考:
https://www.cnblogs.com/hzmark/p/orderly_message.html
https://www.zhihu.com/question/30195969
消息中间件中的顺序消息
什么是顺序消息
有了上述的基础之后,我们回到本篇文章的主题中,聊一聊消息中间件中的顺序消息。
顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。
顺序消息包含两种类型:
分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费
全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费
这是阿里云上对顺序消息的定义,把顺序消息拆分成了顺序发布和顺序消费。那么多线程中发送消息算不算顺序发布?
如上一部分介绍的,多线程中若没有因果关系则没有顺序。那么用户在多线程中去发消息就意味着用户不关心那些在不同线程中被发送的消息的顺序。即多线程发送的消息,不同线程间的消息不是顺序发布的,同一线程的消息是顺序发布的。这是需要用户自己去保障的。
而对于顺序消费,则需要保证哪些来自同一个发送线程的消息在消费时是按照相同的顺序被处理的(为什么不说他们应该在一个线程中被消费呢?)。
全局顺序其实是分区顺序的一个特例,即使Topic只有一个分区(以下不在讨论全局顺序,因为全局顺序将面临性能的问题,而且绝大多数场景都不需要全局顺序)。
如何保证顺序
在MQ的模型中,顺序需要由3个阶段去保障:
-
消息被发送时保持顺序
-
消息被存储时保持和发送的顺序一致
-
消息被消费时保持和存储的顺序一致
发送时保持顺序意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。存储保持和发送的顺序一致则要求在同一线程中被发送出来的消息A和B,存储时在空间上A一定在B之前。而消费保持和存储一致则要求消息A、B到达Consumer之后必须按照先A后B的顺序被处理。
如下图所示:
对于两个订单的消息的原始数据:a1、b1、b2、a2、a3、b3(绝对时间下发生的顺序):
-
在发送时,a订单的消息需要保持a1、a2、a3的顺序,b订单的消息也相同,但是a、b订单之间的消息没有顺序关系,这意味着a、b订单的消息可以在不同的线程中被发送出去
-
在存储时,需要分别保证a、b订单的消息的顺序,但是a、b订单之间的消息的顺序可以不保证
-
a1、b1、b2、a2、a3、b3是可以接受的
-
a1、a2、b1、b2、a3、b3也是可以接受的
-
a1、a3、b1、b2、a2、b3是不能接受的
-
-
消费时保证顺序的简单方式就是“什么都不做”,不对收到的消息的顺序进行调整,即只要一个分区的消息只由一个线程处理即可;当然,如果a、b在一个分区中,在收到消息后也可以将他们拆分到不同线程中处理,不过要权衡一下收益
开源RocketMQ中顺序的实现
上图是RocketMQ顺序消息原理的介绍,将不同订单的消息路由到不同的分区中。文档只是给出了Producer顺序的处理,Consumer消费时通过一个分区只能有一个线程消费的方式来保证消息顺序,具体实现如下。
Producer端
Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的选择。
-
List<MessageQueue> mqs:消息要发送的Topic下所有的分区
-
Message msg:消息对象
-
额外的参数:用户可以传递自己的参数
比如如下实现就可以保证相同的订单的消息被路由到相同的分区:
long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());
Consumer端
RocketMQ消费端有两种类型:MQPullConsumer和MQPushConsumer。
MQPullConsumer由用户控制线程,主动从服务端获取消息,每次获取到的是一个MessageQueue中的消息。PullResult中的List msgFoundList自然和存储顺序一致,用户需要再拿到这批消息后自己保证消费的顺序。
对于PushConsumer,由用户注册MessageListener来消费消息,在客户端中需要保证调用MessageListener时消息的顺序性。RocketMQ中的实现如下:
-
PullMessageService单线程的从Broker获取消息
-
PullMessageService将消息添加到ProcessQueue中(ProcessMessage是一个消息的缓存),之后提交一个消费任务到ConsumeMessageOrderService
-
ConsumeMessageOrderService多线程执行,每个线程在消费消息时需要拿到MessageQueue的锁
-
拿到锁之后从ProcessQueue中获取消息
保证消费顺序的核心思想是:
-
获取到消息后添加到ProcessQueue中,单线程执行,所以ProcessQueue中的消息是顺序的
-
提交的消费任务时提交的是“对某个MQ进行一次消费”,这次消费请求是从ProcessQueue中获取消息消费,所以也是顺序的(无论哪个线程获取到锁,都是按照ProcessQueue中消息的顺序进行消费)
顺序和异常的关系
顺序消息需要Producer和Consumer都保证顺序。Producer需要保证消息被路由到正确的分区,消息需要保证每个分区的数据只有一个线程消息,那么就会有一些缺陷:
-
发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试
-
因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大
-
消费的并行读依赖于分区数量
-
消费失败时无法跳过
不能更换MessageQueue重试就需要MessageQueue有自己的副本,通过Raft、Paxos之类的算法保证有可用的副本,或者通过其他高可用的存储设备来存储MessageQueue。
热点问题好像没有什么好的解决办法,只能通过拆分MessageQueue和优化路由方法来尽量均衡的将消息分配到不同的MessageQueue。
消费并行度理论上不会有太大问题,因为MessageQueue的数量可以调整。
消费失败的无法跳过是不可避免的,因为跳过可能导致后续的数据处理都是错误的。不过可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。
在RocketMQ中提供了基于队列(分区)的顺序消费。
RocketMQ中顺序性主要指的是消息顺序消费。RocketMQ 中每一个消费组一个单独的线程池并发消费拉取到的消息,即消费端是多线程消费。而顺序消费的并发度等于该消费者分配到的队列数。
RokcetMQ的完成顺序性主要是由3把琐来实现的。下图是RocketMQ顺序消费的工作原理:
1、消费端在启动时首先会进行队列负载机制,遵循一个消费者可以分配多个队列,但一个队列只会被一个消费者消费的原则。
2、消费者根据分配的队列,向 Broker 申请琐,如果申请到琐,则拉取消息,否则放弃消息拉取,等到下一个队列负载周期(20s)再试。
3、拉取到消息后会在消费端的线程池中进行消费,但消费的时候,会对消费队列进行加锁,即同一个消费队列中的多条消息会串行执行。
4、在消费的过程中,会对处理队列(ProccessQueue)进行加锁,保证处理中的消息消费完成,发生队列负载后,其他消费者才能继续消费。
ps:(RockerMQ里有个非常重要的数据结构叫ProcessQueue,很多功能,例如消费进度,消费等等功能的底层核心数据保存都是有ProcessQueue提供,ProcessQueue和一个MessageQueue是对应的,即一个队列会有一个ProcessQueue的数据结构)
前面2把琐比较好理解,最后一把琐有什么用呢?
例如队列 q3 目前是分配给消费者C2进行消费,已将拉取了32条消息在线程池中处理,然后对消费者进行了扩容,分配给C2的q3队列,被分配给C3了,由于C2已将处理了一部分,位点信息还没有提交,如果C3立马去消费q3队列中的消息,那存在一部分数据会被重复消费,故在C2消费者在消费q3队列的时候,消息没有消费完成,那负载队列就不能丢弃该队列,就不会在broker端释放琐,其他消费者就无法从该队列消费,尽最大可能保证了消息的重复消费,保证顺序性语义。
再补充一点:要保证顺序消费,其重试次数为Integer.Max_Value。