事务拦截器(TransactionInterceptor)
事务拦截器的invoke方法如下所示:
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
1: TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
2: final PlatformTransactionManager tm = determineTransactionManager(txAttr);
3: final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
4: if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
4.1:TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
4.2: retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
4.3: completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
4.4: cleanupTransactionInfo(txInfo);
}
4.5:commitTransactionAfterReturning(txInfo);
return retVal;
}
5: else {
final ThrowableHolder throwableHolder = new ThrowableHolder();
try {
5.1: Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}
执行流程主要包括获取当前方法的事务标签,获取事务管理器,生成方法限定符,判断是声明式事务还是编程式事务。
-
当前方法的事务标签,找不到就去找类和接口的声明式事务标签
@Transactional
注解 -
从当前BeanFactory获取
PlatformTransactionManager
对象 -
生成方法限定符格式:fully qualified interface/class name + "." + method name
-
处理事务标签是空的或者管理器不是
CallbackPreferringPlatformTransactionManager
的声明式事务
4.1 如果有事务标签,则创建事务,并且把当前的事务信息绑定当前线程存放在ThreadLocal
中,没有则不创建事务。
4.2 调用目标方法。
4.3 遇到异常是RuntimeException
或者Error
,或者遇到 TransactionStatus.isRollbackOnly() is true
会回滚,否则都会提交
4.4 重置当事务管理其中当前ThreadLocal
的事务信息
4.5 无异常当前有事务正常进行commit,无事务则不执行
- 处理编程式事务
5.1 绑定TransactionInfo
在当前线程.用于之后的编程式事务提交回滚。
事务管理器
Spring并不直接管理事务,而是提供了多种事务管理器,他们将事务管理的职责委托给Hibernate或者JTA等持久化机制所提供的相关平台框架的事务来实现。
Spring事务管理器的接口是org.springframework.transaction.PlatformTransactionManager,通过这个接口,Spring为各个平台如JDBC、Hibernate等都提供了对应的事务管理器.
PlatformTransactionManager
事务管理器 抽象了提交和回滚的方法,在AbstractPlatformTransactionManager
抽象类中做了模版实现。
TransactionDefinition
该接口定义了事务的传播行为和隔离级别,超时时间,是否只读的常量,提供了4个方法。用于返回隔离级别,超时和是否只读 和传播行为。
传播行为 | 含义 |
---|---|
PROPAGATION_REQUIRED | 表示当前方法必须运行在事务中。如果当前事务存在,方法将会在该事务中运行。否则,会启动一个新的事务 |
PROPAGATION_SUPPORTS | 表示当前方法不需要事务上下文,但是如果存在当前事务的话,那么该方法会在这个事务中运行 |
PROPAGATION_MANDATORY | 表示该方法必须在事务中运行,如果当前事务不存在,则会抛出一个异常 |
PROPAGATION_REQUIRED_NEW | 表示当前方法必须运行在它自己的事务中。一个新的事务将被启动。如果存在当前事务,在该方法执行期间,当前事务会被挂起。如果使用JTATransactionManager的话,则需要访问TransactionManager |
PROPAGATION_NOT_SUPPORTED | 表示该方法不应该运行在事务中。如果存在当前事务,在该方法运行期间,当前事务将被挂起。如果使用JTATransactionManager的话,则需要访问TransactionManager |
PROPAGATION_NEVER | 表示当前方法不应该运行在事务上下文中。如果当前正有一个事务在运行,则会抛出异常 |
PROPAGATION_NESTED | 表示如果当前已经存在一个事务,那么该方法将会在嵌套事务中运行。嵌套的事务可以独立于当前事务进行单独地提交或回滚。如果当前事务不存在,那么其行为与PROPAGATION_REQUIRED一样。注意各厂商对这种传播行为的支持是有所差异的。可以参考资源管理器的文档来确认它们是否支持嵌套事务 |
TransactionStatus
TransactionStatus主要是体现在PlatformTransactionManager的getTranscation
方法的返回。内部定义了6个方法。
-
isNewTransaction用于判断当前返回的是否是新事务
-
hasSavepoint,用于嵌套事务时使用
-
setRollbackOnly,设置事务只回滚,当是遇到异常的情况。设置该属性,管理器事务出发底层提供事务实现的组建进行回滚,
-
isRollbackOnly 返回事务是否标记为只回滚
-
flush 把会话级别的数据持久化道数据存储介质中
-
complete 用于判断commit活着rollback是否完成。
AbstractPlatformTransactionManager
提供了以下功能:
- 确定是否存在现有事务;
- 应用合适的传播行为;
- 在必要时暂停和恢复事务;
- 在提交时检查只回滚标志;
- 对回滚应用适当的修改(实际回滚或仅设置回滚)
- 触发注册的同步回调(如果事务同步是活动的)。
首先来看下AbstractPlatformTransactionManager内部定义的属性字段
-
SYNCHRONIZATION_ALWAYS
就算是在PROPAGATION_NOT_SUPPORTED,PROPAGATION_NEVER,PROPAGATION_NOT_SUPPORT
ED这三个传播行为中,也是需要事务同步
-
SYNCHRONIZATION_ON_ACTUAL_TRANSACTION
只有在当前事务采取进行事务同步,主要应用于PROPAGATION_REQUIRED,PROPAGATION_MANDATORY,PROPAGATION_REQUIRED_NEW这三个传播行为。
-
SYNCHRONIZATION_NEVER
不进行事务同步,即使有事务也不进行
-
transactionSynchronization
-
defaultTimeout
设置事务超时时间,默认-1,不同的事务管理器有不同的默认超时时间,JTA默认是30S;
-
nestedTransactionAllowed
是否允许是嵌套事务
-
validateExistingTransaction
验证当前存在的事务
-
globalRollbackOnParticipationFailure
设置将现有事务全局标记为是否回滚
-
failEarlyOnGlobalRollbackOnly
设置在当前事务被全局标记的情况下是否提前失败。默认false
-
rollbackOnCommitFailure
设置是否应在提交调用失败时执行回滚。通常没有必要,因此可以避免,因为它可能会覆盖提交异常与后续回滚异常
创建事务
事物的创建是在TransactionInterceptor#createTransactionIfNecessary
方法中。主要做了3件事
-
根据传入的事务标签创建DelegatingTransactionAttribute对象
-
获取事务
-
准备事务上下文
获取事务,准备事务上下文
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
实际上获取事务最终是通过DataSourceTransactionManager
的doGetTransaction
创建基于JDBC的物理事务。拿到事务之后会去判断当前线程是否存在事务,存在则去处理嵌套事务,如果事务超时 就设置验证,校验事物的传播行为,创建DefaultTransactionStatus
对象,执行prepareTransactionStatus
方法主要是为了填充信息,如果是新事物则需要绑定当前线程,存放在TransactionSynchronizationManager
内部的几个ThreadLocal当中。真正的数据库的事务(物理事物)开启是在doBegin
方法中执行的。如设置隔离级别,超时等作,还有如果数据库默认是自动提交事务autoCommit
,Spring则会更改为有Spring来控制。关闭自动提交。
在默认的传播行为中PROPAGATION_REQUIRED表示当前方法必须运行在事务中。如果当前事务存在,方法将会在该事务中运行。否则,会启动一个新的事务。若当前存在事务则回去执行handleExistingTransaction(definition, transaction, debugEnabled);
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
-
如果此时事务传播特性是NEVER,则抛出异常。
-
如果此时事务的传播特性是NOT_SUPPORTED,则调用suspend(transaction)挂起当前事务,将被挂起的资源suspendedResources放入事务状态里。
-
如果此时事务状态是REQUIRES_NEW,则调用suspend(transaction)挂起当前事务,将事务对象transaction和被挂起的资源suspendedResources放入事务状态里。然后调用doBegin(transaction, definition)方法去真正打开事务。最后调用prepareSynchronization(status, definition)方法准备一下事务同步。
-
如果此时事务的传播特性是NESTED,又分三种情况:
-
如果不允许嵌套事务,直接抛出异常。
- 如果使用保存点(Savepoint)来实现嵌套事务,那直接使用当前事务,创建一个保存点就可以了。
- 如果使用新的事务来实现嵌套事务,那就调用doBegin(transaction, definition)开启新的事务,此时不需要挂起当前事务。
- 对于剩下三种传播特性REQUIRED/MANDATORY/SUPPORTS,则不需要创建新事务,直接使用当前事务就可以了。
到此为此 我们彻底明白了事务的创建机制了吧。通过@Import
注解导入了扩展点下的AbstractPlatformTransactional
和DatasourceTransactionalManager
进行实例化。最后依托bean的postProcessAfterInitialization来实现创建代理对象,创建的过程中首先找寻对应方法或者类级别的事务标签也就是@Transactional
注解,同时应用对应的增强。由于TransactionInterceptor
的存在。当方法被触发执行时,根据逻辑事务的传播行为来开启数据库的物理事务。同时进行物理事务和逻辑事务进行同步,也就是常说的所谓的把事务托管给Spring。接下来来看对应的回滚和提交事务。
回滚事务
在事务回滚针对异常默认只处理运行时异常或者是Error才会触发回滚,反之则会提交事务,除非我们在@Transactional
注解之中指定rollbackFor=Exception.class
。
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}
回滚事务的逻辑在processRollback
方法中,在执行回滚操作是首先会触发执行TransactionalManager
中的自定义回调函数triggerBeforeCompletion
,等等会滚前会滚后的扩展。撇开这些之后就是真正回滚处理,如是含有保存点的使用保存点来回滚,如果是新事物直接回退。调用jdbcConnection.rollback
来回退物理事务 .最后在finally语句块中执行清除当前线程的事务信息的工作,如果针对新事务,回去是否当前线程的数据库的连接关系,重置数据库连接。如果是该事物之前还存在之前挂起的事务,则会恢复挂起的资源。
提交事务
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
同样在提交事务之处也是在在processCommit
方法中。同样会有TransactionalManager
中的自定义回调函数triggerAfterCommit
,等等事务提交前提交后的扩展。独立的新事物,直接调用物理事务进行提交jdbcConnection.commit
,针对保存点的事务处理,则回去清除保存点信息,你是否有写迷惑呢,那我嵌套事务何时提交啊。你仔细一想就会发现如果此时提交了嵌套事务,当外部事务回滚,这个嵌套的在语义下必然是需要回滚的。你倘若这里提交了,不就完犊子了吗。所以嵌套事务会跟随外部事务的提交而提交。