消息队列RocketMQ版提供三种方式来发送普通消息:同步(Sync)发送、异步(Async)发送和单向(Oneway)发送。本文介绍了每种发送方式的使用方式。
1.在pom.xml引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
2.创建测试类,使用同步发送
//同步发送,发送消息成功之后,在收到服务端的相应后,才会继续发送下一条消息
//应用场景:发邮件
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
//RocketMQ 分很多消息发送与接收
// 1同步 2异步 3单向
//无论哪一种发送消息的类型,基本上一样
//1.创建一个提供消息的对象,准备发送消息
//2.通过nameserver对象,找到我们要往哪里发送消息(IP 端口)
//3.创建消息(Topic Message key Tag)
//4.通过nameServer找到broker后发送消息
//5.关闭各种对象
DefaultMQProducer producer = new DefaultMQProducer("tm_XBL");
// RocketMQ 的ip地址已经端口号
producer.setNamesrvAddr("192.168.1.7:9876");
//需要将producer 启动起来 (这里有异常可测试阶段直接向上抛啦)
producer.start();
//编辑消息
String body="{userName:'Tom',hobby:'爱吃白菜'}";
//创建消息
Message message=new Message("topicTmXBL","tagsTmXBL","keysTmXBL",body.getBytes());
//发送消息 (这里有异常可测试阶段直接向上抛啦)
SendResult send = producer.send(message);
//返回结果查看
System.out.println(send);
//关闭链接
producer.shutdown();
}
控制台输出结果:ok
3.使用异步发送
//异步发送消息
//一般用在发送方不用等待服务器第一时间的响应,可以直接发送下一条消息
//应用场景:用户上传身份证进行验证信息
//等待到什么时候服务器审核到了用户发送过来的身份信息
//再进行回调
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
//前两步不变
DefaultMQProducer producer = new DefaultMQProducer("tm_XBL");
producer.setNamesrvAddr("192.168.1.7:9876");
//需要将producer 对象启动
producer.start();
//编辑消息
String body="{userName:'Jerry',hobby:'Tom'}";
//创建消息
Message message = new Message("topicTmXBL","tagsTmXBL","keysTmXBL",body.getBytes());
//发送异步消息 发送结果通过callback返回给客户端
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//成功回调 同时关闭链接
System.out.println("回调成功");
producer.shutdown();
}
@Override
public void onException(Throwable throwable) {
//失败回调 同时关闭链接
System.out.println("回调失败");
producer.shutdown();
}
});
//不能关闭producer对象
//因为producer要在后续代码执行的同时,同时监控成功/失败回调
}
控制台结果:ok
4.单向发送
//单向发送
//只管发,不管早晚,也不管成败
//应用场景:用于耗时短、且有可靠性,回调要求不高,例如 日志记录
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
//前两步不变
DefaultMQProducer producer = new DefaultMQProducer("tm_XBL");
producer.setNamesrvAddr("192.168.1.7:9876");
//需要将producer对象启动起来
producer.start();
//编辑信息
String body="userName:'Allen',hobby:'Jerry'";
//创建消息
Message message=new Message("topicTmXBL","tagsTmXBL","keysTmXBL",body.getBytes());
//发送信息
producer.send(message);
//关闭链接
producer.shutdown();
}
控制台结果:ok
5.接收消息/消费消息
//消费消息
//注意,同一个消息可以被不同组的消费者消费
//不能被同一组消费者重复消费
public static void main(String[] args) throws MQClientException {
//创建消费者对象
//Push 是在消费者消费消息后可以返回告诉MQ消费状态的对象
//Pull 是只管消费的对象,后续可能没什么处理
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tm_consumer");
//通过NameSrv设置IP+端口
consumer.setNamesrvAddr("192.168.1.7:9876");
//设置一些消息的参数
//是否顺序消费(先进先出/先进后出)
//CONSUME_FROM_FIRST_OFFSET 先进先出
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置需要消费哪类信息
consumer.subscribe("topicTmXBL","tagsTmXBL");
//进行消费信息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
//list就是根据条件得到的数据
//consumeOrderlyContext就是设置各种属性的上下文对象
//是否设置消费的消息在 被消费后被标记为以消费,相当于是删除的意思
consumeOrderlyContext.setAutoCommit(true);
list.forEach(a->{
String msgId = a.getMsgId();
String topic = a.getTopic();
String tags = a.getTags();
byte[] body = a.getBody();
String str=new String(body);
JSONObject jsonObject = JSON.parseObject(str);
System.out.print("用户姓名----"+jsonObject.get("userName")+",爱好----"+jsonObject.get("hobby")+",");
System.out.println(msgId+","+topic+","+tags);
});
return ConsumeOrderlyStatus.SUCCESS;
}
});
//直接消费
consumer.start();
// //消费者需要关闭吗? 不用
// //1、我们在项目中使用消息消费者,是需要持续不断的读取MQ中的消息,
// //所以不用关闭
// //2、在读取消息时,是自动开启另一线程,和当前代码不是同时执行
// //可能造成,上面的consumeMessage方法还没走完,却已经consumer.shutdown了
// //consumer.shutdown();
}
输出结果:
最后总结一下三种发送方式的优缺点
版权声明:本文为m0_73093747原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。