前言
- 如题,不多说,直接上操作步骤
代码实现
1.生产者服务和消费者服务的pom文件引入消息驱动整合RabbitMq依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
上面展示的是消息驱动+RabbitMQ,如需换成整合Kafuka,替换为如下即可
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
2.项目创建消息通道接口,自定义通道名称test_channel
- 生产者服务
public interface SendMessage {
//创建发送消息通道
@Output("test_channel")
SubscribableChannel sendMsg();
}
- 消费者服务
public interface ReadMessage {
//创建接收消息通道
@Output("test_channel")
SubscribableChannel readMsg();
}
3.两个服务启动类都加上@EnableBinding(xxx.class),绑定各自的通道,xxx对应各自定义的通道接口类名
@SpringBootApplication
//绑定通道,如果同一服务中有多个通道,可同时绑定多个
//@EnableBinding(value = {test.class, test2.class,...})
@EnableBinding(SendMessage.class)
public class MetMailApplication {
public static void main(String[] args) {
SpringApplication.run(MetMailApplication.class, args);
}
}
4.消息的发送与接收
- 生产者服务
@RestController
public class Controller {
@Autowired
private SendMessage sendMessage;
@RequestMapping(value = "test")
public void tset(){
JSONObject json = new JSONObject();
json.put("phoneNumber", "123");
json.put("type", "1");
Message<String> message = MessageBuilder.withPayload(json.toString()).build();
sendMessage.sendMsg().send(message);
}
}
- 消费者服务
@Component
public class Consumer {
//该通道名即自定义名称
@StreamListener("test_channel")
public void redMsg(String msg) {
System.out.println(msg);
//输出打印得到生产者的消息 {"phoneNumber":"123","type":"1"}
}
}
5.相关配置
- 在配置文件xxx-prod.properties中
#该配置可在同一组消费通道有多个消费者服务时,可保证一个消息只被消费一次,达到负载均衡的效果,
#名称自定义,且其中通道名保持一致
spring.cloud.stream.bindings.test_channel.group=test-stream
#队列中消息过期时间 ms
spring.cloud.stream.rabbit.bindings.test_channel.consumer.ttl=90000
#队列过期时间 ms 默认不过期
spring.cloud.stream.rabbit.bindings.test_channel.consumer.expires=90000
- 更多配置可参考点击跳转,或查阅其他相关资料
版权声明:本文为weixin_46603727原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。