前言

  • 如题,不多说,直接上操作步骤

代码实现

1.生产者服务和消费者服务的pom文件引入消息驱动整合RabbitMq依赖

<dependency>
        <groupId>org.springframework.cloud</groupId>    
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        <version>2.0.1.RELEASE</version>
    </dependency>

上面展示的是消息驱动+RabbitMQ,如需换成整合Kafuka,替换为如下即可

<dependency>
        <groupId>org.springframework.cloud</groupId>    
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        <version>2.0.1.RELEASE</version>
    </dependency>

2.项目创建消息通道接口,自定义通道名称test_channel

  • 生产者服务
public interface SendMessage {
        //创建发送消息通道
        @Output("test_channel")
        SubscribableChannel sendMsg();
    }
  • 消费者服务
public interface ReadMessage {
        //创建接收消息通道
        @Output("test_channel")
        SubscribableChannel readMsg();
    }

3.两个服务启动类都加上@EnableBinding(xxx.class),绑定各自的通道,xxx对应各自定义的通道接口类名

@SpringBootApplication
//绑定通道,如果同一服务中有多个通道,可同时绑定多个
//@EnableBinding(value = {test.class, test2.class,...})
@EnableBinding(SendMessage.class)
public class MetMailApplication {
    public static void main(String[] args) {
        SpringApplication.run(MetMailApplication.class, args);
    }
}

4.消息的发送与接收

  • 生产者服务
@RestController
public class Controller {
	@Autowired
	private SendMessage sendMessage;
	
	@RequestMapping(value = "test")
    public void tset(){
		JSONObject json = new JSONObject();
		json.put("phoneNumber", "123");
		json.put("type", "1");

		Message<String> message = MessageBuilder.withPayload(json.toString()).build();
		sendMessage.sendMsg().send(message);
	}
}
  • 消费者服务
@Component
public class Consumer {
	//该通道名即自定义名称
	@StreamListener("test_channel")
	public void redMsg(String msg) {
		System.out.println(msg);
		//输出打印得到生产者的消息 {"phoneNumber":"123","type":"1"}
	}
}

5.相关配置

  • 在配置文件xxx-prod.properties中
#该配置可在同一组消费通道有多个消费者服务时,可保证一个消息只被消费一次,达到负载均衡的效果,
#名称自定义,且其中通道名保持一致
spring.cloud.stream.bindings.test_channel.group=test-stream
#队列中消息过期时间 ms
spring.cloud.stream.rabbit.bindings.test_channel.consumer.ttl=90000
#队列过期时间 ms 默认不过期
spring.cloud.stream.rabbit.bindings.test_channel.consumer.expires=90000
  • 更多配置可参考点击跳转,或查阅其他相关资料

版权声明:本文为weixin_46603727原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_46603727/article/details/116758711