SpringBoot-Druid-Atomikos分布式事务
程序员文章站
2022-05-23 12:55:37
...
分布式事务有很多博客,基本都是大同小异,这里只是记录一下,以后可以给自己参考
静态数据源版本
DruidConfig 核心配置
@Configuration
public class DruidConfig extends AbstractDataSourceConfig{
@Bean(name = "master")
@Primary
public DataSource master(Environment env) throws SQLException {
String prefix = "spring.datasource.druid.master.";
return createAtomikosDataSourceBean(env, prefix, "master");
}
@Bean(name = "slave")
public DataSource slave(Environment env) throws SQLException {
String prefix = "spring.datasource.druid.slave.";
return createAtomikosDataSourceBean(env, prefix, "slave");
}
@Bean(name = "xaTransactionManager")
public JtaTransactionManager regTransactionManager () {
UserTransactionManager userTransactionManager = new UserTransactionManager();
UserTransaction userTransaction = new UserTransactionImp();
return new JtaTransactionManager(userTransaction, userTransactionManager);
}
}
MasterConfig
@Configuration
@MapperScan(basePackages = "com.avanty.mapper.master", sqlSessionFactoryRef = "sqlSessionFactoryMaster", sqlSessionTemplateRef = "sqlSessionTemplateMaster")
public class MasterConfig extends AbstractDataSourceConfig {
@Autowired
@Qualifier("master")
private DataSource master;
@Bean(name = "sqlSessionFactoryMaster")
@Primary
public SqlSessionFactory sqlSessionFactoryMaster()
throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(master);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/master/*Mapper.xml"));
//开启驼峰规则
sqlSessionFactoryBean.getObject().getConfiguration().setMapUnderscoreToCamelCase(true);
return sqlSessionFactoryBean.getObject();
}
@Bean(name = "sqlSessionTemplateMaster")
@Primary
public SqlSessionTemplate sqlSessionTemplateMaster(
@Qualifier("sqlSessionFactoryMaster") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
Slave的配置跟上述代码雷同
测试
@RequestMapping("/hello2")
@Transactional
public String hello2() {
Map map1 = new HashMap();
map1.put("name", "Zhan");
map1.put("userAge", 18);
test1Service.insertTest1(map1);
Map map2 = new HashMap();
map2.put("num", 6);
map2.put("msg", "HelloTest");
test2Service.insertTest2(map2);
int i = 10 / 0;
return "SUCCESS";
}
配置durid监控
package com.avanty.app.dataSource;
import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.core.env.Environment;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Properties;
public abstract class AbstractDataSourceConfig {
protected DataSource createAtomikosDataSourceBean(Environment env, String prefix, String uniqueResourceName) throws SQLException {
// 配置Druid监控
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(env.getProperty(prefix + "url"));
dataSource.setUsername(env.getProperty(prefix + "username"));
dataSource.setPassword(env.getProperty(prefix + "password"));
Properties prop = build(env,prefix);
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setUniqueResourceName(uniqueResourceName);
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setPoolSize(10);
ds.setXaProperties(prop);
return ds;
}
private Properties build(Environment env, String prefix) {
Properties prop = new Properties();
prop.put("url", env.getProperty(prefix + "url"));
prop.put("username", env.getProperty(prefix + "username"));
prop.put("password", env.getProperty(prefix + "password"));
prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));
prop.put("initialSize", env.getProperty(prefix + "initialSize", Integer.class));
prop.put("maxActive", env.getProperty(prefix + "maxActive", Integer.class));
prop.put("minIdle", env.getProperty(prefix + "minIdle", Integer.class));
prop.put("maxWait", env.getProperty(prefix + "maxWait", Integer.class));
prop.put("poolPreparedStatements", env.getProperty(prefix + "poolPreparedStatements", Boolean.class));
prop.put("maxPoolPreparedStatementPerConnectionSize",
env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
prop.put("testOnBorrow", env.getProperty(prefix + "testOnBorrow", Boolean.class));
prop.put("testOnReturn", env.getProperty(prefix + "testOnReturn", Boolean.class));
prop.put("testWhileIdle", env.getProperty(prefix + "testWhileIdle", Boolean.class));
prop.put("timeBetweenEvictionRunsMillis",
env.getProperty(prefix + "timeBetweenEvictionRunsMillis", Integer.class));
prop.put("minEvictableIdleTimeMillis", env.getProperty(prefix + "minEvictableIdleTimeMillis", Integer.class));
prop.put("filters", env.getProperty(prefix + "filters"));
return prop;
}
}
动态数据源版本
代码在《SpringBoot+MyBatis 动态数据源(内附项目地址)》的基础上修改
动态数据源在使用@Transactional的时候会发现数据源没法切换,原因是在开启事务的时候,因为使用的是DataSourceTransactionManager, Spring取得数据源缓存到DataSourceTransactionObject对象中,用于commit, rollback等事务操作,虽然AbstractRoutingDataSource切换了数据源, 但是DataSourceTransactionManager还是没有更改过来,所以会出现问题
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.obtainDataSource().getConnection();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
this.prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = this.determineTimeout(definition);
if (timeout != -1) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable var7) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.obtainDataSource());
txObject.setConnectionHolder((ConnectionHolder)null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
}
}
关键文件修改
MultiDataSourceTransactionFactory
package com.avanty.dds;
/**
* @author: Michael Zhan
* @description: MultiDataSourceTransactionFactory
* @create: 2019-12-09 22:04:30
*/
import org.apache.ibatis.session.TransactionIsolationLevel;
import org.apache.ibatis.transaction.Transaction;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import javax.sql.DataSource;
/**
* <P>支持Service内多数据源切换的Factory</P>
*
* @author lishuangqi
* @date 2019/5/16 15:09
* @since
*/
public class MultiDataSourceTransactionFactory extends SpringManagedTransactionFactory {
@Override
public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
return new MultiDataSourceTransaction(dataSource);
}
}
MultiDataSourceTransaction
package com.avanty.dds;
/**
* @author: Michael Zhan
* @description: MultiDataSourceTransaction
* @create: 2019-12-09 21:55:32
*/
import com.alibaba.druid.support.logging.Log;
import com.alibaba.druid.support.logging.LogFactory;
import org.apache.ibatis.transaction.Transaction;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.jdbc.datasource.DataSourceUtils;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* <P>多数据源切换,支持事务</P>
*
* @author lishuangqi
* @date 2019/5/16 15:09
* @since
*/
public class MultiDataSourceTransaction implements Transaction {
private static final Log LOGGER = LogFactory.getLog(MultiDataSourceTransaction.class);
private final DataSource dataSource;
private Connection mainConnection;
private String mainDatabaseIdentification;
private ConcurrentMap<String, Connection> otherConnectionMap;
private boolean isConnectionTransactional;
private boolean autoCommit;
public MultiDataSourceTransaction(DataSource dataSource) {
this.dataSource = dataSource;
otherConnectionMap = new ConcurrentHashMap<>();
mainDatabaseIdentification = DynamicDataSourceContextHolder.getDataSourceKey();
}
/**
* {@inheritDoc}
*/
@Override
public Connection getConnection() throws SQLException {
String databaseIdentification = DynamicDataSourceContextHolder.getDataSourceKey();
if (databaseIdentification.equals(mainDatabaseIdentification)) {
if (mainConnection != null) return mainConnection;
else {
openMainConnection();
mainDatabaseIdentification = databaseIdentification;
return mainConnection;
}
} else {
if (!otherConnectionMap.containsKey(databaseIdentification)) {
try {
Connection conn = dataSource.getConnection();
otherConnectionMap.put(databaseIdentification, conn);
} catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
}
}
return otherConnectionMap.get(databaseIdentification);
}
}
private void openMainConnection() throws SQLException {
this.mainConnection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.mainConnection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"JDBC Connection ["
+ this.mainConnection
+ "] will"
+ (this.isConnectionTransactional ? " " : " not ")
+ "be managed by Spring");
}
}
/**
* {@inheritDoc}
*/
@Override
public void commit() throws SQLException {
if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Committing JDBC Connection [" + this.mainConnection + "]");
}
this.mainConnection.commit();
for (Connection connection : otherConnectionMap.values()) {
connection.commit();
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void rollback() throws SQLException {
if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Rolling back JDBC Connection [" + this.mainConnection + "]");
}
this.mainConnection.rollback();
for (Connection connection : otherConnectionMap.values()) {
connection.rollback();
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws SQLException {
DataSourceUtils.releaseConnection(this.mainConnection, this.dataSource);
for (Connection connection : otherConnectionMap.values()) {
DataSourceUtils.releaseConnection(connection, this.dataSource);
}
}
@Override
public Integer getTimeout() throws SQLException {
return null;
}
}
DataSourceConfigurer
@Bean(name = "sqlSessionFactory")
@Primary
public SqlSessionFactory sqlSessionFactory(Environment env)
throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
// 配置数据源,此处配置为关键配置,如果没有将 dynamicDataSource 作为数据源则不能实现切换
sqlSessionFactoryBean.setDataSource(dynamicDataSource(env));
sqlSessionFactoryBean.setTransactionFactory(new MultiDataSourceTransactionFactory());
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*Mapper.xml"));
return sqlSessionFactoryBean.getObject();
}
@Bean(name = "sqlSessionTemplate")
@Primary
public SqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
以上就是分布式事务的简单使用了
项目地址: https://gitee.com/ichampion/Public-Project-Demo.git
参考资料:《Spring分布式多动态数据源+事务》