前言

MQ的三大特性:
1、异步解耦
上游系统对下游系统的调用如果是同步调用,则会大大降低系统的吞吐量与并发度,且系统间耦合度太高。而异步调用则会解决这些问题,一般做法就是,在两层间添加一个MQ层。

2、流量削峰
MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

3、数据收集
分布式系统会产生海量级别的数据流,比如业务日志,监控数据,用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,MQ完成此类数据收集是最好的选择。

RocketMQ的特点:
1、亿级消息的堆积能力,单个队列中的百万级消息的累积容量。
2、高可用性:Broker服务器支持多Master多Slave的同步双写,以及异步复制模式,其中同步双写可保证消息不丢失。
3、高可靠性:生产者将消息发送到Broker端有三种方式,同步、异步、单向。其中同步和异步都可以保证消息成功发送。Broker对于消息消息刷盘有2种策略:同步刷盘和异步刷盘,其中同步刷盘可以保证消息成功的存储到磁盘中。消费者的消费模式也有集群消费和广播消费两种,默认是集群消费,如果集群模式中有消费者挂了,一个组里的其他消费者会接替其消费。综上所述,是高可靠的。
4、支持分布式事务消息:这里采用半消息确认和消息回查机制来保证分布式事务消息的。后面详述。
5、支持消息过滤:可采用消费者业务端的tag过滤。
6、支持顺序消息:消息在Broker中采用队列的FIFO模式存储,也就是发送是顺序的,只要保证消费的顺序性即可。
7、支持定时消息和延迟消息:Broker中有定时消息的机制,消息发送到Broker中,不会立即被Consumer消费,会等到特定的时间才被消费,延迟消息也是一样的,延迟一定的时间后才会被Consumer消费。

正文

1、RocketMQ消息模型的术语

a) 消息:消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
b) 主题:Topic表示一类消息的集合,每个主题包含若干消息,是RocketMQ进行消息订阅的基本单位。
c) 标签:为消息设置的标签,用于将同一个topic下区分不同类型的消息,可以理解为Topic是消息的一级分类,Tag是消息的二级分类。
d) 队列:存储消息的物理实体,一个Topic可以包含多个Queue,Queue也叫消息分区,一个Queue中的消息只能被一个消费者组中的一个消费者消费,一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。
e) 消息标识
RocketMQ中每个消息都有唯一的消息ID,且可以携带具有业务标识的key,以便对消息的查询。
f) 组(Group),可分为生产者组和消费者组:
生产者组:同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息,且生产者发送后崩溃,则Broker服务器会联系同一个生产者组的其他生产者实例以提交或者回溯消费。
消费者组:同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面实现了负载均衡和容错。消费者组的消费者实例必须订阅完全相同的topic。
g) 偏移量:队列Queue中的offset,可以认为就是下标,消息队列可看做数组。

2、RocketMQ的架构

在这里插入图片描述
RocketMQ一共有4个组成部分:NameServer、Broker、Producer、Consumer,每一部分都是集群部署的。

1、Producer:生产者通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败且低延迟,RocketMQ中的消息生产者都是以组的形式出现的。生产者组是同一类生产者的集合,这类生产者发送相同的Topic类型。

2、Consumer:RocketMQ中的消息消费者都是以消费者组的形式出现的,消费者组是同一类消费者的集合,消费的是同一个Topic类型。消费者组使得消息消费实现了负载均衡和容错。

负载均衡:将一个Topic中不同的Queue平均分配给组内的不同的Consumer,注意是将队列Queue负载均衡,而不是将消息本身负载均衡。

容错:一个Consumer挂了,组内的其他Consumer可以接着消费原Consumer消费的Queue。

3、NameServer
相当于注册中心的角色,各个角色的机器都要定时向NameServer上报自己的状态,如果超时未上报,NameServer会认为某个机器出现故障不可用了,从而将这个机器从可用列表中删除。

每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息,生产者和消费者通过NameServer去获取整个Broker集群的路由信息,从而进行消息的投递和消费。

4、Broker
负责消息的存储转发

3、RocketMQ工作流程

1、启动NameServer,启动后开始监听端口,等待Producer、Consumer、Broker连接。
2、启动Broker,Broker会与所有的NameServer建立并保持长连接,每隔30s向NameServer定时发送心跳包。
3、Producer发送消息时,先和NameServer集群中的一台建立长连接,并从NameServer中获取路由信息(当前发送的Topic消息的Queue与Broker的地址(IP+Port)的映射关系)。然后,在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每隔30s从Nameserver更新一次路由信息。
4、Consumer消费消息时,跟Producer类似,先和NameServer集群中的一台建立长连接,并从NameServer中获取路由信息(订阅Topic的路由信息),然后从路由信息中获取到要消费的Queue,然后与队列所在的Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样每30s从NameServer更新一次路由信息。

4、RocketMQ的刷盘策略

a) 同步刷盘:在消息达到Broker的内存之后,必须刷到commiLog日志文件中才算成功,然后返回Producer数据已经发送成功。

b) 异步刷盘:消息到达Broker内存后就返回发送成功,接着唤醒一个线程去将数据持久化到commitLog日志文件中。

同步刷盘保证了消息不丢失,但是响应时间相对于异步刷盘要多出10%,适用于对消息可靠性要求高的场景。
异步刷盘的吞吐量比较高,但是如果Broker闪断了内存中的部分数据就丢失了,适用于高吞吐量的场景。

5、RocketMQ的复制策略

a) 同步复制:消息写入master后,master会等待slave同步数据成功后才向producer返回成功ack。
b) 异步复制:消息写入master后,master立即向producer返回成功ack,无需等待slave同步数据成功。

6、消息发送

生产者发送消息有3种方式:
a) 同步:同步发送是指发送方发出数据后等待接收方发回响应后再发送下一个数据包。一般用于重要的消息通知,如重要的通知邮件或者短信等。
b) 异步:异步发送是指发送方发出数据后不等待接收方发回响应就发出下一个数据包。一般用于对响应时间比较敏感的场景。如视频上传后通知启动转码服务。
c) 单向:单向发送是指生产者只负责发送消息,不用管接收方有没有收到。适合对可靠性要求不高的场景,比如日志收集。

7、消息存储

消息存储在commitLog文件中

8、消息消费

获取消息有2种方式:
a) 拉取式:Consumer主动从broker中拉去消息,这种方式实时性较弱。
b) 推送式:Broker收到数据后主动推送给Consumer,该方式一般实时性较高。该方式是经典的发布-订阅模式

消费消息有2种方式:
a) 广播消费:即每条消息都会发送到消费者组里的每个Consumer。消费进度保存在consumer端,因为每个Consumer都会收到消息,它们的消费进度是不同的,所以consumer各自保存各自的消费进度。

b) 集群消费:组内的每个Consumer实例平均分摊同一个Topic的消息,即每条消息只会发送到组内某个Consumer。消费进度保存在Broker中,同一条消息只会被消费一次,消费进度会参与到消费的负载均衡中,所以消费进度是需要共享的。

9、至少消费一次原则

RocketMQ有一个原则:每条消息必须被成功消费一次。那么什么是成功消费呢?Consumer在消费完消息后,会向其消费进度记录器提交其消费消息的offset,offset被成功记录到记录器中,那么这条消息就被成功消费了。

什么是消费进度记录器?对于广播消费模式来说,Consumer本身就是消费进度记录器;对于集群消费模式来说,Broker就是消费进度记录器。

10、重试队列

当RocketMQ对消息的消费出现异常时,会将产生异常消息的offset提交到Broker中的重试队列。系统在发生消息消费异常时,会创建一个重试队列,队列名以%RETRY%开头,以topic@group结尾,到达重试时间后进行消费重试。

11、RocketMQ的消息被消费后会被清理掉吗

消息是被顺序存储到commitLog文件的,且消息的大小不定长,所以消息的清理是不可能以消息为单位进行清理的,而是以commitLog文件为单位进行清理,否则会急剧下降清理效率。

commitLog文件存在一个过期时间,默认为72小时。

12、重复消费问题及幂等性

消息重复的原因:
a)发送时消息重复:当一条消息被发送到Broker并完成持久化,此时出现了网络闪断,从而导致Broker对Producer的应答失败,如果此时Producer意识到消息发送失败并尝试再次发送,此时Broker中就有可能出现2条内容相同且消息ID也相同的消息,那么后续Consumer就一定会消费2次这条消息。

b)消费时重复消息:同发送时重复类似,消息已投递到Consumer并完成业务处理,当Consumer给Broker反馈应答时网络闪断,Broker没有收到消费成功的响应。为了保证消息至少被消费一次的原则,Broker将在网络恢复后再次尝试投递之前已经被消费过的消息。

c)Rebalance时消息重复:当消费者组中的Consumer数量发生变化时,或者订阅Topic的Queue数量发生变化时,会触发Rebalance,此时Consumer可能会收到曾经消费过的消息。

什么是幂等性?
当出现消费者对某条消息重复消费的情况时,重复多次消费的结果和消费一次的结果是相同的,并且多次消费并未对业务系统产生负面影响,那么这个消费过程就是幂等性的。

在网络应用中,在网络不稳定的情况下,消息很可能会出现重复发送或者重复消费,如果重复的消息可能会影响业务处理,那么就应该对消息做幂等处理。

怎么解决重复消息的问题?
因为MQ的服务质量是At least once至少一次,因此无法保证消息不重复,因此消息重复的问题要由Consumer消费端来解决。

实现幂等的最好方式是:从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。
1、数据库的唯一主键的实现主要是利用数据库中主键唯一约束的特性,唯一主键比较适用于“插入”时的幂等性,能保证一张表中只存在一条该唯一主键的记录。比如:将订单表中的订单编号作为唯一索引,创建订单时,根据订单编号就可以保证幂等。

2、redis+ 数据库实现
每次操作都直接set到redis里面,下次先查redis里面有没有数据,然后将redis数据同步到数据库中。

13、消息堆积

消息堆积的主要瓶颈在于客户端的消费能力,而消费能力是由消费耗时消费并发度决定,如何避免消息堆积?
1、梳理消息的消费耗时,通过压测获取消息的耗时,并对耗时较高的逻辑进行分析。
2、设置消费并发度:对于消息消费并发度的计算,可以通过以下两部实施:
a)逐步调大单个Consumer节点的线程数,并观测节点的系统指标,得到单个节点最优的消费线程数和消息吞吐量。
b)根据上下游链路的流量峰值计算出需要设置的节点数

14、RocketMQ如何保证消息不丢失

1、Producer发送消息阶段:发送阶段涉及到Producer到broker的网络通信,因此丢失消息的几率一定会有,那么RocketMQ在此阶段用了哪些手段来保证消息不丢失呢?
a)提供SYNC的发送消息方式,等待broker处理结果,RocketMQ提供了3种发送消息发送方式,分别是:同步发送、异步发送、单向发送,我们在使用producer.send方法时,不指定回调方法,则默认采用同步发送的方式,这也是丢失几率最小的一种发送方式。
b)发送消息如果失败或者超时,则重新发送。发送重试的源码其实就是一个for循环,默认重试3次。
c)broker提供多master模式,即使某台broker宕机了,保证消息可以投递到另外一台正常的broker上。

2、Broker处理消息阶段
a)提供同步刷盘的策略
b)提供主从模式,同时主从支持同步双写

3、Consumer消费消息阶段
Consumer默认提供的是At least once机制,从producer投递消息到broker,即使前面的过程保证了消息正常持久化,但如果consumer消费消息没有消费到,也不能理解为消息绝对的可靠。因此RocketMQ默认提供了At least once机制来保证消息的可靠消费。

什么是At least once机制?
Consumer先pull消息到本地,消费完成后,才向服务器返回ACK。通常消费消息的ack机制一般分为2种思路:
1、先提交后消费;
2、先消费,消费成功后再提交
思路一可以解决重复消费的问题但是会丢失消息,因此RcoketMQ默认实现了思路二,由各自consumer业务方来保证幂等性。

15、RocketMQ如何保证消息的顺序性

Producer发送到broker的消息队列是满足FIFO的,所以发送是顺序的,单个Queue里的消息是顺序的。但是多个Queue同时消费时无法保证消息的有序性的。所以,同一个Topic,同一个Queue,发消息的时候一个线程发送消息,消费的时候一个线程去消费一个Queue里的消息。

16、什么是半消息?RocketMQ是怎么实现分布式事务消息的

17、RocketMQ如何保证消息的最终一致性

分布式的系统如何解决分布式事务问题?也就是如何保证数据的一致性?
可以使用RocketMQ的事务消息来保证数据的最终一致性。
在这里插入图片描述
事务消息的实现流程:
1、首先Producer服务A发送一个半消息(也叫half消息)到Broker;为什么先发一个half消息呢?这是为了保证A和Broker之间的通信正常,如果无法正常通信,则服务A可以直接抛一个异常,也就不会处理后面的逻辑了。
2、如果half消息发送成功,Broker会返回响应,此时消息是半消息,被标记为“不可投递”状态,Consumer消费不了。
3、Producer执行本地的业务逻辑,并提交本地事务。
4、如果Producer本地事务提交成功,则会向Broker发送commit,Broker将half消息标记为正常消息,Consumer就能正常消费了;如果是Rollback,Broker将会丢弃此消息。
5、如果cimmit,将half消息写入磁盘。
6、如果Broker长时间没有收到commit或者rollback消息,则Broker会尝试调用Producer提供的一个接口,通过这个接口来判断half消息的状态。如果Broker从这个接口得知half小执行成功了,就将half消息标记为正常消息,如果得知没有成功,那么就会将half消息删除。
7、Consumer从broker中消费到对应的消息。
8、Consumer处理本地业务逻辑,然后提交本地事务。


版权声明:本文为qq_32166627原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_32166627/article/details/126438053