概念
AQS(AbstractQueuedSynchronizer)就是一个抽象的队列同步器,它是用来构建锁或者其他同步组件的基础框架,它维护了一个volatile int state来表示同步状态,通过内置的FIFO队列来完成线程等待排队。仅仅是定义了若干同步状态获取和释放,线程排队,等待与唤醒等底层操作的方法来供自定义同步组件使用或者重写。
AQS的设计是基于模板方法设计的,可供实现的模板方法基本上可以分为三类:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。
子类通常被推荐定义为自定义同步组件的静态内部类,子类通过继承AQS并实现它的抽象模板方法来管理同步状态,而这些模板方法内部就是真正管理同步状态的地方(主要有tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared等)。
ReentrantLock、ReentrantReadWriteLock、Semaphore等同步组件都是基于AQS构建
AQS中的同步状态
AQS中state状态的变更是基于CAS实现的,主要有三种方法:
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS定义两种资源共享方式:Exclusive(独占,同一时刻只有一个线程能执行,如ReentrantLock)和Share(共享式,多个线程可同时进行访问,如Semaphore/CountDownLatch)。
AQS的这些可重写的方法提供给我们去实现自定义同步组件:首先,我们需要去继承AbstractQueuedSynchronizer这个类(推荐在自定义同步组件中 静态内部类继承AQS),然后我们根据我们的需求去重写相应的方法,比如要实现一个独占锁,那就去重写tryAcquire,tryRelease方法,要实现共享锁,就去重写tryAcquireShared,tryReleaseShared;最后,在我们的组件中调用AQS中的模板方法就可以了,而这些模板方法是会调用到我们之前重写的那些方法的.
对于使用者来讲,我们无需关心获取资源失败,线程排队,线程阻塞/唤醒等一系列复杂的实现,这些都在AQS中为我们处理好了。我们只需要负责好自己的那个环节就好,也就是获取/释放共享资源state.
CLH同步队列
CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next)
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
源码实现
独占式acquire
lock方法一般会直接代理到acquire上
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 首先,调用使用者重写的tryAcquire方法,若返回true,意味着获取同步状态成功,后面的逻辑不再执行;若返回false,也就是获取同步状态失败,进入第2步;
- 此时,获取同步状态失败,构造独占式同步结点,通过addWatiter将此结点添加到同步队列的尾部(此时可能会有多个线程结点试图加入同步队列尾部,需要以线程安全的方 式添加);
- 该结点以在队列中尝试获取同步状态,若获取不到,则阻塞结点线程,直到被前驱结点唤醒或者被中断。
释放同步状态–release()
public final boolean release(int arg) {
if (tryRelease(arg)) {//调用使用者重写的tryRelease方法,若成功,唤醒其后继结点,失败则返回false
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒后继结点
return true;
}
return false;
}
共享式acquireShared
- 首先调用自定义方法tryAcquireShared尝试获取同步状态,至少调用一次tryAcquireShared方法,如果返回值>=0,则获取成功,return;否则执行步骤2),
- 当获取失败时,为当前线程以共享方式创建Node并插入同步队列,然后被挂起等待唤醒
- 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//返回值小于0,获取同步状态失败,排队去;获取同步状态成功,直接返回去干自己的事儿。
doAcquireShared(arg);
}
释放同步状态–releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();//释放同步状态
return true;
}
return false;
}
与AQS的独占功能不同,当锁被头节点获取后,独占功能是只有头节点获取锁,其余节点的线程继续沉睡,等待锁被释放后,才会唤醒下一个节点的线程,而共享功能是只要头节点获取锁成功,就在唤醒自身节点对应的线程的同时,继续唤醒AQS队列中的下一个节点的线程,每个节点在唤醒自身的同时还会唤醒下一个节点对应的线程,以实现共享状态的“向后传播”,从而实现共享功能。
对于独占式同步组件来讲,同一时刻只有一个线程能获取到同步状态,其他线程都得去排队等待,其子类重写的尝试获取同步状态的方法tryAcquire返回值为boolean,这很容易理解;对于共享式同步组件来讲,同一时刻可以有多个线程同时获取到同步状态,这也是“共享”的意义所在。其待重写的尝试获取同步状态的方法tryAcquireShared返回值为int。
总结
AQS(它仅仅是一个工具类)使用标记位+队列的方式,记录获取锁、竞争锁、释放锁等一些类锁操作。但更准确的说,AQS并不关心什么是锁,对于AQS来说它只是实现了一系列的用于判断资源是否可以访问的API,并且封装了在访问资源受限时,将请求访问的线程加入队列、挂起、唤醒等操作。