想想一下这样一个场景,有多个人需要过河,河上有一条船,船要等待满10个人才过河,过完河后每个人又各自行动。
这里的人相当于线程,注意这里,每个线程运行到一半的时候,它就要等待一个条件,即船满过河的条件,之后每个线程才能继续执行。使用CyclicBarrier就可以实现这个需求
一. CyclicBarrier介绍
CyclicBarrier表示循环屏障,它内部有个屏障数count,当调用await方法就会减少一个屏障,并让当前线程等待。当屏障数count减到0的时候,表示条件已满足,所有等待的线程应该被唤醒,并且重置屏障数count,这样就可以继续使用了。
要让线程条件等待,就想到了Condition对象,它能够实现当不满足条件时,调用await方法让当前线程等待。满足条件时调用signal或signalAll方法,唤醒等待的线程。
二. 重要成员属性
/*** Generation一代的意思。* CyclicBarrier是可以循环使用的,用它来标志本代和下一代。* broken:表示本代是不是损坏了。标志有线程发生了中断,或者异常,就是任务没有完成。*/private static class Generation { boolean broken = false; } /** 用它来实现独占锁 */ private final ReentrantLock lock = new ReentrantLock(); /** 用它来实现多个线程之间相互等待通知,就是满足某些条件之后,线程才能执行,否则就等待 */ private final Condition trip = lock.newCondition(); /** 初始化时屏障数量 */ private final int parties; /* 当条件满足(即屏障数量为0)之后,会回调这个Runnable */ private final Runnable barrierCommand; /** 当前代 */ private Generation generation = new Generation(); // 剩余的屏障数量count。当count==0时,表示条件都满足了 private int count;
重要属性:
- count: 当前剩余的屏障数量。用它来判断条件是否满足
- generation: 当前代。
- lock: 独占锁,用它来保证修改成员变量时,多线程并发安全问题。
- trip: Condition对象,用它来实现不满足条件时,线程等待,满足条件时,唤醒等待线程。
三. await方法
/*** 减少屏障数count,* 如果屏障数count等于0了,条件满足,当前返回,并唤醒所有等待线程。* 如果屏障数不为0,那么当前线程等待。*/public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } /** * 与await()方法作用一样,只不过添加了超时设置。 * */ public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
这个方法很重要,我们就是调用这个方法,让当前线程等待着某个条件的满足的。它内部是通过调用dowait方法实现的。
四. dowait方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 使用lock锁,保证同一时间只有一个线程修改这些共享变量(这里就是这些成员属性) lock.lock(); try { final Generation g = generation; // 表示本代已经被损坏,抛出BrokenBarrierException异常 if (g.broken) throw new BrokenBarrierException(); // 如果当前线程中断标志位true,发生了中断,表示本代CyclicBarrier也损坏了, // 所以调用breakBarrier方法,并抛出InterruptedException异常 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 每次调用dowait方法,表示满足了一次条件,所以count自减 int index = --count; if (index == 0) { // 表示条件都满足了,开始放行 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 开始下一代 nextGeneration(); return 0; } finally { // 如果ranAction为false,说明command.run()方法产生异常,也表示本代损坏 if (!ranAction) breakBarrier(); } } // 如果index不等于0,表示条件没有满足,所以就要让当前线程等待。 for (;;) { try { // timed表示是否允许设置超时时间。 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } /** * 执行到这里说明,当前线程被唤醒了,并获取了锁。 * 有三种方式,都可以让线程执行到这里: * 1. 有个线程被中断或者发生异常,调用breakBarrier方法,会让等待的线程执行到这里 * 2. 所有条件都满足,调用nextGeneration方法之后,会让等待的线程执行到这里 * 3. 如果等待的线程设置了超时时间,如果被超时唤醒,那么也会执行到这里 */ if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
这个方法是CyclicBarrier中最重要的方法。它的作用就是将当前屏障数count减一,然后判断条件是否满足(即count == 0),满足就唤醒所有等待的线程,方法返回,如果不满足,就让当前线程等待。
正常情况下,流程就是如此,但是如果运行的线程发生异常,或者等待的线程被中断唤醒了,这个就会出现问题,直接调用breakBarrier方法,表示CyclicBarrier功能失败。
/*** 这个方法只在本代条件都满足(count==0)调用。* 1.唤醒所有等待的线程* 2.重置count值。* 3.创建新的Generation变量,表示下一代*/private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } /** * 当有一个线程发生中断或者异常的时候调用。 * 1.将本代的broken设置为true * 2.重置count值。 * 3.唤醒所有等待的线程 */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
nextGeneration和breakBarrier都有唤醒线程和重置屏障数count的功能,不同点就是nextGeneration设置新的generation,而breakBarrier方法会将当前generation的broken设置为true,表示被破坏掉了。
五.重要示例
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class CyclicBarrierTest { public static void newThread(String name, CyclicBarrier barrier) { new Thread(new Runnable() { @Override public void run() { try { System.out.println("线程"+Thread.currentThread().getName()+"等待船满过河"); barrier.await(); System.out.println("线程"+Thread.currentThread().getName()+"过完河, 各自行动"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }, name).start(); } public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(10, new Runnable() { @Override public void run() { System.out.println("\n在线程"+Thread.currentThread().getName()+"中 船过河了\n"); } }); for (int i = 1; i <= 10; i++) { newThread("t"+i, barrier); } } }
运行结果:
线程t1等待船满过河
线程t2等待船满过河
线程t3等待船满过河
线程t4等待船满过河
线程t5等待船满过河
线程t6等待船满过河
线程t7等待船满过河
线程t8等待船满过河
线程t9等待船满过河
线程t10等待船满过河在线程t10中 船过河了线程t10过完河, 各自行动
线程t1过完河, 各自行动
线程t2过完河, 各自行动
线程t3过完河, 各自行动
线程t4过完河, 各自行动
线程t5过完河, 各自行动
线程t6过完河, 各自行动
线程t7过完河, 各自行动
线程t8过完河, 各自行动
线程t9过完河, 各自行动