美文网首页
读源码DelayQueue-Leader-Follower模式

读源码DelayQueue-Leader-Follower模式

作者: whslowly | 来源:发表于2020-10-17 19:37 被阅读0次

java.util.concurrent.DelayQueue采用了Leader-Follower模式,结合源码理解下这种模式的编码实现。

先总结一下Leader-Follower模式

举个例子来理解这些情况,拿饭店员工来对照理解。

  1. 单Reactor多线程模式
    饭店的员工一般都是分角色的,比如接待员、服务员、厨师等等。
    1. 假如有一个叫做A的人,固定他作为饭店接待员,来客人了就分给客人一个座位号,然后交给其他服务员,比如B进行后续处理。
    2. B会根据座位号为客人引路,为客人点菜等等。
    3. 如果把A、B比作两个线程,客人比作任务,任务由A处理,到交接给B处理,有一次线程上下文切换。
  2. Leader-Follower模式
    这次饭店不分角色了,每个人都是接待员和服务员,统称为员工。
    1. 每次只能有一个员工在门口等待,比如A先在门口等待,其他员工在屋里歇着。
    2. 来客人了的话,A会叫一个其他员工,比如B来门口接替自己。
    3. 然后A开始为客人服务,比如分配座位号,引路,点菜等全流程服务。
    4. 拿线程来说的话,就是接受任务,处理任务都是由线程A负责,没有线程上下文切换。
  3. DelayQueue的Leader-Follower模式
    这次饭店也不分角色,都是员工,但是改变了经营策略,每个客人必须预约吃饭时间,预约采用APP预约。因为加入了延时,逻辑变得复杂了一些。
    1. 每次还是只能有一个员工在门口等待,比如A先在门口等待,A看了眼预约登记表,发现离预约最早到店的时间还有30分钟,A就什么都不干了,先休息30分钟。
    2. 其他员工还是先在屋里歇着,但是因为采用APP预约,客人约几点都有可能,如果此时有客人约的是10分钟后到店,因为A要30分钟后才能醒来干活,所以如果这位客人来了,门口就没有人接待了。
    3. 对于这个问题,饭店的软件系统在监听到最早到店时间变了的话,会再叫一个员工来门口等待,此员工可能是新员工B,也可能是叫醒了之前在门口休息的员工A。我们叫这位新员工X。
      1. 如果新员工X发现最早到店时间是现在,或者客人已经来了,就会叫一个员工C来门口接替自己,并立即开始为客人提供全流程服务。
      2. 如果新员工X发现最早到店时间是10分钟后,新员工X就像A之前一样,什么都不干了,先休息10分钟。
    4. 如果最早到店时间没有变化,还是30分钟后,软件系统不会叫人,其他员工看到A在门口等待,自己可以安心的在屋里歇着,等着A叫人替换他。
    5. 员工A在30分钟后醒来,客人也到了,A会叫一个同事比如B接替自己,而A为客人提供全流程服务。

DelayQueue相关源码

  1. 属性定义

    //锁
    private final transient ReentrantLock lock = new ReentrantLock();
    //优先级队列,最小堆
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    //leader线程
    private Thread leader = null;
    //监视器
    private final Condition available = lock.newCondition();
    
  2. offer
    相当于饭店的软件系统,提供预约服务,并监听最早到店时间的变化。

```java
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //当前线程执行q.offer,向最小堆中添加延迟任务
        q.offer(e);
        //如果这个延迟任务e,刚好因为set的延迟时间最短,
        //成为了堆顶元素,也就是即将最先出队的元素
        if (q.peek() == e) {
            //清空leader标识,唤醒一个follower,使其能够成为leader。
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
```
  1. take
    相当于饭店的员工,为客人提供全流程服务。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    //从这里开始才是Leader-Follower模式
                    if (leader != null)
                        //有leader了,所以follwer无限等待
                        available.await();
                    else {
                        //还没有leader
                        Thread thisThread = Thread.currentThread();
                        //当前线程成为leader
                        leader = thisThread;
                        try {
                            //leader只等待一个过期周期就醒来
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                //释放leader身份
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                //没有leader 并且 队列还有元素的话 唤醒一个做leader
                available.signal();
            lock.unlock();
        }
    }
    

相关文章

网友评论

      本文标题:读源码DelayQueue-Leader-Follower模式

      本文链接:https://www.haomeiwen.com/subject/dqufmktx.html