Java事务处理全解析(一至八) 博客分类: jta jta
Java中的事务处理有多简单?在使用EJB时,事务在我们几乎察觉不到的情况下发挥着 作用;而在使用Spring时,也只需要配置一个TransactionManager,然后在需要事务的方法上加上Transactional注解就行 了。Java的事务处理之所以这么简单是因为框架在背后为我们做了太多的工作。这样,我们虽然可以快速地完成开发工作,但是一旦程序出现问题,在一阵 google和*之后,你估计还是一筹莫展。作为一个有技术追求的程序员,你应该了解Java事务的底层工作原理。
这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:
git clone https://github.com/davenkin/java_transaction_workshop.git
本系列文章将在不依赖Spring和Java EE容器的条件下讲解Java中事务处理的基本方法与原理,包含以下文章:
(二)失败的案例
(三)丑陋的案例
(四)成功的案例(自己实现一个线程安全的TransactionManager)
(五)Template模式
(七)像Spring一样使用Transactional注解(Annotation)
(八)分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)
(一)Java事务处理的基本问题
Java通过JDBC与数据库进行交互,这是一个如今多数程序员都不会直接使用的技术,我们更倾向于使用Hibernate和Mybatis,但是,他们在底层都需要JDBC与数据库通信,事务处理亦是如此,那么,我们首先来看看JDBC提供的事务处理API。
(1)JDBC提供的事务处理API
JDBC提供的事务处理API非常少,请不要被Spring中事务处理的那一堆源代码所打击得信心尽失,这些框架提供的事务处理功能归根结底主要通过以Connection类的方法完成:
Connection.setAutoCommit(boolean);
Connection.commit();
Connection.rollback();
在Spring的事务处理源代码中,有很多都是处理多线程的,另外一些使用了一些设计模式。不要惊慌,在本系列中(除了系列八),你将看不到任何 Spring的影子,我们会通过简单的代码来学习Java事务,学完之后,你可以阅读一下Spring的事务处理源代码,然后将本系列中的事务处理原理与 Spring对比,你会发现,Spring要面临与处理的问题也是本系列文章中遇到的问题。
(2)本地事务和分布式事务
本地(Local Transaction)事务指只有一个数据源参与的事务,比如只有数据库或者只有JMS;分布式事务(Distributed Transaction)指有多个数据源同时参与的事务,比如一项操作需要同时访问数据库和通过JMS发送消息,或者一项操作需要同时访问两个不同数据 库。对于分布式事务,Java提供了JTA规范,它的原理与本地事务存在不同。 鉴于多数情况下Java事务为本地事务,本系列主要讲解本地事务,而在系列八中有分布式事务的入门例子。
(3)线程安全
线程安全是Java事务处理的一大难点,比如一个DAO类维护了一个Connection实例变量,两个线程同时使用该DAO类与数据库交互,其中 一个在使用完Connection后将其关闭,而此时另一个线程正在使用该Connection访问数据库,这时另一个线程对数据库的访问将失败。在本系 列的后续文章中,我们将学到如何处理这样的问题并开发线程安全的程序。
(4)Service层和DAO层
通常来说,数据持久化层又分为Service层和DAO层,Service层用于完成与业务逻辑有关的工作,并且Service层包含了工作单元 (Unit of work),也即Service层中的方法为事务作用的边界;DAO层用于完成对数据库的实际操作(增删改查)。有时在使用Hibernate或是JPA 时我们也会直接在Service层访问数据库而省略掉DAO层。在本系列中,我们会用一个BankService例子贯穿始终。该BankService 用于将用户银行账户(Bank Account)中的存款转帐到该用户的保险账户(Insurance Account)中,两个账户对应有不同的数据库表。
BankService需要两个DAO类协同起来工作,一个负责银行账户表的操作,另一个负责保险账户表操作,这是一个典型的事务处理例子。在下一篇文章中,我们将学习一个关于该BankService事务处理失败的案例。
在本系列的上一篇文章中,我们讲到了Java事务处理的基本问题,并且讲到了Service层和DAO层,在本篇文章中,我们将以BankService为例学习一个事务处理失败的案例。
请通过以下方式下载github源代码:
git clone https://github.com/davenkin/java_transaction_workshop.git
BankService的功能为:某个用户有两个账户,分别为银行账户和保险账户,并且有各自的账户号,BankService的transfer方法从该用户的银行账户向保险账户转帐,两个DAO分别用于对两个账户表的存取操作。
定义一个BankService接口如下:
package davenkin; public interface BankService { public void transfer(int fromId, int toId, int amount); }
在两个DAO对象中,我们通过传入的同一个DataSource获得Connection,然后通过JDBC提供的API直接对数据库进行操作。
定义操作银行账户表的DAO类如下:
package davenkin.step1_failure; import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class FailureBankDao { private DataSource dataSource; public FailureBankDao(DataSource dataSource) { this.dataSource = dataSource; } public void withdraw(int bankId, int amount) throws SQLException { Connection connection = dataSource.getConnection(); PreparedStatement selectStatement = connection.prepareStatement("SELECT BANK_AMOUNT FROM BANK_ACCOUNT WHERE BANK_ID = ?"); selectStatement.setInt(1, bankId); ResultSet resultSet = selectStatement.executeQuery(); resultSet.next(); int previousAmount = resultSet.getInt(1); resultSet.close(); selectStatement.close(); int newAmount = previousAmount - amount; PreparedStatement updateStatement = connection.prepareStatement("UPDATE BANK_ACCOUNT SET BANK_AMOUNT = ? WHERE BANK_ID = ?"); updateStatement.setInt(1, newAmount); updateStatement.setInt(2, bankId); updateStatement.execute(); updateStatement.close(); connection.close(); } }
FailureBankDao的withdraw方法,从银行账户表(BANK_ACCOUNT)中帐号为bankId的用户账户中取出数量为amount的金额。
采用同样的方法,定义保险账户的DAO类如下:
package davenkin.step1_failure; import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class FailureInsuranceDao { private DataSource dataSource; public FailureInsuranceDao(DataSource dataSource){ this.dataSource = dataSource; } public void deposit(int insuranceId, int amount) throws SQLException { Connection connection = dataSource.getConnection(); PreparedStatement selectStatement = connection.prepareStatement("SELECT INSURANCE_AMOUNT FROM INSURANCE_ACCOUNT WHERE INSURANCE_ID = ?"); selectStatement.setInt(1, insuranceId); ResultSet resultSet = selectStatement.executeQuery(); resultSet.next(); int previousAmount = resultSet.getInt(1); resultSet.close(); selectStatement.close(); int newAmount = previousAmount + amount; PreparedStatement updateStatement = connection.prepareStatement("UPDATE INSURANCE_ACCOUNT SET INSURANCE_AMOUNT = ? WHERE INSURANCE_ID = ?"); updateStatement.setInt(1, newAmount); updateStatement.setInt(2, insuranceId); updateStatement.execute(); updateStatement.close(); connection.close(); } }
FailureInsuranceDao类的deposit方法向保险账户表(INSURANCE_ACCOUNT)存入amount数量的金额, 这样在BankService中,我们可以先调用FailureBankDao的withdraw方法取出一定金额的存款,再调用 FailureInsuranceDao的deposit方法将该笔存款存入保险账户表中,一切看似OK,实现BankService接口如下:
package davenkin.step1_failure; import davenkin.BankService; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; public class FailureBankService implements BankService{ private FailureBankDao failureBankDao; private FailureInsuranceDao failureInsuranceDao; private DataSource dataSource; public FailureBankService(DataSource dataSource) { this.dataSource = dataSource; } public void transfer(int fromId, int toId, int amount) { Connection connection = null; try { connection = dataSource.getConnection(); connection.setAutoCommit(false); failureBankDao.withdraw(fromId, amount); failureInsuranceDao.deposit(toId, amount); connection.commit(); } catch (Exception e) { try { assert connection != null; connection.rollback(); } catch (SQLException e1) { e1.printStackTrace(); } } finally { try { assert connection != null; connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } public void setFailureBankDao(FailureBankDao failureBankDao) { this.failureBankDao = failureBankDao; } public void setFailureInsuranceDao(FailureInsuranceDao failureInsuranceDao) { this.failureInsuranceDao = failureInsuranceDao; } }
在FailureBankService的transfer方法中,我们首先通过DataSource获得Connection,然后调用 connection.setAutoCommit(false)已开启手动提交模式,如果一切顺利,则commit,如果出现异常,则 rollback。 接下来,开始测试我们的BankService吧。
为了准备测试数据,我们定义个BankFixture类,该类负责在每次测试之前准备测试数据,分别向银行账户(1111)和保险账户(2222) 中均存入1000元。BankFixture还提供了两个helper方法(getBankAmount和getInsuranceAmount)帮助我 们从数据库中取出数据以做数据验证。我们使用HSQL数据库的in-memory模式,这样不用启动数据库server,方便测试。 BankFixture类定义如下:
package davenkin; import org.junit.Before; import javax.sql.DataSource; import java.sql.*; public class BankFixture { protected final DataSource dataSource = DataSourceFactory.createDataSource(); @Before public void setUp() throws SQLException { Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement(); statement.execute("DROP TABLE BANK_ACCOUNT IF EXISTS"); statement.execute("DROP TABLE INSURANCE_ACCOUNT IF EXISTS"); statement.execute("CREATE TABLE BANK_ACCOUNT (\n" + "BANK_ID INT,\n" + "BANK_AMOUNT INT,\n" + "PRIMARY KEY(BANK_ID)\n" + ");"); statement.execute("CREATE TABLE INSURANCE_ACCOUNT (\n" + "INSURANCE_ID INT,\n" + "INSURANCE_AMOUNT INT,\n" + "PRIMARY KEY(INSURANCE_ID)\n" + ");"); statement.execute("INSERT INTO BANK_ACCOUNT VALUES (1111, 1000);"); statement.execute("INSERT INTO INSURANCE_ACCOUNT VALUES (2222, 1000);"); statement.close(); connection.close(); } protected int getBankAmount(int bankId) throws SQLException { Connection connection = dataSource.getConnection(); PreparedStatement selectStatement = connection.prepareStatement("SELECT BANK_AMOUNT FROM BANK_ACCOUNT WHERE BANK_ID = ?"); selectStatement.setInt(1, bankId); ResultSet resultSet = selectStatement.executeQuery(); resultSet.next(); int amount = resultSet.getInt(1); resultSet.close(); selectStatement.close(); connection.close(); return amount; } protected int getInsuranceAmount(int insuranceId) throws SQLException { Connection connection = dataSource.getConnection(); PreparedStatement selectStatement = connection.prepareStatement("SELECT INSURANCE_AMOUNT FROM INSURANCE_ACCOUNT WHERE INSURANCE_ID = ?"); selectStatement.setInt(1, insuranceId); ResultSet resultSet = selectStatement.executeQuery(); resultSet.next(); int amount = resultSet.getInt(1); resultSet.close(); selectStatement.close(); connection.close(); return amount; } }
编写的Junit测试继承自BankFixture类,测试代码如下:
package davenkin.step1_failure; import davenkin.BankFixture; import org.junit.Test; import java.sql.SQLException; import static junit.framework.Assert.assertEquals; public class FailureBankServiceTest extends BankFixture { @Test public void transferSuccess() throws SQLException { FailureBankDao failureBankDao = new FailureBankDao(dataSource); FailureInsuranceDao failureInsuranceDao = new FailureInsuranceDao(dataSource); FailureBankService bankService = new FailureBankService(dataSource); bankService.setFailureBankDao(failureBankDao); bankService.setFailureInsuranceDao(failureInsuranceDao); bankService.transfer(1111, 2222, 200); assertEquals(800, getBankAmount(1111)); assertEquals(1200, getInsuranceAmount(2222)); } @Test public void transferFailure() throws SQLException { FailureBankDao failureBankDao = new FailureBankDao(dataSource); FailureInsuranceDao failureInsuranceDao = new FailureInsuranceDao(dataSource); FailureBankService bankService = new FailureBankService(dataSource); bankService.setFailureBankDao(failureBankDao); bankService.setFailureInsuranceDao(failureInsuranceDao); int toNonExistId = 3333; bankService.transfer(1111, toNonExistId, 200); assertEquals(1000, getInsuranceAmount(2222)); assertEquals(1000, getBankAmount(1111)); } }
运行测试,第一个测试(transferSuccess)成功,第二个测试(transferFailure)失败。
分析错误,原因在于:我们分别从FailureBankService,FailureBankDao和FailureInsuranceDao中 调用了三次dataSource.getConnection(),亦即我们创建了三个不同的Connection对象,而Java事务是作用于 Connection之上的,所以从在三个地方我们开启了三个不同的事务,而不是同一个事务。
第一个测试之所以成功,是因为在此过程中没有任何异常发生。虽然在FailureBankService中将Connection的提交模式改为了 手动提交,但是由于两个DAO使用的是各自的Connection对象,所以两个DAO中的Connection依然为默认的自动提交模式。
在第二个测试中,我们给出一个不存在的保险账户id(toNonExistId),就是为了使程序产生异常,然后在assertion语句中验证两 张表均没有任何变化,但是测试在第二个assertion语句处出错。发生异常时,银行账户中的金额已经减少,而虽然程序发生了rollback,但是调 用的是FailureBankService中Connection的rollback,而不是FailureInsuranceDao中 Connection的,对保险账户的操作根本就没有执行,所以保险账户中依然为1000,而银行账户却变为了800。
因此,为了使两个DAO在同一个事务中,我们应该在整个事务处理过程中使用一个Connection对象,在下一篇文章中,我们将讲到通过共享Connection对象的方式达到事务处理的目的。
在本系列的上一篇文 章中,我们看到了一个典型的事务处理失败的案例,其主要原因在于,service层和各个DAO所使用的Connection是不一样的,而JDBC中事 务处理的作用对象正是Connection对象,所以不同DAO中的操作不在同一个事务里面,从而导致事务失败。从中我们得出了教训:要避免这种失败,我 们可以使所有操作共享一个Connection对象,这样应该就没有问题了。
请通过以下方式下载本系列文章的github源代码:
git clone https://github.com/davenkin/java_transaction_workshop.git
在本篇文章中,我们将看到一个成功的,但是丑陋的事务处理方案,它的基本思路是:在service层创建Connection对象,再将该Connection传给各个DAO类,这样就完成了Connection共享的目的。
修改两个DAO类,使他们都接受一个Connection对象,定义UglyBankDao类如下:
package davenkin.step2_ugly; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class UglyBankDao { public void withdraw(int bankId, int amount, Connection connection) throws SQLException { PreparedStatement selectStatement = connection.prepareStatement("SELECT BANK_AMOUNT FROM BANK_ACCOUNT WHERE BANK_ID = ?"); selectStatement.setInt(1, bankId); ResultSet resultSet = selectStatement.executeQuery(); resultSet.next(); int previousAmount = resultSet.getInt(1); resultSet.close(); selectStatement.close(); int newAmount = previousAmount - amount; PreparedStatement updateStatement = connection.prepareStatement("UPDATE BANK_ACCOUNT SET BANK_AMOUNT = ? WHERE BANK_ID = ?"); updateStatement.setInt(1, newAmount); updateStatement.setInt(2, bankId); updateStatement.execute(); updateStatement.close(); } }
使用同样的方法,定义UglyInsuranceDao类:
package davenkin.step2_ugly; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class UglyInsuranceDao { public void deposit(int insuranceId, int amount, Connection connection) throws SQLException { PreparedStatement selectStatement = connection.prepareStatement("SELECT INSURANCE_AMOUNT FROM INSURANCE_ACCOUNT WHERE INSURANCE_ID = ?"); selectStatement.setInt(1, insuranceId); ResultSet resultSet = selectStatement.executeQuery(); resultSet.next(); int previousAmount = resultSet.getInt(1); resultSet.close(); selectStatement.close(); int newAmount = previousAmount + amount; PreparedStatement updateStatement = connection.prepareStatement("UPDATE INSURANCE_ACCOUNT SET INSURANCE_AMOUNT = ? WHERE INSURANCE_ID = ?"); updateStatement.setInt(1, newAmount); updateStatement.setInt(2, insuranceId); updateStatement.execute(); updateStatement.close(); } }
然后修改Service类,在UglyBankService类的transfer方法中,首先创建一个Connection对象,然后在将该对象 依次传给UglyBankDao的withdraw方法和UglyInsuranceDao类的deposit方法,这样service层和DAO层使用 相同的Connection对象。定义UglyBankService类如下:
package davenkin.step2_ugly; import davenkin.BankService; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; public class UglyBankService implements BankService { private DataSource dataSource; private UglyBankDao uglyBankDao; private UglyInsuranceDao uglyInsuranceDao; public UglyBankService(DataSource dataSource) { this.dataSource = dataSource; } public void transfer(int fromId, int toId, int amount) { Connection connection = null; try { connection = dataSource.getConnection(); connection.setAutoCommit(false); uglyBankDao.withdraw(fromId, amount, connection); uglyInsuranceDao.deposit(toId, amount, connection); connection.commit(); } catch (Exception e) { try { assert connection != null; connection.rollback(); } catch (SQLException e1) { e1.printStackTrace(); } } finally { try { assert connection != null; connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } public void setUglyBankDao(UglyBankDao uglyBankDao) { this.uglyBankDao = uglyBankDao; } public void setUglyInsuranceDao(UglyInsuranceDao uglyInsuranceDao) { this.uglyInsuranceDao = uglyInsuranceDao; } }
通过上面共享Connection对象的方法虽然可以完成事务处理的目的,但是这样做法是丑陋的,原因在于:为了完成事务处理的目的,我们需要将一 个底层的Connection类在service层和DAO层之间进行传递,而DAO层的方法也要接受这个Connection对象,这种做法显然是不好 的,这就是典型的API污染。
在下一篇博文中,我们将讲到如何在不传递Connection对象的情况下完成和本文相同的事务处理功能。
在本系列的上一篇文章中我们讲到,要实现在同一个事务中使用相同的Connection对象,我们可以通过传递Connection对象的方式达到共享的目的,但是这种做法是丑陋的。在本篇文章中,我们将引入另外一种机制(ConnectionHolder)来完成事务管理。
这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:
git clone https://github.com/davenkin/java_transaction_workshop.git
ConnectionHolder的工作机制是:我们将Connection对象放在一个全局公用的地方,然后在不同的操作中都从这个地方取得 Connection,从而完成Connection共享的目的,这也是一种ServiceLocator模式,有点像JNDI。定义一个 ConnectionHolder类如下:
package davenkin.step3_connection_holder; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; public class ConnectionHolder { private Map<DataSource, Connection> connectionMap = new HashMap<DataSource, Connection>(); public Connection getConnection(DataSource dataSource) throws SQLException { Connection connection = connectionMap.get(dataSource); if (connection == null || connection.isClosed()) { connection = dataSource.getConnection(); connectionMap.put(dataSource, connection); } return connection; } public void removeConnection(DataSource dataSource) { connectionMap.remove(dataSource); } }
从ConnectionHolder类中可以看出,我们维护了一个键为DataSource、值为Connection的Map,这主要用于使 ConnectionHolder可以服务多个DataSource。在调用getConnection方法时传入了一个DataSource对象,如果 Map里面已经存在该DataSource对应的Connection,则直接返回该Connection,否则,调用DataSource的 getConnection方法获得一个新的Connection,再将其加入到Map中,最后返回该Connection。这样在同一个事务过程中,我 们先后从ConnectionHolder中取得的Connection是相同的,除非在中途我们调用了ConnectionHolder的 removeConnection方法将当前Connection移除掉或者调用了Connection.close()将Connection关闭,然 后在后续的操作中再次调用ConnectionHolder的getConnection方法,此时返回的则是一个新的Connection对象,从而导 致事务处理失败,你应该不会做出这种中途移除或关闭Connection的事情。
然而,虽然我们不会自己手动地在中途移除或者关闭Conncetion对象(当然,在事务处理末尾我们应该关闭Conncetion),我们却无法 阻止其他线程这么做。比如,ConnectionHolder类是可以在多个线程中同时使用的,并且这些线程使用了同一个DataSource,其中一个 线程使用完Connection后便将其关闭,而此时另外一个线程正试图使用这个Connection,问题就出来了。因此,上面的 ConnectionHolder不是线程安全的。
为了获得线程安全的ConnectionHolder类,我们可以引入Java提供的ThreadLocal类,该类保证一个类的实例变量在各个线 程中都有一份单独的拷贝,从而不会影响其他线程中的实例变量。定义一个SingleThreadConnectionHolder类如下:
package davenkin.step3_connection_holder; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; public class SingleThreadConnectionHolder { private static ThreadLocal<ConnectionHolder> localConnectionHolder = new ThreadLocal<ConnectionHolder>(); public static Connection getConnection(DataSource dataSource) throws SQLException { return getConnectionHolder().getConnection(dataSource); } public static void removeConnection(DataSource dataSource) { getConnectionHolder().removeConnection(dataSource); } private static ConnectionHolder getConnectionHolder() { ConnectionHolder connectionHolder = localConnectionHolder.get(); if (connectionHolder == null) { connectionHolder = new ConnectionHolder(); localConnectionHolder.set(connectionHolder); } return connectionHolder; } }
有了一个线程安全的SingleThreadConnectionHolder类,我们便可以在service层和各个DAO中使用该类来获取Connection对象:
Connection connection = SingleThreadConnectionHolder.getConnection(dataSource);
当然,此时我们需要传入一个DataSource,这个DataSource可以作为DAO类的实例变量存在,所以我们不用像上一篇文 章那样将Connection对象直接传给DAO的方法。这里你可能要问,既然可以将DataSource作为实例变量,那么在上一篇文章中,为什么不可 以将Connection也作为实例变量呢,这样不就不会造成丑陋的API了吗?原因在于:将Connection对象作为实例变量同样会带来线程安全问 题,当多个线程同时使用同一个DAO类时,一个线程关闭了Connection而另一个正在使用,这样的问题和上面讲到的 ConnectionHolder的线程安全问题一样。
关于Bank DAO和Insurance DAO类的源代码这里就不列出了,他们和上篇文章只是获得Connection对象的方法不一样而已,你可以参考github源代码。
接下来,我们再来看看TransactionManager类,在上几篇文章中,我们都是在service类中直接写和事务处理相关的代码,而更好的方式是声明一个TransactionManger类将事务处理相关工作集中管理:
package davenkin.step3_connection_holder; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; public class TransactionManager { private DataSource dataSource; public TransactionManager(DataSource dataSource) { this.dataSource = dataSource; } public final void start() throws SQLException { Connection connection = getConnection(); connection.setAutoCommit(false); } public final void commit() throws SQLException { Connection connection = getConnection(); connection.commit(); } public final void rollback() { Connection connection = null; try { connection = getConnection(); connection.rollback(); } catch (SQLException e) { throw new RuntimeException("Couldn't rollback on connection[" + connection + "].", e); } } public final void close() { Connection connection = null; try { connection = getConnection(); connection.setAutoCommit(true); connection.setReadOnly(false); connection.close(); SingleThreadConnectionHolder.removeConnection(dataSource); } catch (SQLException e) { throw new RuntimeException("Couldn't close connection[" + connection + "].", e); } } private Connection getConnection() throws SQLException { return SingleThreadConnectionHolder.getConnection(dataSource); } }
可以看出,TransactionManager对象也维护了一个DataSource实例变量,并且也是通过 SingleThreadConnectionHolder来获取Connection对象的。然后我们在service类中使用该 TransactionManager:
package davenkin.step3_connection_holder; import davenkin.BankService; import javax.sql.DataSource; public class ConnectionHolderBankService implements BankService { private TransactionManager transactionManager; private ConnectionHolderBankDao connectionHolderBankDao; private ConnectionHolderInsuranceDao connectionHolderInsuranceDao; public ConnectionHolderBankService(DataSource dataSource) { transactionManager = new TransactionManager(dataSource); connectionHolderBankDao = new ConnectionHolderBankDao(dataSource); connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource); } public void transfer(int fromId, int toId, int amount) { try { transactionManager.start(); connectionHolderBankDao.withdraw(fromId, amount); connectionHolderInsuranceDao.deposit(toId, amount); transactionManager.commit(); } catch (Exception e) { transactionManager.rollback(); } finally { transactionManager.close(); } } }
在ConnectionHolderBankService中,我们使用TransactionManager来管理事务,由于 TransactionManger和两个DAO类都是使用SingleThreadConnectionHolder来获取Connection,故他 们在整个事务处理过程中使用了相同的Connection对象,事务处理成功。我们也可以看到,在两个DAO的withdraw和deposit方法没有 接受和业务无关的对象,消除了API污染;另外,使用TransactionManager来管理事务,使Service层代码也变简洁了。
在下一篇文章中,我们将讲到使用Template模式来完成事务处理。
在本系列的上一篇文章中,我们讲到了使用TransactionManger和ConnectionHolder完成线程安全的事务管理,在本篇中,我们将在此基础上引入Template模式进行事务管理。
这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:
git clone https://github.com/davenkin/java_transaction_workshop.git
Template模式大家应该都很熟悉,比如Spring就提供了许多Template,像JdbcTemplate和JmsTemplate等。 Template模式的基本思想是:在超类里将完成核心功能的方法声明为抽象方法,留给子类去实现,而在超类中完成一些通用操作,比如JMS的 Session的建立和数据库事务的准备工作等。
在本篇文章中,我们使用一个Template类来帮助管理事务,定义TransactionTemplate类如下:
package davenkin.step4_transaction_template; import davenkin.step3_connection_holder.TransactionManager; import javax.sql.DataSource; public abstract class TransactionTemplate { private TransactionManager transactionManager; protected TransactionTemplate(DataSource dataSource) { transactionManager = new TransactionManager(dataSource); } public void doJobInTransaction() { try { transactionManager.start(); doJob(); transactionManager.commit(); } catch (Exception e) { transactionManager.rollback(); } finally { transactionManager.close(); } } protected abstract void doJob() throws Exception; }
在TransactionTemplate类中定义一个doJobInTransaction方法,在该方法中首先使用 TransactionManager开始事务,然后调用doJob方法完成业务功能,doJob方法为抽象方法,完成业务功能的子类应该实现该方法,最 后,根据doJob方法执行是否成功决定commit事务或是rollback事务。
然后定义使用TransactionTemplate的TransactionTemplateBankService:
package davenkin.step4_transaction_template; import davenkin.BankService; import davenkin.step3_connection_holder.ConnectionHolderBankDao; import davenkin.step3_connection_holder.ConnectionHolderInsuranceDao; import javax.sql.DataSource; public class TransactionTemplateBankService implements BankService { private DataSource dataSource; private ConnectionHolderBankDao connectionHolderBankDao; private ConnectionHolderInsuranceDao connectionHolderInsuranceDao; public TransactionTemplateBankService(DataSource dataSource) { this.dataSource = dataSource; connectionHolderBankDao = new ConnectionHolderBankDao(dataSource); connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource); } public void transfer(final int fromId, final int toId, final int amount) { new TransactionTemplate(dataSource) { @Override protected void doJob() throws Exception { connectionHolderBankDao.withdraw(fromId, amount); connectionHolderInsuranceDao.deposit(toId, amount); } }.doJobInTransaction(); } }
在TransactionTemplateBankService的transfer方法中,我们创建了一个匿名的 TtransactionTemplate类,并且实现了doJob方法,在doJob方法中调用两个DAO完成业务操作,然后调用调用 TransactionTemplateBankService的doJobInTransaction方法。
由于TransactionTemplate只是对上一篇文章中事务处理的一层封装,故TransactionManager和两个DAO类都保持和上一篇中的一样,此时他们都使用SingleThreadConnectionHolder获得Connection,故事务处理成功。
在下一篇文章中,我们会讲到使用Java的动态代理来完成事务处理,这也是Spring管理事务的典型方法。
在本系列的上一篇文 章中,我们讲到了使用Template模式进行事务管理,这固然是一种很好的方法,但是不那么完美的地方在于我们依然需要在service层中编写和事务 处理相关的代码,即我们需要在service层中声明一个TransactionTemplate。在本篇文章中,我们将使用Java提供的动态代理 (Dynamic Proxy)功能来完成事务处理,你将看到无论是在service层还是DAO层都不会有事务处理代码,即他们根本就意识不到事务处理的存在。使用动态代 理完成事务处理也是AOP的一种典型应用。
这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:
git clone https://github.com/davenkin/java_transaction_workshop.git
Java动态代理的基本原理为:被代理对象需要实现某个接口(这是前提),代理对象会拦截对被代理对象的方法调用,在其中可以全然抛弃被代理对象的 方法实现而完成另外的功能,也可以在被代理对象方法调用的前后增加一些额外的功能。在本篇文章中,我们将拦截service层的transfer方法,在 其调用之前加入事务准备工作,然后调用原来的transfer方法,之后根据transfer方法是否执行成功决定commit还是rollback。
首先定义一个TransactionEnabledProxyManager类:
package davenkin.step5_transaction_proxy; import davenkin.step3_connection_holder.TransactionManager; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; public class TransactionEnabledProxyManager { private TransactionManager transactionManager; public TransactionEnabledProxyManager(TransactionManager transactionManager) { this.transactionManager = transactionManager; } public Object proxyFor(Object object) { return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), new TransactionInvocationHandler(object, transactionManager)); } } class TransactionInvocationHandler implements InvocationHandler { private Object proxy; private TransactionManager transactionManager; TransactionInvocationHandler(Object object, TransactionManager transactionManager) { this.proxy = object; this.transactionManager = transactionManager; } public Object invoke(Object o, Method method, Object[] objects) throws Throwable { transactionManager.start(); Object result = null; try { result = method.invoke(proxy, objects); transactionManager.commit(); } catch (Exception e) { transactionManager.rollback(); } finally { transactionManager.close(); } return result; } }
通过调用该类的proxyFor方法,传入需要被代理的对象(本例中为service对象),返回一个代理对象。此后,在调用代理对象的 transfer方法时,会自动调用TransactionIvocationHandler的invoke方法,在该方法中,我们首先开始事务,然后执 行:
result = method.invoke(proxy, objects);
上面一行代码执行的是原service层的transfer方法,如果方法执行成功则commit,否则rollback事务。
由于与事务处理相关的代码都被转移到了代理对象中,在service层中我们只需调用DAO即可:
package davenkin.step5_transaction_proxy; import davenkin.BankService; import davenkin.step3_connection_holder.ConnectionHolderBankDao; import davenkin.step3_connection_holder.ConnectionHolderInsuranceDao; import javax.sql.DataSource; public class BareBankService implements BankService { private ConnectionHolderBankDao connectionHolderBankDao; private ConnectionHolderInsuranceDao connectionHolderInsuranceDao; public BareBankService(DataSource dataSource) { connectionHolderBankDao = new ConnectionHolderBankDao(dataSource); connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource); } public void transfer(final int fromId, final int toId, final int amount) { try { connectionHolderBankDao.withdraw(fromId, amount); connectionHolderInsuranceDao.deposit(toId, amount); } catch (Exception e) { throw new RuntimeException(); } } }
如何,上面的BareBankService中没有任何事务处理的影子,我们只需关注核心业务逻辑即可。
然后在客户代码中,我们需要先创建代理对象(这在Spring中通常是通过配置实现的):
@Test public void transferFailure() throws SQLException { TransactionEnabledProxyManager transactionEnabledProxyManager = new TransactionEnabledProxyManager(new TransactionManager(dataSource)); BankService bankService = new BareBankService(dataSource); BankService proxyBankService = (BankService) transactionEnabledProxyManager.proxyFor(bankService); int toNonExistId = 3333; proxyBankService.transfer(1111, toNonExistId, 200); assertEquals(1000, getBankAmount(1111)); assertEquals(1000, getInsuranceAmount(2222)); }
在上面的测试代码中,我们首先创建一个BareBankService对象,然后调用 transactionEnabledProxyManager的proxyFor方法生成对原BareBankService对象的代理对象,最后在代 理对象上调用transfer方法,测试运行成功。
可以看到,通过以上动态代理实现,BareBankService中的所有public方法都被代理了,即他们都被加入到事务中。这对于 service层中的所有方法都需要和数据库打交道的情况是可以的,本例即如此(有且只有一个transfer方法),然而对于service层中不需要 和数据库打交道的public方法,这样做虽然也不会出错,但是却显得多余。在下一篇文章中,我们将讲到使用Java注解(annotation)的方式来声明一个方法是否需要事务,就像Spring中的Transactional注解一样。
在本系列的上一篇文 章中,我们讲到了使用动态代理的方式完成事务处理,这种方式将service层的所有public方法都加入到事务中,这显然不是我们需要的,需要代理的 只是那些需要操作数据库的方法。在本篇中,我们将讲到如何使用Java注解(Annotation)来标记需要事务处理的方法。
这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:
git clone https://github.com/davenkin/java_transaction_workshop.git
首先定义Transactional注解:
package davenkin.step6_annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface Transactional { }
使用注解标记事务的基本原理为:依然使用上一篇中 讲到的动态代理的方式,只是在InvocationHandler的invoke方法中,首先判断被代理的方法是否标记有Transactional注 解,如果没有则直接调用method.invoke(proxied, objects),否则,先准备事务,在调用method.invoke(proxied, objects),然后根据该方法是否执行成功调用commit或rollback。定义 TransactionEnabledAnnotationProxyManager如下:
package davenkin.step6_annotation; import davenkin.step3_connection_holder.TransactionManager; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; public class TransactionEnabledAnnotationProxyManager { private TransactionManager transactionManager; public TransactionEnabledAnnotationProxyManager(TransactionManager transactionManager) { this.transactionManager = transactionManager; } public Object proxyFor(Object object) { return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), new AnnotationTransactionInvocationHandler(object, transactionManager)); } } class AnnotationTransactionInvocationHandler implements InvocationHandler { private Object proxied; private TransactionManager transactionManager; AnnotationTransactionInvocationHandler(Object object, TransactionManager transactionManager) { this.proxied = object; this.transactionManager = transactionManager; } public Object invoke(Object proxy, Method method, Object[] objects) throws Throwable { Method originalMethod = proxied.getClass().getMethod(method.getName(), method.getParameterTypes()); if (!originalMethod.isAnnotationPresent(Transactional.class)) { return method.invoke(proxied, objects); } transactionManager.start(); Object result = null; try { result = method.invoke(proxied, objects); transactionManager.commit(); } catch (Exception e) { transactionManager.rollback(); } finally { transactionManager.close(); } return result; } }
可以看到,在AnnotationTransactionInvocationHandler的invoke方法中,我们首先获得原service 的transfer方法,然后根据originalMethod.isAnnotationPresent(Transactional.class)判 断该方法是否标记有Transactional注解,如果没有,则任何额外功能都不加,直接调用原来service的transfer方法;否则,将其加 入到事务处理中。
在service层中,我们只需将需要加入事务处理的方法用Transactional注解标记就行了:
package davenkin.step6_annotation; import davenkin.BankService; import davenkin.step3_connection_holder.ConnectionHolderBankDao; import davenkin.step3_connection_holder.ConnectionHolderInsuranceDao; import javax.sql.DataSource; public class AnnotationBankService implements BankService { private ConnectionHolderBankDao connectionHolderBankDao; private ConnectionHolderInsuranceDao connectionHolderInsuranceDao; public AnnotationBankService(DataSource dataSource) { connectionHolderBankDao = new ConnectionHolderBankDao(dataSource); connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource); } @Transactional public void transfer(final int fromId, final int toId, final int amount) { try { connectionHolderBankDao.withdraw(fromId, amount); connectionHolderInsuranceDao.deposit(toId, amount); } catch (Exception e) { throw new RuntimeException(); } } }
然后执行测试:
@Test public void transferFailure() throws SQLException { TransactionEnabledAnnotationProxyManager transactionEnabledAnnotationProxyManager = new TransactionEnabledAnnotationProxyManager(new TransactionManager(dataSource)); BankService bankService = new AnnotationBankService(dataSource); BankService proxyBankService = (BankService) transactionEnabledAnnotationProxyManager.proxyFor(bankService); int toNonExistId = 3333; proxyBankService.transfer(1111, toNonExistId, 200); assertEquals(1000, getBankAmount(1111)); assertEquals(1000, getInsuranceAmount(2222)); }
测试运行成功,如果将AnnotationBankService中transfer方法的Transactional注解删除,那么以上测试将抛 出RuntimeException异常,该异常为transfer方法中我们人为抛出的,也即由于此时没有事务来捕捉异常,程序便直接抛出该异常而终止 运行。在下一篇(本系列最后一篇)文章中,我们将讲到分布式事务的一个入门例子。
在本系列先前的文章中,我们主要讲解了JDBC对本地事务的处理,本篇文章将讲到一个分布式事务的例子。
请通过以下方式下载github源代码:
git clone https://github.com/davenkin/jta-atomikos-hibernate-activemq.git
本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源事务(比如单个数据库),分布式事务可以处理多种异构的数据源,比如某个业务操作中同时包含了JDBC和JMS或者某个操作需要访问多个不同的数据库。
Java通过JTA完成分布式事务,JTA本身只是一种规范,不同的应用服务器都包含有自己的实现(比如JbossJTA),同时还存在独立于应用服务器的单独JTA实现,比如本篇中要讲到的Atomikos。对于JTA的原理,这里不细讲,读者可以通过这篇文章了解相关知识。
在本篇文章中,我们将实现以下一个应用场景:你在网上购物,下了订单之后,订单数据将保存在系统的数据库中,同时为了安排物流,订单信息将以消息(Message)的方式发送到物流部门以便送货。
以上操作同时设计到数据库操作和JMS消息发送,为了使整个操作成为一个原子操作,我们只能选择分布式事务。我们首先设计一个service层,定义OrderService接口:
package davenkin; public interface OrderService { public void makeOrder(Order order); }
为了简单起见,我们设计一个非常简单的领域对象Order:
@XmlRootElement(name = "Order") @XmlAccessorType(XmlAccessType.FIELD) public class Order { @XmlElement(name = "Id",required = true) private long id; @XmlElement(name = "ItemName",required = true) private String itemName; @XmlElement(name = "Price",required = true) private double price; @XmlElement(name = "BuyerName",required = true) private String buyerName; @XmlElement(name = "MailAddress",required = true) private String mailAddress; public Order() { }
为了采用JAXB对Order对象进行Marshal和Unmarshal,我们在Order类中加入了JAXB相关的Annotation。 我们将使用Hibernate来完成数据持久化,然后使用Spring提供的JmsTemplate将Order转成xml后以TextMessage的 形式发送到物流部门的ORDER.QUEUE中。
(一)准备数据库
为了方便,我们将采用Spring提供的embedded数据库,默认情况下Spring采用HSQL作为后台数据库,虽然在本例中我们将采用HSQL的非XA的DataSource,但是通过Atomikos包装之后依然可以参与分布式事务。
SQL脚本包含在createDB.sql文件中:
CREATE TABLE USER_ORDER( ID INT NOT NULL, ITEM_NAME VARCHAR (100) NOT NULL UNIQUE, PRICE DOUBLE NOT NULL, BUYER_NAME CHAR (32) NOT NULL, MAIL_ADDRESS VARCHAR(500) NOT NULL, PRIMARY KEY(ID) );
在Spring中配置DataSource如下:
<jdbc:embedded-database id="dataSource"> <jdbc:script location="classpath:createDB.sql"/> </jdbc:embedded-database>
(二)启动ActiveMQ
我们将采用embedded的ActiveMQ,在测试之前启动ActiveMQ提供的BrokerService,在测试执行完之后关闭BrokerService。
@BeforeClass public static void startEmbeddedActiveMq() throws Exception { broker = new BrokerService(); broker.addConnector("tcp://localhost:61616"); broker.start(); } @AfterClass public static void stopEmbeddedActiveMq() throws Exception { broker.stop(); }
(三)实现OrderService
创建一个DefaultOrderService,该类实现了OrderService接口,并维护一个JmsTemplate和一个Hibernate的SessionFactory实例变量,分别用于Message的发送和数据库处理。
package davenkin; import org.hibernate.SessionFactory; import org.hibernate.classic.Session; import org.springframework.beans.factory.annotation.Required; import org.springframework.jms.core.JmsTemplate; import org.springframework.transaction.annotation.Transactional; public class DefaultOrderService implements OrderService{ private JmsTemplate jmsTemplate; private SessionFactory sessionFactory; @Override @Transactional public void makeOrder(Order order) { Session session = sessionFactory.getCurrentSession(); session.save(order); jmsTemplate.convertAndSend(order); } @Required public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @Required public void setSessionFactory(SessionFactory sessionFactory) { this.sessionFactory = sessionFactory; } }
(四)创建Order的Mapping配置文件
<?xml version="1.0"?> <!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN" "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd"> <hibernate-mapping> <class name="davenkin.Order" table="USER_ORDER"> <id name="id" type="long"> <column name="ID" /> <generator class="increment" /> </id> <property name="itemName" type="string"> <column name="ITEM_NAME" /> </property> <property name="price" type="double"> <column name="PRICE"/> </property> <property name="buyerName" type="string"> <column name="BUYER_NAME"/> </property> <property name="mailAddress" type="string"> <column name="MAIL_ADDRESS"/> </property> </class> </hibernate-mapping>
(五)配置Atomikos事务
在Spring的IoC容器中,我们需要配置由Atomikos提供的UserTransaction和TransactionManager,然后再配置Spring的JtaTransactionManager:
<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce"> <constructor-arg> <props> <prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop> </props> </constructor-arg> </bean> <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransactionService"> <property name="forceShutdown" value="false" /> </bean> <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService"> <property name="transactionTimeout" value="300" /> </bean> <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="userTransactionService"> <property name="transactionManager" ref="atomikosTransactionManager" /> <property name="userTransaction" ref="atomikosUserTransaction" /> </bean> <tx:annotation-driven transaction-manager="jtaTransactionManager" />
(六)配置JMS
对于JMS,为了能使ActiveMQ加入到分布式事务中,我们需要配置ActiveMQXAConnectionFactory,而不是 ActiveMQConnectionFactory,然后再配置JmsTemplate,此外还需要配置MessageConvertor在Order 对象和XML之间互转。
<bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="amqConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init"> <property name="uniqueResourceName" value="XAactiveMQ" /> <property name="xaConnectionFactory" ref="jmsXaConnectionFactory" /> <property name="poolSize" value="5"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="amqConnectionFactory"/> <property name="receiveTimeout" value="2000" /> <property name="defaultDestination" ref="orderQueue"/> <property name="sessionTransacted" value="true" /> <property name="messageConverter" ref="oxmMessageConverter"/> </bean> <bean id="orderQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="ORDER.QUEUE"/> </bean> <bean id="oxmMessageConverter" class="org.springframework.jms.support.converter.MarshallingMessageConverter"> <property name="marshaller" ref="marshaller"/> <property name="unmarshaller" ref="marshaller"/> </bean> <oxm:jaxb2-marshaller id="marshaller"> <oxm:class-to-be-bound name="davenkin.Order"/> </oxm:jaxb2-marshaller>
(七)测试
在测试中,我们首先通过(二)中的方法启动ActiveMQ,再调用DefaultOrderService,最后对数据库和QUEUE进行验证:
@Test public void makeOrder(){ orderService.makeOrder(createOrder()); JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); assertEquals(1, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER")); String dbItemName = jdbcTemplate.queryForObject("SELECT ITEM_NAME FROM USER_ORDER", String.class); String messageItemName = ((Order) jmsTemplate.receiveAndConvert()).getItemName(); assertEquals(dbItemName, messageItemName); } @Test(expected = IllegalArgumentException.class) public void failToMakeOrder() { orderService.makeOrder(null); JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); assertEquals(0, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER")); assertNull(jmsTemplate.receiveAndConvert()); }