AQS 是AbstractQueuedSynchronizer
类的简写,这个是锁的一个设计模式,在 Java 中很多锁都会用到 AQS,如常用的显示锁ReentrantLock
、ReentrantReadWriteLock
等内部的锁都是继承 AQS。AQS 的基本的设计模式是模板方法模式,具体锁的获取和释放实现逻辑由类自身来实现,这些方法的组合,以及线程队列获取锁的机制是由 AQS 自身来完成。
1. AQS 介绍
1.1 AQS 设计模式
AQS 的基本设计模式是模板方法模式,听起来有点抽象,这里就不直接看源码,先根据这个模式写个 Demo,让我们理解一下。然后根据对模式的理解再去看源码,一切都很清晰。
抽象父类
public abstract class AbstractSendMsgActor {
abstract void setReceiver();
abstract void setSender();
abstract void setContent();
public void sendSms() {
System.out.println("==>发送信息!");
}
// 完成发送信息整个执行链的调用
protected void sendMsg() {
setReceiver();
setContent();
setSender();
sendSms();
}
}
继承子类
public class SendSmsActor extends AbstractSendMsgActor {
@Override
void setReceiver() {
System.out.println("==>设置接受人!");
}
@Override
void setSender() {
System.out.println("==>设置发送人!");
}
@Override
void setContent() {
System.out.println("==>设置发送内容!");
}
public static void main(String[] args) {
AbstractSendMsgActor msgActor = new SendSmsActor();//创建对应的子类对象
msgActor.sendMsg();//调用模板方法
}
}
/*
执行结果:
==>设置接受人!
==>设置发送内容!
==>设置发送人!
==>发送信息!
*/
这里就很明显了,在抽象父类中定义需要子类实现的接口,然后提供一个模板方法,模板方法将执行的过程需要调用的方法串起来。具体模板方法内执行链方法在子类中如何实现,父类模板方法是不关心的。到这里就可以理解 AQS 模板方法设计模式。但是具体 AQS 内是怎么设计的呢。后续的篇幅可以看源码来理解。
2. 独占锁和共享锁
独占锁:当前如果有一个线程在竞争中获取到了锁,那么共用这把锁的其他所有线程会进入等待队列,必须等锁释放后并获取到锁才能执行。
共享锁:多个线程共享一把锁,如常用的读锁,就是一个共享锁,一个线程获取锁后,共用这把共享锁的线程可以获取并执行逻辑。
在 AQS 下,共享锁和独占锁都要重写那些方法。看下面表格:
锁类型 | 重写方法 |
---|---|
独占锁 | acquire 、tryAcquire :锁的获取tryAcquireNanos :锁的获取,可以设置超时时间release 、tryRelease :锁的释放isHeldExclusively :判断同步器是否处于独占模式 |
共享锁 | acquireShared 、tryAcquireShared :锁的获取tryAcquireSharedNanos :锁的获取,可以设置超时时间releaseShared 、tryReleaseShared :锁的释放 |
通过上面 AQS 模板方法设计模式的理解,然后再看这个需要重写的方法就很简单了。
2.1 独占锁源码
独占锁这里从 acquire
开始看,这个是模板方法的入口。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
selfInterrupt();
}
在 if 判断中有三个方法tryAcquire()
、addWaiter()
、acquireQueued()
。首先tryAcquire()
尝试获取锁,如果获取到了,下面的逻辑都不会走了,因为这里 if
里面的连接符是&&
,如果尝试连接没有成功会往下走。进入addWaiter()
方法。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { //原子方式尝试加入队列尾部
pred.next = node;
return node;
}
}
enq(node);
return node;
}
将当前线程封装到一个 Node 节点对象中,然后根据不同的情况将 Node 节点加入到等待队列中,加入后会返回当前线程对应的 Node 节点对象。这里面的代码其实并不难。主要是要知道 AQS 里面有一个队列,这个队列就是排队等待获取锁的队列。这个方法里面还涉及到一个enq()
方法,源码如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//原子方式尝试将节点加入到头部
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 原子方式尝试将节点加入到队列的尾部
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这段源码和上一段源码都加上了中文的注释,这个原子方式其实就是线程安全才做,这里为什么这样,可以看一下原子操作相关的解释。上面的逻辑执行结束后,节点加入成功,需要执行acquireQueued()
。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 自旋(死循环)
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //代码块一
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
上面的“代码块一”中可以看出来,p 是当前节点的前一个节点,判断中会看 p 节点是否为头结点,同时也会尝试去获取锁,如果判断不成立,就会一直自旋,直到成立为止,跳出自旋。成立就代表着当前节点变成了头节点,并且获取了锁,执行自己的逻辑。
整体的看下来总结一下如图:
在独占锁中,线程过来先尝试获取锁,如果获取到就直接执行业务逻辑,如果没有获取到,就会进入添加节点逻辑,将当前线程打包成 Node 对象,放在线程尾部,然后自旋,直到成为头节点并获取锁后跳出自旋,开始执行自身逻辑。
2.2 共享锁密码
进入acquireShared()
方法,一起探索。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
尝试获取锁,返回为负数,就执行doAcquireShared()
逻辑,这里相当于约定的值,负值表示没有获取到锁,从这里还是可以看出来,即使是共享锁,也是有执行队列。进入doAcquireShared()
方法。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); //将当前节点添加到队列中
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) { //判断当前节点的前一个节点是不是头节点
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里和独享锁的实现很相似,但是在判断当前节点的前一个节点是不是头节点的时候,少了一个获取锁的必要条件。也就意味着只要当前节点的前一个节点是头节点,就能执行。
到这里 AQS 实现、独占锁和共享锁都看完了,直接看这些专有名词可能感觉很难,但是要是深入进去其实并不难。