springboot+jta+atomikos实现分布式事务
程序员文章站
2022-05-23 11:29:16
...
springboot+jta+atomikos实现分布式事务
分布式系统中,会根据不同的业务,拆分出不同的系统和数据库,各个系统之间紧密联系,所以我们会经常遇到一个功能需要操作多个数据库的场景,例如商城下单功能,减库存是在商品信息系统里面,支付又在支付系统里面,我们通常要在商品系统里面进行减库存操作,再在支付系统里面进行支付,记录支付操作信息,整个流程必须在一个事务里面,一个环节出错,则需要全部回滚,这个时候我们就会用到分布式事务来保证事务的一致性。
示例代码:
数据库连接配置1:
@Configuration
@MapperScan(basePackages = "com.test.**.dao.mapper1", sqlSessionTemplateRef = "mysqlSqlSessionTemplate")
public class MysqlDataSourceConfig {
@Bean(name = "mysqlDataSource")
@ConfigurationProperties(prefix = "spring.datasource.baseMysql")
public DataSource setDataSource() throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl("jdbc:mysql://192.168.1.203:3306/test?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&useAffectedRows=true");
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword("root");
mysqlXaDataSource.setUser("root");
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("mysqlDataSource");
xaDataSource.setMinPoolSize(3);
xaDataSource.setMaxPoolSize(25);
xaDataSource.setMaxLifetime(20000);
xaDataSource.setBorrowConnectionTimeout(30);
xaDataSource.setLoginTimeout(30);
xaDataSource.setMaintenanceInterval(60);
xaDataSource.setMaxIdleTime(60);
xaDataSource.setTestQuery("select 1");
return xaDataSource;
}
@Bean(name = "mysqlSqlSessionFactory")
public SqlSessionFactory setSqlSessionFactory(@Qualifier("mysqlDataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
configuration.setMapUnderscoreToCamelCase(true);
bean.setConfiguration(configuration);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mybatis/mysql/mapper/**/*Mapper.xml"));
//添加PageHelper插件
Interceptor interceptor = new PageInterceptor();
Properties properties = new Properties();
//数据库
properties.setProperty("helperDialect", "mysql");
//是否将参数offset作为PageNum使用
properties.setProperty("offsetAsPageNum", "true");
//是否进行count查询
properties.setProperty("rowBoundsWithCount", "true");
//是否分页合理化
properties.setProperty("reasonable", "false");
interceptor.setProperties(properties);
bean.setPlugins(new Interceptor[] {interceptor});
return bean.getObject();
}
@Bean(name = "mysqlSqlSessionTemplate")
public SqlSessionTemplate setSqlSessionTemplate(@Qualifier("mysqlSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
数据库连接配置2:
@Configuration
@MapperScan(basePackages = "com.test.**.dao.mapper2", sqlSessionTemplateRef = "mysqlMallSqlSessionTemplate")
public class MysqlMallDataSourceConfig {
@Bean(name = "mysqlMallDataSource")
@ConfigurationProperties(prefix = "spring.datasource.secondMssql")
@Primary
public DataSource setDataSource() throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl("jdbc:mysql://192.168.1.204:3306/test?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true");
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword("root");
mysqlXaDataSource.setUser("root");
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("mysqlMallDataSource");
xaDataSource.setMinPoolSize(3);
xaDataSource.setMaxPoolSize(25);
xaDataSource.setMaxLifetime(20000);
xaDataSource.setBorrowConnectionTimeout(30);
xaDataSource.setLoginTimeout(30);
xaDataSource.setMaintenanceInterval(60);
xaDataSource.setMaxIdleTime(60);
xaDataSource.setTestQuery("select 1");
return xaDataSource;
}
@Bean(name = "mysqlMallSqlSessionFactory")
@Primary
public SqlSessionFactory setSqlSessionFactory(@Qualifier("mysqlMallDataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
configuration.setMapUnderscoreToCamelCase(true);
bean.setConfiguration(configuration);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mybatis/mysqlmall/mapper/**/*Mapper.xml"));
//添加PageHelper插件
Interceptor interceptor = new PageInterceptor();
Properties properties = new Properties();
//数据库
properties.setProperty("helperDialect", "mysql");
//是否将参数offset作为PageNum使用
properties.setProperty("offsetAsPageNum", "true");
//是否进行count查询
properties.setProperty("rowBoundsWithCount", "true");
//是否分页合理化
properties.setProperty("reasonable", "false");
interceptor.setProperties(properties);
bean.setPlugins(new Interceptor[] {interceptor});
return bean.getObject();
}
@Bean(name = "mysqlMallSqlSessionTemplate")
@Primary
public SqlSessionTemplate setSqlSessionTemplate(@Qualifier("mysqlMallSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
事务管理器
@Configuration
@EnableTransactionManagement
public class TransactionManagerConfig {
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public TransactionManager atomikosTransactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
@Bean(name = "transactionManager")
@DependsOn({ "userTransaction", "atomikosTransactionManager" })
public PlatformTransactionManager transactionManager() throws Throwable {
UserTransaction userTransaction = userTransaction();
JtaTransactionManager manager = new JtaTransactionManager(userTransaction, atomikosTransactionManager());
return manager;
}
}
userMapper文件(userMapper2操作相同sql):
<update id="updateUserTest">
UPDATE user
SET user_name = 'test'
WHERE
user_id = 1;
</update>
service层测试代码如下:
@Override
@Transactional
public void updateUserTest() {
int i = userMapper.updateTest(); //连接数据库1
int j = userMapper2.updateTest(); //连接数据库2
logger.info("i:{}, j:{}", i, j);
i = 1/0; //抛出一个异常
}
controller层:
//模拟多数据库操作
@RequestMapping(value = "/updateUserTest", method = RequestMethod.POST)
public void updateUserTest(){
userRemoteService.updateUserTest();
}
结果会发现,抛出异常且2个数据库修改的数据全部被回滚掉了。
上一篇: 从头开始的Java学习Day03(上)
下一篇: 禁止图片盗链