AnthonyZero's Bolg

JUC-CyclicBarrier

前言

CyclicBarrier 是可循环使用的屏障,它要做的事情是让一组线程到达一个屏障(也可叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开;所有被屏障拦截的线程才会继续执行。

用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。

和 CountdownLatch 相似,都是通过维护计数器来实现的。使用同步屏障的线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。

CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。

JDK设计思路:设置一个计数器,线程每调用一次计数器,就减一,并使用 Condition 阻塞线程。当计数器是0的时候,就唤醒所有线程,并尝试执行构造函数中的任务。由于 CyclicBarrier 是可重复执行的,所以,就需要重置计数器。

代码示例

public class CyclicBarrierExample {
    private static int totalThread = 3;
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread, new A());
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i =0; i < totalThread - 1; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("子线程开始执行");
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    System.out.println("子线程执行完毕");
                }
            });
        }
        executorService.shutdown();
        try {
            System.out.println("等待2个子线程执行完毕...");
            cyclicBarrier.await();
            System.out.println("2个子线程已经执行完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
    static class A implements Runnable{
        @Override
        public void run() {
            System.out.println("都到达屏障后--》优先执行");
        }
    }
}

等待2个子线程执行完毕...
子线程开始执行
子线程开始执行
都到达屏障后--》优先执行
子线程执行完毕
2个子线程已经执行完毕
子线程执行完毕

主线程同时启动两个子线程,当主线程和子线程3个线程都到达屏障之后,优先执行了A的方法,当执行完毕后主线程和2个子线程继续执行。

构造函数

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会优先执行。

await()方法

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    // 获取排他锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        // 屏障被破坏则抛异常
        if (g.broken)
            throw new BrokenBarrierException();
        // 线程中断 则退出屏障
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // 到达屏障的线程进行 计数减一
        int index = --count;
        if (index == 0) {  
            // index == 0, 说明指定 count 的线程均到达屏障 可以打开屏障
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    // 若构造函数指定了 barrierCommand 则执行
                    command.run();
                ranAction = true;
                // 唤醒阻塞在屏障的所有线程 并重置计数器
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    // 若未指定阻塞在屏障处的等待时间,则一直等待;直至最后一个线程到达屏障处的时候被唤醒
                    trip.await();
                else if (nanos > 0L)
                    // 若指定了阻塞在屏障处的等待时间,则在指定时间到达时会返回
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    // 若等待过程中,线程发生了中断,则退出屏障
                    breakBarrier();
                    throw ie;
                } else {                      
                    Thread.currentThread().interrupt();
                }
            }
            // 屏障被破坏 则抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;
            // 说明指定时间内,还有线程未到达屏障处,也就是等待超时 退出屏障
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

private void nextGeneration() {
    // 唤醒阻塞在等待队列的线程
    trip.signalAll();
    // 重置 count
    count = parties;
    // 重置 generation
    generation = new Generation();
}

总结

  • CyclicBarrier 和 CountDownLatch 功能类似,不同之处在于 CyclicBarrier 支持重复利用,而 CountDownLatch 计数只能使用一次。
  • 每次线程调用一次 await 方法,表示这个线程到了栅栏这里了,那么就将计数器减一。如果计数器到 0 了,表示这是这一代最后一个线程到达栅栏,就尝试优先执行我们构造方法中输入的任务。最后,将代更新,计数器重置,并唤醒所有之前等待在栅栏上的线程。如果不是最后一个线程到达栅栏了,就使用 Condition 的 await 方法阻塞当前线程
  • CountDownLatch 是一个类似于集结点的概念,很多个线程做完事情之后等待其他线程完成,全部线程完成之后再恢复运行。不同的是CountDownLatch 类需要你自己调用 countDown() 方法减少一个计数,然后调用 await() 方法即可。而 CyclicBarrier 则直接调用 await() 方法即可。
  • CountDownLatch 更倾向于多个线程合作的情况,等你所有东西都准备好了,我这边就自动执行了。而 CyclicBarrier 则是我们都在一个地方等你,大家到齐了,大家再一起执行