RocketMQ消息的发送与接收
首先第一步就是我们先来通过maven来创建一个父项目并且引入一下的文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>你的项目名</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>Rocketmq-produer</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
再在其下面创建除consumer和produer的子项目来进行消息的传送和接收。
consumer的pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>RocketMQ_spriingboot</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>Rocketmq-consumer</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
produer的pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>RocketMQ_spriingboot</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>Rocketmq-produer</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
创建完成之后,则需要来配置consumer和produer的rockeymq的配置文件
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=组名 //可以随便自己定义
在此之后就可以来测试一下,到底连接成功没有。
在produer这个项目中创建ProducerService来进行消息的发送:
@Component("producerService")
public class ProducerService {
private final RocketMQTemplate rocketMQTemplate;
@Autowired
public ProducerService(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
public void sendMessage(){
for (int i = 0; i < 10; i++) {
rocketMQTemplate.convertAndSend("topic","message"+i);
}
}
}
在cunsumer中则创建一个来进行接收:
@Component
@RocketMQMessageListener(topic = "topic",consumerGroup ="${rocketmq.consumer.group}")
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String o) {
System.out.println("收到的消息为:"+o);
}
}
RocketMQ发送同步、异步、单向消息
@Component("producerService")
public class ProducerService {
public void sendSyncMessage(){//同步
for (int i = 0; i < 10; i++) {
SendResult topic_1 = rocketMQTemplate.syncSend("topic", "message" + 1);
System.out.println(topic_1);
}
}
public void sendAsyncMessage(){//异步
for (int i = 0; i < 10; i++) {
rocketMQTemplate.asyncSend("topic","message"+i, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
}
}
public void sendOneMessage(){
for (int i = 0; i < 10; i++) {//单向发送
rocketMQTemplate.sendOneWay("topic","message"+1);
}
}
}
同步发送消息
在发送完之后,会返回一个SendResult这个类,在这个类里面可以获取各种的信息,比如消息队列还有消息的id等等。而
异步发送消息
则没有返回,它会直接调用SendCallback()类中的
onSuccess和onException方法,如果成功则会调用onSuccess方法,而在其中就有SendResult,所以是跟同步返回的是同样的效果。
消费者的广播模式和集群模式
在springboot整合的RocketMQ这种形式中,要想实现消费者的集群和广播模式是非常简单的,仅仅只需要在@RocketMQMessageListener这个注解中设置一个参数
messageModel
。messageModel中可以设置两种一种是MessageModel.BROADCASTING和 MessageModel.CLUSTERING。MessageModel.BROADCASTING就是广播模式,MessageModel.CLUSTERING则是集群模式,这也是@RocketMQMessageListener这个注解默认的模式就是集群模式。
接下来就来测试一下啊集群模式和广播模式:
首先我们为了方便直接复制一个原有的consumer这个子项目的运行程序,就改个端口即可:
进行复制,在通过Environment variables来设置自己的端口
从而达到有两个消费者启动类,在启动之后在调用方法发送消息,就能看到集群的消息了。
而广播模式也就只需要把messageModel这个参数修改成MessageModel.BROADCASTING就可以测试广播模式了。
RocketMQ发送顺序、延时、事务消息
发送顺序消息
要想发送顺序消息,RocketMQTemplate给我们提供了同步、异步、单向的顺序消息,比如以下:
syncSendOrderly,同步顺序消息;
asyncSendOrderly,异步顺序消息;
sendOneWayOrderly,单向顺序消息;
发送顺序消息的方法跟平常不同的就在于第三个参数,就是hashKey这个就是用来计算队列的
而想要发送顺序消息在springboot中只需要两步,第一步就是通过RocketMQTemplate调用syncSendOrderly方法发送消息,第二个就是在@RocketMQMessageListener里面设置
consumeMode = ConsumeMode.ORDERLY
。这个consumeMode 参数在注解中默认的是
ConsumeMode.CONCURRENTLY
,这消费模式代表的是并行处理。
下面就是一个案例:
消费者端:
@Component
@RocketMQMessageListener(topic = "topic",consumerGroup ="${rocketmq.consumer.group}",consumeMode = ConsumeMode.ORDERLY)
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String o) {
System.out.println("收到的消息为:"+o);
}
}
生产者端:
public void sendSysnOrderMessage(){
rocketMQTemplate.syncSendOrderly("topic","支付1","123123");
rocketMQTemplate.syncSendOrderly("topic","收款1","123123");
rocketMQTemplate.syncSendOrderly("topic","付账1","123123");
rocketMQTemplate.syncSendOrderly("topic","支付2","123124");
rocketMQTemplate.syncSendOrderly("topic","收款2","123124");
rocketMQTemplate.syncSendOrderly("topic","付账2","123124");
}
结果:
发送延迟消息
要想发送延迟消息其实也就是调用RocketMQTemplate调用syncSend方法,syscSend方法有7个重载的样式,其中就有一种的参数里面就有一个设置
delayLevel
的,这是设置延迟消息的等级,等级排序如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
在这个等级下,对delayLevel设置1的话就是1s,2就是5s,一次类推下去。
而发送延迟消息则只需要
public void sendMessageTimeOut(){
rocketMQTemplate.syncSend("topic", MessageBuilder.withPayload("延迟消息第二等级,5秒").build(),3000,2);
}
就能发送出一个延迟消息。
发送事务消息
在发送事务消息之前,首先要先了解一下消息状态这个概念,事务消息有三种状态:
-
RocketMQLocalTransactionState.COMMIT
:提交事务消息,消费者可以消费此消息 -
RocketMQLocalTransactionState.ROLLBACK
:回滚事务,它代表该消息将被删除,不允许被消费。 -
RocketMQLocalTransactionState.UNKNOWN
:中间状态,它代表需要检查消息队列来确定状态。
之后就是要理解事务消息的运行的流程:
1、MQ发送方先发送Half消息给MQServer。
2、Half消息发送成功后,应用模块执行本地事务。
3、根据本地事务执行的结果,再返回Commit或Rollback给MQ发送方。
4、如果是Commit,MQ把消息下发给MQ订阅方,如果是Rollback,直接删掉Half消息。
5、执行结果如果没响应,或是超时等各种原因导致消息最终未能到达服MQ订阅方的时候,则会启动消息回查。
而在springboot整合RocketMQ中要想发送事务消息只需要调用
rocketMQTemplate
中的
sendMessageInTransaction()方法
就能实现。
只是在和其他发送消息不同的就是要自己实现本地事务的处理类,也就是需要
RocketMQLocalTransactionListener
接口和**@RocketMQTransactionListener**注解。
RocketMQLocalTransactionListener接口中只有2个方法:
public interface RocketMQLocalTransactionListener {
RocketMQLocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
RocketMQLocalTransactionState checkLocalTransaction(final Message msg);
}
executeLocalTransaction方法则是可以书写自己的业务逻辑,在根据不同的情况返回不同的消息状态,如果返回的是UNKNOWN这个状态的话,则会调用checkLocalTransaction方法,只有当状态成为COMMIT才会消费信息。
下面则是具体的示例:
public void sendTonditionalMessage(){
Message<String> msg = MessageBuilder.withPayload("事务消息").build();
rocketMQTemplate.sendMessageInTransaction("topic",msg,null);
}
本地事务类:
@RocketMQTransactionListener
public class RocketServiceT implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("executeLocalTransaction被调用");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("checkLocalTransaction被调用");
return RocketMQLocalTransactionState.COMMIT;
}
}
结果: