26 | Fork/Join:单机版的MapReduce
26 | Fork/Join:单机版的MapReduce
讲述:王宝令
时长09:41大小8.84M
分治任务模型
Fork/Join 的使用
ForkJoinPool 工作原理
模拟 MapReduce 统计单词数量
总结
课后思考
赞 31
提建议
精选留言(55)
- 爱吃回锅肉的瘦子2019-04-28https://www.liaoxuefeng.com/article/001493522711597674607c7f4f346628a76145477e2ff82000,老师,您好,我在廖雪峰网站中也看到forkjoin使用方式。讲解了,为啥不使用两次fork,分享出来给大家看看。
作者回复: 用两次fork()在join的时候,需要用这样的顺序:a.fork(); b.fork(); b.join(); a.join();这个要求在JDK官方文档里有说明。 如果是一不小心写成a.fork(); b.fork(); a.join(); b.join();就会有大神廖雪峰说的问题。 建议还是用fork()+compute(),这种方式的执行过程普通人还是能理解的,fork()+fork()内部做了很多优化,我这个普通人看的实在是头痛。 感谢分享啊。我觉得讲的挺好的。用这篇文章的例子理解fork()+compute()很到位。
共 9 条评论112 - 锦2019-04-27CPU同一时间只能处理一个线程,所以理论上,纯cpu密集型计算任务单线程就够了。多线程的话,线程上下文切换带来的线程现场保存和恢复也会带来额外开销。但实际上可能要经过测试才知道。
作者回复: 👍
共 4 条评论69 - 尹圣2019-04-29看到分治任务立马就想到归并排序,用Fork/Join又重新实现了一遍, /** * Ryzen 1700 8核16线程 3.0 GHz */ @Test public void mergeSort() { long[] arrs = new long[100000000]; for (int i = 0; i < 100000000; i++) { arrs[i] = (long) (Math.random() * 100000000); } long startTime = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); MergeSort mergeSort = new MergeSort(arrs); arrs = forkJoinPool.invoke(mergeSort); //传统递归 //arrs = mergeSort(arrs); long endTime = System.currentTimeMillis(); System.out.println("耗时:" + (endTime - startTime)); } /** * fork/join * 耗时:13903ms */ class MergeSort extends RecursiveTask<long[]> { long[] arrs; public MergeSort(long[] arrs) { this.arrs = arrs; } @Override protected long[] compute() { if (arrs.length < 2) return arrs; int mid = arrs.length / 2; MergeSort left = new MergeSort(Arrays.copyOfRange(arrs, 0, mid)); left.fork(); MergeSort right = new MergeSort(Arrays.copyOfRange(arrs, mid, arrs.length)); return merge(right.compute(), left.join()); } } /** * 传统递归 * 耗时:30508ms */ public static long[] mergeSort(long[] arrs) { if (arrs.length < 2) return arrs; int mid = arrs.length / 2; long[] left = Arrays.copyOfRange(arrs, 0, mid); long[] right = Arrays.copyOfRange(arrs, mid, arrs.length); return merge(mergeSort(left), mergeSort(right)); } public static long[] merge(long[] left, long[] right) { long[] result = new long[left.length + right.length]; for (int i = 0, m = 0, j = 0; m < result.length; m++) { if (i >= left.length) { result[m] = right[j++]; } else if (j >= right.length) { result[m] = left[i++]; } else if (left[i] > right[j]) { result[m] = right[j++]; } else result[m] = left[i++]; } return result; }展开
作者回复: 👍👍👍举一反三了😄
共 6 条评论40 - QQ怪2019-05-24学习了老师的分享,现在就已经在工作用到了,的确是在同事面前好好装了一次逼
作者回复: 👍说明你很有悟性😄
33 - linqw2019-04-27以前在面蚂蚁金服时,也做过类似的题目,从一个目录中,找出所有文件里面单词出现的top100,那时也是使用服务提供者,从目录中找出一个或者多个文件(防止所有文件一次性加载内存溢出,也为了防止文件内容过小,所以每次都确保读出的行数10万行左右),然后使用fork/join进行单词的统计处理,设置处理的阈值为20000。 课后习题:单核的话,使用单线程会比多线程快,线程的切换,恢复等都会耗时,并且要是机器不允许,单线程可以保证安全,可见性(cpu缓存,单个CPU数据可见),线程切换(单线程不会出现原子性)展开
作者回复: 👍
30 - 右耳听海2019-04-27请教老师一个问题,merge函数里的mr2.compute先执行还是mr1.join先执行,这两个参数是否可交换位置
作者回复: 我觉得不可以,如果join在前面会先首先让当前线程阻塞在join()上。当join()执行完才会执行mr2.compute(),这样并行度就下来了。
共 6 条评论25 - Geek_ebda962019-05-13如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。 老师这里的意思是不是,如果有耗时的i/o计算,需要用单独的forkjoin pool 来处理这个计算,在程序设计的时候就要跟其他cpu密集计算的任务分开处理?
作者回复: 是的
13 - 王伟2019-04-28老师,我现在碰到一个生产问题:用户通过微信小程序进入我们平台,我们只能需要使用用户的手机号去我们商家库中查取该用户的注册信息。在只知道用户手机号的情况下我们需要切换到所有的商家库去查询。这样非常耗时。ps:我们商家库做了分库处理而且数量很多。想请教一下您,这种查询该如何做?
作者回复: 可以加redis缓存看看,也可以加本地缓存。不要让流量直接打到数据库上
共 2 条评论9 - êwěn2019-04-27老师,fork是fork调用者的子任务还是表示下面new出来的任务是子任务?
作者回复: fork是fork调用者这个子任务加入到任务队列里
7 - 君哥聊技术2019-04-28老师,请问为什么不能merge mr1.compute和mr2..compute或者mr1.join和mr2的join呢?
作者回复: compute+compute相当于没用forkjoin,都在一个线程里跑的。如果用join+join也可以,不过jdk官方有个建议,顺序要用:a.fork(); b.fork(); b.join(); a.join();否则性能有问题。所以还是用fork+compute更简单。
共 2 条评论5 - Nick2019-06-05简易的MapReduce的程序跑下来不会栈溢出吗?
作者回复: 递归程序,如果语言层面没有办法优化,都会的
5 - 王彬-Antonio2020-01-06老师,您在文中提到io密集型和计算密集型最好区分开不同线程池。假设两个线程池如果都在运行,它们之间怎么竞争CPU线程?
作者回复: 操作系统负责调度,在操作系统眼里,都一样,都是线程
3 - null2019-05-27@王伟童鞋的问题,我们也有这场景:通过手机号查询商家信息。 我们是在 redis 里维护(手机号,商家号)关联关系,在 redis 里通过手机号查询商家号,就知道该去哪个库表查询商家具体信息了。 内存开销: 手机号,11 个字符,占用 11B; 商家号,4 个字符,占用 4B; 一条记录占用 15B,100 万条记录,就 15*100万B,大概是: 15*1,000,000B/1000/1000=15M展开共 1 条评论3
- 张三2019-04-29ForkJoinTask这个抽象类的 fork() 和 join()底层是怎么实现的呢?3
- Black Jack2021-11-19return f2.compute() + f1.join(); 对这行代码的理解是 当前线程把 f1的工作分配出去,并等待其完成,然后自己进行f2的工作。如果使用f2.fork,f2.join.相当于当前线程空闲了,存在资源浪费了3
- null2020-09-02老师,您好,有个问题想请教您一下 问题:使用 fork/join 框架,ForkJoinTask 不是只提交给 ForkJoinPool 线程池里的线程执行么?为什么 main 主线程也参与了运算? 验证如下: public static void main(String[] args) { IntStream.range(1, 3).parallel().forEach(i -> System.out.println(Thread.currentThread().getName() + "##" + i)); } 输出: ForkJoinPool.commonPool-worker-1##1 main##2 除了 ForkJoinPool.commonPool-worker,还有 main 的输出。展开
作者回复: 我没有详细看jdk相关的源码,推断当前线程(main)是会参与执行的,否则当前线程只能阻塞,就浪费了,这样的设计对性能影响挺大的。应该不会这样设计。
2 - 狂风骤雨2019-04-29好希望工作当中能有老师这样一位大牛,能为我答疑解惑
作者回复: 我知道的就这些,都写出来了😂,显然我不是大牛😄
共 2 条评论2 - 右耳听海2019-04-28这里用的递归调用,数据量大的时候会不会粘溢出,虽然这里用的二分,时间复杂度为logn
作者回复: 我觉得会
3 - 罗洲2019-04-27单核cpu上多线程会导致线程的上下文切换,还不如单核单线程处理的效率高。共 1 条评论2
- 夏目2020-04-11请教一下:return merge(mr2.compute(), mr1.join()); 这样写mr2又会被分割为mr3、mr4,mr3又会被分割为mr5、mr6.....,无线分割下去吗?共 1 条评论1