1、AQS简介
- AQS,即AbstractQueuedSynchronizer, 队列同步器,它是Java并发用来构建锁和其他同步组件的基础框架。来看下同步组件对AQS的使用:

- AQS是一个抽象类,主是是以继承的方式使用。AQS本身是没有实现任何同步接口的,它仅仅只是定义了同步状态的获取和释放的方法来供自定义的同步组件的使用。从图中可以看出,在java的同步组件中,AQS的子类(Sync等)一般是同步组件的静态内部类,即通过组合的方式使用。
2、AQS原理简介
-
AQS的实现依赖内部的同步队列(FIFO双向队列),如果当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node,将其加入同步队列的尾部,同时阻塞当前线程,当同步状态释放时,唤醒队列的头节点。
-
上面说的有点抽象,来具体看下,首先来看AQS最主要的三个成员变量:
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
- 上面提到的同步状态就是这个int型的变量state. head和tail分别是同步队列的头结点和尾结点。假设state=0表示同步状态可用(如果用于锁,则表示锁可用),state=1表示同步状态已被占用(锁被占用)。

2、AQS同步组件
- CountDownLatch
- Semaphore
- CyclicBarrier
- ReentrantLock
- Condition
- FutureTask
3、CountDownLatch
- CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
-
与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
-
其他N 个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。
例子一:
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
4、Semaphore
-
Semaphore也叫信号量,在JDK1.5被引入,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
-
Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。
-
访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
-
访问资源后,使用release释放许可。
-
Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。
-
获取和释放一个许可
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); // 获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
- 一次获取和释放三个许可
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(3); // 获取多个许可
test(threadNum);
semaphore.release(3); // 释放多个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
- 尝试获取一个许可
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire()) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
- 带有超时时间的许可
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
5、CyclicBarrier
-
CyclicBarrier也是一个同步辅助类 , 它允许一组线程相互等待 , 直到到达某个公共的屏障点 , 通过它可以完成多个线程之间相互等待 ,只有当每个线程都准备好之后, 才能各自继续往下执行后续的操作, 和 CountDownLatch相似的地方就是, 它也是通过计数器来实现的. 当某个线程调用了 await()方法之后, 该线程就进入了等待状态 . 而且计数器就进行 +1 操作 , 当计数器的值达到了我们设置的初始值的时候 , 之前调用了await() 方法而进入等待状态的线程会被唤醒继续执行后续的操作. 因为 CyclicBarrier释放线程之后可以重用, 所以又称之为循环屏障 . CyclicBarrier 使用场景和 CountDownLatch 很相似 , 可以用于多线程计算数据, 最后合并计算结果的应用场景 .
-
CyclicBarrier 与 CountDownLatch 区别
-
CountDownLatch的计数器只能使用一次 , 而 CyclicBarrier 的计数器可以使用 reset重置 循环使用
-
CountDownLatch 主要事项 1 个 或者 n 个线程需要等待其它线程完成某项操作之后才能继续往下执行 , 其描述的是 1 个 或者 n 个线程与其它线程的关系 ; CyclicBarrier 主要是实现了 1 个或者多个线程之间相互等待,直到所有的线程都满足条件之后, 才执行后续的操作 , 其描述的是内部各个线程相互等待的关系 .
-
CyclicBarrier 假如有 5 个线程都调用了 await() 方法 , 那这个 5 个线程就等着 , 当这 5 个线程都准备好之后, 它们有各自往下继续执行 , 如果这 5 个线程在后续有一个计算发生错误了 , 这里可以重置计数器 , 并让这 5 个线程再执行一遍 .
-
例子一:
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
例子二: 带有等待时间
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
例子三: 到达屏障,优先执行回调方法
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
6、ReentrantLock与锁
- 首先要知道 Java 中的锁主要分两类锁 , 一种是 synchronize锁 , 另外一种就是 J.U.C中 提供的锁 , J.U.C里核心的锁是 ReentrantLock。
- ReentrantLock (可重入锁)与 synchronize 的区别
- 可重入性
- ReentrantLock 字面意思就是 再进入 锁 , 所以称之为可重入锁 , synchronize 使用的锁也是可重入的. 它俩都是同一个线程进入一次锁的计数器就自增 1,所以要等到锁的计数器下降为 0 时才释放锁 .
- 锁的实现
- synchronize 的锁是基于 JVM 来实现的 , ReentrantLock 是jdk 实现的. 通俗的来讲就是 操作系统来控制实现和用户编码实现的区别 .
- 性能区别
- 在 synchronize 关键字优化之前, 其性能比 ReentrantLock 差 , 但是优化过后 , 在两者都可以使用的情况下, 建议使用 synchronize, 主要是其写法比较容易
- 功能
- synchronize 写起来更简洁 , 它是由编译器来实现锁的加锁和释放 , 而ReentrantLock 需要我们手工申明加锁和释放锁 , 为了避免手工忘记释放锁而造成死锁 , 所以建议在final里申明和释放锁.
- ReentrantLock 独有的功能
- ReentrantLock可指定是公平锁还是非公平锁 , 公平锁就是先等待的线程先获得锁.有先来后到之说. 而synchronize是非公平锁。
- 提供了一个 Condition 类 , 可以分组唤醒需要唤醒的线程 , 不像 synchronize要么随机唤醒一个线程 , 要么唤醒全部线程。
- 提供能够中断等待锁的线程的机制。
- 可重入性
/**
* 实例说明 模拟并发 发送5000 个请求 , 每次最多200 个请求 ,
* 每次计数自增1 , 最后结果应该是5000
*/
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static int count = 0;
private final static Lock lock = new ReentrantLock();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
7、ReentrantReadWriteLock
-
ReentrantReadWriteLock是Lock的另一种实现方式,我们已经知道了ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时ReentrantReadWriteLock能够提供比排他锁更好的并发性和吞吐量。
-
读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有 ReadWriteLock实现都必须保证 writeLock操作的内存同步效果也要保持与相关 readLock的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。
-
ReentrantReadWriteLock支持以下功能:
-
1、支持公平和非公平的获取锁的方式;
-
2、支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;
-
3、还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;
-
4、读取锁和写入锁都支持锁获取期间的中断;
-
5、Condition支持。仅写入锁提供了一个 Conditon 实现;读取锁不支持 Conditon ,readLock().newCondition() 会抛出 UnsupportedOperationException。
-
private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
readLock.unlock();
}
}
class Data {
}

微信扫码关注java技术栈,每日更新面试题目和答案,并获取Java面试题和架构师相关题目和视频。
网友评论