欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring Boot 分布式事物管理

程序员文章站 2022-04-30 13:58:46
...

Spring Boot 分布式事物管理

前言

事务是为了保证数据的一致性而产生的。那么分布式事务,顾名思义,就是我们要保证分布在不同数据库、不同服务器、不同应用之间的数据一致性。

在单体项目下数据是存放在一个数据库上的,采用数据库的事务就能满足我们的要求。
但随着业务的不断扩张,数据的不断增加,单一数据库已经到达了一个瓶颈,因此我们需要对数据库进行分库分表。为了保证数据的一致性,可能需要不同的数据库之间的数据要么同时成功,要么同时失败,不然导致产生一些脏数据,导致数据不一致。

对此,分布式事务就诞生了。

应用场景

在分布式系统中,如,支付系统中,支付成功后需要对买家和卖家同时进行操作,买家减钱,卖家加钱。必须得放在一个事务里执行。不然出现扣钱了,没买着货那就尴尬了哈哈哈哈哈,所以分布式事务这时候就堪大用。还有银行转账等等。

Spring Boot中实现分布式事务

SpringBoot 集成 Atomikos 实现分布式事务

Atomikos 是一个为 Java 平台提供增值服务的开源类事务管理器。

以下是包括在这个开源版本中的一些功能:

  • 全面崩溃 / 重启恢复;
  • 兼容标准的 SUN 公司 JTA API;
  • 嵌套事务;
  • 为 XA 和非 XA 提供内置的 JDBC 适配器。

注释:XA 协议由 Tuxedo 首先提出的,并交给 X/Open 组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2 和 Sybase 等各大数据库厂家都提供对 XA 的支持。XA 协议采用两阶段提交方式来管理分布式事务。XA 接口提供资源管理器与事务管理器之间进行通信的标准接口。XA 协议包括两套函数,以 xa_ 开头的及以 ax_ 开头的。

代码实现

建库

在本地创建两个数据库:test01,test02,并且创建相同的数据库表:

sql脚本

-- ----------------------------
-- Table structure for test_user1
-- ----------------------------
DROP TABLE IF EXISTS `test_user1`;
CREATE TABLE `test_user1`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `age` int(11) NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 9 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;

SET FOREIGN_KEY_CHECKS = 1;



DROP TABLE IF EXISTS `test_user2`;
CREATE TABLE `test_user2`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `age` int(11) NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 9 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;

SET FOREIGN_KEY_CHECKS = 1;

pom中 添加 Atomikos 依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.3.5</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.5</version>
        </dependency>

添加数据库配置


server:
  port: 8080
spring:
  redis:
    host: localhost
    port: 6379
mysql:
  datasource:
    test1:
      url: jdbc:mysql://localhost:3307/test01?useUnicode=true&characterEncoding=utf-8
      username: root
      password: root
      minPoolSize: 3
      maxPoolSize: 25
      maxLifetime: 20000
      borrowConnectionTimeout: 30
      loginTimeout: 30
      maintenanceInterval: 60
      maxIdleTime: 60
      testQuery: select 1
    test2:
      url: jdbc:mysql://localhost:3307/test02?useUnicode=true&characterEncoding=utf-8
      username: root
      password: root
      minPoolSize: 3
      maxPoolSize: 25
      maxLifetime: 20000
      borrowConnectionTimeout: 30
      loginTimeout: 30
      maintenanceInterval: 60
      maxIdleTime: 60
      testQuery: select 1

数据源配置文件读取

DBConfig1

package com.example.springbootatomikos.config.pojo;

import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @description: 数据源test1
 * @author: Administrator
 * @create: 2020-05-02 19:22
 **/
@ConfigurationProperties(prefix = "mysql.datasource.test1")
@SpringBootConfiguration
public class DBConfig1 {

    private String url;
    private String username;
    private String password;
    private int minPoolSize;
    private int maxPoolSize;
    private int maxLifetime;
    private int borrowConnectionTimeout;
    private int loginTimeout;
    private int maintenanceInterval;
    private int maxIdleTime;
    private String testQuery;


    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getMinPoolSize() {
        return minPoolSize;
    }

    public void setMinPoolSize(int minPoolSize) {
        this.minPoolSize = minPoolSize;
    }

    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

    public int getMaxLifetime() {
        return maxLifetime;
    }

    public void setMaxLifetime(int maxLifetime) {
        this.maxLifetime = maxLifetime;
    }

    public int getBorrowConnectionTimeout() {
        return borrowConnectionTimeout;
    }

    public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
        this.borrowConnectionTimeout = borrowConnectionTimeout;
    }

    public int getLoginTimeout() {
        return loginTimeout;
    }

    public void setLoginTimeout(int loginTimeout) {
        this.loginTimeout = loginTimeout;
    }

    public int getMaintenanceInterval() {
        return maintenanceInterval;
    }

    public void setMaintenanceInterval(int maintenanceInterval) {
        this.maintenanceInterval = maintenanceInterval;
    }

    public int getMaxIdleTime() {
        return maxIdleTime;
    }

    public void setMaxIdleTime(int maxIdleTime) {
        this.maxIdleTime = maxIdleTime;
    }

    public String getTestQuery() {
        return testQuery;
    }

    public void setTestQuery(String testQuery) {
        this.testQuery = testQuery;
    }
}

DBConfig2

package com.example.springbootatomikos.config.pojo;

import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @description: 数据源test2
 * @author: Administrator
 * @create: 2020-05-02 19:22
 **/
@ConfigurationProperties(prefix = "mysql.datasource.test2")
@SpringBootConfiguration
public class DBConfig2 {

    private String url;
    private String username;
    private String password;
    private int minPoolSize;
    private int maxPoolSize;
    private int maxLifetime;
    private int borrowConnectionTimeout;
    private int loginTimeout;
    private int maintenanceInterval;
    private int maxIdleTime;
    private String testQuery;

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getMinPoolSize() {
        return minPoolSize;
    }

    public void setMinPoolSize(int minPoolSize) {
        this.minPoolSize = minPoolSize;
    }

    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

    public int getMaxLifetime() {
        return maxLifetime;
    }

    public void setMaxLifetime(int maxLifetime) {
        this.maxLifetime = maxLifetime;
    }

    public int getBorrowConnectionTimeout() {
        return borrowConnectionTimeout;
    }

    public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
        this.borrowConnectionTimeout = borrowConnectionTimeout;
    }

    public int getLoginTimeout() {
        return loginTimeout;
    }

    public void setLoginTimeout(int loginTimeout) {
        this.loginTimeout = loginTimeout;
    }

    public int getMaintenanceInterval() {
        return maintenanceInterval;
    }

    public void setMaintenanceInterval(int maintenanceInterval) {
        this.maintenanceInterval = maintenanceInterval;
    }

    public int getMaxIdleTime() {
        return maxIdleTime;
    }

    public void setMaxIdleTime(int maxIdleTime) {
        this.maxIdleTime = maxIdleTime;
    }

    public String getTestQuery() {
        return testQuery;
    }

    public void setTestQuery(String testQuery) {
        this.testQuery = testQuery;
    }
}


数据源配置

MyBatisConfig1

package com.example.springbootatomikos.config.one;

import com.example.springbootatomikos.config.pojo.DBConfig1;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;
import java.sql.SQLException;

/**
 * @description: 数据源1配置
 * @author: Administrator
 * @create: 2020-05-02 19:25
 **/
@SpringBootConfiguration
@MapperScan(basePackages = "com.example.springbootatomikos.mapper.one", sqlSessionTemplateRef = "sqlSessionTemplate")
public class MyBatisConfig1 {

    // 配置数据源
    @Primary
    @Bean(name = "dataSource")
    public DataSource dataSource(DBConfig1 config) throws SQLException {
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        mysqlXaDataSource.setUrl(config.getUrl());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXaDataSource.setPassword(config.getPassword());
        mysqlXaDataSource.setUser(config.getUsername());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);

        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        xaDataSource.setUniqueResourceName("dataSource");

        xaDataSource.setMinPoolSize(config.getMinPoolSize());
        xaDataSource.setMaxPoolSize(config.getMaxPoolSize());
        xaDataSource.setMaxLifetime(config.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(config.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(config.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(config.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(config.getMaxIdleTime());
        xaDataSource.setTestQuery(config.getTestQuery());
        return xaDataSource;
    }
    @Primary
    @Bean(name = "sqlSessionFactory")
    public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource") DataSource dataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

    @Primary
    @Bean(name = "sqlSessionTemplate")
    public SqlSessionTemplate sqlSessionTemplate(
            @Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

项目结构图

Spring Boot 分布式事物管理

MyBatisConfig2

package com.example.springbootatomikos.config.two;

import com.example.springbootatomikos.config.pojo.DBConfig2;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;

import javax.sql.DataSource;
import java.sql.SQLException;

/**
 * @description: 数据源2配置
 * @author: Administrator
 * @create: 2020-05-02 19:26
 **/
@SpringBootConfiguration
//basePackages 最好分开配置 如果放在同一个文件夹可能会报错
@MapperScan(basePackages = "com.example.springbootatomikos.mapper.two", sqlSessionTemplateRef = "sqlSessionTemplate2")
public class MyBatisConfig2 {

    // 配置数据源
    @Bean(name = "dataSource2")
    public DataSource dataSource(DBConfig2 config) throws SQLException {
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        mysqlXaDataSource.setUrl(config.getUrl());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXaDataSource.setPassword(config.getPassword());
        mysqlXaDataSource.setUser(config.getUsername());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);

        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        xaDataSource.setUniqueResourceName("dataSource2");

        xaDataSource.setMinPoolSize(config.getMinPoolSize());
        xaDataSource.setMaxPoolSize(config.getMaxPoolSize());
        xaDataSource.setMaxLifetime(config.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(config.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(config.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(config.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(config.getMaxIdleTime());
        xaDataSource.setTestQuery(config.getTestQuery());
        return xaDataSource;
    }

    @Bean(name = "sqlSessionFactory2")
    public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource2") DataSource dataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

    @Bean(name = "sqlSessionTemplate2")
    public SqlSessionTemplate sqlSessionTemplate(
            @Qualifier("sqlSessionFactory2") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

Mapper配置

UserMapper1

@Mapper
public interface UserMapper1 {

    @Insert("insert into test_user1(name,age) values(#{name},#{age})")
    void addUser(@Param("name")String name, @Param("age") int age);
}


UserMapper2

@Mapper
public interface UserMapper2 {

    @Insert("insert into test_user2(name,age) values(#{name},#{age})")
    void addUser(@Param("name") String name, @Param("age") int age);
}

User类

public class User {

    private Long id;
    private String name;
    private int age;


    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}


Service类

@Service
public class UserService {

    @Autowired
    private UserMapper1 userMapper1;
    @Autowired
    private UserMapper2 userMapper2;

    @Transactional
    public void addUser(User user)throws Exception{
        userMapper2.addUser(user.getName(),user.getAge());
        int a= 1/0;
        userMapper1.addUser(user.getName(),user.getAge());
    }
}

启动类和controller


@SpringBootApplication
@RestController
public class SpringbootatomikosApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootatomikosApplication.class, args);
    }

    @Autowired
    private UserService userService;

    @RequestMapping("test")
    public void test(){
        User user = new User();
        user.setName("test");
        user.setAge(110);
        try {
            userService.addUser(user);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

测试

请求 http://127.0.0.1:8080/test
后台异常报错,然后查看两个数据库数据都没有入库表示分布式事务执行成功。

然后将service中的 异常去掉,两个数据库数据入库成功。