目录
在一些业务场景下,我们希望消息被并发处理,提高处理效率;
另一些业务场景下,为了保证业务数据处理顺序,需要优先保证消息消息的消费顺序。
1、关键点
保证RocketMQ消息顺序消费的关键主要有以下几点:
-
保证生产者消费者用同一topic
-
保证生产者消费者用同一topic下的同一个queue(默认一个topic下有4个queue)
-
发消息的时候用一个线程去发送消息
-
消费的时候 只用一个线程去消费一个queue里的消息(默认MessageListenerConcurrently使用20个线程去消费处理消息
)或者使用MessageListenerOrderly
-
如果多个queue都有消息,只能保证每个单个queue里的消费是顺序的
2、生产者
生产端要保证使用指定topic下的指定queue,顺序消息的使用需要在producer的send()方法中添加MessageQueueSelector接口的实现类(或者使用包里已经有的实现SelectMessageQueueByHash等),并重写select选择使用的队列,因为顺序消息局部顺序,需要将所有消息指定发送到同一队列中。
package rocketmq.FIFO;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
/**
* 消息发送者
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("xxooProducerGroup");
//设置nameserver地址
producer.setNamesrvAddr("192.168.159.130:9876");
producer.start();
for (int i = 0; i < 20; i++) {
Message message = new Message("fifo",("保证消息顺序发送顺序消费"+i).getBytes());
//producer.send(message,new SelectMessageQueueByHash(),1);
/*public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value %= mqs.size();
return (MessageQueue)mqs.get(value);
}*/
SendResult sendResult = producer.send(message,
//queue选择器,选择向topic的指定queue中去写消息
new MessageQueueSelector() {
@Override
//手动选择一个queue 此处为选择策略
public MessageQueue select(
//当前topic中包含的所有queue
List<MessageQueue> list,
//要发送的消息
Message message,
//对应到 send()中的args
Object args) {
//选择好的queue 向固定的queue中发送消息
return list.get((Integer) args);
}
}, 1, 2000
);
System.out.println("发送回执:"+sendResult);
}
}
}
3、消费者
如果只有一个queue里有消息,则无需限定消费的线程数,只需要使用MessageListenerOrderly限定顺序消费即可;如果同一topic下有多个queue都有消息并要保证顺序消费,则需要设定消费线程数为1 。
package rocketmq.FIFO;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 保证消息的消费顺序
* - 同一topic
*
* - 同一个QUEUE(一个topic默认有4个queue)
*
* - 发消息的时候一个线程去发送消息
*
* - 消费的时候 一个线程 消费一个queue里的消息或者使用MessageListenerOrderly
*
* - 多个queue 只能保证单个queue里的顺序
*/
public class Consumer2 {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xxooConsumerGroup");
consumer.setNamesrvAddr("192.168.159.130:9876");
//topic
//selector 过滤器 *表示不过滤MessageSelector
//tag selector 在一个group中的消费者,都不能随便变,要保持统一
consumer.subscribe("fifo","*");
//最大开启消费线程数
consumer.setConsumeThreadMax(1);
//最小开启消费线程数
consumer.setConsumeThreadMin(1);
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt msg: list) {
System.out.println(new String(msg.getBody())+" Thread :"+Thread.currentThread().getName());
}
//默认情况下这条消息只会被一个consumer消费到点对点
//massage 的状态修改
//ack
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
版权声明:本文为phs999原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。