目录

1、关键点

2、生产者

3、消费者


在一些业务场景下,我们希望消息被并发处理,提高处理效率;

另一些业务场景下,为了保证业务数据处理顺序,需要优先保证消息消息的消费顺序。

1、关键点

保证RocketMQ消息顺序消费的关键主要有以下几点:

  • 保证生产者消费者用同一topic

  • 保证生产者消费者用同一topic下的同一个queue(默认一个topic下有4个queue)

  • 发消息的时候用一个线程去发送消息

  • 消费的时候 只用一个线程去消费一个queue里的消息(默认MessageListenerConcurrently使用20个线程去消费处理消息

    )或者使用MessageListenerOrderly

  • 如果多个queue都有消息,只能保证每个单个queue里的消费是顺序的

2、生产者

生产端要保证使用指定topic下的指定queue,顺序消息的使用需要在producer的send()方法中添加MessageQueueSelector接口的实现类(或者使用包里已经有的实现SelectMessageQueueByHash等),并重写select选择使用的队列,因为顺序消息局部顺序,需要将所有消息指定发送到同一队列中。

package rocketmq.FIFO;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.List;

/**
 * 消息发送者
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {

        DefaultMQProducer producer = new DefaultMQProducer("xxooProducerGroup");
        //设置nameserver地址
        producer.setNamesrvAddr("192.168.159.130:9876");

        producer.start();

        for (int i = 0; i < 20; i++) {
            Message message = new Message("fifo",("保证消息顺序发送顺序消费"+i).getBytes());

            //producer.send(message,new SelectMessageQueueByHash(),1);
       /*public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value %= mqs.size();
        return (MessageQueue)mqs.get(value);
    }*/
            SendResult sendResult = producer.send(message,
                    //queue选择器,选择向topic的指定queue中去写消息
                    new MessageQueueSelector() {
                        @Override
                        //手动选择一个queue 此处为选择策略
                        public MessageQueue select(
                                //当前topic中包含的所有queue
                                List<MessageQueue> list,
                                //要发送的消息
                                Message message,
                                //对应到 send()中的args
                                Object args) {

                            //选择好的queue 向固定的queue中发送消息
                            return list.get((Integer) args);
                        }
                    }, 1, 2000
            );
            System.out.println("发送回执:"+sendResult);
        }


    }
}

3、消费者

如果只有一个queue里有消息,则无需限定消费的线程数,只需要使用MessageListenerOrderly限定顺序消费即可;如果同一topic下有多个queue都有消息并要保证顺序消费,则需要设定消费线程数为1 。

package rocketmq.FIFO;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 保证消息的消费顺序
 * - 同一topic
 *
 * - 同一个QUEUE(一个topic默认有4个queue)
 *
 * - 发消息的时候一个线程去发送消息
 *
 * - 消费的时候 一个线程 消费一个queue里的消息或者使用MessageListenerOrderly
 *
 * - 多个queue 只能保证单个queue里的顺序
 */
public class Consumer2 {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xxooConsumerGroup");

        consumer.setNamesrvAddr("192.168.159.130:9876");

        //topic
        //selector 过滤器 *表示不过滤MessageSelector
        //tag selector 在一个group中的消费者,都不能随便变,要保持统一
        consumer.subscribe("fifo","*");

        //最大开启消费线程数
        consumer.setConsumeThreadMax(1);
        //最小开启消费线程数
        consumer.setConsumeThreadMin(1);

        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg: list) {
                    System.out.println(new String(msg.getBody())+" Thread :"+Thread.currentThread().getName());
                }
                //默认情况下这条消息只会被一个consumer消费到点对点
                //massage 的状态修改
                //ack
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });



        consumer.start();
    }
}

https://github.com/phs999/MQ/tree/dc38a320fa5fbc08f1e3e0373a36e8f21829ccde/MQ-RocketMQ-01/src/main/java/rocketmq/FIFO


版权声明:本文为phs999原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/phs999/article/details/112505215