RocketMQ消息的发送与接收

首先第一步就是我们先来通过maven来创建一个父项目并且引入一下的文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        
        <artifactId>你的项目名</artifactId>
        
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>Rocketmq-produer</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
<dependencies>
    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>
</project>

再在其下面创建除consumer和produer的子项目来进行消息的传送和接收。

consumer的pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>RocketMQ_spriingboot</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>Rocketmq-consumer</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

produer的pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>RocketMQ_spriingboot</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>Rocketmq-produer</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
<dependencies>
    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>
</project>

创建完成之后,则需要来配置consumer和produer的rockeymq的配置文件

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=组名   //可以随便自己定义

在此之后就可以来测试一下,到底连接成功没有。

在produer这个项目中创建ProducerService来进行消息的发送:

@Component("producerService")
public class ProducerService {

    private final RocketMQTemplate rocketMQTemplate;
   @Autowired
    public ProducerService(RocketMQTemplate rocketMQTemplate) {
        this.rocketMQTemplate = rocketMQTemplate;
    }
        public void sendMessage(){
        for (int i = 0; i < 10; i++) {
        
            rocketMQTemplate.convertAndSend("topic","message"+i);
        }
    }
}

在cunsumer中则创建一个来进行接收:

@Component
@RocketMQMessageListener(topic = "topic",consumerGroup ="${rocketmq.consumer.group}")
public class ConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String o) {
        System.out.println("收到的消息为:"+o);
    }
}



RocketMQ发送同步、异步、单向消息

@Component("producerService")
public class ProducerService {
 
public void sendSyncMessage(){//同步
        for (int i = 0; i < 10; i++) {
            SendResult topic_1 = rocketMQTemplate.syncSend("topic", "message" + 1);
            System.out.println(topic_1);
        }
    }
 
    public void sendAsyncMessage(){//异步
        for (int i = 0; i < 10; i++) {

            rocketMQTemplate.asyncSend("topic","message"+i, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                throwable.printStackTrace();
                }
            });
        }
    }
    public void sendOneMessage(){
        for (int i = 0; i < 10; i++) {//单向发送
            rocketMQTemplate.sendOneWay("topic","message"+1);
        }
    }
}


同步发送消息

在发送完之后,会返回一个SendResult这个类,在这个类里面可以获取各种的信息,比如消息队列还有消息的id等等。而

异步发送消息

则没有返回,它会直接调用SendCallback()类中的

onSuccess和onException方法,如果成功则会调用onSuccess方法,而在其中就有SendResult,所以是跟同步返回的是同样的效果。



消费者的广播模式和集群模式

在springboot整合的RocketMQ这种形式中,要想实现消费者的集群和广播模式是非常简单的,仅仅只需要在@RocketMQMessageListener这个注解中设置一个参数

messageModel

。messageModel中可以设置两种一种是MessageModel.BROADCASTING和 MessageModel.CLUSTERING。MessageModel.BROADCASTING就是广播模式,MessageModel.CLUSTERING则是集群模式,这也是@RocketMQMessageListener这个注解默认的模式就是集群模式。

接下来就来测试一下啊集群模式和广播模式:

首先我们为了方便直接复制一个原有的consumer这个子项目的运行程序,就改个端口即可:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RFUicZZQ-1681395689776)(C:\Users\LENOVO\AppData\Roaming\Typora\typora-user-images\image-20230409172544921.png)]

进行复制,在通过Environment variables来设置自己的端口

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2J0Sqr3U-1681395484345)(C:\Users\LENOVO\AppData\Roaming\Typora\typora-user-images\image-20230409172626778.png)]

从而达到有两个消费者启动类,在启动之后在调用方法发送消息,就能看到集群的消息了。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FODVyCmM-1681395484346)(C:\Users\LENOVO\AppData\Roaming\Typora\typora-user-images\image-20230409173850576.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZrkYurES-1681395484346)(C:\Users\LENOVO\AppData\Roaming\Typora\typora-user-images\image-20230409173902135.png)]

而广播模式也就只需要把messageModel这个参数修改成MessageModel.BROADCASTING就可以测试广播模式了。



RocketMQ发送顺序、延时、事务消息



发送顺序消息

要想发送顺序消息,RocketMQTemplate给我们提供了同步、异步、单向的顺序消息,比如以下:

syncSendOrderly,同步顺序消息;

asyncSendOrderly,异步顺序消息;

sendOneWayOrderly,单向顺序消息;

发送顺序消息的方法跟平常不同的就在于第三个参数,就是hashKey这个就是用来计算队列的

而想要发送顺序消息在springboot中只需要两步,第一步就是通过RocketMQTemplate调用syncSendOrderly方法发送消息,第二个就是在@RocketMQMessageListener里面设置

consumeMode = ConsumeMode.ORDERLY

。这个consumeMode 参数在注解中默认的是

ConsumeMode.CONCURRENTLY

,这消费模式代表的是并行处理。

下面就是一个案例:

消费者端:

@Component
@RocketMQMessageListener(topic = "topic",consumerGroup ="${rocketmq.consumer.group}",consumeMode = ConsumeMode.ORDERLY)
public class ConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String o) {
        System.out.println("收到的消息为:"+o);
    }
}

生产者端:

public void sendSysnOrderMessage(){
   rocketMQTemplate.syncSendOrderly("topic","支付1","123123");
    rocketMQTemplate.syncSendOrderly("topic","收款1","123123");
    rocketMQTemplate.syncSendOrderly("topic","付账1","123123");

    rocketMQTemplate.syncSendOrderly("topic","支付2","123124");
    rocketMQTemplate.syncSendOrderly("topic","收款2","123124");
    rocketMQTemplate.syncSendOrderly("topic","付账2","123124");
}

结果:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yCmUmtvM-1681395484347)(C:\Users\LENOVO\AppData\Roaming\Typora\typora-user-images\image-20230410222237446.png)]



发送延迟消息

要想发送延迟消息其实也就是调用RocketMQTemplate调用syncSend方法,syscSend方法有7个重载的样式,其中就有一种的参数里面就有一个设置

delayLevel

的,这是设置延迟消息的等级,等级排序如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

在这个等级下,对delayLevel设置1的话就是1s,2就是5s,一次类推下去。

而发送延迟消息则只需要

public void sendMessageTimeOut(){
   rocketMQTemplate.syncSend("topic", MessageBuilder.withPayload("延迟消息第二等级,5秒").build(),3000,2);
}

就能发送出一个延迟消息。



发送事务消息

在发送事务消息之前,首先要先了解一下消息状态这个概念,事务消息有三种状态:


  • RocketMQLocalTransactionState.COMMIT

    :提交事务消息,消费者可以消费此消息

  • RocketMQLocalTransactionState.ROLLBACK

    :回滚事务,它代表该消息将被删除,不允许被消费。

  • RocketMQLocalTransactionState.UNKNOWN

    :中间状态,它代表需要检查消息队列来确定状态。

之后就是要理解事务消息的运行的流程:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Emrh4YrG-1681395484347)(C:\Users\LENOVO\Desktop\d727f030e9af4c13875899a55d029e09.png)]

1、MQ发送方先发送Half消息给MQServer。

2、Half消息发送成功后,应用模块执行本地事务。

3、根据本地事务执行的结果,再返回Commit或Rollback给MQ发送方。

4、如果是Commit,MQ把消息下发给MQ订阅方,如果是Rollback,直接删掉Half消息。

5、执行结果如果没响应,或是超时等各种原因导致消息最终未能到达服MQ订阅方的时候,则会启动消息回查。

而在springboot整合RocketMQ中要想发送事务消息只需要调用

rocketMQTemplate

中的

sendMessageInTransaction()方法

就能实现。

只是在和其他发送消息不同的就是要自己实现本地事务的处理类,也就是需要

RocketMQLocalTransactionListener

接口和**@RocketMQTransactionListener**注解。

RocketMQLocalTransactionListener接口中只有2个方法:

public interface RocketMQLocalTransactionListener {
    RocketMQLocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    RocketMQLocalTransactionState checkLocalTransaction(final Message msg);
}

executeLocalTransaction方法则是可以书写自己的业务逻辑,在根据不同的情况返回不同的消息状态,如果返回的是UNKNOWN这个状态的话,则会调用checkLocalTransaction方法,只有当状态成为COMMIT才会消费信息。

下面则是具体的示例:

public void sendTonditionalMessage(){
    Message<String> msg = MessageBuilder.withPayload("事务消息").build();
    rocketMQTemplate.sendMessageInTransaction("topic",msg,null);
}

本地事务类:

@RocketMQTransactionListener
public class RocketServiceT implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("executeLocalTransaction被调用");
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        System.out.println("checkLocalTransaction被调用");
        return RocketMQLocalTransactionState.COMMIT;
    }
}

结果:

在这里插入图片描述



版权声明:本文为lixin203171613原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/lixin203171613/article/details/130141697