属性
//头结点
/** First node of condition queue. */
private transient Node firstWaiter;
//尾结点
/** Last node of condition queue. */
private transient Node lastWaiter;
常量
//在退出等待时重新中断
private static final int REINTERRUPT = 1;
//在退出等待时抛出 InterruptedException
private static final int THROW_IE = -1;
方法
addConditionWaiter
//将节点加入条件队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果条件队列不存在,就创建一个,所以头结点就是node
//如果条件队列存在,就让node成为尾结点
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
unlinkCancelledWaiters
// 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
//条件队列中的node的state都应该是condition即等待状态的
//如果不是都要清除出这个队列
if (t.waitStatus != Node.CONDITION) {
//当前节点出队列
t.nextWaiter = null;
//第一次循环必然进来这个判断
if (trail == null)
//原来的结点出去了,所以它的下一个节点成为新节点
firstWaiter = next;
else
//上一个node的state的值是condition的,就会将上一个和当前node的下一个链接起来
trail.nextWaiter = next;
//这种情况是队列中只有一个头节点,所以尾结点为null
if (next == null)
lastWaiter = trail;
}
else
//当前node的state是condition
trail = t;
t = next;
}
}
signal
public final void signal() {
//isHeldExclusively 如果同步只针对当前(调用)线程进行,则返回true
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
signalAll
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
dosignal
//传递进来的参数就是等待队列的第一个节点firstWaiter
private void doSignal(Node first) {
do {
//如果等待队列的第一个节点的下一个节点是null,说明这个队列中只有一个节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
doSignalAll
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
transferForSignal
//将节点从条件队列转移到同步队列。如果成功则返回true
final boolean transferForSignal(Node node) {
//尝试获取锁,获取失败就返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//唤醒在阻塞队列中的元素
LockSupport.unpark(node.thread);
return true;
}
awaitUninterruptibly
//不可被中断的方法
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
//如果判断节点有后继节点,或者从后往前在同步队列查找可以找到,就返回true
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
fullyRelease
//必须是持有锁的node才可以进行完全释放
//完全释放还有一个意思就是因为reeentlock是个可重入队列,所以需要将大于0的都重置为0
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
await
//当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表
public final void await() throws InterruptedException {
//首先就是判断线程是否中断,ture为已经中断
if (Thread.interrupted())
throw new InterruptedException();
// 添加到 condition 的条件队列中
Node node = addConditionWaiter();
//完全释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 这里退出循环有两种情况,之后再仔细分析
// 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了
// 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//checkInterruptWhileWaiting检查中断,如果在发出信号之前中断,则返回 THROW_IE,如果发出信号则返回 REINTERRUPT,如果未中断则返回 0。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
过程
参考链接
- 一行一行源码分析清楚 AbstractQueuedSynchronizer (二)AQS的Condition源码分析