欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

分布式事务JTA之实践:Spring+ATOMIKOS参考 博客分类: springjta spring 

程序员文章站 2024-03-14 11:53:52
...

分布式事务JTA之实践:Spring+ATOMIKOS

本文的目录结构如下:

  • 一、概述
  • 二、应用场景
  • 三、实验模拟需求
  • 四、实例测试环境
  • 五、源代码下载及配置介绍
  • 六、测试验证

一、概述:

本文主要讲述如何基于Atomikos 和spring在项目中实现分布式事务管理

二、应用场景:

如果项目中的数据源来自多个数据库,同时又需要在多数据源中保证事务,此时就需要用到分布式事务处理了。

三、实验模拟需求:

比如有两个对象:用户信息、用户存款,用户信息存在数据库A、存款信息存在数据库B,若客户甲向乙转账,需要在数据库B中对甲、乙的存款信息修改,同时在数据库A中把甲、乙的备注信息最新为最近一次的操作时间。

、实例测试环境:

  • spring、hibernate3.2
  • mysql5.1.51(需要版本5.0+)
  • AtomikosTransactionsEssentials-3.7.0 (详细可参加它的官网:http://www.atomikos.com  )

说明:

1. 测试的数据库需要支持分布式事务,同时JDBC要支持XA连接驱动。本次测试用的mysql5.1是支持事务的,JDBC驱动版本:mysql-connector-java-5.1.7-bin.jar,包含对 XA连接的支持:com.mysql.jdbc.jdbc2.optional.MysqlXAConnection

2. 附件提供AtomikosTransactionsEssentials 3.7.0 lib包下载:AtomikosTransactionsEssentials-3.7.0-lib.zip。官方下载地址:http://www.atomikos.com/Main/TransactionsEssentialsDownloadForm,需要先注册才能下载。同时这里也提供目前3.7.0的下载链接:http://www.atomikos.com/downloads/transactions-essentials/com/atomikos/AtomikosTransactionsEssentials/3.7.0/AtomikosTransactionsEssentials-3.7.0-bin.zip

五、代码及配置介绍:

源代码下载分布式事务实例演示源代码michael_jta_code.zip

1.代码的目录结构图如下:

分布式事务JTA之实践:Spring+ATOMIKOS参考
            
    
    博客分类: springjta spring 

转账测试的的代码片段:

1 /**
2      * 转账测试
3      * @param srcId
4      * @param destId
5      * @param money
6      * @return boolean
7      */
8     public boolean doTestTransfer(String srcId, String destId, float money) {
9  
10         BankAccount srcAccount = bankAccountDao.getByUserName(srcId);
11         BankAccount destAccount = bankAccountDao.getByUserName(destId);
12         if (srcAccount.getDeposit() < money) {
13             System.out.println("warn :" + srcAccount.getUserName()
14                     + " has not enough money to transfer");
15             return false;
16         }
17         srcAccount.setDeposit(srcAccount.getDeposit() - money);
18         destAccount.setDeposit(destAccount.getDeposit() + money);
19         // 把更新存款信息置于异常发生之前
20         bankAccountDao.update(srcAccount);
21         bankAccountDao.update(destAccount);
22  
23         Date curTime = new Date();
24         UserInfo srcUser = userInfoDao.getById(srcId);
25         UserInfo destUser = userInfoDao.getById(destId);
26  
27         destUser.setRemark1(curTime + "");
28         destUser.setRemark2(curTime + "");
29         // 把更新基本信息置于异常发生之前
30         userInfoDao.update(destUser);
31         srcUser.setRemark1(curTime + "");
32         if (srcAccount.getDeposit() < 18000) {
33             throw new RuntimeException("michael test exception for JTA  ");
34         }
35         srcUser.setRemark2(curTime + "");
36  
37         userInfoDao.update(srcUser);
38         System.out.println("success done:" + srcAccount.getUserName()
39                 + " transfer ¥" + money + " to " + destAccount.getUserName());
40  
41         return true;
42     }

2. 配置文件详细介绍:

jta.jdbc.properties

2 # eg. for mysql
3 jdbc.SDS.class=com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
5  
6 jdbc.SDS2.class=com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

jta.properties

2 com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
3 com.atomikos.icatch.console_file_name = tm.out
4 com.atomikos.icatch.log_base_name = tmlog
5 com.atomikos.icatch.tm_unique_name = com.atomikos.spring.jdbc.tm
6 com.atomikos.icatch.console_log_level = INFO

jta1.hibernate.cfg.xml

1 <!DOCTYPE hibernate-configuration
2     PUBLIC "-//Hibernate/Hibernate Configuration DTD//EN"
4  
5 <hibernate-configuration>
6     <session-factory>
7         <property name="dialect">
8             org.hibernate.dialect.MySQL5Dialect
9         </property>
10         <property name="hbm2ddl.auto">update</property>
11         <mapping class="michael.jta.atomikos.domain.UserInfo" />
12     </session-factory>
13  
14 </hibernate-configuration>

jta2.hibernate.cfg.xml

1 <!DOCTYPE hibernate-configuration
2     PUBLIC "-//Hibernate/Hibernate Configuration DTD//EN"
4  
5 <hibernate-configuration>
6  
7     <session-factory>
8         <property name="show_sql">true</property>
9         <property name="dialect">
10             org.hibernate.dialect.MySQL5Dialect
11         </property>
12         <property name="hbm2ddl.auto">update</property>
13         <mapping class="michael.jta.atomikos.domain.BankAccount" />
14     </session-factory>
15  
16 </hibernate-configuration>

jta.spring.xml

 

1 <?xml version="1.0" encoding="UTF-8"?>
2 <!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
3 <beans>
4     <!-- Configurer that replaces ${...} placeholders with values from properties files -->
5     <bean id="propertyConfigurer"
6         class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
7         <property name="locations">
8             <list>
9                 <value>classpath:jta.jdbc.properties</value>
10             </list>
11         </property>
12     </bean>
13  
14     <!-- 数据源配置 http://sjsky.iteye.com-->
15     <bean id="SDS" class="com.atomikos.jdbc.SimpleDataSourceBean"
16         init-method="init" destroy-method="close">
17         <property name="uniqueResourceName">
18             <value>SDS</value>
19         </property>
20         <property name="xaDataSourceClassName">
21             <value>${jdbc.SDS.class}</value>
22         </property>
23         <property name="xaDataSourceProperties">
24             <value>${jdbc.SDS.properties}</value>
25         </property>
26         <property name="exclusiveConnectionMode">
27             <value>true</value>
28         </property>
29         <property name="connectionPoolSize">
30             <value>3</value>
31         </property>
32         <property name="validatingQuery">
33             <value>SELECT 1</value>
34         </property>
35     </bean>
36  
37     <bean id="SDS2" class="com.atomikos.jdbc.SimpleDataSourceBean"
38         init-method="init" destroy-method="close">
39         <property name="uniqueResourceName">
40             <value>SDS2</value>
41         </property>
42         <property name="xaDataSourceClassName">
43             <value>${jdbc.SDS2.class}</value>
44         </property>
45         <property name="xaDataSourceProperties">
46             <value>${jdbc.SDS2.properties}</value>
47         </property>
48         <property name="exclusiveConnectionMode">
49             <value>true</value>
50         </property>
51         <property name="connectionPoolSize">
52             <value>3</value>
53         </property>
54         <property name="validatingQuery">
55             <value>SELECT 1</value>
56         </property>
57     </bean>
58  
59     <!-- sessionFactory http://www.micmiu.com-->
60     <bean id="sessionFactory1"
61         class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean">
62         <property name="dataSource" ref="SDS" />
63         <property name="configLocation"
64             value="classpath:jta1.hibernate.cfg.xml" />
65     </bean>
66  
67     <bean id="sessionFactory2"
68         class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean">
69         <property name="dataSource" ref="SDS2" />
70         <property name="configLocation"
71             value="classpath:jta2.hibernate.cfg.xml" />
72     </bean>
73  
74     <!-- TransactionManager http://www.micmiu.com-->
75     <!-- Construct Atomikos UserTransactionManager, needed to configure Spring -->
76     <bean id="atomikosTransactionManager"
77         class="com.atomikos.icatch.jta.UserTransactionManager"
78         init-method="init" destroy-method="close">
79         <!--  when close is called, should we force transactions to terminate or not? -->
80         <property name="forceShutdown">
81             <value>true</value>
82         </property>
83     </bean>
84  
85     <!-- Also use Atomikos UserTransactionImp, needed to configure Spring  -->
86     <bean id="atomikosUserTransaction"
87         class="com.atomikos.icatch.jta.UserTransactionImp">
88         <property name="transactionTimeout">
89             <value>300</value>
90         </property>
91     </bean>
92  
93     <!-- Configure the Spring framework to use JTA transactions from Atomikos -->
94     <bean id="springJTATransactionManager"
95         class="org.springframework.transaction.jta.JtaTransactionManager">
96  
97         <property name="transactionManager">
98             <ref bean="atomikosTransactionManager" />
99         </property>
100         <property name="userTransaction">
101             <ref bean="atomikosUserTransaction" />
102         </property>
103     </bean>
104  
105     <!-- Configure DAO http://www.micmiu.com-->
106     <bean id="userInfoDao"
107         class="michael.jta.atomikos.dao.impl.UserInfoDaoImpl">
108         <property name="sessionFactory" ref="sessionFactory1" />
109     </bean>
110     <bean id="bankAccountDao"
111         class="michael.jta.atomikos.dao.BankAccountDao">
112         <property name="sessionFactory" ref="sessionFactory2" />
113     </bean>
114  
115     <bean id="bankAccountService"
116         class="michael.jta.atomikos.service.impl.BankAccountServiceImpl">
117         <property name="userInfoDao" ref="userInfoDao" />
118         <property name="bankAccountDao" ref="bankAccountDao" />
119     </bean>
120  
121     <!-- 定义事务规则的拦截器 http://www.micmiu.com-->
122     <bean id="transactionInterceptor"
123         class="org.springframework.transaction.interceptor.TransactionInterceptor">
124         <property name="transactionManager"
125             ref="springJTATransactionManager" />
126         <property name="transactionAttributes">
127             <props>
128                 <prop key="*">PROPAGATION_REQUIRED</prop>
129             </props>
130         </property>
131     </bean>
132  
133     <!-- 声明式事务边界配置 所有的bean公用一个代理bean http://sjsky.iteye.com-->
134     <bean id="baseTransactionProxy"
135         class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean"
136         abstract="true">
137         <property name="transactionManager"
138             ref="springJTATransactionManager" />
139         <property name="transactionAttributes">
140             <props>
141                 <!-- 可以根据实际情况细化配置提高性能 -->
142                 <prop key="*">PROPAGATION_REQUIRED</prop>
143             </props>
144         </property>
145     </bean>
146  
147     <bean id="bankAccountServiceProxy" parent="baseTransactionProxy">
148         <property name="target">
149             <ref bean="bankAccountService" />
150         </property>
151     </bean>
152  
153 </beans>

 

六、测试验证

1. 初始化数据:

因为mysql数据库表的类型有事务和非事务之分,建表时一定要注意确保表的类型是事务控制的:InnoDB

数据库A(192.168.8.253):

1 DROP DATABASE IF EXISTS demota;
2 CREATE DATABASE demota;
3 USE demota;
4 DROP TABLE  IF EXISTS tb_user_info;
5  
6 CREATE TABLE tb_user_info (
7     user_name varchar(20),
8     real_name varchar(10),
9     remark1 varchar(50),
10     remark2 varchar(50)
11     ) ENGINE = InnoDB;
12  
13 INSERT INTO tb_user_info (user_name,real_name,remark1,remark2) VALUES ('husband','husband','','');
14 INSERT INTO tb_user_info (user_name,real_name,remark1,remark2) VALUES ('wife','wife','','');

数据库B(192.168.8.150):

1 DROP DATABASE IF EXISTS demota;
2 CREATE DATABASE demota;
3 USE demota;
4 DROP TABLE  IF EXISTS tb_account;
5 CREATE TABLE tb_account (
6  id int AUTO_INCREMENT,
7  user_name varchar(20),
8  deposit float(10,2),
9  PRIMARY KEY(id)
10  ) ENGINE = InnoDB;
11  
12 INSERT INTO tb_account (user_name,deposit) VALUES ('husband',20000.00);
13 INSERT INTO tb_account (user_name,deposit) VALUES ('wife',10000.00);

2. 测试过程:

ps: 代码中模拟了异常出现的条件:如果账户金额<18000会抛出异常

JtaRunMainTest.java

1 package michael.jta.atomikos;
2  
3 import michael.jta.atomikos.service.BankAccountService;
4  
5 import org.springframework.context.ApplicationContext;
6 import org.springframework.context.support.ClassPathXmlApplicationContext;
7  
8 /**
9  * @author michael
10  *
11  */
12 public class JtaRunMainTest {
13  
14     /**
15      * @param args
16      */
17     public static void main(String[] args) {
18         System.out.println("------------start");
19         ApplicationContext appCt = new ClassPathXmlApplicationContext(
20                 "jta.spring.xml");
21         System.out.println("------------finished init xml");
22  
23         Object bean = appCt.getBean("bankAccountServiceProxy");
24         System.out.println(bean.getClass());
25         BankAccountService service = (BankAccountService) bean;
26         service.doTestTransfer("husband", "wife", 2000);
27     }
28 }

运行第一次结果:

1 ------------start
2 ------------finished init xml
3  class $Proxy11
4  Hibernate: select bankaccoun0_.id as id2_, bankaccoun0_.deposit as deposit2_, bankaccoun0_.user_name as user3_2_ from tb_account bankaccoun0_ where bankaccoun0_.user_name=?
5  Hibernate: select bankaccoun0_.id as id2_, bankaccoun0_.deposit as deposit2_, bankaccoun0_.user_name as user3_2_ from tb_account bankaccoun0_ where bankaccoun0_.user_name=?
6 success done:husband transfer ¥2000.0 to wife
7  Hibernate: update tb_account set deposit=?, user_name=? where id=?
8  Hibernate: update tb_account set deposit=?, user_name=? where id=?

运行第二次结果:

1 ------------start
2 ------------finished init xml
3  class $Proxy11
4  Hibernate: select bankaccoun0_.id as id2_, bankaccoun0_.deposit as deposit2_, bankaccoun0_.user_name as user3_2_ from tb_account bankaccoun0_ where bankaccoun0_.user_name=?
5  Hibernate: select bankaccoun0_.id as id2_, bankaccoun0_.deposit as deposit2_, bankaccoun0_.user_name as user3_2_ from tb_account bankaccoun0_ where bankaccoun0_.user_name=?
6 Exception in thread "main" java.lang.RuntimeException: michael test exception for JTA
7 at michael.jta.atomikos.service.impl.BankAccountServiceImpl.doTestTransfer(BankAccountServiceImpl.java:51)
8  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
9  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
10  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
11  at java.lang.reflect.Method.invoke(Method.java:597)
12  at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:299)
13  at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:172)
14  at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:139)
15  at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:107)
16  at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:161)
17  at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
18  at $Proxy11.doTestTransfer(Unknown Source)
19  at michael.jta.atomikos.JtaRunMainTest.main(JtaRunMainTest.java:26)

测试过程中数据库查询的结果截图:

分布式事务JTA之实践:Spring+ATOMIKOS参考
            
    
    博客分类: springjta spring 

从上面的数据库截图可见,第一正常运行时两个数据库同步更新了,第二次运行发生异常后,两个数据库的数据为发生变化,实现了事务回滚

相关标签: spring