欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

分布式事务(一)

程序员文章站 2022-06-21 17:41:13
...

事务: 事务是由一组操作构成的可靠的独立的工作单元,事务具备ACID的特性,即原子性、一致性、隔离性和持久性。


我们在MySQL事务中,也详细介绍了事务的概念,在平时的开发中,用的最多的肯定是Spring中的事务,其有关使用方法在Spring声明式事务中介绍过。


但是在分布式系统中,为了保证数据的高可用,通常我们会将数据保留多个副本,这些副本会放置在不同的物理的机器上。或者我们在分布式系统中,通过Dubbo之类的调用别的接口,其中就会操作其他机器上的数据库。这样就涉及到了分布式事务。


分布式事务是指多台数据库的执行SQL,也想要达到一致性的标准,即多台机器上的数据库一起commit或rollback。参照单机事务的模型,分布式事务的思路延袭,也想通过三个标准接口的模式来完成(启副本/commit/rollback)。按这个思路, X/Open组织提出了分布式事务的规范——XA

  • XA是由X/Open组织提出的分布式事务的规范。XA规范主要定义了(全局)事务管理器(TM)和(局部)资源管理器(RM)之间的接口。主流的关系型数据库产品都是实现了XA接口的
  • XA接口是双向的系统接口,在事务管理器(TM)以及一个或多个资源管理器(RM)之间形成通信桥梁。
  • XA之所以需要引入事务管理器是因为,在分布式系统中,从理论上讲两台机器理论上无法达成一致的状态,需要引入一个单点进行协调。
  • 由全局事务管理器管理和协调的事务,可以跨越多个资源(如数据库或JMS队列)和进程。全局事务管理器一般使用XA二阶段提交协议与数据库进行交互。
    分布式事务(一)

至于上述的XA协议是X/Open组织提出的分布式事务的规范,根据其规范实现的就有我们比较熟悉的2PC——两阶段提交,如下:
分布式事务(一)

  • 两阶段提交协议(Two-phase commit protocol)是XA用于在全局事务中协调多个资源的机制。
  • TM和RM间采取两阶段提交(Two-phase Commit)的方案来解决一致性问题。
  • 两阶段提交需要一个协调者(TM)来掌控所有参与者节点(RM)的操作结果并且指引这些节点是否需要最终提交。

如果想对2PC协议有更深入的了解,可以查看下 一致性协议——2PC及3PC



这里我们就来实际看看XA规范下的2PC事务是如何使用的,首先我们建立一个SpringBoot的项目,其目录结果及相关pom依赖如下:
分布式事务(一)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    <version>2.3.2.RELEASE</version>
</dependency>
<!-- mysql驱动 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.38</version>
</dependency>
<!-- 数据库连接池 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.0.29</version>
</dependency>

然后我们看看其application.yml配置文件,其中我们配置了两个不同数据库test1、test2数据源,如下:

server:
  port: 8080

spring:
  jta:
    log-dir: ./tx-logs
    transaction-manager-id: txManager
  datasource:
    type: com.alibaba.druid.pool.xa.DruidXADataSource
    druid:
      test1:
        name: test1
        driverClassName: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/test1?useUnicode=true&characterEncoding=utf8&useSSL=false
        username: root
        password: root
        
        # 下面为连接池的补充设置,应用到上面所有数据源中
        # 初始化大小,最小,最大
        initialSize: 5
        minIdle: 5
        maxActive: 20
        
        # 配置获取连接等待超时的时间
        maxWait: 60000
        
        # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
        timeBetweenEvictionRunsMillis: 60000
        
        # 配置一个连接在池中最小生存的时间,单位是毫秒
        minEvictableIdleTimeMillis: 30
        
        validationQuery: SELECT 1
        validationQueryTimeout: 10000
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        
        # 打开PSCache,并且指定每个连接上PSCache的大小
        poolPreparedStatements: true
        maxPoolPreparedStatementPerConnectionSize: 20
        
        filters: stat,wall
        
        # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
        connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
        
        # 合并多个DruidDataSource的监控数据
        useGlobalDataSourceStat: true

      test2:
        name: test2
        driverClassName: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/test2?useUnicode=true&characterEncoding=utf8&useSSL=false
        username: root
        password: root
        
        # 下面为连接池的补充设置,应用到上面所有数据源中
        # 初始化大小,最小,最大
        initialSize: 5
        minIdle: 5
        maxActive: 20
        
        # 配置获取连接等待超时的时间
        maxWait: 60000
        
        # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
        timeBetweenEvictionRunsMillis: 60000
        
        # 配置一个连接在池中最小生存的时间,单位是毫秒
        minEvictableIdleTimeMillis: 30
        
        validationQuery: SELECT 1
        validationQueryTimeout: 10000
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        
        # 打开PSCache,并且指定每个连接上PSCache的大小
        poolPreparedStatements: true
        maxPoolPreparedStatementPerConnectionSize: 20
        
        filters: stat,wall
        
        # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
        connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
        
        # 合并多个DruidDataSource的监控数据
        useGlobalDataSourceStat: true

然后根据我们配置的配置文件,我们需要在配置类DataSourceConfig中配置相关的属性,如dataSource、JdbcTemaplate等,如下:

@Configuration
public class DataSourceConfig {

    @Bean(name = "test1DataSource")
    @Autowired
    public DataSource test1DataSource(Environment env) {
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        Properties prop = build(env, "spring.datasource.druid.test1.");
        ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        ds.setUniqueResourceName("test1");
        ds.setPoolSize(5);
        ds.setXaProperties(prop);
        return ds;

    }

    @Autowired
    @Bean(name = "test2DataSource")
    public AtomikosDataSourceBean test2DataSource(Environment env) {

        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        Properties prop = build(env, "spring.datasource.druid.test2.");
        ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        ds.setUniqueResourceName("test2");
        ds.setPoolSize(5);
        ds.setXaProperties(prop);
        return ds;
    }

    @Bean("test1JdbcTemplate")
    public JdbcTemplate sysJdbcTemplate(@Qualifier("test1DataSource") DataSource ds) {
        return new JdbcTemplate(ds);
    }

    @Bean("test2JdbcTemplate")
    public JdbcTemplate busJdbcTemplate(@Qualifier("test2DataSource") DataSource ds) {
        return new JdbcTemplate(ds);
    }

    private Properties build(Environment env, String prefix) {
        Properties prop = new Properties();
        prop.put("url", env.getProperty(prefix + "url"));
        prop.put("username", env.getProperty(prefix + "username"));
        prop.put("password", env.getProperty(prefix + "password"));
        prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));

        prop.put("initialSize", env.getProperty(prefix + "initialSize", Integer.class));
        prop.put("maxActive", env.getProperty(prefix + "maxActive", Integer.class));
        prop.put("minIdle", env.getProperty(prefix + "minIdle", Integer.class));

        prop.put("maxWait", env.getProperty(prefix + "maxWait", Integer.class));

        prop.put("timeBetweenEvictionRunsMillis", env.getProperty(prefix + "timeBetweenEvictionRunsMillis", Integer.class));
        prop.put("minEvictableIdleTimeMillis", env.getProperty(prefix + "minEvictableIdleTimeMillis", Integer.class));

        prop.put("validationQuery", env.getProperty(prefix + "validationQuery"));
        prop.put("validationQueryTimeout", env.getProperty(prefix + "validationQueryTimeout", Integer.class));
        prop.put("testOnBorrow", env.getProperty(prefix + "testOnBorrow", Boolean.class));
        prop.put("testOnReturn", env.getProperty(prefix + "testOnReturn", Boolean.class));
        prop.put("testWhileIdle", env.getProperty(prefix + "testWhileIdle", Boolean.class));

        prop.put("poolPreparedStatements", env.getProperty(prefix + "poolPreparedStatements", Boolean.class));
        prop.put("maxPoolPreparedStatementPerConnectionSize", env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));

        prop.put("filters", env.getProperty(prefix + "filters", ""));

        return prop;
    }
}

然后我们就可以在Service直接使用@Transactional了,和我们之前的使用方式没什么区别,如下

@Service
public class UserService {

    @Autowired
    private JdbcTemplate test1JdbcTemplate;

    @Autowired
    private JdbcTemplate test2JdbcTemplate;

    @Transactional
    public void createUser(Long userId) {

        String sql = "INSERT INTO user(id) VALUES (?)";

        //添加test1库的user用户
        test1JdbcTemplate.update(sql,userId);

        //添加test2库的user用户
        test2JdbcTemplate.update(sql,userId);
    }
}

然后我们就可以进行测试了,其中test1、test2数据库中都有一张user表,其中字段id我们设置成了主键,这样我们就可以手动添加或删除两个数据库中user表的id=1的数据,来组合不同的情况进行测试,其测试类Trans2pcApplicationTest如下:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Trans2pcApplication.class)
class Trans2pcApplicationTest {

	@Autowired
	private UserService userService;

	@Test
	void testUserService() {
		userService.createUser(1L);
	}
}

上述我们采用2PC的协议虽然可以达到分布式下事务的管理,但是其效率非常低,因为要同时锁定两个数据库的数据,事务锁定时间大大延长,导致现实中数据库性能欠佳。另外必须要使用支持XA协议的datasource数据源,如我们上述使用的Druid数据源。

相关标签: 分布式架构