前言
CyclicBarrier 是可循环使用的屏障,它要做的事情是让一组线程到达一个屏障(也可叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开;所有被屏障拦截的线程才会继续执行。
用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。
和 CountdownLatch 相似,都是通过维护计数器来实现的。使用同步屏障的线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。
CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。
JDK设计思路:设置一个计数器,线程每调用一次计数器,就减一,并使用 Condition 阻塞线程。当计数器是0的时候,就唤醒所有线程,并尝试执行构造函数中的任务。由于 CyclicBarrier 是可重复执行的,所以,就需要重置计数器。
代码示例
public class CyclicBarrierExample {
private static int totalThread = 3;
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread, new A());
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i =0; i < totalThread - 1; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("子线程开始执行");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("子线程执行完毕");
}
});
}
executorService.shutdown();
try {
System.out.println("等待2个子线程执行完毕...");
cyclicBarrier.await();
System.out.println("2个子线程已经执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
static class A implements Runnable{
@Override
public void run() {
System.out.println("都到达屏障后--》优先执行");
}
}
}
等待2个子线程执行完毕...
子线程开始执行
子线程开始执行
都到达屏障后--》优先执行
子线程执行完毕
2个子线程已经执行完毕
子线程执行完毕
主线程同时启动两个子线程,当主线程和子线程3个线程都到达屏障之后,优先执行了A的方法,当执行完毕后主线程和2个子线程继续执行。
构造函数
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会优先执行。
await()方法
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取排他锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// 屏障被破坏则抛异常
if (g.broken)
throw new BrokenBarrierException();
// 线程中断 则退出屏障
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 到达屏障的线程进行 计数减一
int index = --count;
if (index == 0) {
// index == 0, 说明指定 count 的线程均到达屏障 可以打开屏障
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 若构造函数指定了 barrierCommand 则执行
command.run();
ranAction = true;
// 唤醒阻塞在屏障的所有线程 并重置计数器
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
// 若未指定阻塞在屏障处的等待时间,则一直等待;直至最后一个线程到达屏障处的时候被唤醒
trip.await();
else if (nanos > 0L)
// 若指定了阻塞在屏障处的等待时间,则在指定时间到达时会返回
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
// 若等待过程中,线程发生了中断,则退出屏障
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
// 屏障被破坏 则抛出异常
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
// 说明指定时间内,还有线程未到达屏障处,也就是等待超时 退出屏障
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// 唤醒阻塞在等待队列的线程
trip.signalAll();
// 重置 count
count = parties;
// 重置 generation
generation = new Generation();
}
总结
- CyclicBarrier 和 CountDownLatch 功能类似,不同之处在于 CyclicBarrier 支持重复利用,而 CountDownLatch 计数只能使用一次。
- 每次线程调用一次 await 方法,表示这个线程到了栅栏这里了,那么就将计数器减一。如果计数器到 0 了,表示这是这一代最后一个线程到达栅栏,就尝试优先执行我们构造方法中输入的任务。最后,将代更新,计数器重置,并唤醒所有之前等待在栅栏上的线程。如果不是最后一个线程到达栅栏了,就使用 Condition 的 await 方法阻塞当前线程
- CountDownLatch 是一个类似于集结点的概念,很多个线程做完事情之后等待其他线程完成,全部线程完成之后再恢复运行。不同的是CountDownLatch 类需要你自己调用 countDown() 方法减少一个计数,然后调用 await() 方法即可。而 CyclicBarrier 则直接调用 await() 方法即可。
- CountDownLatch 更倾向于多个线程合作的情况,等你所有东西都准备好了,我这边就自动执行了。而 CyclicBarrier 则是我们都在一个地方等你,大家到齐了,大家再一起执行