目录
代码地址
☛链接
方案一:非MQ事务消息
开发流程
- 客户下单
- 生成预订单
- 调用产品服务扣减库存
- 调用客户服务扣减余额
- 2、3、4步骤正常执行完,修改订单状态为正式订单,流程结束
- 3、4步骤出现异常时推送MQ
- 回退扣减产品库存(幂等处理)
- 回退扣减客户余额(幂等处理)
- 取消订单(修改订单状态),流程结束
流程图
伪代码
public class OrderController {
@RequestMapping("/saveOrder")
public R saveOrder(@RequestBody Order order) {
// 1、数据校验、生成预订单
try {
// 2、调用产品服务扣减库存
// 3、调用客户服务扣减余额
// 4、更新订单为正式订单
} catch (Exception e) {
// 5、推送MQ, 回滚余额、库存
return R.error(e.getMessage());
}
return R.ok();
}
}
核心代码
订单服务
@Slf4j
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private IOrderService orderService;
@Autowired
private ProductFeign productFeign;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Resource(name = "sendRocketMqMsgExecutor")
private ExecutorService executorService;
/**
* 方案一
* @param order
* @return
*/
@RequestMapping("/saveOrder")
public R saveOrder(@RequestBody Order order) {
// 数据校验、生成预订单
orderService.createPreOrder(order);
try {
// 调用产品服务扣减库存
orderService.reductionInventory(order);
// 调用客户服务扣减余额
orderService.reductionAmount(order);
// 更新订单为正式订单
order.setState(OrderState.SUBMIT.getVal());
boolean success = orderService.updateById(order);
Assert.isTrue(success, "更新订单状态失败!");
// 。。。
} catch (Exception e) {
// 通过线程池推送MQ, 回滚余额、回滚库存、取消订单...
executorService.submit(() -> {
rocketMQTemplate.syncSend(MqTopic.ORDER_ROLLBACK, JSONUtil.toJsonStr(order));
});
return R.error(e.getMessage());
}
return R.ok();
}
}
/**
* 取消订单
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = MqGroup.ORDER_CANCEL, topic = MqTopic.ORDER_ROLLBACK, messageModel = MessageModel.BROADCASTING)
public class OrderCancelListener implements RocketMQListener<String> {
@Autowired
private IOrderService orderService;
@Override
public void onMessage(String message) {
log.info("OrderCancelListener 取消订单 start");
Order order = JSONUtil.toBean(message, Order.class);
orderService.cancelOrder(order.getId());
log.info("OrderCancelListener 取消订单 end");
}
}
产品服务
/**
* <p>
* 产品库存 服务实现类
* </p>
*
* @author lik
* @since 2022-02-04
*/
@Slf4j
@Service
public class ProductInventoryServiceImpl extends ServiceImpl<ProductInventoryMapper, ProductInventory> implements IProductInventoryService {
@Autowired
private IProductInventoryLogService productInventoryLogService;
/**
* 扣减库存
*
* @param order
*/
@Override
@Transactional
public void subInventory(Order order) {
// 库存幂等处理
long count = productInventoryLogService.count(Wrappers.<ProductInventoryLog>lambdaQuery()
.eq(ProductInventoryLog::getTargetId, order.getId())
.eq(ProductInventoryLog::getTargetType, BusinessType.ORDER.getVal())
.lt(ProductInventoryLog::getQuantity, 0));
if (count != 0) {
log.info("订单库存日志已经存在,Body:" + JSONUtil.toJsonStr(order));
return;
}
for (OrderItem orderItem : order.getOrderItems()) {
// 更新库存
int count2 = baseMapper.updateInventory(orderItem.getProductId(), orderItem.getQuantity().negate());
String msg = "产品【"+ orderItem.getProductName() +"】";
Assert.isTrue(count2 != 0, msg + "扣减库存失败!");
// 写库存日志
ProductInventoryLog inventoryLog = new ProductInventoryLog();
inventoryLog.setProductId(orderItem.getProductId());
inventoryLog.setQuantity(orderItem.getQuantity().negate());
inventoryLog.setTargetId(order.getId());
inventoryLog.setTargetItemId(orderItem.getId());
inventoryLog.setTargetType(BusinessType.ORDER.getVal());
inventoryLog.setMemo("订单扣减库存");
boolean suc = productInventoryLogService.save(inventoryLog);
Assert.isTrue(suc, msg + "扣减库存写日志失败!");
}
}
/**
* 增加库存
*
* @param order
*/
@Override
@Transactional
public void addInventory(Order order) {
// 库存幂等处理
long count = productInventoryLogService.count(Wrappers.<ProductInventoryLog>lambdaQuery()
.eq(ProductInventoryLog::getTargetId, order.getId())
.eq(ProductInventoryLog::getTargetType, BusinessType.ORDER.getVal())
.lt(ProductInventoryLog::getQuantity, 0));
if (count == 0) {
log.info("订单没有扣过库存,不需要回滚库存,Body:" + JSONUtil.toJsonStr(order));
return;
}
count = productInventoryLogService.count(Wrappers.<ProductInventoryLog>lambdaQuery()
.eq(ProductInventoryLog::getTargetId, order.getId())
.eq(ProductInventoryLog::getTargetType, BusinessType.ORDER.getVal())
.gt(ProductInventoryLog::getQuantity, 0));
if (count != 0) {
log.info("订单库存日志已经存在,Body:" + JSONUtil.toJsonStr(order));
return;
}
for (OrderItem orderItem : order.getOrderItems()) {
// 更新库存
int count2 = baseMapper.updateInventory(orderItem.getProductId(), orderItem.getQuantity());
String msg = "产品【"+ orderItem.getProductName() +"】";
Assert.isTrue(count2 != 0, msg + "回滚库存失败!");
// 写库存日志
ProductInventoryLog inventoryLog = new ProductInventoryLog();
inventoryLog.setProductId(orderItem.getProductId());
inventoryLog.setQuantity(orderItem.getQuantity());
inventoryLog.setTargetId(order.getId());
inventoryLog.setTargetItemId(orderItem.getId());
inventoryLog.setTargetType(BusinessType.ORDER.getVal());
inventoryLog.setMemo("订单回滚库存");
boolean suc = productInventoryLogService.save(inventoryLog);
Assert.isTrue(suc, msg + "回滚库存写日志失败!");
}
}
}
/**
* 回滚产品库存
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = MqGroup.ORDER_CANCEL, topic = MqTopic.ORDER_ROLLBACK, messageModel = MessageModel.BROADCASTING)
public class OrderRollbackProductInventoryListener implements RocketMQListener<String> {
@Autowired
private IProductInventoryService productInventoryService;
@Override
public void onMessage(String message) {
log.info("OrderRollbackProductInventoryListener 回退订单产品库存 start");
productInventoryService.addInventory(JSONUtil.toBean(message, Order.class));
log.info("OrderRollbackProductInventoryListener 回退订单产品库存 end");
}
}
客户服务
/**
* <p>
* 客户 服务实现类
* </p>
*
* @author lik
* @since 2022-02-04
*/
@Slf4j
@Service
public class CustomerServiceImpl extends ServiceImpl<CustomerMapper, Customer> implements ICustomerService {
@Autowired
private ICustomerAmountLogService customerAmountLogService;
/**
* 更新客户余额
* @param customerAmountLog
*/
@Override
@Transactional
public void updateCustomerAmount(CustomerAmountLog customerAmountLog) {
// 余额幂等处理
long count = customerAmountLogService.count(Wrappers.<CustomerAmountLog>lambdaQuery()
.eq(CustomerAmountLog::getCustomerId, customerAmountLog.getCustomerId())
.eq(CustomerAmountLog::getTargetId, customerAmountLog.getTargetId())
.eq(CustomerAmountLog::getTargetType, customerAmountLog.getTargetType())
.eq(CustomerAmountLog::getAmount, customerAmountLog.getAmount())); // 余额
if (count != 0) {
log.info("余额日志已经存在,Body:" + JSONUtil.toJsonStr(customerAmountLog));
return ;
}
// 写日志, 可以对日志表中的customer_id, target_id, target_type, amount 4个字段设置成复合唯一键,避免重复扣
boolean suc = customerAmountLogService.save(customerAmountLog);
Assert.isTrue(suc, "写客户余额日志失败!");
// 更新客户余额
count = baseMapper.updateCustomerAmount(customerAmountLog.getCustomerId(), customerAmountLog.getAmount());
Assert.isTrue(count != 0, "更新客户余额失败");
}
/**
* 回滚客户余额
*
* @param customerAmountLog
*/
@Override
@Transactional
public void rollbackCustomerAmount(CustomerAmountLog customerAmountLog) {
long count = customerAmountLogService.count(Wrappers.<CustomerAmountLog>lambdaQuery()
.eq(CustomerAmountLog::getCustomerId, customerAmountLog.getCustomerId())
.eq(CustomerAmountLog::getTargetId, customerAmountLog.getTargetId())
.eq(CustomerAmountLog::getTargetType, customerAmountLog.getTargetType())
.eq(CustomerAmountLog::getAmount, customerAmountLog.getAmount().negate())); // 余额
if (count == 0) {
log.info("余额不存在扣减,不需要回滚,Body:" + JSONUtil.toJsonStr(customerAmountLog));
return ;
}
this.updateCustomerAmount(customerAmountLog);
}
}
/**
- 回滚客户余额
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = MqGroup.ORDER_CANCEL, topic = MqTopic.ORDER_ROLLBACK, messageModel = MessageModel.BROADCASTING)
public class OrderRollbackCustomerAmountListener implements RocketMQListener<String> {
@Autowired
private ICustomerService customerService;
@Override
public void onMessage(String message) {
log.info("OrderRollbackCustomerAmountListener 回退客户余额 start");
Order order = JSONUtil.toBean(message, Order.class);
CustomerAmountLog customerAmountLog = new CustomerAmountLog();
customerAmountLog.setCustomerId(order.getCustomer());
customerAmountLog.setAmount(order.getAmount());
customerAmountLog.setTargetType(BusinessType.ORDER.getVal());
customerAmountLog.setTargetId(order.getId());
customerAmountLog.setMemo("回滚订单扣减余额");
customerService.rollbackCustomerAmount(customerAmountLog);
log.info("OrderRollbackCustomerAmountListener 回退客户余额 end");
}
}
注意点
- 每个分支服务都要是一个独立的事务,如扣减库存时要保证扣减并且写日志同时成功、或者同时失败;
- 在发消息到MQ时使用了线程池,经过测试如果不使用线程池,当有大量订单创建失败,发送消息到MQ时,会出现发消息到MQ超时异常,导致订单回退库存、客户余额失败,可以适当调下MQ超时时间,再使用线程池基本上能够回避发消息超时的情况;
- 为避免还有其他原因导致发消息到MQ失败,最好是定时去回查失败的订单有没有正常回退,再重新发送消息到MQ进行回退,每个MQ消费者要做好幂等处理;
测试结果
当客户余额、产品库存充足的情况下,100、500、1000并发下可以正常生成订单,扣减客户余额和产品库存的结果也是正确的;
当有个别客户余额或者产品库存不足的情况下,100、500、1000并发下正常订单的扣减结果是正确的,扣减客户余额或者扣减产品库存失败的订单也是能够正确的回退扣减的结果;
方案二:MQ事务消息
开发流程
- 客户下单
- 生成预订单
- 发送预消息到MQ
- 3步骤成功,调用产品服务扣减库存(幂等处理,避免消息重复发送)
- 3步骤成功,调用客户服务扣减余额(幂等处理,避免消息重复发送)
- 3步骤成功,修改订单状态为正式订单
- 4、5、6步骤执行成功时,取消MQ预消息,…,流程结束
- 4、5、6步骤出现异常时,提交MQ预消息进行数据回退
- 回退扣减产品库存(幂等处理)
- 回退扣减客户余额(幂等处理)
- 取消订单(修改订单状态),流程结束
流程图
核心代码
订单服务
@Slf4j
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private IOrderService orderService;
@Autowired
private ProductFeign productFeign;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Resource(name = "sendRocketMqMsgExecutor")
private ExecutorService executorService;
/**
* 方案二:RocketMQ事务消息
* @param order
* @return
*/
@RequestMapping("/saveOrder2")
public R saveOrder2(@RequestBody Order order) throws Exception {
log.info("saveOrder2 start");
// 数据校验、生成预订单
orderService.createPreOrder(order);
String payload = JSONUtil.toJsonStr(order);
Message<String> message = MessageBuilder.withPayload(payload).build();
// TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(MqTopic.ORDER_ROLLBACK, message, null);
Future<TransactionSendResult> future = executorService.submit(() -> {
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(MqTopic.ORDER_ROLLBACK, message, null);
return sendResult;
});
TransactionSendResult sendResult = future.get();
log.info("sendResult: " + JSONUtil.toJsonStr(sendResult));
log.info("saveOrder2 end");
return R.ok();
}
}
/**
* RocketMQ事务消息,提交本地事务
*/
@Slf4j
@Component
@RocketMQTransactionListener
public class OrderMqTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private IOrderService orderService;
/**
* 消息预提交成功就会触发该方法的执行,用于完成本地事务
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("--------------------- executeLocalTransaction --------------------");
String payload = new String((byte[]) msg.getPayload());
log.info("payload:" + payload);
Order order = JSONUtil.toBean(payload, Order.class);
try {
// 扣减库存
orderService.reductionInventory(order);
// 扣减余额
orderService.reductionAmount(order);
// 更新订单为正式订单
order.setState(OrderState.SUBMIT.getVal());
boolean success = orderService.updateById(order);
Assert.isTrue(success, "更新订单状态失败!");
// 。。。
} catch (Exception e) {
// 推送MQ, 回滚余额、库存
return RocketMQLocalTransactionState.COMMIT;
}
// 取消消息
return RocketMQLocalTransactionState.ROLLBACK;
}
/**
* executeLocalTransaction 返回 RocketMQLocalTransactionState.UNKNOWN 时执行,
* executeLocalTransaction 方法出现异常也是 UNKNOWN
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("--------------------- checkLocalTransaction ---------------------");
String payload = new String((byte[]) msg.getPayload());
log.info("payload:" + payload);
Order order = JSONUtil.toBean(payload, Order.class);
Order order2 = orderService.getById(order.getId());
if (order2.getState() == OrderState.SUBMIT.getVal()) {
// 订单已提交,不需要推送Mq进行回滚
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 取消订单
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = MqGroup.ORDER_CANCEL, topic = MqTopic.ORDER_ROLLBACK, messageModel = MessageModel.BROADCASTING)
public class OrderCancelListener implements RocketMQListener<String> {
@Autowired
private IOrderService orderService;
@Override
public void onMessage(String message) {
log.info("OrderCancelListener 取消订单 start");
Order order = JSONUtil.toBean(message, Order.class);
orderService.cancelOrder(order.getId());
log.info("OrderCancelListener 取消订单 end");
}
}
产品服务
/**
* <p>
* 产品库存 服务实现类
* </p>
*
* @author lik
* @since 2022-02-04
*/
@Slf4j
@Service
public class ProductInventoryServiceImpl extends ServiceImpl<ProductInventoryMapper, ProductInventory> implements IProductInventoryService {
@Autowired
private IProductInventoryLogService productInventoryLogService;
/**
* 扣减库存
*
* @param order
*/
@Override
@Transactional
public void subInventory(Order order) {
// 库存幂等处理
long count = productInventoryLogService.count(Wrappers.<ProductInventoryLog>lambdaQuery()
.eq(ProductInventoryLog::getTargetId, order.getId())
.eq(ProductInventoryLog::getTargetType, BusinessType.ORDER.getVal())
.lt(ProductInventoryLog::getQuantity, 0));
if (count != 0) {
log.info("订单库存日志已经存在,Body:" + JSONUtil.toJsonStr(order));
return;
}
for (OrderItem orderItem : order.getOrderItems()) {
// 更新库存
int count2 = baseMapper.updateInventory(orderItem.getProductId(), orderItem.getQuantity().negate());
String msg = "产品【"+ orderItem.getProductName() +"】";
Assert.isTrue(count2 != 0, msg + "扣减库存失败!");
// 写库存日志
ProductInventoryLog inventoryLog = new ProductInventoryLog();
inventoryLog.setProductId(orderItem.getProductId());
inventoryLog.setQuantity(orderItem.getQuantity().negate());
inventoryLog.setTargetId(order.getId());
inventoryLog.setTargetItemId(orderItem.getId());
inventoryLog.setTargetType(BusinessType.ORDER.getVal());
inventoryLog.setMemo("订单扣减库存");
boolean suc = productInventoryLogService.save(inventoryLog);
Assert.isTrue(suc, msg + "扣减库存写日志失败!");
}
}
/**
* 增加库存
*
* @param order
*/
@Override
@Transactional
public void addInventory(Order order) {
// 库存幂等处理
long count = productInventoryLogService.count(Wrappers.<ProductInventoryLog>lambdaQuery()
.eq(ProductInventoryLog::getTargetId, order.getId())
.eq(ProductInventoryLog::getTargetType, BusinessType.ORDER.getVal())
.lt(ProductInventoryLog::getQuantity, 0));
if (count == 0) {
log.info("订单没有扣过库存,不需要回滚库存,Body:" + JSONUtil.toJsonStr(order));
return;
}
count = productInventoryLogService.count(Wrappers.<ProductInventoryLog>lambdaQuery()
.eq(ProductInventoryLog::getTargetId, order.getId())
.eq(ProductInventoryLog::getTargetType, BusinessType.ORDER.getVal())
.gt(ProductInventoryLog::getQuantity, 0));
if (count != 0) {
log.info("订单库存日志已经存在,Body:" + JSONUtil.toJsonStr(order));
return;
}
for (OrderItem orderItem : order.getOrderItems()) {
// 更新库存
int count2 = baseMapper.updateInventory(orderItem.getProductId(), orderItem.getQuantity());
String msg = "产品【"+ orderItem.getProductName() +"】";
Assert.isTrue(count2 != 0, msg + "回滚库存失败!");
// 写库存日志
ProductInventoryLog inventoryLog = new ProductInventoryLog();
inventoryLog.setProductId(orderItem.getProductId());
inventoryLog.setQuantity(orderItem.getQuantity());
inventoryLog.setTargetId(order.getId());
inventoryLog.setTargetItemId(orderItem.getId());
inventoryLog.setTargetType(BusinessType.ORDER.getVal());
inventoryLog.setMemo("订单回滚库存");
boolean suc = productInventoryLogService.save(inventoryLog);
Assert.isTrue(suc, msg + "回滚库存写日志失败!");
}
}
}
/**
* 回滚产品库存
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = MqGroup.ORDER_CANCEL, topic = MqTopic.ORDER_ROLLBACK, messageModel = MessageModel.BROADCASTING)
public class OrderRollbackProductInventoryListener implements RocketMQListener<String> {
@Autowired
private IProductInventoryService productInventoryService;
@Override
public void onMessage(String message) {
log.info("OrderRollbackProductInventoryListener 回退订单产品库存 start");
productInventoryService.addInventory(JSONUtil.toBean(message, Order.class));
log.info("OrderRollbackProductInventoryListener 回退订单产品库存 end");
}
}
客户服务
/**
* <p>
* 客户 服务实现类
* </p>
*
* @author lik
* @since 2022-02-04
*/
@Slf4j
@Service
public class CustomerServiceImpl extends ServiceImpl<CustomerMapper, Customer> implements ICustomerService {
@Autowired
private ICustomerAmountLogService customerAmountLogService;
/**
* 更新客户余额
* @param customerAmountLog
*/
@Override
@Transactional
public void updateCustomerAmount(CustomerAmountLog customerAmountLog) {
// 余额幂等处理
long count = customerAmountLogService.count(Wrappers.<CustomerAmountLog>lambdaQuery()
.eq(CustomerAmountLog::getCustomerId, customerAmountLog.getCustomerId())
.eq(CustomerAmountLog::getTargetId, customerAmountLog.getTargetId())
.eq(CustomerAmountLog::getTargetType, customerAmountLog.getTargetType())
.eq(CustomerAmountLog::getAmount, customerAmountLog.getAmount())); // 余额
if (count != 0) {
log.info("余额日志已经存在,Body:" + JSONUtil.toJsonStr(customerAmountLog));
return ;
}
// 写日志, 可以对日志表中的customer_id, target_id, target_type, amount 4个字段设置成复合唯一键,避免重复扣
boolean suc = customerAmountLogService.save(customerAmountLog);
Assert.isTrue(suc, "写客户余额日志失败!");
// 更新客户余额
count = baseMapper.updateCustomerAmount(customerAmountLog.getCustomerId(), customerAmountLog.getAmount());
Assert.isTrue(count != 0, "更新客户余额失败");
}
/**
* 回滚客户余额
*
* @param customerAmountLog
*/
@Override
@Transactional
public void rollbackCustomerAmount(CustomerAmountLog customerAmountLog) {
long count = customerAmountLogService.count(Wrappers.<CustomerAmountLog>lambdaQuery()
.eq(CustomerAmountLog::getCustomerId, customerAmountLog.getCustomerId())
.eq(CustomerAmountLog::getTargetId, customerAmountLog.getTargetId())
.eq(CustomerAmountLog::getTargetType, customerAmountLog.getTargetType())
.eq(CustomerAmountLog::getAmount, customerAmountLog.getAmount().negate())); // 余额
if (count == 0) {
log.info("余额不存在扣减,不需要回滚,Body:" + JSONUtil.toJsonStr(customerAmountLog));
return ;
}
this.updateCustomerAmount(customerAmountLog);
}
}
/**
- 回滚客户余额
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = MqGroup.ORDER_CANCEL, topic = MqTopic.ORDER_ROLLBACK, messageModel = MessageModel.BROADCASTING)
public class OrderRollbackCustomerAmountListener implements RocketMQListener<String> {
@Autowired
private ICustomerService customerService;
@Override
public void onMessage(String message) {
log.info("OrderRollbackCustomerAmountListener 回退客户余额 start");
Order order = JSONUtil.toBean(message, Order.class);
CustomerAmountLog customerAmountLog = new CustomerAmountLog();
customerAmountLog.setCustomerId(order.getCustomer());
customerAmountLog.setAmount(order.getAmount());
customerAmountLog.setTargetType(BusinessType.ORDER.getVal());
customerAmountLog.setTargetId(order.getId());
customerAmountLog.setMemo("回滚订单扣减余额");
customerService.rollbackCustomerAmount(customerAmountLog);
log.info("OrderRollbackCustomerAmountListener 回退客户余额 end");
}
}
注意点
- 每个分支服务都要是一个独立的事务,如扣减库存时要保证扣减并且写日志同时成功、或者同时失败;
- 在发消息到MQ时使用了线程池,经过测试如果不使用线程池,在高并发时,会出现大量消息发送到MQ超时异常,可以适当调下MQ超时时间,再使用线程池基本上能够回避发消息超时的情况;
测试结果
当客户余额、产品库存充足的情况下,100、500、1000并发下可以正常生成订单,扣减客户余额和产品库存的结果也是正确的;
当有个别客户余额或者产品库存不足的情况下,100、500、1000并发下正常订单的扣减结果是正确的,扣减客户余额或者扣减产品库存失败的订单也是能够正确的回退扣减的结果;
总结
非事务消息不能保证消息一定能够成功发到MQ,导致出现脏数据,需要使用定时任务去处理;
事务消息可以保证正常情况下消息发送到MQ,代码写起来有点麻烦;
版权声明:本文为ZLK1142原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。