欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Atomikos简介

程序员文章站 2022-05-23 11:20:23
...

开发原因

在Java后端开发过程中事务控制非常重要,而Spring为我们提供了方便的声明式事务方法@transactional。但是默认的Spring事务只支持单数据源,而实际上一个系统往往需要写多个数据源,这个时候我们就需要考虑如何通过Spring实现对分布式事务的支持。

开发思路

对于分布式事务而言,JTA(Java Transaction API,XA的JAVA实现方案)是一个不错的解决方案,通常JTA需要应用服务器的支持,但在查阅SpringBoot的文档时发现,它推荐了AtomikosBitronix两种无需服务器支持的分布式事务组件,在这两个组件中,Atomikos更受大家的好评,所以我选择使用Atomikos

 Atomikos实践

pom.xml文件引入依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

 application文件配置两个数据源

#account数据库
spring.datasource.account.url = jdbc:mysql://192.168.190.131:3307/seata-account?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true
spring.datasource.account.username = root
spring.datasource.account.password = 123456
#storage数据库
spring.datasource.storage.url = jdbc:mysql://192.168.190.131:3308/seata-storage?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true
spring.datasource.storage.username = root
spring.datasource.storage.password = 123456

MybatisConfig中配置这两个数据源

package com.yj.atomikos.config;

import java.util.Properties;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import com.alibaba.druid.support.http.StatViewServlet;
import com.alibaba.druid.support.http.WebStatFilter;
import com.atomikos.jdbc.AtomikosDataSourceBean;

@Configuration
public class MybatisConfig {
	@Value("${spring.datasource.account.url}")
	private String accountDbUrl;

	@Value("${spring.datasource.account.username}")
	private String accountUsername;

	@Value("${spring.datasource.account.password}")
	private String accountPassword;

	@Value("${spring.datasource.storage.url}")
	private String storageDbUrl;

	@Value("${spring.datasource.storage.username}")
	private String storageUsername;

	@Value("${spring.datasource.storage.password}")
	private String storagePassword;

	final static Logger logger = LoggerFactory.getLogger(MybatisConfig.class);

	/**
	 * 配置druid显示监控统计信息 开启Druid的监控平台 http://localhost:8080/druid
	 *
	 * @return servlet registration bean
	 */
	@Bean
	public ServletRegistrationBean druidServlet() {
		logger.info("Init Druid Servlet Configuration ");
		ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean(new StatViewServlet(),
				"/druid/*");
		// IP白名单,不设默认都可以
		// servletRegistrationBean.addInitParameter("allow",
		// "192.168.2.25,127.0.0.1");
		// IP黑名单(共同存在时,deny优先于allow)
		servletRegistrationBean.addInitParameter("deny", "192.168.190.131");
		// 控制台管理用户
		servletRegistrationBean.addInitParameter("loginUsername", "root");
		servletRegistrationBean.addInitParameter("loginPassword", "123456");
		// 是否能够重置数据 禁用HTML页面上的“Reset All”功能
		servletRegistrationBean.addInitParameter("resetEnable", "false");
		return servletRegistrationBean;
	}

	@Bean
	public FilterRegistrationBean filterRegistrationBean() {
		FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(new WebStatFilter());
		filterRegistrationBean.addUrlPatterns("/*");
		filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
		return filterRegistrationBean;
	}

	@Bean(name = "accountDataSource")
	@Primary
	public DataSource accountDataSource() {
		AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
		atomikosDataSourceBean.setUniqueResourceName("accountDataSource");
		atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
		Properties properties = new Properties();
		properties.put("URL", accountDbUrl);
		properties.put("user", accountUsername);
		properties.put("password", accountPassword);
		atomikosDataSourceBean.setXaProperties(properties);
		return atomikosDataSourceBean;
	}

	@Bean(name = "storageDataSource")
	public DataSource storageDataSource() {
		AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
		atomikosDataSourceBean.setUniqueResourceName("storageDataSource");
		atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
		Properties properties = new Properties();
		properties.put("URL", storageDbUrl);
		properties.put("user", storageUsername);
		properties.put("password", storagePassword);
		atomikosDataSourceBean.setXaProperties(properties);
		return atomikosDataSourceBean;
	}
}

AccountDataSourceConfig

package com.yj.atomikos.config;

import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

@Configuration
@MapperScan(basePackages = { "com.yj.atomikos.dao.account" }, sqlSessionFactoryRef = "accountSqlSessionFactory")
public class AccountDataSourceConfig {

	public static final String MAPPER_XML_LOCATION = "classpath:mapper/account/*.xml";

	@Autowired
	@Qualifier("accountDataSource")
	private DataSource accountDataSource;

	@Bean
	public SqlSessionTemplate accountSqlSessionTemplate() throws Exception {
		return new SqlSessionTemplate(accountSqlSessionFactory());
	}

	@Bean
	public SqlSessionFactory accountSqlSessionFactory() throws Exception {
		SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
		factoryBean.setDataSource(accountDataSource);
		factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_XML_LOCATION));
		return factoryBean.getObject();
	}
}

StorageDataSourceConfig

package com.yj.atomikos.config;

import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

@Configuration
@MapperScan(basePackages = { "com.yj.atomikos.dao.storage" }, sqlSessionFactoryRef = "storageSqlSessionFactory")
public class StorageDataSourceConfig {

	public static final String MAPPER_XML_LOCATION = "classpath:mapper/storage/*.xml";

	@Autowired
	@Qualifier("storageDataSource")
	private DataSource storageDataSource;

	@Bean
	public SqlSessionTemplate storageSqlSessionTemplate() throws Exception {
		return new SqlSessionTemplate(storageSqlSessionFactory());
	}

	@Bean
	public SqlSessionFactory storageSqlSessionFactory() throws Exception {
		SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
		factoryBean.setDataSource(storageDataSource);
		factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_XML_LOCATION));
		return factoryBean.getObject();
	}
}

BusinessService模拟事务

package com.yj.atomikos.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.yj.atomikos.dao.account.AccountDao;
import com.yj.atomikos.dao.storage.StorageDao;

@Service
public class BusinessService {

	private static final Logger log = LoggerFactory.getLogger(BusinessService.class);

	@Autowired
	private AccountDao accountDao;
	
	@Autowired
	private StorageDao storageDao;

	@Transactional
	public String transaction(long userId, long productId, String flag) {
		bisiness(userId, productId, flag);
		return "操作成功";
	}

	private void bisiness(long userId, long productId, String flag) {
		log.info("==业务开始==");
		accountDao.updateAccount(userId);
		if ("1".equals(flag)) {
			int i = 1 / 0;
		}
		storageDao.updateStorage(productId);
		log.info("==业务结束==");
	}
}

开始验证

启动项目,观察日志输出

2020-01-27 15:44:24.471  WARN 10732 --- [           main] c.a.i.config.UserTransactionServiceImp   : No properties path set - looking for transactions.properties in classpath...
2020-01-27 15:44:24.472  WARN 10732 --- [           main] c.a.i.config.UserTransactionServiceImp   : transactions.properties not found - looking for jta.properties in classpath...
2020-01-27 15:44:24.473  WARN 10732 --- [           main] c.a.i.config.UserTransactionServiceImp   : Failed to open transactions properties file - using default values
2020-01-27 15:44:24.506  INFO 10732 --- [           main] c.a.p.imp.StateRecoveryManagerImp        : com.atomikos.persistence.imp.WriteAheadObjectLog instantiation failed - falling back to default
2020-01-27 15:44:24.509  INFO 10732 --- [           main] c.a.persistence.imp.FileLogStream        : Starting read of logfile D:\eclipse\ws\DistributedTransaction\Atomikos\transaction-logs\tmlog25.log
2020-01-27 15:44:24.509  INFO 10732 --- [           main] c.a.persistence.imp.FileLogStream        : Done read of logfile
2020-01-27 15:44:24.509  INFO 10732 --- [           main] c.a.persistence.imp.AbstractLogStream    : Logfile closed: D:\eclipse\ws\DistributedTransaction\Atomikos\transaction-logs\tmlog25.log
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING core version: 3.9.3
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.automatic_resource_registration = true
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.client_demarcation = false
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.threaded_2pc = false
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.serial_jta_transactions = true
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.serializable_logging = true
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.log_base_dir = D:\eclipse\ws\DistributedTransaction\Atomikos\transaction-logs
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.max_actives = 50
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.checkpoint_interval = 500
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.enable_logging = true
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.output_dir = .\
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.log_base_name = tmlog
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.max_timeout = 300000
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.tm_unique_name = 192.168.190.1.tm
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING java.naming.factory.initial = com.sun.jndi.rmi.registry.RegistryContextFactory
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING java.naming.provider.url = rmi://localhost:1099
2020-01-27 15:44:24.530  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.service = com.atomikos.icatch.standalone.UserTransactionServiceFactory
2020-01-27 15:44:24.531  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.force_shutdown_on_vm_exit = false
2020-01-27 15:44:24.531  INFO 10732 --- [           main] c.a.i.c.i.AbstractUserTransactionService : USING com.atomikos.icatch.default_jta_timeout = 10000
2020-01-27 15:44:24.541  INFO 10732 --- [           main] o.s.t.jta.JtaTransactionManager          : Using JTA UserTransaction: [email protected]
2020-01-27 15:44:24.542  INFO 10732 --- [           main] o.s.t.jta.JtaTransactionManager          : Using JTA TransactionManager: [email protected]
2020-01-27 15:44:24.649  INFO 10732 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2020-01-27 15:44:24.657  INFO 10732 --- [           main] o.a.coyote.http11.Http11NioProtocol      : Starting ProtocolHandler ["http-nio-8001"]
2020-01-27 15:44:24.666  INFO 10732 --- [           main] o.a.tomcat.util.net.NioSelectorPool      : Using a shared selector for servlet write/read
2020-01-27 15:44:24.675  INFO 10732 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8001 (http)

调用接口

http://127.0.0.1:8001/transaction?user_id=1&product_id=1&flag=1

观察日志

2020-01-27 15:45:24.012  INFO 10732 --- [nio-8001-exec-3] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring FrameworkServlet 'dispatcherServlet'
2020-01-27 15:45:24.013  INFO 10732 --- [nio-8001-exec-3] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started
2020-01-27 15:45:24.028  INFO 10732 --- [nio-8001-exec-3] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 15 ms
2020-01-27 15:45:24.075  INFO 10732 --- [nio-8001-exec-3] c.a.icatch.imp.thread.TaskManager        : THREADS: using JDK thread pooling...
2020-01-27 15:45:24.081  INFO 10732 --- [nio-8001-exec-3] c.a.icatch.imp.BaseTransactionManager    : createCompositeTransaction ( 10000 ): created new ROOT transaction with id 192.168.190.1.tm0000100027
2020-01-27 15:45:24.089  INFO 10732 --- [nio-8001-exec-3] com.yj.atomikos.service.BusinessService  : ==业务开始==
2020-01-27 15:45:24.108  INFO 10732 --- [nio-8001-exec-3] c.atomikos.jdbc.AbstractDataSourceBean   : AtomikosDataSoureBean 'accountDataSource': getConnection ( null )...
2020-01-27 15:45:24.108  INFO 10732 --- [nio-8001-exec-3] c.atomikos.jdbc.AbstractDataSourceBean   : AtomikosDataSoureBean 'accountDataSource': init...
2020-01-27 15:45:24.108  WARN 10732 --- [nio-8001-exec-3] c.atomikos.jdbc.AbstractDataSourceBean   : AtomikosDataSoureBean 'accountDataSource': poolSize equals default - this may cause performance problems!
2020-01-27 15:45:24.115  INFO 10732 --- [nio-8001-exec-3] c.atomikos.jdbc.AtomikosDataSourceBean   : AtomikosDataSoureBean 'accountDataSource': initializing with [ xaDataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlXADataSource, uniqueResourceName=accountDataSource, maxPoolSize=1, minPoolSize=1, borrowConnectionTimeout=30, maxIdleTime=60, reapTimeout=0, maintenanceInterval=60, testQuery=null, xaProperties=[URL=jdbc:mysql://192.168.190.131:3307/seata-account?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true,user=root,password=123456], loginTimeout=0, maxLifetime=0]
2020-01-27 15:45:24.314  INFO 10732 --- [nio-8001-exec-3] c.a.d.xa.XATransactionalResource         : accountDataSource: refreshed XAResource
2020-01-27 15:45:24.353  INFO 10732 --- [nio-8001-exec-3] c.atomikos.jdbc.AtomikosConnectionProxy  : atomikos connection proxy for [email protected]: calling getAutoCommit...
2020-01-27 15:45:24.356 DEBUG 10732 --- [nio-8001-exec-3] c.y.a.d.a.AccountDao.updateAccount       : ==>  Preparing: update account set money = money-10 where user_id = ? 
2020-01-27 15:45:24.359  INFO 10732 --- [nio-8001-exec-3] c.a.icatch.imp.CompositeTransactionImp   : addParticipant ( XAResourceTransaction: 3139322E3136382E3139302E312E746D30303030313030303237:3139322E3136382E3139302E312E746D31 ) for transaction 192.168.190.1.tm0000100027
2020-01-27 15:45:24.359  INFO 10732 --- [nio-8001-exec-3] c.a.datasource.xa.XAResourceTransaction  : XAResource.start ( 3139322E3136382E3139302E312E746D30303030313030303237:3139322E3136382E3139302E312E746D31 , XAResource.TMNOFLAGS ) on resource accountDataSource represented by XAResource instance [email protected]
2020-01-27 15:45:24.361  INFO 10732 --- [nio-8001-exec-3] c.a.icatch.imp.CompositeTransactionImp   : registerSynchronization ( com.a[email protected]b42ba2df ) for transaction 192.168.190.1.tm0000100027
2020-01-27 15:45:24.361  INFO 10732 --- [nio-8001-exec-3] c.atomikos.jdbc.AtomikosConnectionProxy  : atomikos connection proxy for [email protected]: calling prepareStatement(update account set money = money-10 where user_id = ?)...
2020-01-27 15:45:24.385 DEBUG 10732 --- [nio-8001-exec-3] c.y.a.d.a.AccountDao.updateAccount       : ==> Parameters: 1(Long)
2020-01-27 15:45:24.389 DEBUG 10732 --- [nio-8001-exec-3] c.y.a.d.a.AccountDao.updateAccount       : <==    Updates: 1
2020-01-27 15:45:24.390  INFO 10732 --- [nio-8001-exec-3] c.atomikos.jdbc.AtomikosConnectionProxy  : atomikos connection proxy for [email protected]: close()...
2020-01-27 15:45:24.391  INFO 10732 --- [nio-8001-exec-3] c.a.datasource.xa.XAResourceTransaction  : XAResource.end ( 3139322E3136382E3139302E312E746D30303030313030303237:3139322E3136382E3139302E312E746D31 , XAResource.TMSUCCESS ) on resource accountDataSource represented by XAResource instance [email protected]
2020-01-27 15:45:24.395  INFO 10732 --- [nio-8001-exec-3] c.a.datasource.xa.XAResourceTransaction  : XAResource.rollback ( 3139322E3136382E3139302E312E746D30303030313030303237:3139322E3136382E3139302E312E746D31 ) on resource accountDataSource represented by XAResource instance [email protected]
2020-01-27 15:45:24.400  INFO 10732 --- [nio-8001-exec-3] c.a.icatch.imp.CompositeTransactionImp   : rollback() done of transaction 192.168.190.1.tm0000100027
2020-01-27 15:45:24.409 ERROR 10732 --- [nio-8001-exec-3] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root cause

java.lang.ArithmeticException: / by zero

同时观察两个数据源中的数据,正常。到这里Atomikos对于一个项目中多数据源的事务控制生效了。