极客时间已完结课程限时免费阅读

19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?

19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?-极客时间

19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?

讲述:王宝令

时长10:02大小9.17M

前几天老板突然匆匆忙忙过来,说对账系统最近越来越慢了,能不能快速优化一下。我了解了对账系统的业务后,发现还是挺简单的,用户通过在线商城下单,会生成电子订单,保存在订单库;之后物流会生成派送单给用户发货,派送单保存在派送单库。为了防止漏派送或者重复派送,对账系统每天还会校验是否存在异常订单。
对账系统的处理逻辑很简单,你可以参考下面的对账系统流程图。目前对账系统的处理逻辑是首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。
对账系统流程图
对账系统的代码抽象之后,也很简单,核心代码如下,就是在一个单线程里面循环查询订单、派送单,然后执行对账,最后将写入差异库。
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

利用并行优化对账系统

老板要我优化性能,那我就首先要找到这个对账系统的瓶颈所在。
目前的对账系统,由于订单量和派送单量巨大,所以查询未对账订单 getPOrders() 和查询派送单 getDOrders() 相对较慢,那有没有办法快速优化一下呢?目前对账系统是单线程执行的,图形化后是下图这个样子。对于串行化的系统,优化性能首先想到的是能否利用多线程并行处理
对账系统单线程执行示意图
所以,这里你应该能够看出来这个对账系统里的瓶颈:查询未对账订单 getPOrders() 和查询派送单 getDOrders() 是否可以并行处理呢?显然是可以的,因为这两个操作并没有先后顺序的依赖。这两个最耗时的操作并行之后,执行过程如下图所示。对比一下单线程的执行示意图,你会发现同等时间里,并行执行的吞吐量近乎单线程的 2 倍,优化效果还是相对明显的。
对账系统并行执行示意图
思路有了,下面我们再来看看如何用代码实现。在下面的代码中,我们创建了两个线程 T1 和 T2,并行执行查询未对账订单 getPOrders() 和查询派送单 getDOrders() 这两个操作。在主线程中执行对账操作 check() 和差异写入 save() 两个操作。不过需要注意的是:主线程需要等待线程 T1 和 T2 执行完才能执行 check() 和 save() 这两个操作,为此我们通过调用 T1.join() 和 T2.join() 来实现等待,当 T1 和 T2 线程退出时,调用 T1.join() 和 T2.join() 的主线程就会从阻塞态被唤醒,从而执行之后的 check() 和 save()。
while(存在未对账订单){
// 查询未对账订单
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待T1、T2结束
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

用 CountDownLatch 实现线程等待

经过上面的优化之后,基本上可以跟老板汇报收工了,但还是有点美中不足,相信你也发现了,while 循环里面每次都会创建新的线程,而创建线程可是个耗时的操作。所以最好是创建出来的线程能够循环利用,估计这时你已经想到线程池了,是的,线程池就能解决这个问题。
而下面的代码就是用线程池优化后的:我们首先创建了一个固定大小为 2 的线程池,之后在 while 循环里重复利用。一切看上去都很顺利,但是有个问题好像无解了,那就是主线程如何知道 getPOrders() 和 getDOrders() 这两个操作什么时候执行完。前面主线程通过调用线程 T1 和 T2 的 join() 方法来等待线程 T1 和 T2 退出,但是在线程池的方案里,线程根本就不会退出,所以 join() 方法已经失效了。
// 创建2个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
});
/* ??如何实现等待??*/
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
那如何解决这个问题呢?你可以开动脑筋想出很多办法,最直接的办法是弄一个计数器,初始值设置成 2,当执行完pos = getPOrders();这个操作之后将计数器减 1,执行完dos = getDOrders();之后也将计数器减 1,在主线程里,等待计数器等于 0;当计数器等于 0 时,说明这两个查询操作执行完了。等待计数器等于 0 其实就是一个条件变量,用管程实现起来也很简单。
不过我并不建议你在实际项目中去实现上面的方案,因为 Java 并发包里已经提供了实现类似功能的工具类:CountDownLatch,我们直接使用就可以了。下面的代码示例中,在 while 循环里面,我们首先创建了一个 CountDownLatch,计数器的初始值等于 2,之后在pos = getPOrders();dos = getDOrders();两条语句的后面对计数器执行减 1 操作,这个对计数器减 1 的操作是通过调用 latch.countDown(); 来实现的。在主线程中,我们通过调用 latch.await() 来实现对计数器等于 0 的等待。
// 创建2个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为2
CountDownLatch latch =
new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

进一步优化性能

经过上面的重重优化之后,长出一口气,终于可以交付了。不过在交付之前还需要再次审视一番,看看还有没有优化的余地,仔细看还是有的。
前面我们将 getPOrders() 和 getDOrders() 这两个查询操作并行了,但这两个查询操作和对账操作 check()、save() 之间还是串行的。很显然,这两个查询操作和对账操作也是可以并行的,也就是说,在执行对账操作的时候,可以同时去执行下一轮的查询操作,这个过程可以形象化地表述为下面这幅示意图。
完全并行执行示意图
那接下来我们再来思考一下如何实现这步优化,两次查询操作能够和对账操作并行,对账操作还依赖查询操作的结果,这明显有点生产者 - 消费者的意思,两次查询操作是生产者,对账操作是消费者。既然是生产者 - 消费者模型,那就需要有个队列,来保存生产者生产的数据,而消费者则从这个队列消费数据。
不过针对对账这个项目,我设计了两个队列,并且两个队列的元素之间还有对应关系。具体如下图所示,订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间是有一一对应的关系的。两个队列的好处是,对账操作可以每次从订单队列出一个元素,从派送单队列出一个元素,然后对这两个元素执行对账操作,这样数据一定不会乱掉。
双队列示意图
下面再来看如何用双队列来实现完全的并行。一个最直接的想法是:一个线程 T1 执行订单的查询工作,一个线程 T2 执行派送单的查询工作,当线程 T1 和 T2 都各自生产完 1 条数据的时候,通知线程 T3 执行对账操作。这个想法虽看上去简单,但其实还隐藏着一个条件,那就是线程 T1 和线程 T2 的工作要步调一致,不能一个跑得太快,一个跑得太慢,只有这样才能做到各自生产完 1 条数据的时候,通知线程 T3。
下面这幅图形象地描述了上面的意图:线程 T1 和线程 T2 只有都生产完 1 条数据的时候,才能一起向下执行,也就是说,线程 T1 和线程 T2 要互相等待,步调要一致;同时当线程 T1 和 T2 都生产完一条数据的时候,还要能够通知线程 T3 执行对账操作。
同步执行示意图

用 CyclicBarrier 实现线程同步

下面我们就来实现上面提到的方案。这个方案的难点有两个:一个是线程 T1 和 T2 要做到步调一致,另一个是要能够通知到线程 T3。
你依然可以利用一个计数器来解决这两个难点,计数器初始化为 2,线程 T1 和 T2 生产完一条数据都将计数器减 1,如果计数器大于 0 则线程 T1 或者 T2 等待。如果计数器等于 0,则通知线程 T3,并唤醒等待的线程 T1 或者 T2,与此同时,将计数器重置为 2,这样线程 T1 和线程 T2 生产下一条数据的时候就可以继续使用这个计数器了。
同样,还是建议你不要在实际项目中这么做,因为 Java 并发包里也已经提供了相关的工具类:CyclicBarrier。在下面的代码中,我们首先创建了一个计数器初始值为 2 的 CyclicBarrier,你需要注意的是创建 CyclicBarrier 的时候,我们还传入了一个回调函数,当计数器减到 0 的时候,会调用这个回调函数。
线程 T1 负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;线程 T2 负责查询派送单,当查出一条时,也调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;当 T1 和 T2 都调用 barrier.await() 的时候,计数器会减到 0,此时 T1 和 T2 就可以执行下一条语句了,同时会调用 barrier 的回调函数来执行对账操作。
非常值得一提的是,CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值。这个功能用起来实在是太方便了。
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor =
Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}
void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}

总结

CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别在这里还是有必要再强调一下:CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而 CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。
本章的示例代码中有两处用到了线程池,你现在只需要大概了解即可,因为线程池相关的知识咱们专栏后面还会有详细介绍。另外,线程池提供了 Future 特性,我们也可以利用 Future 特性来实现线程之间的等待,这个后面我们也会详细介绍。

课后思考

本章最后的示例代码中,CyclicBarrier 的回调函数我们使用了一个固定大小的线程池,你觉得是否有必要呢?
欢迎在留言区与我分享你的想法,也欢迎你在留言区记录你的思考过程。感谢阅读,如果你觉得这篇文章对你有帮助的话,也欢迎把它分享给更多的朋友。
分享给需要的人,Ta购买本课程,你将得18
生成海报并分享

赞 62

提建议

上一篇
18 | StampedLock:有没有比读写锁更快的锁?
下一篇
20 | 并发容器:都有哪些“坑”需要我们填?
unpreview
 写留言

精选留言(153)

  • 探索无止境
    2019-04-11
    今天的文章很精彩,有案例有递进,一气呵成!设置线程池为单个线程可以保证对账的操作按顺序执行
    共 1 条评论
    247
  • 张申傲
    2019-05-23
    我觉得老师的问题其实是两个: 1.为啥要用线程池,而不是在回调函数中直接调用? 2.线程池为啥使用单线程的? 我的考虑: 1.使用线程池是为了异步操作,否则回掉函数是同步调用的,也就是本次对账操作执行完才能进行下一轮的检查。 2.线程数量固定为1,防止了多线程并发导致的数据不一致,因为订单和派送单是两个队列,只有单线程去两个队列中取消息才不会出现消息不匹配的问题。
    展开

    作者回复: 👍

    共 13 条评论
    240
  • J.M.Liu
    2019-04-12
    老师,CyclicBarrier的回调函数在哪个线程执行啊?主线程吗?比如这里的最后一段代码中,循环会在回调的时候阻塞吗? 如果是这样的话,那check函数岂不是可以直接作为回调函数了呀,并不需要线程池了啊

    作者回复: 好问题,CyclicBarrier的回调函数执行在一个回合里最后执行await()的线程上,而且同步调用回调函数check(),调用完check之后,才会开始第二回合。所以check如果不另开一线程异步执行,就起不到性能优化的作用了。

    共 8 条评论
    176
  • undifined
    2019-04-11
    线程池大小为1是必要的,如果设置为多个,有可能会两个线程 A 和 B 同时查询,A 的订单先返回,B 的派送单先返回,造成队列中的数据不匹配;所以1个线程实现生产数据串行执行,保证数据安全 如果用Future 的话可以更方便一些: CompletableFuture<List> pOrderFuture = CompletableFuture.supplyAsync(this::getPOrders); CompletableFuture<List> dOrderFuture = CompletableFuture.supplyAsync(this::getDOrders); pOrderFuture.thenCombine(dOrderFuture, this::check) .thenAccept(this::save); 老师这样理解对吗,谢谢老师
    展开

    作者回复: 对,👍👍👍

    共 4 条评论
    78
  • 曾轼麟
    2019-04-13
    老师推荐您使用ThreadPoolExecutor去实现线程池,并且实现里面的RejectedExecutionHandler和ThreadFactory,这样可以方便当调用订单查询和派送单查询的时候出现full gc的时候 dump文件 可以快速定位出现问题的线程是哪个业务线程,如果是CountDownLatch,建议设置超时时间,避免由于业务死锁没有调用countDown()导致现线程睡死的情况

    作者回复: 好建议,所有的阻塞操作,都需要设置超时时间,这是个很好的习惯。

    共 4 条评论
    65
  • 空知
    2019-04-11
    老师,关于CyclicBarrier回调函数,请教下 自己写了个 CyclicBarrier的例子,回调函数总是在计数器归0时候执行,但是线程T1 T2要等回调函数执行结束之后才会再次执行...看了下CyclicBarrier 的源码,当内部计数器 index == 0时候, final Runnable command = barrierCommand; if (command != null) command.run(); 没有开启子线程吧.也就是说 对账还是同步执行的,结束之后才是下一次的查询
    展开

    作者回复: 所以才需要线程池来异步执行回调函数,你一不小心把答案找到了😂

    共 7 条评论
    63
  • 西西弗与卡夫卡
    2019-04-11
    回调中的线程池用单线程是为了确保从两个队列取数时可以一对一获取,避免错乱。比如说,如果有两个线程,则可能出现线程1获取PO1,线程获取PO2和DO1,线程获取DO2的乱序。 其实线程池改成多线程也可以,要把两个remove(0)放到一个同步块中
    共 1 条评论
    31
  • 波波
    2019-04-11
    思考题中,如果生产者比较快,消费者比较慢,生产者通知的时候,消费者还在对账,这个时候会怎么处理?会不会导致消费者错失通知,导致队列满了,但是消费者却没有收到通知。

    作者回复: 有这种可能,还能oom

    共 7 条评论
    25
  • nanquanmama
    2019-04-11
    最后的那个例子,业务逻辑的部分已经变得很不直观,并发控制的逻辑掩盖住了业务逻辑。请问一下老师,实际项目开发中,并发控制逻辑如何做,才能和业务逻辑分离出来?

    作者回复: 放到不同的类里,这方面传统的面向对象可以解决,lambda也能解决,这个模块的最后几章能解决你说的这个问题,但是更复杂的场景还得自己设计

    18
  • ... ...
    2019-04-12
    追问:如果线程池是单线程的话。那假如生产者速度快运check函数执行时间。那是不是就会出现堵塞情况了。久而久之,是不是会出现队列内存溢出

    作者回复: 会

    共 3 条评论
    17
  • Adam
    2019-06-14
    如果生产者比较快,消费者check还没对账完 会不会照成 队列越来越多 最后内存溢出了 ,有没有什么好的方案解决呢?

    作者回复: 方案上基本都是限流

    14
  • xuery
    2019-05-01
    有,如果为线程池有多个线程,则由于check()函数里面的两个remove并不是原子操作,可能导致消费错乱。假设订单队列中有P1,P2;派送队列中有D1,D2;两个线程T1,T2同时执行check,可能出现T1消费到P1,D2,T2消费到P2,D1,就是T1先执行pos.remove(0), 而后T2执行pos.remove(0);dos.remov(0);然后T1才执行dos.remove(0)的场景

    作者回复: 多个线程有这个可能,所以线程池用的是单线程的

    共 2 条评论
    14
  • 木偶人King
    2019-04-11
    老师,最后checkAll() 这里为什么new 了两个Thread 而不是使用线程池

    作者回复: 反正也不会反复创建,用不用都没关系

    14
  • iron_man
    2019-04-11
    王老师,cyclicbarrier,具体是在什么时候清零计数器呢?是在所有线程await返回后还是在回调函数调用后?await和回掉函数的调用顺序是怎样的

    作者回复: 回调函数执行完之后才会唤醒等待的线程。

    共 3 条评论
    13
  • 王盛武
    2019-04-11
    undefind同学的意思差不多对。 只有一个线程的线程池,是因为,订单队列和派单队列读取数据存在竞态条件。 如果要开多个线程,则需要一个lock进行同步那两个remove方法。 个人推荐的思路是,如果生产者速度比消费者快的情况下,放入一个双向的阻塞队列尾部,每次从双向队列头部取两个对象,根据对象属性来区别订单类型,也能开多个线程进行check操作。 但本文业务里check速度很快,所以这个场景只需要开1个线程的线程池是合理的。
    展开

    作者回复: 一次多取几个然后批量执行,这个办法非常实用!

    共 2 条评论
    12
  • 王莹
    2019-09-25
    1.回调处理交给新开辟的线程执行,让当前处理继续进行,无需等待 2.使用线程池解决新开辟线程创建和销毁的开销问题 3.单线程使得两个队列的出队无需同步

    作者回复: 👍

    11
  • aguan(^・ェ・^)
    2019-04-18
    老师,问一个业务逻辑的问题,在从两个队列中分别取订单和派送单的做比较的时候,怎么保证这订单和派送单是一一对应的关系呢?如果派送单有漏单,那如何对账比较取结果时的数据是一一对应关系?

    作者回复: 一一是一组和一组等价,check的时候也是批量操作。没有就就放一个空对象做占位就可以了

    共 2 条评论
    11
  • 月马穿关
    2019-04-11
    感谢老师,一直不太明白什么时候用CyclicBarrier,今天看到案例了,刚看到join那段我想到了CompletableFuture

    作者回复: 👍

    10
  • null
    2019-05-21
    老师,您好!我有几个疑问: 1. 文章里提到:获取订单 getPOrders() 和获取派送订 getDOrders() 是相互独立、互不依赖的。 我们的订单系统通过 MQ 与派送系统进行数据交互,并且一个订单有可能生成多个派送单(仓库不同,拆单),想了好久,也没想到比较好的方式实现订单和派送单的查询操作可以并行处理。 如果每次只筛选过去一个小时未对账的订单,和过去一小时的派送单,当存在漏生成派送单时,系统发现不了(不知道是漏生成派送单,还是 MQ 没消费)。 2. 文章第二种方案,线程池多生成一个线程,专门用来处理 ( check & save ),也能够实现查询和对账并行处理。因此不太能理解“一个线程等待多个线程”和“一组线程之间的相互等待”的区别。感觉 CountDownLatch 和 CyclicBarrier 都是 (check & save) 线程在等待 getPOrders 和 getDOrders 线程。 3.文章最后一种方案,每次只查询一条改成一次查多条,这样可以减少查询的次数。check 的时候,也批量处理,吞吐量是不是会好一点吖。 谢谢老师!!
    展开

    作者回复: 查询数据库一定是批量查询的,先把订单的id查出来,然后用这些id就能并行查了,cyclicbarrier能循环利用,用起来更简单,你可以用countdownlatch实现一下,就知道他们的区别了

    7
  • crazypokerk
    2019-04-11
    请教一下老师,上面说的将CyclicBarrier计数器初始值设为2,假如当T1先执行完,然后执行await时减1,此时计数器为1大于0,等待,然后T2执行await时再减1,此时计数器为0,则唤醒T3执行,与此同时,将计数器重置为2,T1、T2继续开始执行,以此循环往复,可以这样理解吗?

    作者回复: 是的

    7