欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

springboot+jta+atomikos实现分布式事务

程序员文章站 2022-05-23 11:29:16
...

springboot+jta+atomikos实现分布式事务

分布式系统中,会根据不同的业务,拆分出不同的系统和数据库,各个系统之间紧密联系,所以我们会经常遇到一个功能需要操作多个数据库的场景,例如商城下单功能,减库存是在商品信息系统里面,支付又在支付系统里面,我们通常要在商品系统里面进行减库存操作,再在支付系统里面进行支付,记录支付操作信息,整个流程必须在一个事务里面,一个环节出错,则需要全部回滚,这个时候我们就会用到分布式事务来保证事务的一致性。

示例代码:
数据库连接配置1:

@Configuration
@MapperScan(basePackages = "com.test.**.dao.mapper1", sqlSessionTemplateRef = "mysqlSqlSessionTemplate")
public class MysqlDataSourceConfig {
@Bean(name = "mysqlDataSource")
@ConfigurationProperties(prefix = "spring.datasource.baseMysql")
public DataSource setDataSource() throws SQLException {
	MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource(); 
	mysqlXaDataSource.setUrl("jdbc:mysql://192.168.1.203:3306/test?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&useAffectedRows=true");
    mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
    mysqlXaDataSource.setPassword("root");
    mysqlXaDataSource.setUser("root");
    mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
    AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
    xaDataSource.setXaDataSource(mysqlXaDataSource);
    xaDataSource.setUniqueResourceName("mysqlDataSource");

    xaDataSource.setMinPoolSize(3);
    xaDataSource.setMaxPoolSize(25);
    xaDataSource.setMaxLifetime(20000);
    xaDataSource.setBorrowConnectionTimeout(30);
    xaDataSource.setLoginTimeout(30);
    xaDataSource.setMaintenanceInterval(60);
    xaDataSource.setMaxIdleTime(60);
    xaDataSource.setTestQuery("select 1");
    return xaDataSource;
}

@Bean(name = "mysqlSqlSessionFactory")
public SqlSessionFactory setSqlSessionFactory(@Qualifier("mysqlDataSource") DataSource dataSource) throws Exception {
    SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
    bean.setDataSource(dataSource);
    org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
    configuration.setMapUnderscoreToCamelCase(true);
    bean.setConfiguration(configuration);
    bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mybatis/mysql/mapper/**/*Mapper.xml"));
    //添加PageHelper插件
    Interceptor interceptor = new PageInterceptor();
    Properties properties = new Properties();
    //数据库
    properties.setProperty("helperDialect", "mysql");
    //是否将参数offset作为PageNum使用
    properties.setProperty("offsetAsPageNum", "true");
    //是否进行count查询
    properties.setProperty("rowBoundsWithCount", "true");
    //是否分页合理化
    properties.setProperty("reasonable", "false");

    interceptor.setProperties(properties);
    bean.setPlugins(new Interceptor[] {interceptor});
    return bean.getObject();
}

@Bean(name = "mysqlSqlSessionTemplate")
public SqlSessionTemplate setSqlSessionTemplate(@Qualifier("mysqlSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
    return new SqlSessionTemplate(sqlSessionFactory);
}

}

数据库连接配置2:

@Configuration
@MapperScan(basePackages = "com.test.**.dao.mapper2", sqlSessionTemplateRef = "mysqlMallSqlSessionTemplate")
public class MysqlMallDataSourceConfig {
@Bean(name = "mysqlMallDataSource")
@ConfigurationProperties(prefix = "spring.datasource.secondMssql")
@Primary
public DataSource setDataSource() throws SQLException {
	MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
    mysqlXaDataSource.setUrl("jdbc:mysql://192.168.1.204:3306/test?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true");
    mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
    mysqlXaDataSource.setPassword("root");
    mysqlXaDataSource.setUser("root");
    mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
    AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
    xaDataSource.setXaDataSource(mysqlXaDataSource);
    xaDataSource.setUniqueResourceName("mysqlMallDataSource");
    xaDataSource.setMinPoolSize(3);
    xaDataSource.setMaxPoolSize(25);
    xaDataSource.setMaxLifetime(20000);
    xaDataSource.setBorrowConnectionTimeout(30);
    xaDataSource.setLoginTimeout(30);
    xaDataSource.setMaintenanceInterval(60);
    xaDataSource.setMaxIdleTime(60);
    xaDataSource.setTestQuery("select 1");
    return xaDataSource;
}

@Bean(name = "mysqlMallSqlSessionFactory")
@Primary
public SqlSessionFactory setSqlSessionFactory(@Qualifier("mysqlMallDataSource") DataSource dataSource) throws Exception {
    SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
    bean.setDataSource(dataSource);
    org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
    configuration.setMapUnderscoreToCamelCase(true);
    bean.setConfiguration(configuration);
    bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mybatis/mysqlmall/mapper/**/*Mapper.xml"));
    //添加PageHelper插件
    Interceptor interceptor = new PageInterceptor();
    Properties properties = new Properties();
    //数据库
    properties.setProperty("helperDialect", "mysql");
    //是否将参数offset作为PageNum使用
    properties.setProperty("offsetAsPageNum", "true");
    //是否进行count查询
    properties.setProperty("rowBoundsWithCount", "true");
    //是否分页合理化
    properties.setProperty("reasonable", "false");

    interceptor.setProperties(properties);
    bean.setPlugins(new Interceptor[] {interceptor});
    return bean.getObject();
}

@Bean(name = "mysqlMallSqlSessionTemplate")
@Primary
public SqlSessionTemplate setSqlSessionTemplate(@Qualifier("mysqlMallSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
    return new SqlSessionTemplate(sqlSessionFactory);
}

}

事务管理器

@Configuration
@EnableTransactionManagement
public class TransactionManagerConfig {
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable {
    UserTransactionImp userTransactionImp = new UserTransactionImp();
    userTransactionImp.setTransactionTimeout(10000);
    return userTransactionImp;
}
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public TransactionManager atomikosTransactionManager() throws Throwable {
    UserTransactionManager userTransactionManager = new UserTransactionManager();
    userTransactionManager.setForceShutdown(false);
    return userTransactionManager;
}
@Bean(name = "transactionManager")
@DependsOn({ "userTransaction", "atomikosTransactionManager" })
public PlatformTransactionManager transactionManager() throws Throwable {
    UserTransaction userTransaction = userTransaction();
    JtaTransactionManager manager = new JtaTransactionManager(userTransaction, atomikosTransactionManager());
    return manager;
}

}

userMapper文件(userMapper2操作相同sql):

<update id="updateUserTest">
    UPDATE user
    SET user_name = 'test'
    WHERE
        user_id = 1;
</update>

service层测试代码如下:

@Override
@Transactional
public void updateUserTest() {
	int i = userMapper.updateTest();  //连接数据库1
	int j = userMapper2.updateTest();   //连接数据库2
	logger.info("i:{}, j:{}", i, j);
	i = 1/0;   //抛出一个异常
}

controller层:

//模拟多数据库操作
@RequestMapping(value = "/updateUserTest", method = RequestMethod.POST)
public void updateUserTest(){
	userRemoteService.updateUserTest();
}

结果会发现,抛出异常且2个数据库修改的数据全部被回滚掉了。