Spring源码分析-SpringBoot事务(二)

Scroll Down

事务拦截器(TransactionInterceptor)

事务拦截器的invoke方法如下所示:

    @Override
	@Nullable
	public Object invoke(MethodInvocation invocation) throws Throwable {
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
	}
	protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {
	1:	TransactionAttributeSource tas = getTransactionAttributeSource();
		final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
	2:	final PlatformTransactionManager tm = determineTransactionManager(txAttr);
	3:	final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
	4:  if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
		4.1:TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
			Object retVal;
			try {
		4.2:	retVal = invocation.proceedWithInvocation();
			}
			catch (Throwable ex) {
		4.3:	completeTransactionAfterThrowing(txInfo, ex);
				throw ex;
			}
			finally {
		4.4:	cleanupTransactionInfo(txInfo);
			}
		4.5:commitTransactionAfterReturning(txInfo);
			return retVal;
		}
	5: else {
			final ThrowableHolder throwableHolder = new ThrowableHolder();
			try {
			5.1: Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
					TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
					try {
						return invocation.proceedWithInvocation();
					}
					catch (Throwable ex) {
						if (txAttr.rollbackOn(ex)) {
							if (ex instanceof RuntimeException) {
								throw (RuntimeException) ex;
							}
							else {
								throw new ThrowableHolderException(ex);
							}
						}
						else {
							// A normal return value: will lead to a commit.
							throwableHolder.throwable = ex;
							return null;
						}
					}
					finally {
						cleanupTransactionInfo(txInfo);
					}
				});
				if (throwableHolder.throwable != null) {
					throw throwableHolder.throwable;
				}
				return result;
			}
			catch (ThrowableHolderException ex) {
				throw ex.getCause();
			}
			catch (TransactionSystemException ex2) {
				if (throwableHolder.throwable != null) {
					logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
					ex2.initApplicationException(throwableHolder.throwable);
				}
				throw ex2;
			}
			catch (Throwable ex2) {
				if (throwableHolder.throwable != null) {
					logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
				}
				throw ex2;
			}
		}
	}

执行流程主要包括获取当前方法的事务标签,获取事务管理器,生成方法限定符,判断是声明式事务还是编程式事务。

  1. 当前方法的事务标签,找不到就去找类和接口的声明式事务标签@Transactional注解

  2. 从当前BeanFactory获取PlatformTransactionManager对象

  3. 生成方法限定符格式:fully qualified interface/class name + "." + method name

  4. 处理事务标签是空的或者管理器不是CallbackPreferringPlatformTransactionManager的声明式事务

4.1 如果有事务标签,则创建事务,并且把当前的事务信息绑定当前线程存放在ThreadLocal中,没有则不创建事务。

4.2 调用目标方法。

4.3 遇到异常是RuntimeException或者Error,或者遇到 TransactionStatus.isRollbackOnly() is true 会回滚,否则都会提交

4.4 重置当事务管理其中当前ThreadLocal的事务信息

4.5 无异常当前有事务正常进行commit,无事务则不执行

  1. 处理编程式事务

5.1 绑定TransactionInfo在当前线程.用于之后的编程式事务提交回滚。

事务管理器

Spring并不直接管理事务,而是提供了多种事务管理器,他们将事务管理的职责委托给Hibernate或者JTA等持久化机制所提供的相关平台框架的事务来实现。
Spring事务管理器的接口是org.springframework.transaction.PlatformTransactionManager,通过这个接口,Spring为各个平台如JDBC、Hibernate等都提供了对应的事务管理器.

PlatformTransactionManager

事务管理器 抽象了提交和回滚的方法,在AbstractPlatformTransactionManager抽象类中做了模版实现。

TransactionDefinition

该接口定义了事务的传播行为和隔离级别,超时时间,是否只读的常量,提供了4个方法。用于返回隔离级别,超时和是否只读 和传播行为。

传播行为含义
PROPAGATION_REQUIRED表示当前方法必须运行在事务中。如果当前事务存在,方法将会在该事务中运行。否则,会启动一个新的事务
PROPAGATION_SUPPORTS表示当前方法不需要事务上下文,但是如果存在当前事务的话,那么该方法会在这个事务中运行
PROPAGATION_MANDATORY表示该方法必须在事务中运行,如果当前事务不存在,则会抛出一个异常
PROPAGATION_REQUIRED_NEW表示当前方法必须运行在它自己的事务中。一个新的事务将被启动。如果存在当前事务,在该方法执行期间,当前事务会被挂起。如果使用JTATransactionManager的话,则需要访问TransactionManager
PROPAGATION_NOT_SUPPORTED表示该方法不应该运行在事务中。如果存在当前事务,在该方法运行期间,当前事务将被挂起。如果使用JTATransactionManager的话,则需要访问TransactionManager
PROPAGATION_NEVER表示当前方法不应该运行在事务上下文中。如果当前正有一个事务在运行,则会抛出异常
PROPAGATION_NESTED表示如果当前已经存在一个事务,那么该方法将会在嵌套事务中运行。嵌套的事务可以独立于当前事务进行单独地提交或回滚。如果当前事务不存在,那么其行为与PROPAGATION_REQUIRED一样。注意各厂商对这种传播行为的支持是有所差异的。可以参考资源管理器的文档来确认它们是否支持嵌套事务

TransactionStatus

TransactionStatus主要是体现在PlatformTransactionManager的getTranscation方法的返回。内部定义了6个方法。

  • isNewTransaction用于判断当前返回的是否是新事务

  • hasSavepoint,用于嵌套事务时使用

  • setRollbackOnly,设置事务只回滚,当是遇到异常的情况。设置该属性,管理器事务出发底层提供事务实现的组建进行回滚,

  • isRollbackOnly 返回事务是否标记为只回滚

  • flush 把会话级别的数据持久化道数据存储介质中

  • complete 用于判断commit活着rollback是否完成。

AbstractPlatformTransactionManager

提供了以下功能:

  • 确定是否存在现有事务;
  • 应用合适的传播行为;
  • 在必要时暂停和恢复事务;
  • 在提交时检查只回滚标志;
  • 对回滚应用适当的修改(实际回滚或仅设置回滚)
  • 触发注册的同步回调(如果事务同步是活动的)。

首先来看下AbstractPlatformTransactionManager内部定义的属性字段

  • SYNCHRONIZATION_ALWAYS

    就算是在PROPAGATION_NOT_SUPPORTED,PROPAGATION_NEVER,PROPAGATION_NOT_SUPPORT

    ED这三个传播行为中,也是需要事务同步

  • SYNCHRONIZATION_ON_ACTUAL_TRANSACTION

    只有在当前事务采取进行事务同步,主要应用于PROPAGATION_REQUIRED,PROPAGATION_MANDATORY,PROPAGATION_REQUIRED_NEW这三个传播行为。

  • SYNCHRONIZATION_NEVER

    不进行事务同步,即使有事务也不进行

  • transactionSynchronization

  • defaultTimeout

    设置事务超时时间,默认-1,不同的事务管理器有不同的默认超时时间,JTA默认是30S;

  • nestedTransactionAllowed

    是否允许是嵌套事务

  • validateExistingTransaction

    验证当前存在的事务

  • globalRollbackOnParticipationFailure

    设置将现有事务全局标记为是否回滚

  • failEarlyOnGlobalRollbackOnly

    设置在当前事务被全局标记的情况下是否提前失败。默认false

  • rollbackOnCommitFailure

    设置是否应在提交调用失败时执行回滚。通常没有必要,因此可以避免,因为它可能会覆盖提交异常与后续回滚异常

创建事务

事物的创建是在TransactionInterceptor#createTransactionIfNecessary方法中。主要做了3件事

  • 根据传入的事务标签创建DelegatingTransactionAttribute对象

  • 获取事务

  • 准备事务上下文

获取事务,准备事务上下文

	public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
		Object transaction = doGetTransaction();
		// Cache debug flag to avoid repeated checks.
		boolean debugEnabled = logger.isDebugEnabled();
		if (definition == null) {
			// Use defaults if no transaction definition given.
			definition = new DefaultTransactionDefinition();
		}
		if (isExistingTransaction(transaction)) {
			// Existing transaction found -> check propagation behavior to find out how to behave.
			return handleExistingTransaction(definition, transaction, debugEnabled);
		}
		// Check definition settings for new transaction.
		if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
		}
		// No existing transaction found -> check propagation behavior to find out how to proceed.
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
		else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			SuspendedResourcesHolder suspendedResources = suspend(null);
			if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
			}
			try {
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException | Error ex) {
				resume(null, suspendedResources);
				throw ex;
			}
		}
		else {
			// Create "empty" transaction: no actual transaction, but potentially synchronization.
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + definition);
			}
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
		}
	}

AnnotationAwareAspectJAutoProxyCreator
实际上获取事务最终是通过DataSourceTransactionManagerdoGetTransaction创建基于JDBC的物理事务。拿到事务之后会去判断当前线程是否存在事务,存在则去处理嵌套事务,如果事务超时 就设置验证,校验事物的传播行为,创建DefaultTransactionStatus对象,执行prepareTransactionStatus方法主要是为了填充信息,如果是新事物则需要绑定当前线程,存放在TransactionSynchronizationManager内部的几个ThreadLocal当中。真正的数据库的事务(物理事物)开启是在doBegin方法中执行的。如设置隔离级别,超时等作,还有如果数据库默认是自动提交事务autoCommit,Spring则会更改为有Spring来控制。关闭自动提交。

在默认的传播行为中PROPAGATION_REQUIRED表示当前方法必须运行在事务中。如果当前事务存在,方法将会在该事务中运行。否则,会启动一个新的事务。若当前存在事务则回去执行handleExistingTransaction(definition, transaction, debugEnabled);

private TransactionStatus handleExistingTransaction(
			TransactionDefinition definition, Object transaction, boolean debugEnabled)
			throws TransactionException {

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
			throw new IllegalTransactionStateException(
					"Existing transaction found for transaction marked with propagation 'never'");
		}

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction");
			}
			Object suspendedResources = suspend(transaction);
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(
					definition, null, false, newSynchronization, debugEnabled, suspendedResources);
		}

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction, creating new transaction with name [" +
						definition.getName() + "]");
			}
			SuspendedResourcesHolder suspendedResources = suspend(transaction);
			try {
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException | Error beginEx) {
				resumeAfterBeginException(transaction, suspendedResources, beginEx);
				throw beginEx;
			}
		}

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			if (!isNestedTransactionAllowed()) {
				throw new NestedTransactionNotSupportedException(
						"Transaction manager does not allow nested transactions by default - " +
						"specify 'nestedTransactionAllowed' property with value 'true'");
			}
			if (debugEnabled) {
				logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
			}
			if (useSavepointForNestedTransaction()) {
				// Create savepoint within existing Spring-managed transaction,
				// through the SavepointManager API implemented by TransactionStatus.
				// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
				DefaultTransactionStatus status =
						prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
				status.createAndHoldSavepoint();
				return status;
			}
			else {
				// Nested transaction through nested begin and commit/rollback calls.
				// Usually only for JTA: Spring synchronization might get activated here
				// in case of a pre-existing JTA transaction.
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, null);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
		}

		// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
		if (debugEnabled) {
			logger.debug("Participating in existing transaction");
		}
		if (isValidateExistingTransaction()) {
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
				Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
					Constants isoConstants = DefaultTransactionDefinition.constants;
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] specifies isolation level which is incompatible with existing transaction: " +
							(currentIsolationLevel != null ?
									isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
									"(unknown)"));
				}
			}
			if (!definition.isReadOnly()) {
				if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] is not marked as read-only but existing transaction is");
				}
			}
		}
		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
	}
  • 如果此时事务传播特性是NEVER,则抛出异常。

  • 如果此时事务的传播特性是NOT_SUPPORTED,则调用suspend(transaction)挂起当前事务,将被挂起的资源suspendedResources放入事务状态里。

  • 如果此时事务状态是REQUIRES_NEW,则调用suspend(transaction)挂起当前事务,将事务对象transaction和被挂起的资源suspendedResources放入事务状态里。然后调用doBegin(transaction, definition)方法去真正打开事务。最后调用prepareSynchronization(status, definition)方法准备一下事务同步。

  • 如果此时事务的传播特性是NESTED,又分三种情况:

  • 如果不允许嵌套事务,直接抛出异常。

    • 如果使用保存点(Savepoint)来实现嵌套事务,那直接使用当前事务,创建一个保存点就可以了。
    • 如果使用新的事务来实现嵌套事务,那就调用doBegin(transaction, definition)开启新的事务,此时不需要挂起当前事务。
    • 对于剩下三种传播特性REQUIRED/MANDATORY/SUPPORTS,则不需要创建新事务,直接使用当前事务就可以了。

到此为此 我们彻底明白了事务的创建机制了吧。通过@Import注解导入了扩展点下的AbstractPlatformTransactionalDatasourceTransactionalManager进行实例化。最后依托bean的postProcessAfterInitialization来实现创建代理对象,创建的过程中首先找寻对应方法或者类级别的事务标签也就是@Transactional注解,同时应用对应的增强。由于TransactionInterceptor的存在。当方法被触发执行时,根据逻辑事务的传播行为来开启数据库的物理事务。同时进行物理事务和逻辑事务进行同步,也就是常说的所谓的把事务托管给Spring。接下来来看对应的回滚和提交事务。

回滚事务

在事务回滚针对异常默认只处理运行时异常或者是Error才会触发回滚,反之则会提交事务,除非我们在@Transactional注解之中指定rollbackFor=Exception.class

	private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
		try {
			boolean unexpectedRollback = unexpected;

			try {
				triggerBeforeCompletion(status);

				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Rolling back transaction to savepoint");
					}
					status.rollbackToHeldSavepoint();
				}
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction rollback");
					}
					doRollback(status);
				}
				else {
					// Participating in larger transaction
					if (status.hasTransaction()) {
						if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
							if (status.isDebug()) {
								logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
							}
							doSetRollbackOnly(status);
						}
						else {
							if (status.isDebug()) {
								logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
							}
						}
					}
					else {
						logger.debug("Should roll back transaction but cannot - no transaction available");
					}
					// Unexpected rollback only matters here if we're asked to fail early
					if (!isFailEarlyOnGlobalRollbackOnly()) {
						unexpectedRollback = false;
					}
				}
			}
			catch (RuntimeException | Error ex) {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				throw ex;
			}

			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

			// Raise UnexpectedRollbackException if we had a global rollback-only marker
			if (unexpectedRollback) {
				throw new UnexpectedRollbackException(
						"Transaction rolled back because it has been marked as rollback-only");
			}
		}
		finally {
			cleanupAfterCompletion(status);
		}
	}

回滚事务的逻辑在processRollback方法中,在执行回滚操作是首先会触发执行TransactionalManager中的自定义回调函数triggerBeforeCompletion,等等会滚前会滚后的扩展。撇开这些之后就是真正回滚处理,如是含有保存点的使用保存点来回滚,如果是新事物直接回退。调用jdbcConnection.rollback来回退物理事务 .最后在finally语句块中执行清除当前线程的事务信息的工作,如果针对新事务,回去是否当前线程的数据库的连接关系,重置数据库连接。如果是该事物之前还存在之前挂起的事务,则会恢复挂起的资源。

提交事务

	protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
		if (txInfo != null && txInfo.getTransactionStatus() != null) {
			if (logger.isTraceEnabled()) {
				logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
			}
			txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
		}
	}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;

			try {
				boolean unexpectedRollback = false;
				prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;

				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Releasing transaction savepoint");
					}
					unexpectedRollback = status.isGlobalRollbackOnly();
					status.releaseHeldSavepoint();
				}
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction commit");
					}
					unexpectedRollback = status.isGlobalRollbackOnly();
					doCommit(status);
				}
				else if (isFailEarlyOnGlobalRollbackOnly()) {
					unexpectedRollback = status.isGlobalRollbackOnly();
				}

				// Throw UnexpectedRollbackException if we have a global rollback-only
				// marker but still didn't get a corresponding exception from commit.
				if (unexpectedRollback) {
					throw new UnexpectedRollbackException(
							"Transaction silently rolled back because it has been marked as rollback-only");
				}
			}
			catch (UnexpectedRollbackException ex) {
				// can only be caused by doCommit
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
				throw ex;
			}
			catch (TransactionException ex) {
				// can only be caused by doCommit
				if (isRollbackOnCommitFailure()) {
					doRollbackOnCommitException(status, ex);
				}
				else {
					triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				}
				throw ex;
			}
			catch (RuntimeException | Error ex) {
				if (!beforeCompletionInvoked) {
					triggerBeforeCompletion(status);
				}
				doRollbackOnCommitException(status, ex);
				throw ex;
			}

			// Trigger afterCommit callbacks, with an exception thrown there
			// propagated to callers but the transaction still considered as committed.
			try {
				triggerAfterCommit(status);
			}
			finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
			}

		}
		finally {
			cleanupAfterCompletion(status);
		}
	}

同样在提交事务之处也是在在processCommit方法中。同样会有TransactionalManager中的自定义回调函数triggerAfterCommit,等等事务提交前提交后的扩展。独立的新事物,直接调用物理事务进行提交jdbcConnection.commit,针对保存点的事务处理,则回去清除保存点信息,你是否有写迷惑呢,那我嵌套事务何时提交啊。你仔细一想就会发现如果此时提交了嵌套事务,当外部事务回滚,这个嵌套的在语义下必然是需要回滚的。你倘若这里提交了,不就完犊子了吗。所以嵌套事务会跟随外部事务的提交而提交。