🌑

Shawn Fux

从源码角度分析Spring事务实现

使用 Spring 的事务管理很简单,只需要在被管理的方法或者类上标注 @Transactional 注解即可,但是往往越是简单的东西其底层实现越复杂,而 Spring 的事务传播机制更是重中之重,了解其底层实现,能让我们使用起来更加得心应手。

基于注解的事务开启

@EnableTransactionManagement 注解

通常我们在使用 Spring 提供的事务的时候,都是在配置类上标注一个 @EnableTransactionManagement 注解,意味着我们就开启了事务,当然还需要配置数据源,因为 Spring 的事务管理底层还是依赖于数据库提供的事务管理。所以先来分析这个 @EnableTransactionManagement 注解到底有什么秘密,它干了些什么?

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {

   /**
    * 通过设置 proxyTargetClass 是决定使用
    * JDK 动态代理还是 CGLIB 动态代理
    */
   boolean proxyTargetClass() default false;

   /**
    * 这里会有两种 mode 值
    * 一个是默认的 AdviceMode.PROXY 使用 AOP 动态代理实现
    * 另一个是 AdviceMode.ASPECTJ 
    * 使用 Aspectj 在类加载或者编译时静态织入源码
    * 从这里看出 Spring 的事务不仅仅限与 AOP 动态代理实现的
    */
   AdviceMode mode() default AdviceMode.PROXY;

   /**
    * 由于 Spring 的事务基于 AOP 实现的
    * 所以它这里会通过 order 进行排序来达到定义切面的执行顺序
    */
   int order() default Ordered.LOWEST_PRECEDENCE;

}

分析了 @EnableTransactionManagement 注解有哪些属性后,现在来分析 @Import(TransactionManagementConfigurationSelector.class) 这一行代码看做了些什么?

Import 导入了什么?

public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {

   /**
    * Returns {@link ProxyTransactionManagementConfiguration} or
    * {@code AspectJ(Jta)TransactionManagementConfiguration} for {@code PROXY}
    * and {@code ASPECTJ} values of {@link EnableTransactionManagement#mode()},
    * respectively.
    */
   @Override
   protected String[] selectImports(AdviceMode adviceMode) {
      switch (adviceMode) {
         case PROXY:
            return new String[] {AutoProxyRegistrar.class.getName(),
                  ProxyTransactionManagementConfiguration.class.getName()};
         case ASPECTJ:
            return new String[] {determineTransactionAspectClass()};
         default:
            return null;
      }
   }

   private String determineTransactionAspectClass() {
      return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
            TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
            TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
   }

}

selectImports 方法会去判断你 @EnableTransactionManagement 注解设置的 mode 属性是什么,而 Spring 在解析 @Import 注解的时候,会调用 selectImports 方法获取返回值,会把返回值的类当做配置类进一步解析,所以可以看到如果你的 mode 属性是 PROXY 的话会注入 AutoProxyRegistrar.classProxyTransactionManagementConfiguration.class ,而如果 mode 属性是 ASPECTJ 的话,就会调用 determineTransactionAspectClass 注入一些关于 Aspectj 相关的配置类,一般来说我们在使用 Spring 事务的时候都是默认的 PROXY,但是如果你想使用 ASPECTJ 的话也是可以的,但是会需要自己做一些额外工作,这里就不细展开说了,因为本次主要是分析基于 AOP 动态代理的事务,也是我们比较常用的,如果你对 AspectJ 来实现事务感兴趣的话,可以阅读该文章

AutoProxyRegistrar

通过上面这一步的分析,我们发现如果你的 mode 属性为 PROXY 的话,那么会导入 AutoProxyRegistrar.classProxyTransactionManagementConfiguration.class 这两个配置类,我们先分析 AutoProxyRegistrar.class 看做了些什么?

public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar {

   @Override
   public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
      boolean candidateFound = false;
      // 获取 @EnableTransactionManagement 注解所在的配置类上的所有注解名字
      Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
      for (String annType : annTypes) {
         // 变量配置类上的所有注解
         AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
         if (candidate == null) {
            continue;
         }
         Object mode = candidate.get("mode");
         Object proxyTargetClass = candidate.get("proxyTargetClass");
         // 判断注解是否有 mode proxyTargetClass 属性
         if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
               Boolean.class == proxyTargetClass.getClass()) {
            candidateFound = true;
            if (mode == AdviceMode.PROXY) {
               // 注册 InfrastructureAdvisorAutoProxyCreator.class 到 beanDefinitionMap
               AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
               // proxyTargetClass 为 true
               // 则把 proxyTargetClass 这个属性值赋值
               // InfrastructureAdvisorAutoProxyCreator.class 的proxyTargetClass属性
               if ((Boolean) proxyTargetClass) {
                  AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
                  return;
               }
            }
         }
      }

}

其实上面的代码很简单就是找到 @EnableTransactionManagement 注解的 mode proxyTargetClass 属性值,然后判断如果你的 mode 属性是 PROXY 的话,就注册一个 InfrastructureAdvisorAutoProxyCreator.class 类到 beanDefinitionMap,最后判断一下 proxyTargetClass 是不是为 ture,如果是的话把这个值设置到 InfrastructureAdvisorAutoProxyCreator.class beanDefinition里面的 proxyTargetClass 属性上。

Configuration 注册了那些Bean?

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

   @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
   @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
   public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
         TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
      // 创建了一个 Advisor
      BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
      // 相当于 Pointcut
      advisor.setTransactionAttributeSource(transactionAttributeSource);
      // Advice
      advisor.setAdvice(transactionInterceptor);
      if (this.enableTx != null) {
         // 这个属性来自父类,而这个order 属性其实就是我们的 @EnableTransactionManagement 注解的 order 属性
         advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
      }
      return advisor;
   }

   @Bean
   @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
   public TransactionAttributeSource transactionAttributeSource() {
      // 相当于 Pointcut
      return new AnnotationTransactionAttributeSource();
   }

   @Bean
   @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
   public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
      // 相当于 Advice
      TransactionInterceptor interceptor = new TransactionInterceptor();
      interceptor.setTransactionAttributeSource(transactionAttributeSource);
      if (this.txManager != null) {
         interceptor.setTransactionManager(this.txManager);
      }
      return interceptor;
   }

}

在上面的配置类中注册了三个 Bean:

  • BeanFactoryTransactionAttributeSourceAdvisor (Advisor)
  • TransactionAttributeSource(Pointcut)
  • TransactionInterceptor(Advice)

通过这个三个类就可以形成一个完整的切面,如果不了解 Advisor Pointcut Advice 是什么的话,可以看一下我写的关于 AOP 文章

接下来我们再看一下父类 AbstractTransactionManagementConfiguration 做了一些什么事:

@Configuration
public abstract class AbstractTransactionManagementConfiguration implements ImportAware {

   @Nullable
   protected AnnotationAttributes enableTx;

   /**
    * Default transaction manager, as configured through a {@link TransactionManagementConfigurer}.
    */
   @Nullable
   protected TransactionManager txManager;


   @Override
   // 实现了 ImportAware 接口,会回调该方法
   public void setImportMetadata(AnnotationMetadata importMetadata) {
      // 获取 EnableTransactionManagement 的属性值保存到 enableTx 这个成员变量上
      this.enableTx = AnnotationAttributes.fromMap(
            importMetadata.getAnnotationAttributes(EnableTransactionManagement.class.getName(), false));
      if (this.enableTx == null) {
         throw new IllegalArgumentException(
               "@EnableTransactionManagement is not present on importing class " + importMetadata.getClassName());
      }
   }

   @Autowired(required = false)
   void setConfigurers(Collection<TransactionManagementConfigurer> configurers) {
		  // 支持多个事务管理器注入
      if (CollectionUtils.isEmpty(configurers)) {
         return;
      }
      if (configurers.size() > 1) {
         throw new IllegalStateException("Only one TransactionManagementConfigurer may exist");
      }
      TransactionManagementConfigurer configurer = configurers.iterator().next();
      this.txManager = configurer.annotationDrivenTransactionManager();
   }


   @Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)
   @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
   public static TransactionalEventListenerFactory transactionalEventListenerFactory() {
     // Spring事务监听器
      return new TransactionalEventListenerFactory();
   }

}

在父类中,实现了 ImportAware 接口,Spring 会在生命周期中回调该方法,将 EnableTransactionManagement 注解的元数据信息,传给 setImportMetadata 方法,然后将注解的属性值保存到成员变量 enableTx 上面,其中子类就使用到了它的 order 属性,然后 setConfigurers 方法可以注入多个事务管理器的依赖,不过这是可选的并非强制依赖,最后注册了一个 TransactionalEventListenerFactory 事务监听器工厂类。

小结

如果你的 @EnableTransactionManagement 注解 mode 属性为 PROXY,那么经过上面的一系列操作以后,容器应该会有以下五个真正干活的 Bean:

  • InfrastructureAdvisorAutoProxyCreator

  • BeanFactoryTransactionAttributeSourceAdvisor

  • TransactionAttributeSource => AnnotationTransactionAttributeSource

  • TransactionInterceptor

  • TransactionalEventListenerFactory

接下来,就分析这五个类是怎么协同运作,来实现我们的事务管理的。

创建事务的代理对象

AOP处理器优先级

我们都知道事务是基于 AOP 动态代理来实现的,那么它肯定需要创建代理对象,而 InfrastructureAdvisorAutoProxyCreator 就是用来创建代理对象的。

InfrastructureAdvisorAutoProxyCreator

可以看到 InfrastructureAdvisorAutoProxyCreator 简介继承了 AbstractAutoProxyCreator 类,所以它也是一个 Bean 后置处理器,我们都知道创建代理对象是在 BeanPostProcessor 接口的 postProcessAfterInitialization 方法回调产生的,所以我们直接从这里开始入手,看是怎么创建我们的事务代理对象的。如果你对 Bean 的生命周期不熟悉的话,可以看一下我写的 Bean 的创建过程

另外笔者发现如果同时使用了 @EnableAspectJAutoProxy@EnableTransactionManagement 这两个注解的话,因为这两个注解都会注入一个创建代理对象的后处理器,其实它们两个之间只会注入一个其中一个后处理器,那么它们的优先级会是怎么样的了谁覆盖谁了?

其实无论是 @EnableAspectJAutoProxy@EnableTransactionManagement 注册创建 ProxyCreator 后处理器,都是调用 AopConfigUtils 类下的 registerOrEscalateApcAsRequired 进行注册,逻辑大概如下:

private static BeanDefinition registerOrEscalateApcAsRequired(
      Class<?> cls, BeanDefinitionRegistry registry, @Nullable Object source) {

   Assert.notNull(registry, "BeanDefinitionRegistry must not be null");
   // 先判断容器内是否有该 org.springframework.aop.config.internalAutoProxyCreator beanDefinition
   if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
      // 如果有的话,就获取已经存在的
      BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
      // 判断待注册的和已经存在的 class 类型是否一样
      if (!cls.getName().equals(apcDefinition.getBeanClassName())) {
         // 如果不一样,获取已经存在的 beanDefinition 优先级
         int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName());
         // 获取待注册的 beanDefinition 优先级
         int requiredPriority = findPriorityForClass(cls);
         // 如果待注册的优先级数值大于已存在的优先级数值
         if (currentPriority < requiredPriority) {
            // 那么待注册的覆盖已经注册的
            apcDefinition.setBeanClassName(cls.getName());
         }
      }
      return null;
   }
   // 容器不存在,直接注册
   RootBeanDefinition beanDefinition = new RootBeanDefinition(cls);
   beanDefinition.setSource(source);
   beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE);
   beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
   registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition);
   return beanDefinition;
}

findPriorityForClass 就是去获取这个 ProxyCreator 后处理器的优先级:

private static int findPriorityForClass(@Nullable String className) {
   for (int i = 0; i < APC_PRIORITY_LIST.size(); i++) {
      Class<?> clazz = APC_PRIORITY_LIST.get(i);
      if (clazz.getName().equals(className)) {
         return i;
      }
   }
}

而这个获取优先级的代码也比较简单,就是根据你传的 className 去和 APC_PRIORITY_LIST 这个数字进行匹配,匹配到了就返回数组的下标作为优先级返回,那么再来看看这个数组有什么?

static {
   // Set up the escalation list...
   APC_PRIORITY_LIST.add(InfrastructureAdvisorAutoProxyCreator.class);
   APC_PRIORITY_LIST.add(AspectJAwareAdvisorAutoProxyCreator.class);
   APC_PRIORITY_LIST.add(AnnotationAwareAspectJAutoProxyCreator.class);
}

这是定义在 AopConfigUtils 类的一个静态代码块,其实从这里就能看出 AnnotationAwareAspectJAutoProxyCreator 是优先级最高的,因为它的数组下标是2,是高于 InfrastructureAdvisorAutoProxyCreator 类的,所以可以得出结论如果这两个类都注册的话,只有 AnnotationAwareAspectJAutoProxyCreator 会被保存下来。通过分析这里源码的也解开了笔者的一些疑惑,笔者在初学 Spring AOP 和 事务这一块一直以为会产生两个代理对象,其实如果你这个目标类即有事务又有如 @Aspect 的切面逻辑的话,最终都会转换成 Advisor 形成一条调用链。

事务注解不止@Transactional

在大多时候我们使用 @Transactional 注解标注一个类或者方法,Spring 就会认为需要进行事务管理,而 AnnotationTransactionAttributeSource 就是用来定义什么的类是需要事务管理的,先看一下它的静态代码块:

private static final boolean jta12Present;

private static final boolean ejb3Present;

static {
   ClassLoader classLoader = AnnotationTransactionAttributeSource.class.getClassLoader();
   jta12Present = ClassUtils.isPresent("javax.transaction.Transactional", classLoader);
   ejb3Present = ClassUtils.isPresent("javax.ejb.TransactionAttribute", classLoader);
}

从这里看出,Spring 会去判断类路径中是否有 @Transactional@TransactionAttribute 这两个注解,如果有的话 jta12Presentejb3Present 属性值为 true,那么判断出这个注解是否存在有什么用了?接着往下看:

// 无参构造方法
public AnnotationTransactionAttributeSource() {
		this(true);
}

public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
   this.publicMethodsOnly = publicMethodsOnly;
   if (jta12Present || ejb3Present) {
      this.annotationParsers = new LinkedHashSet<>(4);
      // 先添加一个Spring自带 @Transactional 注解的解析器 
      this.annotationParsers.add(new SpringTransactionAnnotationParser());
      if (jta12Present) {
         // 添加一个支持 javax @Transactional 注解的解析器
         this.annotationParsers.add(new JtaTransactionAnnotationParser());
      }
      if (ejb3Present) {
         // 添加一个支持 ejb @TransactionAttribute 注解的解析器
         this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
      }
   }
   else {
      // 以上条件不满足,还是会添加一个默认Spring自带 @Transactional 注解的解析器 
      this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());
   }
}

通过这两块代码分析,Spring 不仅支持自带的 @Transactional 注解作为事务的定义,还是支持 javax 的 @Transactional 和 ejb 的 @TransactionAttribute 注解。

isCandidateClass

isCandidateClass 会遍历所有的解析器,也就是我们在构造方法里添加的解析器,然后去调用解析器的 isCandidateClass 方法解析目标类,如果能匹配上就返回 ture。

SpringTransactionAnnotationParser 的 isCandidateClass 方法很简单,就是看你类是否有 @Transactional 注解

@Override
public boolean isCandidateClass(Class<?> targetClass) {
   return AnnotationUtils.isCandidateClass(targetClass, Transactional.class);
}

另外两个就不去看了,其实逻辑都一样,就是判断你有没有对应的注解,上面的 isCandidateClass 是判断类上是否有事务注解,那么必然应该还会有判断方法上是否有事务注解的方法,因为我们的事务注解是支持标注在方法上的,下面这个方法就是判断方法上是否有事务注解:

protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
   for (TransactionAnnotationParser parser : this.annotationParsers) {
      TransactionAttribute attr = parser.parseTransactionAnnotation(element);
      if (attr != null) {
         return attr;
      }
   }
   return null;
}

isCandidateClass 大同小异,也是交给了 AnnotationParser 解析器去处理,如果有找到事务注解的话,会将注解上的属性解析封装到 TransactionAttribute 对象返回,那么我们看一下 SpringTransactionAnnotationParserparseTransactionAnnotation 方法是怎么解析 @Transactional 注解的:

public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
   // 先重试获取方法上的 @Transactional 注解
   AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
         element, Transactional.class, false, false);
   if (attributes != null) {
      // 如果找到了话,进一步解析其属性值
      return parseTransactionAnnotation(attributes);
   }
   else {
      return null;
   }
}

// 这个方法解析 @Transactional 的属性值然后封装到 TransactionAttribute 对象返回
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
		RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();

		Propagation propagation = attributes.getEnum("propagation");
		rbta.setPropagationBehavior(propagation.value());
		Isolation isolation = attributes.getEnum("isolation");
		rbta.setIsolationLevel(isolation.value());

		rbta.setTimeout(attributes.getNumber("timeout").intValue());
		String timeoutString = attributes.getString("timeoutString");
		Assert.isTrue(!StringUtils.hasText(timeoutString) || rbta.getTimeout() < 0,
				"Specify 'timeout' or 'timeoutString', not both");
		rbta.setTimeoutString(timeoutString);

		rbta.setReadOnly(attributes.getBoolean("readOnly"));
		rbta.setQualifier(attributes.getString("value"));
		rbta.setLabels(Arrays.asList(attributes.getStringArray("label")));

		List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
		for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
			rollbackRules.add(new RollbackRuleAttribute(rbRule));
		}
		for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
			rollbackRules.add(new RollbackRuleAttribute(rbRule));
		}
		for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
			rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
		}
		for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
			rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
		}
		rbta.setRollbackRules(rollbackRules);

		return rbta;
	}

做一下小结吧,AnnotationTransactionAttributeSource 就是用来判断一个类或者方法是否需要被事务管理,然后会去解析事务注解的属性值,但是其实真正干活的还是 AnnotationParser 这些解析器,AnnotationTransactionAttributeSource 算是做了一个整合,提供了一个统一访问的入口。

寻找事务注解

接下来就分析 BeanFactoryTransactionAttributeSourceAdvisor 这个类,因为我们在创建代理对象的时候,最先找到的还是 Advisor 去做匹配。

public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {

   @Nullable
   private TransactionAttributeSource transactionAttributeSource;
	
   // 默认写死的 Pointcut
   private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
      @Override
      @Nullable
      protected TransactionAttributeSource getTransactionAttributeSource() {
        // 将 AnnotationTransactionAttributeSource 传递给了 Pointcut
        // 因为 Pointcut 将来去做判断能不能匹配需要事务管理
        // 其实最终还是会调用 AnnotationTransactionAttributeSource 的方法去干活
         return transactionAttributeSource;
      }
   };
} 

// 这里注入的其实就是 AnnotationTransactionAttributeSource 这个类
public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
		this.transactionAttributeSource = transactionAttributeSource;
	}

@Override
	public Pointcut getPointcut() {
		return this.pointcut;
	}

这里我就给出入口,是在哪里调用的 BeanFactoryTransactionAttributeSourceAdvisor 类的 getPointcut 去做类和方法匹配的,在 AopUtils.javacanApply 方法:

canApply

Pointcut 是用来定义类和方法的匹配,类匹配使用 ClassFilter,而方法匹配使用 MethodMatcher,接下来就看一下 TransactionAttributeSourcePointcut 这个 Pointcut 的 ClassFilter 和 MethodMatcher 是什么样的逻辑:

TransactionAttributeSourceClassFilter

TransactionAttributeSourceClassFilter 是作为 TransactionAttributeSourcePointcut 内部类存在的,其实通过上面图,我想你应该指定了,tas 这个变量其实就是 AnnotationTransactionAttributeSource ,那么调用的 isCandidateClass 方法就是上面分析那一个。至此,我们分析了 Pointcut 中的 ClassFilter,接下来还有一个 MethodMatcher,是用来匹配方法的。

分析完了类的匹配,加下来分析方法的匹配,首先调用 matches 方法时会跳转到 TransactionAttributeSourcePointcut 类的方法:

matches

可以看到最后还是交给了AnnotationTransactionAttributeSource 去处理,那么我们继续跟踪源码下去,getTransactionAttribute 是由其父类 AbstractFallbackTransactionAttributeSource 实现的:

public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
   // 如果方法所在的类为 Object 直接返回 null
   if (method.getDeclaringClass() == Object.class) {
      return null;
   }

   // 先检查缓存是否存在
   Object cacheKey = getCacheKey(method, targetClass);
   TransactionAttribute cached = this.attributeCache.get(cacheKey);
   if (cached != null) {
      // 这里类似一个于空缓存,代表这个方法已经解析过了,就是不存在期望的值
      if (cached == NULL_TRANSACTION_ATTRIBUTE) {
         return null;
      }
      else {
         return cached;
      }
   }
   else {
      // 缓存不存在。进一步解析
      TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
      if (txAttr == null) {
         // 如果解析过后为 null,缓存一个空对象
         this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
      }
      else {
         // 生成全限定类名加方法名
         String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
         if (txAttr instanceof DefaultTransactionAttribute) {
            DefaultTransactionAttribute dta = (DefaultTransactionAttribute) txAttr;
            // 设置事务属性的描述信息
            dta.setDescriptor(methodIdentification);
            dta.resolveAttributeStrings(this.embeddedValueResolver);
         }
         if (logger.isTraceEnabled()) {
            logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
         }
         this.attributeCache.put(cacheKey, txAttr);
      }
      return txAttr;
   }
}

getTransactionAttribute 方法里会去缓存找这个方法是否匹配解析过,没找到就会调用 computeTransactionAttribute 方法去进一步解析匹配,接下来去看一下 computeTransactionAttribute 方法做了些什么:

protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
   // Don't allow no-public methods as required.
   if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
      return null;
   }

   // 如果该方法来自接口,寻找接口实现类的方法
   Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

   // 抽象方法留给子类扩展,先寻找方法是否存在事务注解
   TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
   if (txAttr != null) {
      return txAttr;
   }

   // 如果方法上未找到事务注解,那么就尝试去方法所在的类上寻找注解
   txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
   if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
      return txAttr;
   }

   // 如果具体的方法(可能是接口实现类的方法)不等于原方法
   if (specificMethod != method) {
      // 再去原方法寻找事务注解
      txAttr = findTransactionAttribute(method);
      if (txAttr != null) {
         return txAttr;
      }
      // 如果原方法没找到,去原方法所在类找事务注解
      txAttr = findTransactionAttribute(method.getDeclaringClass());
      if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
         return txAttr;
      }
   }

   return null;
}

从这里可以分析看出,其实我们的事务注解标注在接口也是可以的也会生效的,不过大多时候我们不会这么干,那接下来再继续跟踪 findTransactionAttribute 这个方法,而这个方法才会去寻找注解信息,这个方法最终会调用到 AnnotationTransactionAttributeSource determineTransactionAttribute 方法,会去寻找是否有对应的事务注解,如果找到了的话将注解的属性解析封装成对象返回。

小结

事务五个类

TransactionInterceptor

首先我们要搞清切面入口是从哪里开始的,当我们对一个事务方法进行调用的时候,如果你的代理对象是 JDK 动态代理产生的话,那么会先进入到 JdkDynamicAopProxy 类的 invoke 方法,如果是代理对象是 CGLIB 动态代理产生的话,那么会进入到 CglibAopProxy 类的 intercept 方法,虽然两个类的方法名不一样,但是逻辑都大致相同,会先判断 exposeProxy 属性是否为 ture,这是属性就是我们注解设置的,如果为 ture,则会将当前代理对象放入 ThreadLocal,你可以通过 AopContext 获取到当前执行的代理对象,接下来就拿到这个方法所匹配的所有 Advice 形成一条调用链,并封装成一个 ReflectiveMethodInvocation. 对象对调用链进行调用,即执行我们的切面逻辑,大致流程就是这样其中很多细节没去细讲,可以看我的 AOP这篇文章有更加详细的分析。因为我们的 TransactionInterceptor 也是调用链中的一环,而在执行调用链的时候都会去调用 invoke 方法,所以我们先从 TransactionInterceptorinvoke 方法入手。

invoke

public Object invoke(MethodInvocation invocation) throws Throwable {
   // 获取被代理对象的 class 类型
   Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

   // 调用父类的方法
   return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
      @Override
      @Nullable
      public Object proceedWithInvocation() throws Throwable {
         // 执行下一个切面
         return invocation.proceed();
      }
      @Override
      public Object getTarget() {
         // 返回当前被代理对象
         return invocation.getThis();
      }
      @Override
      public Object[] getArguments() {
         // 返回当前被代理方法参数
         return invocation.getArguments();
      }
   });
}

可以看到在 TransactionInterceptorinvoke 并没有关于事务管理的逻辑,那么看来主要逻辑还是在父类 TransactionAspectSupportinvokeWithinTransaction 方法。

invokeWithinTransaction

invokeWithTransaction

invokeWithinTransaction 定义好了一个大体框架,这里我画出了我们比较关心的三个重点,接下来就主要围绕,开启事务,回滚事务,提交事务来分析。

开启事务

调用 createTransactionIfNecessary 方法会把当前的事务管理器 ptm,事务注解属性对象 txAttr,以及事务的名称 joinpointIdentification 传递过去。

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

   // 如果 txAttr 不等null,但是它的 name 值为null
   // 创建一个 DelegatingTransactionAttribute 保证代理 txAttr
   // 并代理它的 getName 方法
   if (txAttr != null && txAttr.getName() == null) {
      txAttr = new DelegatingTransactionAttribute(txAttr) {
         @Override
         public String getName() {
            return joinpointIdentification;
         }
      };
   }

   TransactionStatus status = null;
   if (txAttr != null) {
      if (tm != null) {
         // 开启事务 AbstractPlatformTransactionManager
         // 获取一个事务描述状态信息对象
         status = tm.getTransaction(txAttr);
      }
      else {
         if (logger.isDebugEnabled()) {
            logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                  "] because no transaction manager has been configured");
         }
      }
   }
   // 将当前参数封装成一个 TransactionInfo 对象
   // 并将这个对象保存到 ThreadLocal 然后返回给调用者
   return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

getTransaction

在这个方法里会调用我们传进来的事务管理器的 getTransaction 方法获取数据库连接并且开启事务

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
      throws TransactionException {

   // 事务注解属性对象是否存在,如果不存在使用一个默认的
   TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
   // 获取数据库连接对象
   Object transaction = doGetTransaction();
   boolean debugEnabled = logger.isDebugEnabled();

   // 判断当前线程是否存在事务
   if (isExistingTransaction(transaction)) {
      // 当前线程已经存在事务了,而此时就需要检查事务的传播机制而进行相应的处理了
      return handleExistingTransaction(def, transaction, debugEnabled);
   }

   // 检查新事务设置的超时时间是否小于 -1
   if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
      throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
   }

   // 如果我们设置的事务传播机制是 Propagation.MANDATORY
   // 因为这个传播机制必须在有事务的前提下运行,而代码走到这里说明没有事务
   if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
            "No existing transaction found for transaction marked with propagation 'mandatory'");
   }
   else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
         def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
         def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      // 挂起事务
      SuspendedResourcesHolder suspendedResources = suspend(null);
      if (debugEnabled) {
         logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
      }
      try {
         // 开启一个新的事务,设置 AutoCommit 为 false
         return startTransaction(def, transaction, debugEnabled, suspendedResources);
      }
      catch (RuntimeException | Error ex) {
         resume(null, suspendedResources);
         throw ex;
      }
   }
   else {
      // 以非事务的方式运行:
      // Propagation.NEVER
      // Propagation.NOT_SUPPORTED
      // Propagation.SUPPORTS
      if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
         logger.warn("Custom isolation level specified but no actual transaction initiated; " +
               "isolation level will effectively be ignored: " + def);
      }
      // 即使以非事务的方式运行,还是会去判断是否需要将当前的一些事务信息保存到 ThreadLocal
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
   }
}

获取数据库连接对象

首先会去调用 doGetTransaction 尝试获取一个数据库连接对象:

protected Object doGetTransaction() {
   DataSourceTransactionObject txObject = new DataSourceTransactionObject();
   // 设置是否允许使用 Savepoint
   txObject.setSavepointAllowed(isNestedTransactionAllowed());
   // obtainDataSource() 获取数据源
   // 先从线程缓存中尝试获取数据库连接对象,如果是新开的事务获取到结果为 null
   ConnectionHolder conHolder =
         (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
   txObject.setConnectionHolder(conHolder, false);
   return txObject;
}

其实它这里并没有从 DataSource 数据源去获取连接对象,而是先去 TransactionSynchronizationManager 所管理的 resources 获取连接对象,如果是新开启的事务的话,这里其实获取到的连接对象会是 null。

public abstract class TransactionSynchronizationManager {

   /**
    * key => DataSource 数据源
    * value => Connect 数据库连接对象
    */
   private static final ThreadLocal<Map<Object, Object>> resources =
         new NamedThreadLocal<>("Transactional resources");

   /**
    * 记录当前事务的同步器回调
    */
   private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
         new NamedThreadLocal<>("Transaction synchronizations");

   /**
    * 记录当前事务的名称
    */
   private static final ThreadLocal<String> currentTransactionName =
         new NamedThreadLocal<>("Current transaction name");

   /**
    * 记录当前事务是否为只读
    */
   private static final ThreadLocal<Boolean> currentTransactionReadOnly =
         new NamedThreadLocal<>("Current transaction read-only status");

   /**
    * 记录当前线程的事务隔离级别
    */
   private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
         new NamedThreadLocal<>("Current transaction isolation level");

   /**
    * 记录当前线程是否有事务活动
    */
   private static final ThreadLocal<Boolean> actualTransactionActive =
         new NamedThreadLocal<>("Actual transaction active");
}

TransactionSynchronizationManager 是一个事务的工具类会将一些当前事务的状态属性保存到 ThreadLocal 里面。

事务的传播机制实现

接着就会调用 isExistingTransaction 方法将上一步获取到的事务状态对象传进去,判断有没有数据库连接对象,如果有的话说明有事务存在,这是就会进入到 handleExistingTransaction 方法去进一步判断你当前的这个事务传播机制是什么,处理逻辑如下:

handleExistingTransaction

handleExistingTransaction 这个方法是在 AbstractPlatformTransactionManager 类定义实现的,限于版面我就不展开每一个去看了,感兴趣的话可以看我的源码笔记里面写的很详细关于每一种事务的传播机制。眼尖的你肯定发现好像只处理了六种传播机制,但是事务的传播机制不是有七种吗?还有一个不在这里处理,而是在 getTransaction 方法里:

// 如果我们设置的事务传播机制是 Propagation.MANDATORY
   // 因为这个传播机制必须在有事务的前提下运行,而代码走到这里说明没有事务
   if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
            "No existing transaction found for transaction marked with propagation 'mandatory'");
   }

挂起当前事务

其实事务的挂起有点类似于我们的线程上下文切换的感觉,就是把当前的事务一些状态属性保存起来,然后清空 ThreadLocal 腾出地方给新的事务用,而恢复事务也很简单了,就是把旧的挂起的事务数据重写设置到 ThreadLocal 里面就行了。

SuspendedResourcesHolder

创建一个新的事务

startTransaction 方法就会去从 DataSource 获取一个数据库连接对象,并开启一个事务

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
      boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

   boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
   DefaultTransactionStatus status = newTransactionStatus(
         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
   // 开启事务,获取一个新的数据库连接对象
   doBegin(transaction, definition);
   // 将当前事务的一些属性值存放到 ThreadLocal
   // 可以通过 TransactionSynchronizationManager 这个工具类获取到当前线程事务的一些属性值
   prepareSynchronization(status, definition);
   return status;

以非事务的方式运行

最后类似 Propagation.NEVER 这种以非事务方式运行都会调用 prepareTransactionStatus 方法,而这个方法其实和我们的 startTransaction 方法非常像,我贴出来对比一下你就知道了:

protected final DefaultTransactionStatus prepareTransactionStatus(
      TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
      boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

   DefaultTransactionStatus status = newTransactionStatus(
         definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
   prepareSynchronization(status, definition);
   return status;
}

可以看到都会去创建 DefaultTransactionStatus 对象也会调用 prepareSynchronization 将当前事务的一些状态属性设置到 ThreadLocal,其实就是少了一个 doBegin ,非事务的方式就不会去获取数据库连接管理。

prepareTransactionInfo

当执行完 getTransaction 方法后会获取一个到事务状态对象里面就包括了我们的数据库连接对象,然后会将当前返回的 TransactionStatus 对象以及其它一些关于当前事务的对象包装成一个 TransactionInfo对象并放置到 ThreadLocal 中:

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, String joinpointIdentification,
      @Nullable TransactionStatus status) {

   TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
   if (txAttr != null) {
      txInfo.newTransactionStatus(status);
   }
   // 将旧的 TransactionInfo 从 ThreadLocal 获取出来保存
   // 将当前新的 TransactionInfo 存储到 ThreadLocal
   txInfo.bindToThread();
   return txInfo;
}

这个 ThreadLocal 是定义在 TransactionAspectSupport 类的,另外还提供了一些方法,可以让我们在程序中动态获取当前线程执行的事务对象一些信息。

public abstract class TransactionAspectSupport {
	private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
			new NamedThreadLocal<>("Current aspect-driven transaction");
  	
  	// 返回当前线程的事务信息对象
  	protected static TransactionInfo currentTransactionInfo() throws NoTransactionException {
		  return transactionInfoHolder.get();
	  }
  
  	// 返回当前线程的事务状态对象
  	public static TransactionStatus currentTransactionStatus() throws NoTransactionException {
		TransactionInfo info = currentTransactionInfo();
      if (info == null || info.transactionStatus == null) {
        throw new NoTransactionException("No transaction aspect-managed TransactionStatus in scope");
      }
		  return info.transactionStatus;
	}
}

提交事务

当前执行完被代理的目标方法以后,如果这中间又没有发生异常,此时就会来执行 commitTransactionAfterReturning 方法准备提交事务了。

public final void commit(TransactionStatus status) throws TransactionException {
   if (status.isCompleted()) {
      throw new IllegalTransactionStateException(
            "Transaction is already completed - do not call commit or rollback more than once per transaction");
   }

   DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
   // 判断是否有手动设置回滚
   // 比方说在我们的代码里
   // TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()
   if (defStatus.isLocalRollbackOnly()) {
      if (defStatus.isDebug()) {
         logger.debug("Transactional code has requested rollback");
      }
      processRollback(defStatus, false);
      return;
   }
   // 在提交代码前还需要在判断,是不是有的别的事务块出现异常,并设置 RollbackOnly 为 ture
   // 比方说 a 方法 调用 b 方法,它们属于同一个事务,但是 b 方法出现异常,a 方法把 b 方法的异常 try catch 了
   if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
      if (defStatus.isDebug()) {
         logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
      }
      processRollback(defStatus, true);
      return;
   }
   // 提交事务
   processCommit(defStatus);
}

其实在真正进行提交事务之前,还会做两个检查,先判断事务的状态是不是被手动设置为需要回滚了,如果是的话就不会提交事务转而进行回滚。另外还有一种计算别的事务块出现异常了然后因为它是没有回滚事务的权限,所以只能将全局回滚异常设置为 ture,提醒真正有权限回滚的人去做事务回滚。

回滚事务

如果目标方法在执行代码的时候抛出了异常,就会跳转到 completeTransactionAfterThrowing 来执行回滚操作了:

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
      // txInfo.transactionAttribute.rollbackOn(ex) 判断当前抛出的异常是否能被支持回滚
      // 这里的 txInfo.transactionAttribute => RuleBasedTransactionAttribute
      if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
         try {
            // 回滚事务
            txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
         }
         catch (TransactionSystemException ex2) {
            logger.error("Application exception overridden by rollback exception", ex);
            ex2.initApplicationException(ex);
            throw ex2;
         }
         catch (RuntimeException | Error ex2) {
            logger.error("Application exception overridden by rollback exception", ex);
            throw ex2;
         }
      }
}

首先会去判断你当前抛出的异常类型是否能被支持回滚,然后调用 `

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
   try {
      boolean unexpectedRollback = unexpected;
      try {
         // 判断是否有 Savepoint
         if (status.hasSavepoint()) {
            if (status.isDebug()) {
               logger.debug("Rolling back transaction to savepoint");
            }
            // 如果有的话回滚到保存的 Savepoint
            status.rollbackToHeldSavepoint();
         }
         else if (status.isNewTransaction()) {
            // 判断是否是新的事务
            if (status.isDebug()) {
               logger.debug("Initiating transaction rollback");
            }
            // 回滚事务,这里会真正调用数据库进行回滚
            doRollback(status);
         }
         else {
            // 当前事务不是最顶级的事务
            if (status.hasTransaction()) {
               if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                  if (status.isDebug()) {
                     logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                  }
                  // 这里只是将当前的 ConnectionHolder 的 RollbackOnly 设置为ture
                  // 没有真正的去回滚,因为他不是一个顶级事务,可以理解为这个事务不它建的,没有回滚的权利
                  doSetRollbackOnly(status);
               } 
         finally {
            // 释放数据库连接
            cleanupAfterCompletion(status);
         }
}

先会判断当前事务是否有 Savepoint,比方说如果你的事务传播机制为 Propagation.NESTED ,那么这时候就会有 Savepoint。接着判断当前的事务是否为一个新事务,其实个人理解为这个事务是不是自己创建的,而不是附属在它事务下的,比方说你的事务传播机制为 Propagation.REQUIRES_NEW 那么每个方法其实都是一个独立的事务。最后再判断是不是这个事务是不是附属在其它事务下的,如果是的话,是没有回滚权利的,只是将 RollbackOnly 设置为 ture,通知正在有提交权利的事务去回滚。

最后无论是提交事务还是回滚事务最终都会调用 cleanupAfterCompletion 方法进行一些资源的释放。

— Sep 16, 2022