Spring JTA分布式事务实现
1.概述
Java Transaction API,通常称为JTA,是用于管理 Java中的事务的API 。它允许我们以资源无关的方式启动,提交和回滚事务。
根据用于管理事务的底层实现,Spring中的事务策略可以分为两个主要部分:
- 单连接器策略(相当于本地事务管理器) - 底层技术使用单连接器。例如,JDBC使用连接级事务、Hibernate以及JDO使用会话级事务。可以应用使用AOP和拦截器的声明式事务管理。
- 多连接器策略(相当于全局事务管理器) - 底层技术具有使用多个连接器的能力。当有这方面需求时,JTA是最好的选择。此策略需要启用JTA的数据源实例。JBossTS、Atomikos、Bitronix都是开源的JTA实现。
JTA的真正强大之处在于它能够在单个事务中管理多个资源(如数据库,消息服务)。
在本文中,我们将从概念层面了解JTA,并了解业务代码通常是如何与JTA交互。
2.通用接口和分布式事务
JTA提供了对业务代码的事务控制(开始,提交和回滚)的抽象。
如果没有这种抽象,我们必须处理每种资源类型的各个API。
例如,我们需要处理JDBC资源。同样,JMS资源可能具有类似但不兼容的模型。
通过JTA,我们可以以一致和协调的方式管理不同类型的多种资源。
作为API,JTA定义了由事务管理器实现的接口和语义 。实现由Atomikos和Bitronix等库提供。
3.示例项目
本例子模拟了银行应用的一个非常简单的转账业务。我们有两个服务:银行账户服务BankAccountService 和操作行为审计服务AuditService,它们使用了两个不同的数据库。
数据库采用的是JAVA内置数据库HSQLDB,这些独立的数据库需要在事务开始,提交或回滚时进行协调。
JTA事务管理器采用Bitronix。
我们的示例项目使用Spring Boot来简化配置:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.1.4.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-bitronix</artifactId>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
</dependency>
</dependencies>
在服务启动时,建立2个数据源:accountDb,auditDb:
@Bean("dataSourceAccount")
public DataSource dataSource() throws Exception {
return createHsqlXADatasource("jdbc:hsqldb:mem:accountDb");
}
@Bean("dataSourceAudit")
public DataSource dataSourceAudit() throws Exception {
return createHsqlXADatasource("jdbc:hsqldb:mem:auditDb");
}
在每个测试方法之前,我们使用脚本分别在每个库下创建表:ACCOUNT和AUDIT_LOG,并初始化一些数据:
ID | BALANCE |
---|---|
a0000001 | 1000 |
a0000002 | 2000 |
4.声明性事务界定
在JTA中处理事务的第一种方法是使用@Transactional注解。
让我们用@Transactional注解服务方法 executeTranser()。 这表明事务管理器开始事务:
@Transactional
public void executeTransfer(String fromAccontId, String toAccountId,
BigDecimal amount) {
bankAccountService.transfer(fromAccontId, toAccountId, amount);
auditService.log(fromAccontId, toAccountId, amount);
BigDecimal balance = bankAccountService.balanceOf(fromAccontId);
if (balance.compareTo(BigDecimal.ZERO) < 0) {
throw new RuntimeException("余额不足!");
}
}
这里 executeTranser()方法调用了2个不同的服务:AccountService和AuditService。这2个服务使用2个不同的数据库。
当 executeTransfer()返回时,该事务管理器识别出它是事务的结束,将作用于两个数据库。
在方法结束时,如果转账人的资金不足, executeTransfer()会检查帐户余额并抛出 RuntimeException异常。
4.1没有异常全部提交
场景1:当从a0000001账户给a0000002账户转账500元时,由于执行金额小于余额,一切正常。行为审计表里增加一条记录。
@Test
public void givenAnnotationTx_whenNoException_thenAllCommitted()
throws Exception {
tellerService.executeTransfer("a0000001", "a0000002",
BigDecimal.valueOf(500));
assertThat(accountService.balanceOf("a0000001"))
.isEqualByComparingTo(BigDecimal.valueOf(500));
assertThat(accountService.balanceOf("a0000002"))
.isEqualByComparingTo(BigDecimal.valueOf(2500));
TransferLog lastTransferLog = auditService.lastTransferLog();
assertThat(lastTransferLog).isNotNull();
assertThat(lastTransferLog.getFromAccountId()).isEqualTo("a0000001");
assertThat(lastTransferLog.getToAccountId()).isEqualTo("a0000002");
assertThat(lastTransferLog.getAmount())
.isEqualByComparingTo(BigDecimal.valueOf(500));
}
4.2出现异常进行回滚
场景2:当从a0000002账户给a0000001账户转账10000元时,由于执行金额大于余额,抛出异常。两个数据库进行回滚,账户余额不变,行为审计表没有数据。
@Test
public void givenAnnotationTx_whenException_thenAllRolledBack()
throws Exception {
assertThatThrownBy(() -> {
tellerService.executeTransfer("a0000002", "a0000001",
BigDecimal.valueOf(100000));
}).hasMessage("余额不足!");
assertThat(accountService.balanceOf("a0000001"))
.isEqualByComparingTo(BigDecimal.valueOf(1000));
assertThat(accountService.balanceOf("a0000002"))
.isEqualByComparingTo(BigDecimal.valueOf(2000));
assertThat(auditService.lastTransferLog()).isNull();
}
5.编程性事务界定
另一种控制JTA事务的方法是通过调用 javax.transaction.UserTransaction以编程方式实现。
public void executeTransferProgrammaticTx(String fromAccontId,
String toAccountId, BigDecimal amount) throws Exception {
userTransaction.begin();
bankAccountService.transfer(fromAccontId, toAccountId, amount);
auditService.log(fromAccontId, toAccountId, amount);
BigDecimal balance = bankAccountService.balanceOf(fromAccontId);
if (balance.compareTo(BigDecimal.ZERO) < 0) {
userTransaction.rollback();
throw new RuntimeException("余额不足!");
} else {
userTransaction.commit();
}
}
在我们的示例中,begin()方法启动了一个新事务。如果余额验证失败,调用rollback(),它将回滚两个数据库。否则,调用commit() 会将更改提交给两个数据库。
需要注意的是 commit()和 rollback()都 结束当前事务。
6.总结
在本文中,我们讨论了JTA试图解决的问题。通过示例工程说明了使用注释和编程方式来控制事务,涉及需要在单个事务中协调2个事务资源。
示例完整代码可以在GitHub上找到。
下一篇: Python中使用中文的方法
推荐阅读
-
Spring Cloud Config实现分布式配置中心
-
利用Spring Cloud Config结合Bus实现分布式配置中心的步骤
-
spring整合atomikos实现分布式事务的方法示例
-
spring事务Propagation及其实现原理详解
-
Spring Cloud Config实现分布式配置中心
-
spring boot整合redis实现shiro的分布式session共享的方法
-
关于分布式事务的实现梳理
-
spring实现jdbctemplate添加事务支持示例
-
Spring实战之使用TransactionProxyFactoryBean实现声明式事务操作示例
-
spring事务Propagation及其实现原理详解