手把手教你阅读AQS源码(一)
最近准备面试大厂,上天(高并发,分布式)入地(JVM)的技能是必须的,今天抽一下午时间把JUC包下非常基础非常核心非常重要的一个内容AQS做一个总结。
(一)什么是AQS
AQS是AbstractQueuedSynchronizer的缩写,是JUC包下的一个同步器框架,它对于原子性同步状态的管理,线程的挂起与唤醒,以及多线程竞争锁的排队等提供了一系列通用的机制。让我们自己可以通过AQS很容易的实现自己的一把定制化锁,包括JUC包下的同步工具CountdownLatch、ReentrantLock、Semaphore等都是通过AQS来进行管理内部的同步状态。具体的介绍可以参考AQS的作者Doug Lea的一篇论文---------(传送门)--------->The java.util.concurrent Synchronizer Framework
(二)AQS框架怎么用?
这篇文章笔者会以ReentrantLock为切入点加以代码块和描述的方式对AQS的源码做一个深入浅出的分析,在这之前我们先来看一下AQS源码中作者写的相关注释。
Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class.
To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying following methods, as applicable, by inspecting and/or modifying #setState} and/or {@link #compareAndSetState}:
- {@link #tryRelease}
- {@link #tryAcquire}
- {@link #tryAcquireShared}
- {@link #tryReleaseShared}
- {@link #isHeldExclusively}
以笔者捉急的英语水平,大概意思是要想使用AQS进行同步状态的管理,需要在子类中定义一个内部类继承AQS,然后重写相关的方法,并不是说这5个方法都要重写,如果要自己实现一个独占锁,那么只需要重写tryAcquire,tryRelease,isHeldExclusively这三个即可,例如JUC包下的ReentrantLock,如果要实现一个既要独占又要共享,那么就要重写这5个方法,例如ReentrantReadWriteLock,本篇文章笔者会以ReentrantLock为例子,在源码级别分析ReentantLock的上锁流程。话不多说开搞!【长文警告】
①先搞一个ReentantLock的调试Demo
package com.zs.juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo {
public static void main(String[] args) {
final ReentrantLock lock = new ReentrantLock();
try {
lock.lock();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
非常简单的一个Demo,我们以睡眠的方式模拟业务逻辑,那么在分析ReentrantLock上锁流程之前,我们先对ReentrantLock的内部架构做一个了解。
②ReentrantLock的内部架构
当我们new这个ReentrantLock的时候,内部做了什么?
构造方法中new了一个NonfairSync并且赋值给了sync,拿脚一想sync定是ReentrantLock的一个成员变量,跟上去看下这个sync
果然sync是ReentrantLock的一个内部抽象类,并且继承自AQS。这不就是AQS作者推荐使用的方法么,哈哈对上了。但是这里new的是NonfairSync,我们再去看下这个
NonfairSync也是ReentrantLock的内部类,继承自Sync,那既然有Nonfair,肯定还有fair嘛
看到这里大概能总结出ReentrantLock的内部架构,大概张这个样子:
小结:ReentrantLock默认实现了两种独占锁,公平锁(FairSync)和非公平锁(NonfairSync),默认是非公平的,公平锁可以通过new ReentrantLock(true)来创建。
③AQS的内部架构
AQS中维护了几个非常重要的属性:
一个被volatile修饰的int类型的整形state,初始值是0
一个队列节点ADT是通过内部类Node实现
描述这个队列的头结点Node head
描述这个队列的尾节点Node tail
先来看下Node这个内部类:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
从Node类中的prev和next可以看出AQS内部维护的等待队列其实是一个双向链表,当中有维护一个Thread,其实就可以姑且认为一个Node里面封装了一个线程,waitStatus表示了当前Node的等待状态,默认是0。当这个队列默认是懒加载的,当中有线程在等待的时候,队列大概张这个样子:
好了至此我么着手分析非公平锁的上锁过程:
④非公平多锁的上锁流程
场景:三个线程t1,t2,t3共同抢占这把锁。
当我们调用lock.lock()的时候,跟进源码进到了ReentrantLock的lock():
继续跟进走到了Sync的抽象方法lock():
这里走的是NonfairSync的lock()如下:
非公平锁上来二话不说直接CAS尝试第一次加锁,那么对于这三个线程t1,t2,t3注定只有一个成功另外两个失败,我们假设t1成功了,t2,t3失败。
那么t1除了在CAS的过程中把AQS的state从0变成1之外,还把自己的线程设置为持有锁的线程,这是if的逻辑。
那么对于t2和t3线程,第一次竞争锁失败后,会走else的逻辑再一次尝试获取锁:
这里的逻辑一共分为三个大的方法:
tryAcquire(),注意这个方法取了反
addWaiter(Node.EXCLUSIVE),
acquireQueued();
我们先看第一个方法tryAcquire(arg):
跟进去发现这里是AQS提供的模板方法,让子类去实现,我们跟进NonefairSync看看:
调用了父类Sync的nonfairTryAcquire(acquires):
t2和t3第一次和t1没有竞争到锁,并不甘心,这里还会在尝试获取一次锁,对于线程t2来讲,先获取到当前锁的状态state。
如果state == 0 说明之前拿到锁的线程t1已经释放了锁资源,那么我t2还是CAS把0变成1,在把我t2设置为持有锁的线程,返回true,那么在方法的调用处取了反,&&之后的代码(入队列)不会执行,t2的这一次尝试获取锁成功了。
当然如果state == 0 不成立,说明此时t1还没有释放锁,会继续判断当前线程是不是持有锁的线程?如果是t1来再次获取锁,那么会给state目前的值+1,再把state重新设置回去,返回true,说明这次是t1的重入。也获取锁成功了
当然如果此时确实是我t2来尝试获取锁,t1此时还没有释放锁,那么我这次尝试又失败了返回false,t3和t2在这里是同样的逻辑。那么返回false,方法调用处取反,&&之后的代码就要执行啦(t2和t3要去队列排队啦)。
再看第二个方法addWaiter(Node.EXCLUSIVE);
目前的场景是这样:t2,t3线程第二次尝试获取锁失败了,都要执行这个方法
看方法名字大概才出来这个方法是线程入队列的方法,具体看下逻辑:
方法的参数是Node类中的一个常量Node.EXCLUSIVE = null,如下:
Node node = new Node(Thread.currentThread(), mode);
t2这个时候先创建一个Node节点node,内部封装了自己的线程。
Node pred = tail;
Node pred表示队列的尾部
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
这里的逻辑是快速入队列:
如果pred(队列的尾部)不为null,说明队列已经被其他线程初始化了,但是我们想一下这个时候这个队列还没有被初始化,所以t2并不会走这里的逻辑,而是走enq()去初始化队列。
enq(node);
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这个方法是自选入队和初始化队列的逻辑:
上来一个死循环for(;
先拿到当前AQS队列的尾节点tail赋值给Node t 此时队列还没有初始化,所以t == null
if(t == null) {
if(compairAndSetHead(new Node()))
tail = head
}
条件成立t2线程会以CAS的方式创建出一个空的节点赋值给AQS队列的head,再把tail节点也指向这个新创建的空节点。完成队列初始化,第一次循环结束。
那么第二次循环t == null 条件肯定不成立走的是else的逻辑
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
t2先把自己的prev指针指向队列尾部,再以CSA的方式把AQS的tail指针指向自己,如果CAS成功,再把队列原尾节点的next指针指向自己,维护好链表。这里为什么会用CAS去设置尾指针呢?如果这个时候t3也要来排队,发现队列也已经初始化了,那么原队列到底是把t2的节点追加到尾部还是把t3的节点追加到尾部呢,这里就CAS谁成功了,谁追加。失败了没关系啊,死循环,你下一次在尝试呗。确保这个队列尾部不能链多个节点罢了。
总结一下这个方法,说白了就是第一个来排队的线程,需要初始化队列,也就是创建一个空的头结点,然后走enq()自旋入队,其他线程先尝试快速入队,失败后自旋入队。最终返回当前排队的节点。
那么现在这些线程已经在队列里面了,然后这些线程在队列要干嘛?当然是park了呀,接下里就是最后一个方法的作用了,我们看下:
acquireQueued(Node node, arg);
这里要提到Node类中的一个属性叫waitStatus,节点的等待状态。我的理解是这样的,ReentrantLock释放锁的时候,唤醒的是下一个节点,那么我当前节点我要保证在我park之前我的前驱节点是能够正常唤醒我的状态,你不能是被取消了或者其他乱七八糟的状态,要不然我岂不是脸一黑醒不来了。
来看方法逻辑:
场景: t2是初始化队列的线程,队列里面现在有头节点—>t2节点
还是上来一个死循环,先拿到t2的前驱节点,如果这个前驱节点数头结点的话,这个线程是有一个机会再去尝试获取锁,对于t2来讲,他的前驱节点确实是头节点,如果t2获取到锁了,t2就不需要park了呀,它会把队列的头结点设置为自己,之前的头结点舍弃。那如果这次t2仍然没有获取到锁,就会走shouldParkAfterFailedAcquire()看自己能不能安全的park。如果当前节点的前驱节点不是头结点,也就是说,队列中已经有人排队了,那你就被去试了,直接去走shouldParkAfterFailedAcquire()看自己能不能安全park完事了。
再看下最后这个方法shouldParkAfterFailedAcquire():
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
pred是当前节点的前驱节点,node是当前节点
这个方法唯一返回true的条件是前驱节点的waitStatus是-1,也就是说只有当前节点的前驱节点的等待状态是-1的时候我才能安全的park。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
如果我的前驱节点的状态> 0 我会一直往前驱找,直到前驱节点的状态 <=0 我把我自己挂在他后面,返回false,等待下一次循环
else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
否则我会把前驱节点的状态设置为-1.,返回false,等待下一次循环,直到返回true,才能去park
最后看一下队列中的线程是如何park的parkAndCheckInterrup()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
调用LockSupport.park(),挂起当前线程。
好了,这篇文章我们跟着源码看了一下ReentrantLock的非公平锁是如何上锁的,有了这个基础下次我会继续更新公平锁的上锁流程以及释放锁的流程。