分布式事务(一)
事务: 事务是由一组操作构成的可靠的独立的工作单元,事务具备ACID的特性,即原子性、一致性、隔离性和持久性。
我们在MySQL事务中,也详细介绍了事务的概念,在平时的开发中,用的最多的肯定是Spring中的事务,其有关使用方法在Spring声明式事务中介绍过。
但是在分布式系统中,为了保证数据的高可用,通常我们会将数据保留多个副本,这些副本会放置在不同的物理的机器上。或者我们在分布式系统中,通过Dubbo之类的调用别的接口,其中就会操作其他机器上的数据库。这样就涉及到了分布式事务。
分布式事务是指多台数据库的执行SQL,也想要达到一致性的标准,即多台机器上的数据库一起commit或rollback。参照单机事务的模型,分布式事务的思路延袭,也想通过三个标准接口的模式来完成(启副本/commit/rollback)。按这个思路, X/Open组织提出了分布式事务的规范——XA。
- XA是由X/Open组织提出的分布式事务的规范。XA规范主要定义了(全局)事务管理器(TM)和(局部)资源管理器(RM)之间的接口。主流的关系型数据库产品都是实现了XA接口的
- XA接口是双向的系统接口,在事务管理器(TM)以及一个或多个资源管理器(RM)之间形成通信桥梁。
- XA之所以需要引入事务管理器是因为,在分布式系统中,从理论上讲两台机器理论上无法达成一致的状态,需要引入一个单点进行协调。
- 由全局事务管理器管理和协调的事务,可以跨越多个资源(如数据库或JMS队列)和进程。全局事务管理器一般使用XA二阶段提交协议与数据库进行交互。
至于上述的XA协议是X/Open组织提出的分布式事务的规范,根据其规范实现的就有我们比较熟悉的2PC——两阶段提交,如下:
- 两阶段提交协议(Two-phase commit protocol)是XA用于在全局事务中协调多个资源的机制。
- TM和RM间采取两阶段提交(Two-phase Commit)的方案来解决一致性问题。
- 两阶段提交需要一个协调者(TM)来掌控所有参与者节点(RM)的操作结果并且指引这些节点是否需要最终提交。
如果想对2PC协议有更深入的了解,可以查看下 一致性协议——2PC及3PC
这里我们就来实际看看XA规范下的2PC事务是如何使用的,首先我们建立一个SpringBoot的项目,其目录结果及相关pom依赖如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<!-- mysql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!-- 数据库连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.29</version>
</dependency>
然后我们看看其application.yml
配置文件,其中我们配置了两个不同数据库test1、test2数据源,如下:
server:
port: 8080
spring:
jta:
log-dir: ./tx-logs
transaction-manager-id: txManager
datasource:
type: com.alibaba.druid.pool.xa.DruidXADataSource
druid:
test1:
name: test1
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/test1?useUnicode=true&characterEncoding=utf8&useSSL=false
username: root
password: root
# 下面为连接池的补充设置,应用到上面所有数据源中
# 初始化大小,最小,最大
initialSize: 5
minIdle: 5
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 30
validationQuery: SELECT 1
validationQueryTimeout: 10000
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
# 打开PSCache,并且指定每个连接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
# 合并多个DruidDataSource的监控数据
useGlobalDataSourceStat: true
test2:
name: test2
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/test2?useUnicode=true&characterEncoding=utf8&useSSL=false
username: root
password: root
# 下面为连接池的补充设置,应用到上面所有数据源中
# 初始化大小,最小,最大
initialSize: 5
minIdle: 5
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 30
validationQuery: SELECT 1
validationQueryTimeout: 10000
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
# 打开PSCache,并且指定每个连接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
# 合并多个DruidDataSource的监控数据
useGlobalDataSourceStat: true
然后根据我们配置的配置文件,我们需要在配置类DataSourceConfig
中配置相关的属性,如dataSource、JdbcTemaplate等,如下:
@Configuration
public class DataSourceConfig {
@Bean(name = "test1DataSource")
@Autowired
public DataSource test1DataSource(Environment env) {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
Properties prop = build(env, "spring.datasource.druid.test1.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("test1");
ds.setPoolSize(5);
ds.setXaProperties(prop);
return ds;
}
@Autowired
@Bean(name = "test2DataSource")
public AtomikosDataSourceBean test2DataSource(Environment env) {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
Properties prop = build(env, "spring.datasource.druid.test2.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("test2");
ds.setPoolSize(5);
ds.setXaProperties(prop);
return ds;
}
@Bean("test1JdbcTemplate")
public JdbcTemplate sysJdbcTemplate(@Qualifier("test1DataSource") DataSource ds) {
return new JdbcTemplate(ds);
}
@Bean("test2JdbcTemplate")
public JdbcTemplate busJdbcTemplate(@Qualifier("test2DataSource") DataSource ds) {
return new JdbcTemplate(ds);
}
private Properties build(Environment env, String prefix) {
Properties prop = new Properties();
prop.put("url", env.getProperty(prefix + "url"));
prop.put("username", env.getProperty(prefix + "username"));
prop.put("password", env.getProperty(prefix + "password"));
prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));
prop.put("initialSize", env.getProperty(prefix + "initialSize", Integer.class));
prop.put("maxActive", env.getProperty(prefix + "maxActive", Integer.class));
prop.put("minIdle", env.getProperty(prefix + "minIdle", Integer.class));
prop.put("maxWait", env.getProperty(prefix + "maxWait", Integer.class));
prop.put("timeBetweenEvictionRunsMillis", env.getProperty(prefix + "timeBetweenEvictionRunsMillis", Integer.class));
prop.put("minEvictableIdleTimeMillis", env.getProperty(prefix + "minEvictableIdleTimeMillis", Integer.class));
prop.put("validationQuery", env.getProperty(prefix + "validationQuery"));
prop.put("validationQueryTimeout", env.getProperty(prefix + "validationQueryTimeout", Integer.class));
prop.put("testOnBorrow", env.getProperty(prefix + "testOnBorrow", Boolean.class));
prop.put("testOnReturn", env.getProperty(prefix + "testOnReturn", Boolean.class));
prop.put("testWhileIdle", env.getProperty(prefix + "testWhileIdle", Boolean.class));
prop.put("poolPreparedStatements", env.getProperty(prefix + "poolPreparedStatements", Boolean.class));
prop.put("maxPoolPreparedStatementPerConnectionSize", env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
prop.put("filters", env.getProperty(prefix + "filters", ""));
return prop;
}
}
然后我们就可以在Service直接使用@Transactional
了,和我们之前的使用方式没什么区别,如下
@Service
public class UserService {
@Autowired
private JdbcTemplate test1JdbcTemplate;
@Autowired
private JdbcTemplate test2JdbcTemplate;
@Transactional
public void createUser(Long userId) {
String sql = "INSERT INTO user(id) VALUES (?)";
//添加test1库的user用户
test1JdbcTemplate.update(sql,userId);
//添加test2库的user用户
test2JdbcTemplate.update(sql,userId);
}
}
然后我们就可以进行测试了,其中test1、test2数据库中都有一张user表,其中字段id我们设置成了主键,这样我们就可以手动添加或删除两个数据库中user表的id=1的数据,来组合不同的情况进行测试,其测试类Trans2pcApplicationTest
如下:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Trans2pcApplication.class)
class Trans2pcApplicationTest {
@Autowired
private UserService userService;
@Test
void testUserService() {
userService.createUser(1L);
}
}
上述我们采用2PC的协议虽然可以达到分布式下事务的管理,但是其效率非常低,因为要同时锁定两个数据库的数据,事务锁定时间大大延长,导致现实中数据库性能欠佳。另外必须要使用支持XA协议的datasource数据源,如我们上述使用的Druid数据源。
上一篇: 扫码登陆的逻辑实现
下一篇: Zookeeper完全分布式集群的搭建