美文网首页
多线程-AQS-ReentrantReadWriteLock

多线程-AQS-ReentrantReadWriteLock

作者: 麦大大吃不胖 | 来源:发表于2021-01-28 16:20 被阅读0次

AQS

by shihang.mai

AQS是实现ReentrantReadWriteLock的核心

ReentrantReadWriteLock

ReentrantReadWriteLock锁支持读读并发,但读写、写写都是互斥的

在我们研究ReentrantLock时,就知道AQS是用voliate int state去表示锁状态,int是4字节,32位的.在ReentrantReadWriteLock中需要用这个state表示读和写锁,就用高16位表示读锁,低16位表示写锁。因为无论读锁还是写锁都用了2字节表示,所以可重入次数为2^16-1

state

先看如下计算

0000000000000000   0000000000000000
----------------------------------------------
写入读锁,即在原来的基础上+65535
0000000000000001   0000000000000000
读取读锁数量,直接将1右边移16位,1>>>16
0000000000000000   0000000000000001
----------------------------------------------
写入写锁,即在原来基础上+1
0000000000000000   0000000000000001
读取写锁数量,原来的值与(高16位都为0,低16位都为1的值)做&运算
原值:0000000000000000    0000000000000001
&
辅值:0000000000000000    1111111111111111
结果:0000000000000000    0000000000000001

先来看一下构造方法

private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
public ReentrantReadWriteLock() {
        this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

默认是非公平锁,并且new出了ReadLock和WriteLock,所以我们使用时如下就能获取到读锁和写锁

ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

写锁lock

调用writeLock.lock()时

public static class WriteLock implements Lock, java.io.Serializable {
   
        public void lock() {
            sync.acquire(1);
        }
}

继续调用,走到了AQS,这个很熟悉,在ReentranLock中也看到过

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

这里先执行tryAcquire()==false,那么才会继续走acquireQueued(),tryAcquire是ReentrantReadWriteLock 内部类Sync 中实现方法

abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }
}
  1. 先获取当前线程
  2. 获取volatile int state的值,然后函数exclusiveCount()根据state值获取到写锁的数量
  3. 注意此时调用的是写锁的lock,当有锁,即进入条件if (c != 0)
  • 当w==0,即有线程获取了读锁,那么直接返回false,无论当前线程.说明了ReentrantReadWriteLock不支持读升级写冲入
  • 当w!=0,即有线程获取了写锁,当前线程不是持有锁的线程,那么返回false,即写写互斥
  1. 当然加写锁前需要判断是否超过65535重入,如果第3的条件都不成立,即获取锁成功,设置state+1
  2. 当state==0,那么writerShouldBlock()==false(非公平锁),再CAS设置state+1,并设置占有锁线程为当前线程,然后true

当tryAcquire()返回false时,就将执行acquireQueued()这里不多说,在ReentranLock中已详细说明,下面为概括图

ypVnUI.png

读锁lock

调用readLock.lock()时

public static class ReadLock implements Lock, java.io.Serializable {
public void lock() {
            sync.acquireShared(1);
        }
}

继续调用,走到了AQS的acquireShared(),意为获取共享锁

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

这里先调用tryAcquireShared(),如果返回值<0,那么执行doAcquireShared()
先来看看tryAcquireShared(),tryAcquire是ReentrantReadWriteLock 内部类Sync 中实现方法

abstract static class Sync extends AbstractQueuedSynchronizer {
protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }
}
  1. 获取当前线程
  2. 获取state值
  3. 如果有线程持有写锁,并且当前读线程不是持有写锁的线程,那么返回-1(读写互斥,并支持写降级读重入)
  4. 获取读锁的数量
  5. readerShouldBlock()
  • 如果队列为空或者前面没有写锁在排队时,则返回false
  • 如果头结点不为空,下一个结点也不为空并且是写锁时,返回true,表示要规规矩矩的排队
  1. 当readerShouldBlock()返回false时,继续判断读锁数量是否少于最大重入数65535,再CAS设置读锁+1,设置成功后
  • 当还没设置读锁+1时,读锁为0,那么久不熬时当前线程就是第一个上读锁的线程
  • 当当前线程==持有读锁线程,那么就是锁重入
  • 当持有读锁线程!=当前读线程,这里涉及一个不同线程重入,用HoldCounter类,这个类只有线程id和重入次数两个字段,当线程重入的时候就会初始化这个类并保存在ThreadLocalHoldCounter类中,这个类就是继承ThreadLocl的,用来初始化HoldCounter对象并保存
  1. 读锁如果成功获取,那么返回1
  2. 当readerShouldBlock()返回true时,直接调用fullTryAcquireShared()
final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

这个方法中代码和tryAcquireShared基本上一致,只是采用了自旋的方式,处理初次加锁中的漏网之鱼

如果tryAcquireShared返回-1,即加锁失败,那么走AQS中的doAcquireShared()

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  1. 将当前线程封装成share Node放入CLH队列
  2. for无条件循环,判断当前节点的前继节点是否是头节点,如果是再一次尝试获取锁,获取锁成功,那么继续唤醒后面的所有share Node,到最后一个share Node节点(后面并没其他节点),将waitState=-3
  3. 前继节点不是头节点或者获取锁失败,那么就将前继节点waitState=-1,并挂起当前线程

读锁.unLock()

public static class ReadLock implements Lock, java.io.Serializable {
        public void unlock() {
            sync.releaseShared(1);
        }
}

调到AQS

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

没什么好说的,就一直将读锁释放,全部释放后,唤醒写锁线程

写锁.unLock()

public static class WriteLock implements Lock, java.io.Serializable {
  public void unlock() {
            sync.release(1);
        }
}

调到AQS

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
}
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }
}

也没什么好说的,解锁将state变为0,重新唤醒CLH队列中的Node

相关文章

网友评论

      本文标题:多线程-AQS-ReentrantReadWriteLock

      本文链接:https://www.haomeiwen.com/subject/nybttltx.html