美文网首页
Java中的AQS

Java中的AQS

作者: 星流星 | 来源:发表于2018-09-22 17:25 被阅读0次

AQS

Java并发包的Lock接口的实现基本上都是通过聚合一个同步容器的子类来完成线程控制访问的。

队列同步容器 : AbstractQueuedSynchronizer

队列同步容器(AbstractQueuedSynchronizer),使用来构建其他同步组件的基本框架,它使用了一个int型的成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

private volatile int state;

同步容器的主要的工作方式是继承,子类通过继承同步容器并实现它的抽象方法来同步管理状态,在抽象方法实现的过程中避免不了对同步状态的更改,这时就需要使用同步器提供的三个方法(getState()、setState(int newState)和compareAndSetState(int exect, int update))来进行操作,因为它们是能够保证状态的改变是安全的。子类被推荐定义为自定义同步组件的静态内部类,同步器本身没有实现任何的同步接口,它仅仅定义了若干同步状态的获取和释放方法来供自定义同步组件使用,同步器可以独占的获取同步状态,也可以共享的获取同步状态,这样就方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。
AQS定义资源共享的方式:

  1. Exclusive 独占,只有一个线程能执行。
  2. Share 共享,多个线程可以同事执行,如Semaphore和CountDownLatch

同步器是实现锁和其他任意同步组件的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。

对列同步器的接口与示例

同步器的设计是基于模板方法的,也就是说,使用着需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用着重写的方法。

重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。

  • getState(): 获取当前同步状态。
  • getState(int newState):设置当前同步状态。
  • compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证设置状态的原子性。

同步器可以重写的方法如下:

方法名称 描述
protected boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期。然后在进行CAS设置同步状态
protected boolean tryRelease(int arg) 独占方式释放同步状态,等待获取同步的线程将有机会获取同步状态
protected int tryAcquireShared(int arg) 共享获取同步状态,返回值大于等于0的值,表示获取成功,反之获取失败
protected boolean tryReleaseShared(int arg) 共享释放同步状态
protected boolean isHeldExeclusively() 当前线程是否在独占模式下被线程占用,一般该方法表示是否被当前线程所占用

实现自定义同步组件时,会调用同步器提供的模板方法,这些模板方法如下:

方法名称 描述
void acquire 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步对列等待,该方法将会调用重写的tryAcquire(int arg)方法
void acquireInterruptibly(int arg) 与acquire(int arg)方法相同,但是个该方法可以响应中断,当前线程为获取同步状态而进入同步对列中,如果当前线程被中断,该方法会抛出InterruptedException并返回
boolean tryAcquireNanos(int arg, long nanos) 在acquireInterruptibly(int arg)基础上增加了超时限制当前线程在规定的时间内没有获取同步状态,那么将返回false,如果获取到了返回true
void acquireShared(int arg) 共享的方式获取同步状态,如果当前线程未获取到同步状态,将会进入同步对列等待,与独占的方式获取的主要区别是在同一时刻可以有多个线程获取到同步状态
void acquireSharedInterruptibly(int arg, long nanos) 与acquireShared()方法相同,该方法响应中断
boolean tryAcquireSharedNanos(int arg, int nanos) 在acquireSharedInterruptibly(int arg)基础上增加了超时限制
boolean release(int arg) 独占的方式释放同步状态,该方法会在释放同步状态之后,将同步状态的第一个节点包含的线程唤醒
boolean releaseShared(int arg) 共享的方式释放同步状态
Collection<Thread> getQueuedThreads() 获取等待在同步队列上的线程集合

同步器上提供的模板方法基本上有三类:

  • 独占的方式获取与释放同步状态;
  • 共享的方式获取与方式同步状态;
  • 查询同步队列中的线程等待情况。

以ReentrantLock为例,state初始化为0,表示为锁定状态,A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其它线程再tryAcquire()是就会失败,直到A线程unLock(),state=0(即释放锁),其它线程才有机会获取该锁,当然,释放锁之前,A线程自己是可以重复获取此锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这也就是可重入。

队列同步器的实现分析

同步队列

同步器依赖与一个队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态的信息构造成一个节点(Node)并将其加入同步队列,同时会吧当前线程阻塞,当同步状态释放时,会把首节点中的线程唤醒,使其再次获取同步状态。
[图片上传失败...(image-5eaa12-1537608329671)]

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;
    
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
}

下面的是节点的属性与名称及描述

  • int waitStatus等待的状态,包含如下:
    1. CANCELLED,值为1,由于同步队列中的等待的线程等待超时或者被中断,需要中同步队列中取消等待,节点进入该状态将不会变化。
    2. SIGNAL,值为-1,后继节点的线程处于等待状态,当前节点如果释放了同步状态或者被取消,将会通知该节点,使得后继节点得以运行。
    3. CONDITION,值为-2,节点在等待队列中,节点线程等待在Condition上,当其它线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。
    4. PROPAGATE,值为-3,表示下一次共享同步状态 将会无条件地被传播下去。
    5. INITIAL,值为0,初始状态。
  • Node prev: 前驱节点,当节点加入同步队列时被设置(尾部添加)
  • Node next: 后继节点。
  • Node nextWaiter:等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点公用一个字段。
  • Thread thread:获取同步状态的线程。

Node是过程同步队列的基础,同步器拥有head和tail,获取同步状态失败的线程将会成为节点加入该队列的尾部。这里加入队列的过程必须要保证线程安全,因此同步器是一个基于CAS的设置节点的方法:compareAndSendTail(Node expect, Node update),它需要传递当前线程“认为”的尾节点,只有设置成功,当前节点才正式与尾节点建立关联。

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点在释放同步状态时,会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。

private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功的获取到同步状态,因此设置头结点的方法并不需要使用CAS来保证,它只需要将首节点设置成原首节点的后继节点并断开首节点与next引用即可。

2. 独占式同步状态获取和释放

通过AQS的acquire(int arg)方法可以获取同步状态,该方法不响应中断,也就是说,由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。

这个方法是线程来获取共享资源的入口,如果获取到资源,方法直接返回,来来来否则进入等待队列,直到获取到资源为止。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上面的代码主要完成了同步状态获取、节点构造、加入同步队列以及同步队列中自旋等待的相关工作,其流程如下:

  1. tryAcquire() 首先调用自定义AQS实现的tryAcquire()方法,该方法保证线程安全的获取同步状态,如果获取同步状态成功,则该方法直接返回,否则执行后面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  2. addWaiter() 这是如果没有获取同步状态成功时执行的方法。如果没有通过tryAcquire()方法获取成功,那么就需要构造一个节点并将其保存在同步队列中,addWaiter()方法的作用就是将当前节点构造成一个节点,然后添加在队列的尾部,其中的参数是节点的类型,其中:
    • Node.EXCLUSIVE 表示独占是获取同步状态。
    • Node.SHARED 表示共享式获取同步状态
  3. acquireQueued() 调用这个方法使的该节点以“死循环”的方式获取同步状态,如果获取不到则阻塞节点中的线程,而被阻塞的线程的唤醒主要依靠前去节点出队或阻塞线程被中断来实现。
1. tryAcquire(int arg)方法

这个方法的源码如下:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

正如前面提到的这是一个需要子类实现的方法。这个方法的方法体内只是抛出了一个异常,一般当不支持请求操作时,抛出该异常。

2. addWaiter(Node mode)方法

此方法的作用是将当前的线程构造成一个节点,然后添加进队列中,并返回当前线程所在节点。

private Node addWaiter(Node mode) {
    /**
     * Node有两种模式
     * - EXCLUSIVE 独占
     * - SHARED 共享
     */
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 快速尝试
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果失败则通过enq入队
    enq(node);
    return node;
}

private Node enq(final Node node) {
    // CAS"自旋",直到成功入队
    for (;;) {
        Node t = tail;
        // 队列为空,创建一个空的标志节点作为head节点,并将tail也指向它
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 正常入队
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

这是一个CAS自旋的用法。如果compareAndSetHeadcompareAndSetTail()返回false,那么就说明有其他线程修改了head或tail,然后进入下一个循环继续。

3. acquireQueued(final Node node, int arg)方法

获取资源失败后,获取资源的线程被添加进入队列后,下一步的做法就是进入等待状态,直到其它线程彻底释放资源后唤醒自己。

final boolean acquireQueued(final Node node, int arg) {
    // 这个变量使用来标记是否成功获取到资源,也就是是否成功获取到锁
    boolean failed = true;
    try {
        // 标记等待过程中是否可以被中断
        boolean interrupted = false;
        // 下面的是一个CAS自旋
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果p是队列的首节点并且能够获取到资源
            // 如果当前节点的前驱拥有锁,那么当前节点就有机会获取锁了
            if (p == head && tryAcquire(arg)) {
                // 拿到资源后,将当前节点设置为首节点
                setHead(node);
                // ‘然后将前驱节点的next设置为空,有助于垃圾回收器回收
                p.next = null; // help GC
                // 成功获取
                failed = false;
                // 等待的过程被中断了
                return interrupted;
            }
            // 检查是否可以等待
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 如果等待过程中被中断过,那么就将interrupted标记为true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
/**
 * Node类中的一个方法,获取当前节点的前驱节点
 */
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}
/**
 * 设置首节点为node
 */
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

/**
 * 这个方法主要是检查状态,查看是否可以等待
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 拿到前驱节点的状态
    int ws = pred.waitStatus;
    // 如果前驱节点的状态为SIGNAL,那么就可以进行等待,因为当前驱节点的waitStatus是Node.SIGNAL,那么释放同步状态的时候,会唤醒后继节点
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;

        if (ws > 0) {
        /*
         * 如果前驱节点放弃了对锁的持有(放弃获取同步状态了),那么就一直往前找,直到找到一个没有放弃获取同步状态的节点,然后插在它的后面
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            // pred = pred.prev
            // node.prev = pred
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 如果前驱节点正常,那么就把前驱节点的状态设置成SIGNAL,然后进可以进行等待了
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

下面的是acquire(int arg)方法的大致流程:
[图片上传失败...(image-9af990-1537608329671)]

独占式释放资源

在获取同步状态并执行响应的逻辑后,就需要释放同步状态,使得后续的节点能够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放同步状态后,会唤醒其后继节点,进而使后继节点获取同步状态。源码如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
/**
 * 和tryAcquire()方法一样都是需要子类实现的
 */
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

private void unparkSuccessor(Node node) {
    // node一般为当前线程所在的节点
    int ws = node.waitStatus;
    // 将当前的线程状态设置为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 找到下一个节点
    Node s = node.next;
    // 如果为空或者已经取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        从尾部遍历到当前节点,找到最后一个waitStatus <= 0的
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 然后唤醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

该方法执行时候时,会唤醒后继节点,unparkSuccessor(Node node)方法使用LockSupport来唤醒处于等待状态的线程。中间的过程是寻找一个waitStatus <= 0的节点,当waitStatus <= 0时也就意味着这个节点没有放弃获取同步状态。

3. 共享式获取同步状态和释放

共享式获取资源

共享式和独占式获取最重要的区别是同一个时刻能否有多个线程同时获取到同步状态。例如读写锁,可以有多个线程获取读的状态,但是只能有一个线程获取写的状态。读写锁就用到了这里的“共享式”。

通过调用同步器的acquireShared(int arg)方法可以共享的获取同步状态,源码如下:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

前面说过tryAcquireShared()这个方法需要子类取实现。如果这个方法返回值小于0代表获取失败,0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以获取。如果获取成功直接返回,如果获取失败则进入doAcquireShared(int arg),将获取同步状态失败的线程放在同步队列中。
下面是doAcquireShared(int arg)的源码:

private void doAcquireShared(int arg) {
    // 将当前线程通过共享式构造出一个节点(Node)并加入队列尾部
    final Node node = addWaiter(Node.SHARED);
    // 这是一个是否失败的标志
    boolean failed = true;
    try {
        // 线程是否被中断
        boolean interrupted = false;
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 获取前驱节点是head,这里很有可能是head用完资源后然后将node唤醒
            if (p == head) {
                // 再次尝试获取资源
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 将head指向自己。还有剩余资源时唤醒其他线程
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    // 如果等待过程中被中断,此时将node中的线程中断
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 判断自己能否进入等待状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这个和独占式获取很像,不过是将selfInterrupt()放在了上面的这个方法里。这里值得注意的是资源的数量,不独占式获取资源不同的是,独占式永远只有一个线程获取到同步状态,而共享式可以有多个线程获取同步状态,而且有的线程可能在有多个资源时才能获取到。所以这里就有了一个setHeadAndPropagate(Node, int)方法。

/**
 * node指的是当前线程构成的节点
 * propagate 是指剩余的资源数
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    // 将node设为head
    setHead(node);
    // 如果还有剩余资源的话,唤醒下一个节点。
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

在个方法比前面的setHead()方法多了一点就是如果有资源剩余那么就去唤醒下一个的共享线程。其中的doReleaseShared()是释放共享资源的方法,这个会在下面的释放共享资源中说到。

共享式释放资源

共享式释放资源使用的是releaseShared(int arg)方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 这个方法中会唤醒后继的节点
        doReleaseShared();
        return true;
    }
    return false;
}

和独占式释放资源一样,其中的tryReleaseShared(arg)需要子类去实现,如果释放成功会返回true,下面会执行doReleaseShared()方法,这个方法中会尝试唤醒后继节点中的线程,它的源码如下:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // h == null说明阻塞队列为空
        // h == tail 说明队列中只有一个节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 
            if (ws == Node.SIGNAL) {
                // CAS 比较失败说明线程已经被唤醒了
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 如果CAS比较成功,则使用LockSupport唤醒线程
                unparkSuccessor(h);
            }
            // 如果后继节点暂时不需要唤醒,则把当前节点的状态设置为PROPAGATE,确保可以传递下去
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // head被改变
            break;
    }
}

相关文章

  • AQS:Java 中悲观锁的底层实现机制

    介绍 AQS AQS(AbstractQueuedSynchronizer)是 Java 并发包中,实现各种同步组...

  • J.U.C源码阅读之AQS

    概念理解 1.AQS-- 指AbstractQueuedSynchronizer类。 AQS是java中管理“锁”...

  • 知识梳理目录

    Java基础 Java线程池 AQS之独占锁 AQS之Condition AQS之Condition AQS之同步...

  • ReentrantLock及AQS浅谈

    一、AQS简介 AQS全称AbstractQueuedSynchronizer,是java并发包中的一个类,该类更...

  • java并发包中aqs浅谈

    aqs原理 aqs即AbstractQueuedSynchronizer,是java并发包中的一个抽象类,Reen...

  • 关注的博客

    JDK源码阅读顺序 java并发 java详谈 AQS_1 AQS_2 AQS_3 ConditionOb...

  • AbstractQueuedSynchronizer(AQS)—

    AQS做什么的 AbstractQueuedSynchronizer(AQS)是Java中许多同步类的基类,是一个...

  • Java中的AQS

    AQS Java并发包的Lock接口的实现基本上都是通过聚合一个同步容器的子类来完成线程控制访问的。 队列同步容器...

  • Java中的AQS

    AQS是什么? AQS全称是AbstractQueuedSynchronizer。 AQS是实现显示锁,Count...

  • Java中的AQS

    1、学习AQS的必要性 队列同步器AbstractQueuedSynchronizer(以下简称同步器或AQS),...

网友评论

      本文标题:Java中的AQS

      本文链接:https://www.haomeiwen.com/subject/jtsxoftx.html