ConditionObject源码分析
ConditionObject是AbstractQueuedSynchronizer中的一部分,它是用来实现通知/等待机制的,和Object的wait/notify一样,Codition分为两个部分,分别为await和signal,前者应用为等待,后者用于唤醒。
首先看看await方法的实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
其中有个addConditionWaiter方法,来看看它的实现:
1 | private Node addConditionWaiter() { |
这段代码主要的意思创建一个单向链表,每次调用都创建一个新的Node到尾部。而firstWaiter为第一个头部的Node。
fullyRelease方法是用来释放锁的,因为await方法之后需要将所有的Node的锁释放掉让其它的线程去竞争锁。
1 | final int fullyRelease(Node node) { |
其本质还是调用了release方法释放AQS队列中的阻塞队列。
有一个isOnSyncQueue方法的调用:
1 | final boolean isOnSyncQueue(Node node) { |
该方法是判断当前线程是否在AQS队列中,如果不在,则将当前线程阻塞。然后调用acquireQueued方法不断尝试去获取锁。
Condition的signal()实现原理
那么线程是如何被通知往下执行的?另外一个线程调用singal方法时:
1 | public final void signal() { |
首先判断当前线程是不是持有锁,没有持有锁则抛出IllegalMonitorStateException异常。接着调用doSignal方法。
1 | private void doSignal(Node first) { |
这里主要有一个transferForSignal方法:
1 | final boolean transferForSignal(Node node) { |
这段代码直接加入到AQS队列中,如果该节点的状态为cancel或者CAS操作失败则直接唤醒该线程。
实例
假如有2个线程先用调用来演示唤醒机制,其流程可能是这样子的。
- 线程1调用ReentrantLock的lock方法时,优先获取到锁,不需要进入AQS队列。
- 线程2调用ReentrantLock的lock方法时,由于线程1已经获取到锁时,线程2只能进入AQS队列
- 线程1调用condition.await方法时,线程1释放锁并且无限轮询准备下一次获取锁且将线程1进入condition队列。
- 线程1释放锁时,会触发AQS队列拿出线程2,使的线程2运行condition.signal方法,此时会将condition队列中的1个元素取出来放入AQS队列的末尾。
- 此时线程2调用ReentrantLock的lock方法时,触发线程1去获取锁(由于AQS中只有一个Node),就直接出队列了。
总结
- Condition创建的头结点就是真正等待的节点,而AQS创建的头结点是没有Thread的空节点。
- Condition创建的头结点使用nextWaiter来标识下一个节点,而AQS创建的节点有next和pred相互标识该节点的前一个节点和后一个节点。