SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

2023-05-16 0 4,211

目录

消息队列实现分布式事务原理

首先让我们来看一下基于消息队列实现分布式事务的原理方案。

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

柔性事务

发送消息的服务有个OUTBOX数据表,在进行INSERT、UPDATE、DELETE 业务操作时也会给OUTBOX数据表INSERT一条消息记录,这样可以保证原子性,因为这是基于本地的ACID事务。

OUTBOX表充当临时消息队列,然后我们在引入一个消息中继(MessageRelay)的服务,由他从OUTBOX表中读取数据并发布消息到消息组件。

消息中继的实现可以很简单,只需要通过定时任务定期从OUTBOX表中拉取最新未发布的数据,获取到数据后将数据发送给消息组件,最后将完成发送的消息从OUTBOX表中删除即可,对于失败的消息可以根据业务规则进行重试。

RocketMQ的事务消息

RocketMQ本身已经支持事务消息,如果你们项目使用了RocketMQ,可以直接借助RocketMQ的事务消息实现分布式事务,我们先看一下RocketMQ事务消息的原理然后再借助RocketMQ来实现分布式事务。

RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

分布式事务

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程

整体流程为:

正常事务发送与提交阶段

1、生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)

2、服务端响应消息写入结果,半消息发送成功

3、开始执行本地事务

4、根据本地事务的执行状态执行Commit或者Rollback操作

事务信息的补偿流程

1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求

2、生产者收到确认回查请求后,检查本地事务的执行状态

3、根据检查后的结果执行Commit或者Rollback操作

补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

RocketMQ事务流程关键

事务消息在一阶段对用户不可见

事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列,然后把主题改成RMQ_SYS_TRANS_HALF_TOPIC,这样由于消费者没有订阅这个主题,所以不会被消费。

如何处理第二阶段的失败消息?

在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态。

当然了rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,RocketMQ默认回滚该消息。

消息状态 事务消息有三种状态:TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息

TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。

TransactionStatus.Unknown:中间状态,它代表需要检查消息队列来确定状态。

代码实现

业务需求:用户请求订单微服务order-service接口删除订单(退货),删除订单时需要调用account-service的方法给账户增加余额,一个典型的分布式事务问题。

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

基础配置

在Order-Service和Account-Service中引入Rocket消息组件

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

在配置中心添加RocketMQ的相关配置

rocketmq:
  name-server: xxx.xx.x.xx:9876
  producer:
    group: cloud-group

在OrderService服务中建立一张事务日志表rocketmq_transaction_log(作用稍后说)

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

发送半消息

Order-Service作为分布式事务开始的入口,在Service层我们给RocketMQ发送一条半消息

OrderController入口

/**
 * 根据订单号删除订单
 * @param orderNo 订单编号
 */
@PostMapping(\"/order/delete\")
public ResultData<String> delete(@RequestParam String orderNo){
 log.info(\"delete order id is {}\",orderNo);
 orderService.delete(orderNo);
 return ResultData.success(\"订单删除成功\");
}

直接调用orderService的delete方法

OrderServiceImpl业务逻辑

@Override
public void delete(String orderNo) {
 Order order = orderMapper.selectByNo(orderNo);
 //如果订单存在且状态为有效,进行业务处理
 if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {
  String transactionId = UUID.randomUUID().toString();
  //如果可以删除订单则发送消息给rocketmq,让用户中心消费消息
  rocketMQTemplate.sendMessageInTransaction(\"add-amount\",
    MessageBuilder.withPayload(
      UserAddMoneyDTO.builder()
        .userCode(order.getAccountCode())
        .amount(order.getAmount())
        .build()
    )
    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
    .setHeader(\"order_id\",order.getId())
    .build()
    ,order
  );
 
 }
}

首先校验一下订单状态,然后使用rocketMQTemplate.sendMessageInTransaction()发送事务消息。

sendMessageInTransaction方法有三个参数:

  • destination:目的地(主题),这里发送给add-amount这个topic
  • message:发送给消费者的消息体,需要使用MessageBuilder.withPayload()来构建消息
  • arg:参数

注意,这里我们生成了一个transactionId,并放在header中跟消息一起发送(这里实际也可以构造成一个对象,放在arg里进行发送),作用后面再讲!

消息封装实体UserAddMoneyDTO

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class UserAddMoneyDTO {
    /**
     * 用户编码
     */
    private String userCode;
    /**
     * 金额
     */
    private BigDecimal amount;
}

这个类生产者和消费者都需要用到,所以我直接丢到common包中,大家根据项目实际情况决定放哪。

执行本地事务与回查

MQServer收到半消息后会告诉生产者order-service确认收到半消息,这时候order-service需要执行本地事务,执行完本地事务后再告诉MQServer本地事务的执行状态,确认此消息究竟是Commit还是Rollback。

RocketMQ提供了RocketMQLocalTransactionListener接口,本地事务监听器,这个接口类的实现如下:

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

第一个方法executeLocalTransaction为执行本地事务;第二个方法checkLocalTransaction为检查本地事务的执行状态,也就是回查动作。

我们需要实现RocketMQLocalTransactionListener接口,在executeLocalTransaction方法中执行本地事务,在执行checkLocalTransaction回查方法时告诉RocketMQ到底该提交还是回滚。

这里大家思考一个问题,本地事务已经执行完成了,怎么去回查本地事务的执行结果呢?

答案如下:我们可以在执行本地事务的时候同时生成一条事务日志,让本地事务与日志事务在同一个方法中,同时添加@Transactional注解,保证两个操作事务是一个原子操作。

这样如果事务日志表中有这个本地事务的信息,那就代表本地事务执行成功,需要Commit,相反如果没有对应的事务日志,则表示执行失败,需要Rollback。这就是为什么我们上面在OrderService中需要建立一张事务日志表的原因。

实现RocketMQLocalTransactionListener接口,完成事务执行逻辑

/**
 * 监听事务消息
 * @author javadaily
 */
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {
    private final OrderService orderService;
    private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;
    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info(\"执行本地事务\");
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer orderId = Integer.valueOf((String)headers.get(\"order_id\"));
        log.info(\"transactionId is {}, orderId is {}\",transactionId,orderId);
        try{
            //执行本地事务,并记录日志
            orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);
            //执行成功,可以提交事务
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    /**
     * 本地事务的检查,检查本地事务是否成功
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info(\"检查本地事务,事务ID:{}\",transactionId);
        //根据事务id从日志表检索
        QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq(\"transaction_id\",transactionId);
        RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
        if(null != rocketmqTransactionLog){
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

本地事务执行逻辑

@Transactional(rollbackFor = RuntimeException.class)
@Override
public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){
    orderMapper.changeStatus(id,status);
    rocketMqTransactionLogMapper.insert(
        RocketmqTransactionLog.builder()
        .transactionId(transactionId)
        .log(\"执行删除订单操作\")
        .build()
    );
}

修改订单状态为删除状态,同时往事务日志表中插入一条事务日志,用@Transactional注解保证事务。

Account-Service消费消息

监听消息并处理给用户增加余额逻辑

@Slf4j
@Service
@RocketMQMessageListener(topic = \"add-amount\",consumerGroup = \"cloud-group\")
@RequiredArgsConstructor(onConstructor = @__(@Autowired) )
public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {
    private final AccountMapper accountMapper;
    /**
     * 收到消息的业务逻辑
     */
    @Override
    public void onMessage(UserAddMoneyDTO userAddMoneyDTO) {
        log.info(\"received message: {}\",userAddMoneyDTO);
        accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());
        log.info(\"add money success\");
    }
}

测试

测试数据

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

订单表

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

用户表

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

事务日志表

如果事务消息成功消费最终用户表中jianzh5这个用户的amount应该变成300(100+200)

测试准备

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

我们在执行本地事务成功并需要通知消息队列提交事务处打个断点,然后在执行到此处时手动模拟异常

模拟异常

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

在准备提交事务时我们通过命令taskkill /pid 10116 -t -f命令强制杀掉OrderService进程。(先通过jps获取OrderService进程ID)

重启服务器,检查是否会执行回查方法

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

重启OrderService程序会自动执行回查方法,结合事务日志表判断是否提交事务。

运行后的结果

SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

小结

我们介绍了使用消息队列实现柔性事务的方案,重点剖析了RocketMQ事务消息的原理,并通过Demo案例实现了分布式事务(柔性事务)。

资源下载此资源下载价格为1小猪币,终身VIP免费,请先
由于本站资源来源于互联网,以研究交流为目的,所有仅供大家参考、学习,不存在任何商业目的与商业用途,如资源存在BUG以及其他任何问题,请自行解决,本站不提供技术服务! 由于资源为虚拟可复制性,下载后不予退积分和退款,谢谢您的支持!如遇到失效或错误的下载链接请联系客服QQ:442469558

:本文采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可, 转载请附上原文出处链接。
1、本站提供的源码不保证资源的完整性以及安全性,不附带任何技术服务!
2、本站提供的模板、软件工具等其他资源,均不包含技术服务,请大家谅解!
3、本站提供的资源仅供下载者参考学习,请勿用于任何商业用途,请24小时内删除!
4、如需商用,请购买正版,由于未及时购买正版发生的侵权行为,与本站无关。
5、本站部分资源存放于百度网盘或其他网盘中,请提前注册好百度网盘账号,下载安装百度网盘客户端或其他网盘客户端进行下载;
6、本站部分资源文件是经压缩后的,请下载后安装解压软件,推荐使用WinRAR和7-Zip解压软件。
7、如果本站提供的资源侵犯到了您的权益,请邮件联系: 442469558@qq.com 进行处理!

猪小侠源码-最新源码下载平台 Java教程 SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解 http://www.20zxx.cn/705512/xuexijiaocheng/javajc.html

猪小侠源码,优质资源分享网

常见问题
  • 本站所有资源版权均属于原作者所有,均只能用于参考学习,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担
查看详情
  • 最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,建议提前注册好百度网盘账号,使用百度网盘客户端下载
查看详情

相关文章

官方客服团队

为您解决烦忧 - 24小时在线 专业服务