消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

一、MQ使用场景

  1. 异步处理
  2. 流量削峰填谷(比如:秒杀)
  3. 解耦微服务

二、常见MQ产品对比

参考:https://www.imooc.com/article/290040

三、搭建RocketMQ

参考:https://www.imooc.com/article/290089

第1步:解压

第2步:在mq当前目录打开终端输入命令

        nohup sh bin/mqnamesrv &

第3步:测试是否启动成功,命令:

        tail -f ~/logs/rocketmqlogs/namesrv.log

第4步:启动Broker,命令

        nohup sh bin/mqbroker -n localhost:9876 &

第5步:测试Broker是否启动成功,命令

        tail -f ~/logs/rocketmqlogs/broker.log

       或者直接打开目录中自动生成的nohup.out文件看日志结果

停止:以此执行以下两条命令

# 命令

sh bin/mqshutdown broker

# 输出如下信息说明停止成功

The mqbroker(36695) is running…

Send shutdown request to mqbroker(36695) OK

# 命令

sh bin/mqshutdown namesrv

# 输出如下信息说明停止成功

The mqnamesrv(36664) is running…

Send shutdown request to mqnamesrv(36664) OK

 四、搭建RocketMQ控制台

参考:

https://www.imooc.com/article/290092

下载课程提供的懒人包:rocketmq-console-ng-1.0.1.jar

在命令行中启动jar

java -jar rocketmq-console-ng-1.0.1.jar

访问网页:

http://localhost:17890/

控制台使用说明参考:https://github.com/eacdy/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md

五、RocketMQ实现分布式事务

  1. 半消息(Half(Prepare)Message):暂时无法消费的消息。生产者将消息发送到了MQServer,但这个消息会被标记为“咱不能投递”状态,先储存起来;消费者不会去消费这条消息。
  2. 消息回查(Message Status Check):网络断开或生产者重启可能导致丢失事务消息的第二次确认。当MQServer发现消息长时间出于半消息状态时,将向消息生产者发送请求,询问该消息的最终状态(提交或回滚)。

消息的三种状态

  1. Commit:提交事务消息,消费者可以消费此消息
  2. Rollback:回滚事务消息,broker会删除该消息,消费者不能消费
  3. UNKNOWN:broker需要回查确认该消息的状态

 六、SpringCloudStream

SpringCloudStream是一款用于构建消息驱动的微服务框架

Spring Cloud Stream编程模型

  1. Destination Binder(目标绑定器):与消息中间件通信的组件
  2. Destination Bindings(目标绑定):Binding是连接应用程序跟消息中间件的桥梁,用于消息的消费和上产,由binder创建
  3. Message:消息

 6.1、用SpringCloudStream编写生产者

6.1.1、pom.xml中添加依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

 6.1.2、BootApplication类添加注解

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@EnableBinding(Source.class)
public class ContentCenterApplication {
...
}

6.1.3、在application.yml中添加属性

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        output:
          #用来指定topic
          destination: stream-test-topic

6.1.4、在TestController中添加GetMapping

@Autowired
private Source source;
@GetMapping("/test-steam")
public String testStream(){
    this.source.output()
            .send(
                    MessageBuilder
                            .withPayload("消息体")
                            .build()
            );
    return "success";
}

6.1.5、运行效果

 发现消息已经被投递到RokectMQ

 修改application.yml配置属性,避免显示SPringCloudStream发送心跳包的日志

#设置日志输出级别
logging:
  level:
    com.itmuch.contentcenter.feignclient.UserCenterFeignClient: debug
    #避免显示SpringCloudStream发送心跳包的日志
    com.alibaba.nacos: error

 6.2、SpringCloudStream编写消费者

6.2.1、在pom.xml中添加依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

6.2.2、在BootApplication中添加注解

import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
public class UserCenterApplication {
...
}

6.2.3、在application.yml中添加属性

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        input:
          #用来指定topic,要和content-center微服务的topic匹配
          destination: stream-test-topic
          #一定要设置,必填项,如果用其他MQ,该属性可以不设置
          group: binder-group

 6.2.4、添加测试Service

package personal.qin.usercenter.rocketmq;//package com.itmuch.usercenter.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class TestStreamConsumer {

    @StreamListener(Sink.INPUT)
    public void receive(String messageBody){
        log.info("通过stream收到了消息:messageBody={}", messageBody);
//        throw IllegalArgumentException("抛异常")
    }

    /**
     * 全局异常处理
     *
     * @param message 发生异常的消息
     */
    @StreamListener("errorChannel")
    public void error(Message<?> message) {
        ErrorMessage errorMessage = (ErrorMessage) message;
        log.warn("RocketMQ-SpringCloudStream发生异常,errorMessage={}", errorMessage);
    }
}

6.2.5、测试运行效果 ,可以正常消费消息

七、消息过滤

参考:https://www.imooc.com/article/290424

八、SpringCloudStream异常处理

参考:https://www.imooc.com/article/290435

九、SpringCloudStream+RocketMQ实现生产者分布式事务(事务消息)

9.1、修改application.yml

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        #添加事务控制时使用
        bindings:
          output:
            producer:
              #事务消息
              transactional: true
              #要和@RocketMQTransactionListener的txProducerGroup的值一致
              group: tx-add-bonus-group

9.2、新建添加积分事务监听类

package com.itmuch.contentcenter.rocketmq;

import com.alibaba.fastjson.JSON;
import com.itmuch.contentcenter.dao.messaging.RocketmqTransactionLogMapper;
import com.itmuch.contentcenter.domain.dto.content.ShareAuditDTO;
import com.itmuch.contentcenter.domain.entity.messaging.RocketmqTransactionLog;
import com.itmuch.contentcenter.service.content.ShareService;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
    private final ShareService shareService;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        MessageHeaders headers = msg.getHeaders();

        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer shareId = Integer.valueOf((String) headers.get("share_id"));

        String dtoString = (String) headers.get("dto");
        ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class);

        try {
            this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        // select * from xxx where transaction_id = xxx
        RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
            RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .build()
        );
        if (transactionLog != null) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

9.3、ShareService类

package com.itmuch.contentcenter.service.content;

import com.alibaba.fastjson.JSON;
import com.itmuch.contentcenter.dao.content.ShareMapper;
import com.itmuch.contentcenter.dao.messaging.RocketmqTransactionLogMapper;
import com.itmuch.contentcenter.domain.dto.content.ShareAuditDTO;
import com.itmuch.contentcenter.domain.dto.content.ShareDTO;
import com.itmuch.contentcenter.domain.dto.messaging.UserAddBonusMsgDTO;
import com.itmuch.contentcenter.domain.dto.user.UserDTO;
import com.itmuch.contentcenter.domain.entity.content.Share;
import com.itmuch.contentcenter.domain.entity.messaging.RocketmqTransactionLog;
import com.itmuch.contentcenter.domain.enums.AuditStatusEnum;
import com.itmuch.contentcenter.feignclient.UserCenterFeignClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;

import java.util.Objects;
import java.util.UUID;

@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareService {
    private final ShareMapper shareMapper;
//    private final RestTemplate restTemplate;
    private final UserCenterFeignClient userCenterFeignClient;
    private final RocketMQTemplate rocketMQTemplate;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    private final Source source;

    public ShareDTO findById(Integer id){
        Share share= this.shareMapper.selectByPrimaryKey(id);
        Integer userId = share.getUserId();

        //调用用户微服务的/users/{userId}
        //用HttpGet方法去请求,并返回一个对象
//        ResponseEntity<UserDTO> forEntity = restTemplate.getForEntity(
//                "http://localhost:8080/users/{id}",
//                UserDTO.class,
//                userId);
//        //获取响应的状态码
//        HttpStatus statusCode = forEntity.getStatusCode();

//        UserDTO userDTO = this.restTemplate.getForObject(
//                "http://user-center/users/{userId}", //Ribbon会自动把user-center转换成其在nacos上注册的地址,并且进行负载均衡
//                UserDTO.class, userId);

        UserDTO userDTO = this.userCenterFeignClient.findById(userId);

        //消息的装配
        ShareDTO shareDTO = new ShareDTO();
        BeanUtils.copyProperties(share, shareDTO); //将share对象中的数据拷贝到shareDTO对象
        shareDTO.setWxNickname(userDTO.getWxNickname());
        return shareDTO;
    }

    public Share auditById(Integer id, ShareAuditDTO auditDTO) {
        // 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
        Share share = this.shareMapper.selectByPrimaryKey(id);
        if (share == null) {
            throw new IllegalArgumentException("参数非法!该分享不存在!");
        }
        if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
            throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");
        }

        // 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
        if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
            // 发送半消息。。
            String transactionId = UUID.randomUUID().toString();

            this.source.output()
                    .send(
                            MessageBuilder
                                    .withPayload(
                                            UserAddBonusMsgDTO.builder()
                                                    .userId(share.getUserId())
                                                    .bonus(50)
                                                    .build()
                                    )
                                    // header也有妙用...
                                    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                                    .setHeader("share_id", id)
                                    .setHeader("dto", JSON.toJSONString(auditDTO))
                                    .build()
                    );
        } else {
            this.auditByIdInDB(id, auditDTO);
        }
        return share;
    }

    @Transactional(rollbackFor = Exception.class)
    public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
        Share share = Share.builder()
                .id(id)
                .auditStatus(auditDTO.getAuditStatusEnum().toString())
                .reason(auditDTO.getReason())
                .build();
        this.shareMapper.updateByPrimaryKeySelective(share);

        // 4. 把share写到缓存
    }

    @Transactional(rollbackFor = Exception.class)
    public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
        this.auditByIdInDB(id, auditDTO);

        this.rocketmqTransactionLogMapper.insertSelective(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .log("审核分享...")
                        .build()
        );
    }
}

9.4、ShareAdminController类

package com.itmuch.contentcenter.controller.content;

//import com.itmuch.contentcenter.auth.CheckAuthorization;
import com.itmuch.contentcenter.domain.dto.content.ShareAuditDTO;
import com.itmuch.contentcenter.domain.entity.content.Share;
import com.itmuch.contentcenter.service.content.ShareService;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/admin/shares")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareAdminController {
    private final ShareService shareService;
    @PutMapping("/audit/{id}")
//    @CheckAuthorization("admin") //认证和授权
    public Share auditById(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) {
        return this.shareService.auditById(id, auditDTO);
    }

}

十、SpringCloudStream+RocketMQ实现消费者分布式事务(事务消息)

10.1、修改application.yml

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        input:
          #用来指定topic,要和content-center微服务的topic匹配
          destination: add-bonus
          #一定要设置,必填项,如果用其他MQ,该属性可以不设置
          group: binder-group

10.2、UserService类

package com.itmuch.usercenter.service.user;

import com.itmuch.usercenter.dao.bonus.BonusEventLogMapper;
import com.itmuch.usercenter.dao.user.UserMapper;
import com.itmuch.usercenter.domain.dto.messaging.UserAddBonusMsgDTO;
import com.itmuch.usercenter.domain.entity.bonus.BonusEventLog;
import com.itmuch.usercenter.domain.entity.user.User;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;

@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class UserService {

    private final UserMapper userMapper;
    private final BonusEventLogMapper bonusEventLogMapper;

    public User findById(Integer id){
        return this.userMapper.selectByPrimaryKey(id);
    }

    @Transactional(rollbackFor = Exception.class)
    public void addBonus(UserAddBonusMsgDTO msgDTO) {
        // 1. 为用户加积分
        Integer userId = msgDTO.getUserId();
        Integer bonus = msgDTO.getBonus();
        User user = this.userMapper.selectByPrimaryKey(userId);

        user.setBonus(user.getBonus() + bonus);
        this.userMapper.updateByPrimaryKeySelective(user);

        // 2. 记录日志到bonus_event_log表里面
        this.bonusEventLogMapper.insert(
                BonusEventLog.builder()
                        .userId(userId)
                        .value(bonus)
                        .event(msgDTO.getEvent())
                        .createTime(new Date())
                        .description(msgDTO.getDescription())
                        .build()
        );
        log.info("积分添加完毕...");
    }
}

10.3、AddBonusStreamConsumer类

package com.itmuch.usercenter.rocketmq;

import com.itmuch.usercenter.dao.bonus.BonusEventLogMapper;
import com.itmuch.usercenter.dao.user.UserMapper;
import com.itmuch.usercenter.domain.dto.messaging.UserAddBonusMsgDTO;
import com.itmuch.usercenter.domain.entity.bonus.BonusEventLog;
import com.itmuch.usercenter.domain.entity.user.User;
import com.itmuch.usercenter.service.user.UserService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusStreamConsumer {
    private final UserService userService;

    @StreamListener(Sink.INPUT)
    public void receive(UserAddBonusMsgDTO message) {
        message.setEvent("CONTRIBUTE");
        message.setDescription("投稿加积分..");
        this.userService.addBonus(message);
    }
}

10.4、BonusController类

package com.itmuch.usercenter.controller.user;

import com.itmuch.usercenter.domain.dto.messaging.UserAddBonusMsgDTO;
import com.itmuch.usercenter.domain.dto.user.UserAddBonseDTO;
import com.itmuch.usercenter.domain.entity.user.User;
import com.itmuch.usercenter.service.user.UserService;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/users")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class BonusController {
    private final UserService userService;

    @PutMapping("/add-bonus")
    public User addBonus(@RequestBody UserAddBonseDTO userAddBonseDTO) {
        Integer userId = userAddBonseDTO.getUserId();
        userService.addBonus(
            UserAddBonusMsgDTO.builder()
                .userId(userId)
                .bonus(userAddBonseDTO.getBonus())
                .description("兑换分享...")
                .event("BUY")
                .build()
        );
        return this.userService.findById(userId);
    }
}

十一、知识盘点

参考:https://www.imooc.com/article/290489


版权声明:本文为qzc70919700原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qzc70919700/article/details/122580384