属性
//这就是可以重复设置栅栏的原因
private static class Generation {
boolean broken = false;
}
private final ReentrantLock lock = new ReentrantLock();
//Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上
private final Condition trip = lock.newCondition();
//参与的线程数
private final int parties;
//使用了这个就是 到达这个栅栏需要完成的操作
private final Runnable barrierCommand;
//当前所处的“代”
private Generation generation = new Generation();
// 还没有到栅栏的线程数,这个值初始为 parties,然后递减
// 还没有到栅栏的线程数 = parties - 已经到栅栏的数量
private int count;
方法
CyclicBarrier
//设置需要有多少个线程一起到栅栏,一起执行
public CyclicBarrier(int parties) {
this(parties, null);
}
CyclicBarrier重载
//就是比默认的多了设置到达栅栏之前必须完成的操作
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
nextGeneration
//所有线程到达了栅栏的时候,开始初始化下个栅栏
private void nextGeneration() {
// 唤醒上一代的所有
trip.signalAll();
//设置下一代
count = parties;
generation = new Generation();
}
breakBarrier
//将当前屏障生成设置为已破坏并唤醒所有人。仅在持有锁时调用。
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
await
//这个类中最重要的方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
await 重载
//带有超时机制,超时会抛出异常
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
dowait
//await传过来的参数是 false和 0L
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//当前代
final Generation g = generation;
// 检查栅栏是否被打破,如果被打破,抛出 BrokenBarrierException 异常
if (g.broken)
throw new BrokenBarrierException();
//如果线程已经中断
if (Thread.interrupted()) {
//将当前屏障生成设置为已破坏并唤醒所有人。仅在持有锁时调用。
breakBarrier();
throw new InterruptedException();
}
//count表示还没有到栅栏的数,那其实就是还没有到栅栏的数-1
int index = --count;
//所有线程都已经到了栅栏
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果在初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况
ranAction = true;
// 唤醒等待的线程,然后开启新的一代
nextGeneration();
return 0;
} finally {
if (!ranAction)
// 进到这里,说明执行指定操作的时候,发生了异常,那么需要打破栅栏
// 之前我们说了,打破栅栏意味着唤醒所有等待的线程,设置 broken 为 true,重置 count 为 parties
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
isBroken
//查看栅栏是否被破坏
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
reset
//唤醒所有线程,开启新的一代
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
getNumberWaiting
//获得已经到大栅栏的线程数量
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}