记录自己的学习过程. 共勉.
本次分析的AQS源码基于jdk8的.
一、类继承关系图
老样子,分析AQS之前我们先看看AQS的类继承关系图。Serializable接口就不扯了,我们主要关注AbstractOwnableSynchronizer这个父类。

1. AbstractOwnableSynchronizer分析
/**
* A synchronizer that may be exclusively owned by a thread. This
* class provides a basis for creating locks and related synchronizers
* that may entail a notion of ownership. The
* {@code AbstractOwnableSynchronizer} class itself does not manage or
* use this information. However, subclasses and tools may use
* appropriately maintained values to help control and monitor access
* and provide diagnostics.
*
* @since 1.6
* @author Doug Lea
*/
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/** Use serial ID even though all fields transient. */
private static final long serialVersionUID = 3737899427754241961L;
/**
* Empty constructor for use by subclasses.
*/
protected AbstractOwnableSynchronizer() { }
/**
* The current owner of exclusive mode synchronization.
* 独占模式下,当前持有锁的线程。
*/
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
summary: AbstractOwnerSynchronizer为独占模式的同步器提供了一个抽象类。
二、AQS是什么
AQS全称是AbstractQueuedSynchronizer(抽象队列同步器),它是一个用来构建锁或者其他同步组件基础框架,底层数据结构依赖于一个变种的CLH队列(先进先出的等待队列)和一个保证原子操作的state成员变量(int)。
关于state举一个简单的例子,ReentrantLock的公平锁FairSync,当占用锁时把state设置成1,表示当前的锁已被占用,而释放锁时,又会把state设置成0,表示锁可用;
关于FIFO Queue我们也举一个case,当有N个线程同时竞争一个X锁时,只有一个线程能够竞争到,其他的线程都需要等待,AQS会管理这一些等到线程,将每一个等待线程保存在一个Node中,使用FIFO Queue管理Node,当锁被释放之后,按照FIFO顺序依次唤醒线程,我们接下来就先来分析一下这个Node。
三、AQS之Node
先看源码。
static final class Node {
/** 标识NODE是共享模式 */
static final Node SHARED = new Node();
/** 标识NODE是独占模式*/
static final Node EXCLUSIVE = null;
/** Node节点状态枚举 */
/** 当前节点被取消 */
static final int CANCELLED = 1;
/** 当前节点已经完成,需要唤醒下一个节点*/
static final int SIGNAL = -1;
/** 这个状态在Condition模式下使用,这里暂时不谈 */
static final int CONDITION = -2;
/** 这个状态在共享模式下使用,这里暂时不谈 */
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* Node Status有5种状态值,SIGNAL,CANCELLED,
* CONDITION,PROPAGATE,0[默认值]
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED.
*/
Node nextWaiter;
/**
* Returns previous node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// ... 省略部分方法
}
下面是等待线程Node组成的队列,有一个Head头节点和一个Tail尾节点,新的节点从尾节点添加,并成为一个新的尾节点,头节点默认是一个空节点,Head->next是当前获得锁的线程。当Head->next 释放锁时,这个节点的WaitStatus会变成SIGNAL,并将成为新的头节点,然后唤醒next节点争夺锁。

四、AQS之独占锁争夺分析
1. 请求锁资源
/*Acquires in exclusive mode*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法的逻辑比较简单,首先AQS尝试争夺一次锁,如果请求成功了表示加锁成功,如果争夺锁失败则加入等待队列中,按顺序获取锁资源。其中的tryAcquire(arg)方式就是AQS暴露给用户自己实现的一个争夺锁的抽象方法,可以参考java.util.concurrent.locks.ReentrantLock中的 公平锁&非公平锁。
2. 加入等待队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
我们接下来分析这段加入等待队列的代码,首先我们来看看 addWaiter(Node.EXCLUSIVE)这个方法做什么。
private Node addWaiter(Node mode) {
// 生成Node,以Node.EXCLUSIVE独占模式,不讨论其他case
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 利用cas原子性,尝试加入队列
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter()首先 利用当前线程及当前线程争夺锁的模式生成一个等待节点,然后利用cas加入队列。
3. 等待锁资源
当当前线程加入等待队列以后,我们来看看这个线程是怎么获取到锁资源的。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 看看前置节点是否已经释放锁,若是,尝试争夺锁(与此时尝试获取锁的线程,而非在等待队列中的Node)
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 如果当前节点争夺锁成功,将当前设置为HEAD节点,并GC上一个HEAD节点资源。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 前置节点未完成,尝试阻塞当前线程,等待唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个方法主要是看前置节点是否已经释放锁,当前节点被唤醒,尝试获取锁。若获取失败或前置节点未释放锁,则重新阻塞自己。我们关注一下shouldParkAfterFailedAcquire(p, node)这个方法。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*如果前置节点状态为SIGNAL,表示其未释放锁资源,当前线程可以阻塞,等待被唤醒。*/
return true;
if (ws > 0) {
/*前置节点已经被取消了,往前遍历直到一个非取消状态的节点。HEAD节点不会为取消状态。*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*如果前置节点状态为0或PROPAGATE,尝试将其设置为SINGAL状态*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
逻辑大概是这样,首先找到一个非CANCELLED状态的前置节点,如果该节点状态为SIGNAL,则返回true并调用parkAndCheckInterrupt()阻塞当前线程,如果不是的话,尝试将节点状态设置为SIGNAL直接返回上一层循环,尝试重新获取锁。
4. 锁释放
/*独占模式释放锁*/
public final boolean release(int arg) {
// 首先尝试释放锁,还原state,
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// Node节点默认为0,如果节点状态不是0,说明有后置节点存在,需要处理。
unparkSuccessor(h);
return true;
}
return false;
}
释放锁资源时,不仅仅是归还锁资源,而且要唤醒下一个等待的线程,来看看unparkSuccessor(h)这个方法。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/* 从队列按head->tail的方向找第一个有效的节点,并唤醒它。*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
网友评论