美文网首页Spring 功能使用及源码解析
Spring 框架事件收发功能使用(二)事务事件监听

Spring 框架事件收发功能使用(二)事务事件监听

作者: 云逸Dean | 来源:发表于2019-07-18 15:57 被阅读0次

Spring 框架事件收发功能使用(二)事务事件监听

1. 概述

系列博客(一)中,讲述了如何使用 Spring 框架来实现事件收发机制,包含了实现接口和注解两种实现方式,以及包括了同步和异步两种运行方式。
在接下来的内容里,我们会介绍如何将事件的监听与事务进行关联。

2. 基于事务的事件收发的使用

首先,我们还是依照惯例,
Spring 的官方文档来看看怎么使用它。

在 Spring 4.2 开始,事件的监听器可以被绑定到事务的阶段上。
典型的示例就是它可以处理当事务成功完成后的事件。
我们可以使事件更加灵活的使用在关心事务结果的监听器上。

我们可以使用 @EventListener 注解注册一个常规的事件监听器。
当你需要将它绑定到事务上,则需要使用 @TransactionalEventListener。
这个监听器默认会被绑定到事务的提交阶段。

接下的示例会展示这个概念。假设一个组件 publish 了一个 订单创建 事件,并且我们想定义一个监听器,它只处理事务提交成功后发出的事件。下面便是如何实现这个事件监听器的代码例子:

@Component
public class MyComponent {

    @TransactionalEventListener
    public void handleOrderCreatedEvent(CreationEvent<Order> creationEvent) {
        //do something
    }
}

@TransactionalEventListener 注解 暴露了一个事务阶段的属性,用来指定监控器绑定的事务阶段。可以使用的阶段有这4个 BEFORE_COMMIT, AFTER_COMMIT (default), AFTER_ROLLBACK, AFTER_COMPLETION ,包含了事务的完成和回滚。
如果没有事务在运行,那这个监听器不会被触发,直到我们提供了事务。不过我们也可以通过改变 fallbackExecution 的设置来使监听器在没有事务的情况下仍然能够被触发。

3. @TransactionEventListener 的实现原理

Talking is cheap, show the Spring framework source code.

                //EventListenerMethodProcessor.processBean(148);
                /*
                *  由于@TransactionalEventListener 继承了 @EventListener ,
                *  所以,通过 findMergedAnnotation 同样能够获取其所注解的方法
                */
                annotatedMethods = MethodIntrospector.selectMethods(targetType,
                        (MethodIntrospector.MetadataLookup<EventListener>) method ->
                                AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
                //EventListenerMethodProcessor.processBean(176);
                /*
                * @TranscationalEventListener 由 ApplicationListenerMethodTransactionalAdapter 所负责实现功能
                * 通过解读他的 Adapter 便能够了解其功能是如何实现的
                */
                if (applicationListener instanceof ApplicationListenerMethodAdapter) {
                    ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
                }

class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
    
    private final TransactionalEventListener annotation;
    
    /*
    * 根据对应的 beanName Class 以及方法名,初始化 Adapter
    */
    public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) {
        super(beanName, targetClass, method);
        TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
        if (ann == null) {
            throw new IllegalStateException("No TransactionalEventListener annotation found on method: " + method);
        }
        this.annotation = ann;
    }

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (TransactionSynchronizationManager.isSynchronizationActive()) {//判断是否开启了事务
            TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);//创建一个新的事务同步器
            TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);//将刚刚创建的事务同步器,注册到这个线程的事务同步器列表中
        }
        else if (this.annotation.fallbackExecution()) {//如果当前线程内没有开启事务,并且注解的 fallbackExecution 属性设置为 true
            if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
                logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");//在符合 if 条件后输出警告
            }
            processEvent(event);//处理事件
        }
        else {
            // No transactional event execution at all
            if (logger.isDebugEnabled()) {
                logger.debug("No transaction is active - skipping " + event);//本线程没有开启事务,fallbackExecution 又设置为 false
            }
        }
    }

    private TransactionSynchronization createTransactionSynchronization(ApplicationEvent event) {
        return new TransactionSynchronizationEventAdapter(this, event, this.annotation.phase());
    }

    /*
    * 这个内部类实现了事件监听器能根据 4 个不同的事务阶段进行处理事件的功能
    */
    private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter {

        private final ApplicationListenerMethodAdapter listener;

        private final ApplicationEvent event;

        private final TransactionPhase phase;

        public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
                ApplicationEvent event, TransactionPhase phase) {

            this.listener = listener;
            this.event = event;
            this.phase = phase;
        }

        @Override
        public int getOrder() {
            return this.listener.getOrder();
        }
        // 如果是 beforeCommit 的这个阶段,需要单独判断
        @Override 
        public void beforeCommit(boolean readOnly) {
            if (this.phase == TransactionPhase.BEFORE_COMMIT) {
                processEvent();
            }
        }
        // 其余 3 个事务阶段在事务提交后进行判断和处理
        @Override
        public void afterCompletion(int status) {
            if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
                processEvent();
            }
            else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
                processEvent();
            }
            else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
                processEvent();
            }
        }
    
        protected void processEvent() {
            this.listener.processEvent(this.event);
        }
    }

}

4. @TransactionEventListener的异步陷阱

目前为止,我们看到的都是强大的 Spring 框架所提供的事务事件监听器的美好,不过任何美好都需要我们小心的维护。
接下来,我们就看看 @TransactionalEventListener 的美好背后隐藏着什么需要注意的。

如果,看过第一部分内容的读者应该还记得,我们将 @EventListner 变为异步会有两种方式。
一种是使用 @Async 注解,并且 EnableAsync;另一种是通过给我们的 Adatper 配置需要的线程池。
如果使用第一种方式来让我们的 @TransactionalEventListener 不会有问题。
但是,使用第二种的方式会导致我们的监听器找不到事务而失败。
那我们依然还是遵循从功能中来到源码里去的思想,到源码中找找问题出在哪里。

     //我们再回到所有 ApplicationEvent 类型时间分发的地方--> SimpleApplicationEventMulticaster类。
     //我们会发现他是通过新建线程的方式来进行触发监听器消费事件。
     executor.execute(() -> invokeListener(listener, event));

     // 在 Adapter 中,我们通过这个方法的调用来判断是否开启了事务

     TransactionSynchronizationManager.isSynchronizationActive();

     // 接下来我们进入 Manager看看这个方法的注释。注释表明,这个方法会返回当前线程活跃的事务同步器。
     // Bingo,是不是觉得有什么不对了。我们刚刚在 invokeListener 的时候是通过新建线程的方式进行异步的。
     // 如果,我们再去获取当前线程的事务,结果非常明显,一定是 False。
     // 所以,如果我们通过给 Multicaster 配置线程池的方式来实现异步看样子是不行的。
     /**
     * Return if transaction synchronization is active for the current thread.
     * Can be called before register to avoid unnecessary instance creation.
     * @see #registerSynchronization
     */
    public static boolean isSynchronizationActive() {
        return (synchronizations.get() != null);
    }

5.小结

本小节中,主要围绕 @TransactionalEventListener 来讲述下面 3 个内容:

  1. 如何使用 Spring 框架实现事务事件监听
  2. @TransactionalEventListener 是怎么实现绑定到事务上的事件监听的
  3. 在异步场景下,@TransactionalEventListener 的陷阱。

本系列的第三部分给出如何能让 @TransactionalEventListener 异步执行,又不在每个地方都加上 @Sync 注解的办法。

相关文章

网友评论

    本文标题:Spring 框架事件收发功能使用(二)事务事件监听

    本文链接:https://www.haomeiwen.com/subject/hubllctx.html