0%

看Spring事务机制的实现

这几天给项目整合动态多数据源,遇到了事务和connection切换相关的问题,发现对Spring事务这块的代码没有仔细看过,就大体看下了Spring事务管理的实现

以下实现来自于sping-boot 2.6.4(spring-core 5.3.16)

这篇文章曾经发在掘金社区,是本人自己发布。

面向应用层的api

spring提供的操作事务的api:

  • 声明式事务,比如最常用的@Transactional注解
  • 使用TransactionManager、TransactionDefinition等类手动处理事务的开始/提交/回滚等操作
  • 使用TransactionTemplate处理事务

其中,TransactionTemplate的使用比较简单,提供了execute()方法用于执行需要在事务进行的操作,对TransactionManager、TransactionDefinition等类的操作都封装在了TransactionTemplate内部

下面将对比手动处理事务和声明式事务的代码

使用JdbcTemplate手动操作事务

JdbcTemplate是spring封装的JDBC操作工具类。可以看到,JdbcTemplate依靠DataSourceUtils类来获取到事务关联的connection完成数据库操作。当用户代码中开启了事务时,JdbcTemplate使用的这一connection即事务关联的connection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// JdbcTemplate类核心的execute函数
public <T> T execute(ConnectionCallback<T> action) throws DataAccessException {
Assert.notNull(action, "Callback object must not be null");

Connection con = DataSourceUtils.getConnection(obtainDataSource());
try {
// Create close-suppressing Connection proxy, also preparing returned Statements.
Connection conToUse = createConnectionProxy(con);
return action.doInConnection(conToUse);
}
catch (SQLException ex) {
// Release Connection early, to avoid potential connection pool deadlock
// in the case when the exception translator hasn't been initialized yet.
String sql = getSql(action);
DataSourceUtils.releaseConnection(con, getDataSource());
con = null;
throw translateException("ConnectionCallback", sql, ex);
}
finally {
DataSourceUtils.releaseConnection(con, getDataSource());
}
}

以下伪代码开启了传播级别为PROPAGATION_REQUIRED的事务,并实现了发生YourException异常时回滚事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 使用 JdbcTemplate 手动处理事务操作的伪代码

// 通过TransactionDefinition配置事务传播属性、隔离级别等
TransactionDefinition yourTxDefinition = new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED);


// 使用TransactionManager手动开启事务
TransactionStatus status = yourTxManagerBean.getTransaction(definition);

try {
// 执行sql
template.update(yourSql1);
template.update(yourSql2);

// 提交事务
manager.commit(status);
} catch (YourException e) {
e.printStackTrace();
// 出现异常时会滚
manager.rollback(status);
} catch (Exception e) {
// do something
}

使用注解@Transactional声明式处理事务

spring提供了@Transactional注解,spring内部自动通过AnnotationTransactionAttributeSource解析@Transactional注解上指定的事务属性;或者通过全局事务的方式,以NameMatchTransactionAttributeSource等方式定义interceptor的匹配规则,并为这些方法指定使用的TransactionAttribute,可全局配置事务

两种方式可以通过spring interceptor自动处理事务,具体的实现类为TransactionInterceptor——不过,这个类中只进行了基本的配置操作,为了方便扩展,aop操作事务的默认实现定义在了其父类TransactionAspectSupport中,核心的函数为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 以下方法只保留了关键流程
// TransactionAspectSupport切面实现自动化事务处理的方法
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {

// 获取事务配置、事务管理器对象
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);

if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
// 响应式事务处理的操作
// ...
}

// 命令式事务处理
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 请求transactionManager创建并开启事务,TransactionManager将会根据事务传播属性配置确定新建事务或继续使用当前事务等操作
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
// 调用方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 方法中抛出异常时,根据事务配置的rollbackOn属性,操作事务提交或回滚
// 异常并不会被捕获,而是继续抛出,方法结束
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}

// 方法未抛出异常的情况,事务正常提交
commitTransactionAfterReturning(txInfo);
return retVal;
}


}

不难发现,TransactionAspectSupport的处理流程与上文实现的使用JdbcTemplate手动处理事务操作的流程是一致的——spring通过aop的方式,为我们自动完成了事务的操作,使得只需要声明一个@Transactional注解就可以使用事务,为了实现与上文JdbcTemplate手动处理事务操作代码相同的效果,只需要:

1
2
3
4
@Transactional(propagation = Propagation.REQUIRED,rollbackFor = YourException.class)
public void yourSqlExecFunction(){
//...
}

多事务切换与传播机制的实现

使用面向应用的api,可以方便的使用事务。在上面的示例代码以及TransactionAspectSupport的核心处理代码中,只包含了当前的当个事务的处理逻辑,并不涉及spring提供的多个事务之间的传播机制和事务切换。这一部分已经由spring事务管理器做了封装。

事务管理有两套实现,比如核心的事务管理器:

  • PlatformTransactionManager,命令式的实现(imperative)
  • ReactiveTransactionManager,响应式的实现(reactive)

这里只看PlatformTransactionManager在单数据源下的实现

事务管理关键的类

事务管理器AbstractPlatformTransactionManager

Abstract base class that implements Spring’s standard transaction workflow, serving as basis for concrete platform transaction managers。是事务管理器的抽象实现,定义了核心的事务处理流程的函数接口,包括事务各状态处理的关键的函数,如begin(),commit(),rollback()

需要说明的是,这里提到的事务,应当区分出是逻辑上的事务还是真正关联到数据库connection的事务:

  • 逻辑上的事务,这个称呼可能不太准确,应当称之为事务状态——比如,方法A、B都声明了开启事务;A中调用了B;A和B的事务隔离级别配置不同。那么,两个函数关联的事务状态是不同的,具有不同的事务属性,也可能关联到了不同的数据库事务资源。事务状态使用TransactionStatus类记录,由AbstractPlatformTransactionManager负责管理
  • 实际的事务,数据库connection上开启的事务,关联到实际的数据库资源

AbstractPlatformTransactionManager只作为一个基础的模版类,负责管理事务状态(在AbstractPlatformTransactionManager类源码中完全没有出现connection)。作为实现事务传播机制的核心类,存在多个事务的切换时,能否切换、是否开启新事务、是否挂起/恢复、事务状态的保存,都已经由AbstractPlatformTransactionManager封装。

对于多个事务之间的切换,已经由AbstractPlatformTransactionManager完成了事务状态的处理,需要真正操作数据库事务资源的步骤,则留出函数,供子类面向不同的datasource扩展实现具体的事务资源(datasource,connection等)的管理和操作。比如,新建一个connection并操作connection开启事务(set autocommit = false);操作connection提交/回滚一个数据库事务

AbstractPlatformTransactionManager的子类:

  • DataSourceTransactionManager,面向单个jdbc数据源的实现
  • JtaTransactionManager,JTA的实现,即多数据源的分布式事务处理

事务状态TransactionStatus

事务状态的抽象接口,AbstractPlatformTransactionManager使用的事务状态类,基本实现为DefaultTransactionStatus,关键字段:

  • Object transaction,事务对象,比如在DataSourceTransactionManager实现中使用transaction存储事务关联的数据库connection
  • boolean newTransaction,当前是否是新开启的事务
  • boolean newSynchronization,是否是新开始同步的事务
  • boolean readOnly,事务是否是readonly的
  • Object suspendedResources,记录当前事务之前挂起的事务对象,用于suspend后的resume恢复

其中关键的是suspendedResources,用于实现事务的suspend/resume

事务同步管理器TransactionSynchronizationManager

  1. 提供了基于threadlocal的事务状态、事务资源对象存储。将事务状态、事务资源绑定到threadlocal,从而在事务处理的过程中不需要考虑多线程之间的同步问题
  2. 可存储事务同步器TransactionSynchronization实例,其中提供了事务生命周期中部分状态的回调,比如:beforeCommit(boolean readOnly),afterCompletion(int status)

对于TransactionSynchronizationManager管理的事务资源connection,spring提供了工具类DataSourceUtils来操作

一个事务资源管理器的实现:DataSourceTransactionManager

管理事务关联的connection,提供了对单个JDBC数据源的事务资源的操作函数的实现

DataSourceTransactionManager对connection的管理中,会通过TransactionSynchronizationManager在启动事务时绑定一个connection,除非发生事务挂起/恢复,否则不会切换connection,而是一直使用同个connection

为什么AbstractPlatformTransactionManager封装了事务状态的切换,却不负责connection的切换?当然不能,AbstractPlatformTransactionManager是一个顶层的抽象模版,需要支持扩展出单数据源和多数据源,而多数据源的情况下显然不会是DataSourceTransactionManager中这样一直使用同个connection的情况

从事务创建流程看传播机制的处理

对于AbstractPlatformTransactionManager中对事务状态的处理流程,传播机制相关的处理主要是在创建/结束事务时,对应函数为getTransaction()/cleanupAfterCompletion(),抽取getTransaction()关键的部分给出了以下流程图:

新事务创建流程.png

事务的挂起与恢复

当A调用B,B调用C,并且A/B/C都声明开启事务时,那么涉及到多个事务的切换。在上一部分的事务创建流程可以看到,对不同的传播机制,大多是不需要开启新事务的,比如默认的REQUIRED,从始至终加入到同一个connection的同一个事务中,统一提交/回滚

而对REQUIERD_NEW的情况,要求每次开启一个新事务,那么对A->B->C的情况,就需要有一个栈来记录事务,每次开启新事务前,需要先suspend将原事务入栈;每次提交/回滚栈顶事务后,除非是最外层函数关联的事务,否则需要恢复到之前的事务继续进行处理

在事务状态层面,AbstractPlatformTransactionManager使用的TransactionStatus对象通过suspendedResources字段来实现对前一个事务的记录:每次开启一个新事务时,初始化一个TransactionStatus对象B,并将suspendedResources指向挂起的原TransactionStatus对象A;事务B处理完成后,从suspendedResources中恢复A,继续处理A。即通过suspendedResources属性,用前驱链表方便的实现了一个栈用于挂起-恢复

在上一部分的事务创建流程中记录了suspend原事务、启动新事务的过程,对应的可以在AbstractPlatformTransactionManager类的cleanupAfterCompletion()方法中看到,如果栈不为空,则会执行resume操作出栈前一事务继续处理

DataSourceTransactionManager对事务关联的connection的管理

对REQUIRED之类的传播属性,多个函数实际在同一个事务中统一提交/回滚。这种情况下,必须能够保证整个过程是使用的同一个connection

而对REQUIRE_NEW的情况,需要管理多个数据库事务,需要切换多个connection,那么同样的需要一个栈,来记录执行过程中每个事务关联的connection

在DataSourceTransactionManager中,借助TransactionSynchronizationManager提供的threadlocal存储,可以方便的保证线程执行中使用同一个connection,可以参看doBegin()方法:如果TransactionSynchronizationManager(threadlocal)中存储了connection对象,那么直接取用;否则才获取新的connection,以下摘取了关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;

// 尝试从TransactionSynchronizationManager获取connection,不存在时新建connection
// 对于前一个事务挂起的情况,在执行挂起操作的suspend函数中,会将TransactionSynchronizationManager绑定的资源清空,此处会新建connection
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();

// 根据TransactionDefinition参数,设置connection属性
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());

// 启动事务
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}

//...

// 绑定事务connection到TransactionSynchronizationManager
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}

catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}

而对需要切换connection的情况,那么则需要在父类AbstractPlatformTransactionManager通知切换事务——suspend/resume时,相应的完成connection的切换,如下:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
protected Object doSuspend(Object transaction) {
// 挂起事务时,将TransactionSynchronizationManager绑定的资源清空,之后启动事务时则会新建connection
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

@Override
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}