前言
CountdownLatch 是 JDK 并发包中提供的并发工具类,其允许一个或多个线程等待其他线程完成操作。常用作将一个任务拆分成多个子任务同时执行,只有子任务都执行完毕主线程才往下执行。
例如应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
当计数器总量大于0时,线程将被阻塞,不能够获取锁,只有当计数器总量为0时,所有被阻塞的线程同时被释放。
CountDownLatch是通过一个计数器来实现的,计数器的初始值一般为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器的值为0时,表示所有的线程都已经完成了任务,然后在CountDownLatch上等待的线程就可以恢复执行任务。
CountDownLatch使用
public class CountdownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
final int totalThread = 10;
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
int finalI = i;
executorService.execute(() -> {
System.out.println("执行" + finalI);
countDownLatch.countDown(); //-1
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println("end");
}
}
执行1
执行0
执行4
执行2
执行5
执行6
执行8
执行7
执行3
执行9
end
从结果中可以看出 main 主线程会在 10 个子线程处理完毕之后才继续执行。CountDownLatch的await方法会阻塞当前线程,直到计数器变成0.
构造函数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
CountDownLatch 与其他同步组件一样,内部类 Sync 继承了 AQS,构造的时候会指定子任务个数 count , 也即是同步状态初始值。
countDown()方法
内部计数器减一,如果计数达到零,唤醒所有等待的线程。
//Decrements the count of the latch 每次释放1个计数
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//如果内部计数器状态值递减后等于零
if (tryReleaseShared(arg)) {
//唤醒同步队列节点中的线程
doReleaseShared();
return true;
}
return false;
}
//尝试释放共享锁,即将内部计数器值减一
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//使用CAS修改state值
if (compareAndSetState(c, nextc))
return nextc == 0; //==0返回true
}
}
从实现可以看出,每次子任务在调用 countDown 时,会将同步状态值减一。
await()方法
当await()方法被调用时,当前线程会阻塞,直到内部计数器的值等于零或当前线程被中断
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取共享锁,如果可以获取到锁直接返回;
//如果获取不到锁,执行doAcquireSharedInterruptibly
if (tryAcquireShared(arg) < 0)
//该方法使当前线程一直等待,直到当前线程获取到共享锁或被中断才返回
doAcquireSharedInterruptibly(arg);
}
//如果当前内部计数器等于零返回1,否则返回-1;
//内部计数器等于零表示可以获取共享锁,否则不可以;
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
从实现可以看出 await() 方法执行时,当子任务未处理完毕时(state != 0),调用线程会被添加到同步队列而阻塞等待。
总结
- 子任务在进行 countDown 操作时,最好是在 finally 块处理; 避免出现子任务处理异常,导致主线程一直阻塞的问题。
- 计数器必须大于等于0,只是等于0的时候计算器就是零,调用await方法时不会阻塞当前线程。
- await(long timeout, TimeUnit unit)方法可以等待指定时间后,就会不再阻塞当前线程。
- 当计数器达到0的时候,所有等待的线程都会释放,不为0的时候,所有等待的线程都会阻塞
- CountDownLatch实现了有限的AQS共享锁。