大家好,在上篇文章《高可用架构及二主二从异步集群部署》中介绍了的 RocketMQ 的历史及安装,为本文开始做实验打下了基础,接下来就正式开始讲解RocketMQ的概念以及Java API的操作使用。
RocketMQ的架构
这是在RocketMQ官网的架构图,http://rocketmq.apache.org/docs/rmq-arc/
一般见到的架构图都是这样的,其中这些重要的角色需要解释下。
Broker
RocketMQ 的服务,或者说一个进程,叫做 Broker,Broker 的作用是存储和转发消息。RocketMQ 单机大约能承受10万 QPS 的请求,为了提升 Broker 的性能(做负载均衡)
以及可用性(防止单点故障)
,通常会做集群部署。
跟 Kafka 或者 Redis Cluster 一样,RocketMQ 集群的每个 Broker 节点保存总数据的一部分,因为可用横向扩展。为了提高可靠性(防止数据丢失)
,每个 Broker 可以有自己的副本(Slave)。
Topic
Topic用于将消息按主题做划分,比如订单消息,支付消息,人员消息,注意,跟kafka不同的是,在RocketMQ 中,Topic是一个逻辑概念,消息不是按Topic划分存储的。
Producer 将消息发往指定的 Topic,Consumer 订阅这个 Topic 就可以收到相应的消息。Topic 跟生产者和消费者都是多对多的关系,一个生产者可以发送消息到多个 Topic,一个消费者也可以订阅多个 Topic。
NameServer
在 rocketmq 的早版本(2.x)的时候,是没有 namesrv 组件的,用的是 zookeeper 做分布式协调和服务发现,但是后期阿里数据根据实际业务需求进行改进和优化,自组研发了轻量级的 namesrv,用于注册 Client 服务与 Broker 的请求路由工作,namesrv 上不做任何消息的位置存储,频繁操作 zookeeper 的位置存储数据会影响整体集群性能。
为了保证高可用,NameServer 自身也可以做集群的部署,节点之间无任何信息同步。
Producer
生产者,拥有相同 Producer Group 的 Producer 组成一个集群, 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。
RocketMQ 的生产者支持批量发送
Consumer
消费者,接收消息进行消费的实例,拥有相同 Consumer Group 的 Consumer 组成一个集群,与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取Topic 路由信息,并向提供 Topic 服务的Master、Slave
建立长连接,且定时向 Master、Slave
发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
消费者有两种消费方式
:一种是集群消费(消息轮询),一种是广播消费(全部收到相同副本)。
从消费模型
来说,一种是 pull 主动拉去,另一种是 push,被动接收。但实际上 RocketMQ 都是 pull 模式,只是 push 在 pull 模式上做了一层封装,PushConsumer 会注册 MessageListener 监听器,取到消息后,唤醒 MessageListener 的 ConsumeMessage()来消费,对用户而言,感觉消息是被推送过来的。RocketMQ是基于长轮训
来实现消息的 pull。
Message Queue
大家知道发往某一个 Topic 的多条信息,是分布在不同的 Broker 上的,在 Kafka 里面设计了一个partition,一个 Topic 可以拆分成多个 partition,这些 partition 可以分布在不同的 Broker 上,这样就实现了数据的分片,也决定了 kafka 可以实现横向扩展。
在 RocketMQ 中只有一个存储文件,并没有像 kafka 一样按照不同的 Topic 分开存储。所以它设计了一个Message Queue 的逻辑概念,作用跟partition类似。
首先我们在创建 Topic 的时候会让我们指定队列数量,一个叫 writeQueueNums(写队列数量),一个叫readQueueNums(读队列数量),写队列数量决定了有几个 Message Queue,读队列数量决定了有几个线程来消费这些 Message Queue(只是用来负载的)。perm 表示队列权限,2表示W,4表示R,6表示RW。
这里是我们创建 topic 的时候指定的,如果我们由代码自动创建 topic 的时候默认是几个 Message Queue呢?
//服务端创建一个Topic默认8个队列,在BrokerConfig类里面
private int defaultTopicQueueNums = 8;
//topic不存在,生产者发送消息时创建默认4个队列,在DefaultMQProducer类里面
private volatile int defaultTopicQueueNums = 4;
//最终服务端创建的时候有一个判断,取小一点的值,在MQClientInstance类里面
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
最终计算结果应该是4吧,我们找一个由代码创建的topic看下,确实是4。
写队列数量和读队列数量这两个值需要相等,在集群模式下如果不相等,假如说writeQueueNums=6,readQueueNums=3, 那么每个 broker 上会有3个 queue 的消息是无法消费的。
如果消费者数大于readQueueNumbs,那么会有一些消费者消费不到消息,浪费资源。
Java API
现在开始API的教程使用,官网提供了Java客户端API,只需要引入pom依赖就可以了。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
先写一个生产者Producer类:
/**
* @author jackxu
*/
public class Producer {
public static void main(String[] args) throws MQClientException {
//生产者组
DefaultMQProducer producer = new DefaultMQProducer("jackxu_producer_group");
//生产者需用通过NameServer获取所有broker的路由信息,多个用分号隔开,这个跟Redis哨兵一样
producer.setNamesrvAddr("39.103.144.86:9876;42.192.77.73:9876");
//启动
producer.start();
for (int i = 0; i < 10; i++) {
try {
/*Message(String topic, String tags, String keys, byte[] body)
Message代表一条信息,第一个参数是topic,这是主题
第二个参数是tags,这是可选参数,用于消费端过滤消息
第三个参数是keys,这也是可选参数,如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于
消息的索引,可以设置为消息的唯一编号(主键)。*/
Message msg = new Message("jackxu_test_topic", "TagA", "6666", ("RocketMQ Test message " + i).getBytes());
//SendResult是发送结果的封装,包括消息状态,消息id,选择的队列等等,只要不抛异常,就代表发送成功
SendResult sendResult = producer.send(msg);
System.out.println("第" + i + "条send结果: " + sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
SendResult中,有 一个SendStatus状态,表示消息的发送状态。一共有四种状态。
- FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘(需要Broker的刷盘策略配置SYNC_FLUSH才会报这个错误)。
- FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,没有在规定时间内完成主从同步。
- SLAVE_NOT_AVAILABLE:这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。
- 表示发送成功
再写一个简单消费者类SimpleConsumer:
/**
* @author jackxu
*/
public class SimpleConsumer {
public static void main(String[] args) throws MQClientException {
//消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jackxu_consumer_group");
//消费者从NameServer拿到topic的queue所在的Broker地址,多个用分号隔开
consumer.setNamesrvAddr("39.103.144.86:9876;42.192.77.73:9876");
//设置Consumer第一次启动是从队列头部开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//subscribe订阅的第一个参数就是topic,第二个参数为生产者发送时候的tags,*代表匹配所有消息,
//想要接收具体消息时用||隔开,如"TagA||TagB||TagD"
consumer.subscribe("jackxu_test_topic", "*");
//Consumer可以用两种模式启动,广播(Broadcast)和集群(Cluster),广播模式下,一条消息会发送给所有Consumer,
//集群模式下消息只会发送给一个Consumer
consumer.setMessageModel(MessageModel.BROADCASTING);
//批量消费,每次拉取10条
consumer.setConsumeMessageBatchMaxSize(10);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//msgs是一个List,一般是Consumer先启动,所有每次都是一条数据
//如果Producer先启动Consumer端后启动,会积压数据,此时setConsumeMessageBatchMaxSize会生效,
//msgs的数据就是十条
StringBuilder sb = new StringBuilder();
sb.append("msgs条数:" + msgs.size());
MessageExt messageExt = msgs.get(0);
//消息重发了三次
if (messageExt.getReconsumeTimes() == 3) {
//todo 持久化消息记录表
//重试了三次不再重试了,直接签收掉
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
String topic = msg.getTopic();
String messageBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
//todo 业务逻辑处理
sb.append("topic:" + topic + ",tags:" + tags + ",msg:" + messageBody);
} catch (Exception e) {
e.printStackTrace();
// 重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
System.out.println(sb.toString());
//签收,这句话告诉broker消费成功,可以更新offset了,也就是发送ack。
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
每行代码的功能作用都写在注释里了,小伙伴要仔细观看下哦。现在测试一下,先启动消费者,然后启动生产者。
这是生产者启动后发送的十条数据。
msgId:生产者生成的唯一编号,全局唯一,也叫uniqId。
offsetMsgId:消息偏移id,该id记录了消息所在集群的物理地址,主要包含所存储Broker服务器的地址(IP与端口号)以及所在commitlog文件的物理偏移量。
在控制台可以通过 msgId 来查询该条消息。
通过Key也一样可以找到
再看下消费者端,十条数据都已经被成功消费了
源码以及官方提供的 rocketmq\example 的示例代码已经上传,需要的小伙伴可以下载下来观看。地址:https://github.com/xuhaoj/rocketmq-javaapi
rocketmq\example中各个包的作用如下:
package | 作用 |
---|---|
batch | 批量消息,用List发送 |
broadcast | 广播消息,setMessageModel(MessageModel.BROADCASTING) |
delay | 延迟消息,msg.setDelayTimeLevel(3) |
filter | 基于tag或者 SQL表达式过滤 |
ordermessage | 顺序消息 |
quickstart | 入门 |
rpc | 实现RPC调用 |
simple | ACL、异步、assign、subscribe |
tracemessage | 消息追踪 |
transaction | 事务消息 |
Spring Boot 集成
在Srping Boot中提供了更简单的配置方式和操作方式,使用起来非常的舒服,干净,简洁。首先还是引入RocketMQ starter 的依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
然后客户端的配置直接写在application.properties中
server.port=9096
spring.application.name=springboot-rocketmq-demo
rocketmq.name-server=39.103.144.86:9876;42.192.77.73:9876
rocketmq.producer.group=jackxu-springboot-rocketmq-group
rocketmq.producer.send-message-timeout=3000
创建一个消费者类Consumer,加上@RocketMQMessageListener注解监听消息
/**
* @author jackxu
*/
@Component
@RocketMQMessageListener(topic = "springboot-topic", consumerGroup = "springboot-consumer-group",
selectorExpression = "tag1", selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
try {
System.out.println("接收到rocketmq消息:" + message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
注解里面的配置和 java api中的一样,相信大家都能够看懂。
MessageModel有两个选项,BROADCASTING代表所有消费者消费同样的消息,CLUSTERING代表多个消费者轮询消费消息(默认)。
ConsumeMode也有两个选项,CONCURRENTLY代表消费端并发消费(默认),消息顺序得不到保证,到底有多少个线程并发消费,取决于线程池的大小,ORDERLY代表有序消费,也就是生产者发送的顺序跟消费者消费的顺序一致。
两者的区别:顺序消费需要对要处理的队列加锁,确保同一队列,同一时间,只允许一个消费线程处理。很显然并发消费效率更高。
创建一个生产者类MessageSender,生产者的代码更加简单,只需要注入RocketMQTemplate就可以发送消息。
/**
* @author jackxu
*/
@Component
public class MessageSender {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void syncSend() {
/**
* 发送可靠同步消息 ,可以拿到SendResult 返回数据
* 同步发送是指消息发送出去后,会在收到mq发出响应之后才会发送下一个数据包的通讯方式。
* 参数1: topic:tag
* 参数2: 消息体 可以为一个对象
* 参数3: 超时时间 毫秒
*/
SendResult result = rocketMQTemplate.syncSend("springboot-topic:tag", "这是一条同步消息", 10000);
System.out.println(result);
}
public void asyncSend() throws Exception {
/**
* 发送 可靠异步消息
* 发送消息后,不等mq响应,接着发送下一个数据包。发送方通过设置回调接口接收服务器的响应,并可对响应结果进行处理。
* 参数1: topic:tag
* 参数2: 消息体 可以为一个对象
* 参数3: 回调对象
*/
rocketMQTemplate.asyncSend("springboot-topic:tag1", "这是一条异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("回调sendResult:" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println(e.getMessage());
}
});
TimeUnit.SECONDS.sleep(100000);
}
public void sendOneWay() {
/**
* 发送单向消息,特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
* 此方式发送消息的过程耗时非常短,一般在微秒级别。应用场景:适用于某些耗时非常短,但对可靠性要求并
* 不高的场景,例如日志收集。
* 参数1: topic:tag
* 参数2: 消息体 可以为一个对象
*/
rocketMQTemplate.sendOneWay("springboot-topic:tag1", "这是一条单向消息");
}
public void sendOneWayOrderly() {
/**
* 发送单向的顺序消息
* 参数1: topic:tag
* 参数2: 消息体 可以为一个对象
*/
rocketMQTemplate.sendOneWayOrderly("springboot-topic:tag1", "这是一条顺序消息", "8888");
}
}
一共有三种类型,它们的使用方法和作用已经写在注释上了,它们的选择方案如下:
- 当发送的消息不重要时,采用one-way方式,以提高吞吐量,
效率最高
- 当发送的消息很重要时,且对响应时间不敏感的时候采用sync方式
- 当发送的消息很重要时,且对响应时间非常敏感的时候采用async方式
写一个测试类测试下
/**
* @author jackxu
*/
@SpringBootTest
class SpringbootRocketmqApplicationTests {
@Autowired
private MessageSender sender;
@Test
public void syncSendTest() {
sender.syncSend();
}
@Test
public void asyncSendTest() throws Exception {
sender.asyncSend();
}
@Test
public void sendOneWayTest() {
sender.sendOneWay();
}
@Test
public void sendOneWayOrderlyTest() {
sender.sendOneWayOrderly();
}
}
这里选择异步发送sendOneWayTest,执行一下,查看结果,发送成功并且回调了。
看下控制台,也有这条消息
消费端也消费成功,测试完成。
源码已经上传至 https://github.com/xuhaoj/springboot-rocketmq ,感兴趣的小伙伴可以下载下来观看。
结语
最后推荐一本电子书《RocketMQ实战与原理解析》作为课外读物,下载链接在:https://pan.baidu.com/s/1Ah1Gm3CXeFnbyoBSvCekfw
提取码:jack ,原创不易,觉得写的不错请点一个赞。。