26 | Pipeline:Beam如何抽象多步骤的数据流水线?
26 | Pipeline:Beam如何抽象多步骤的数据流水线?
讲述:巴莫
时长09:24大小8.60M
数据流水线
Beam 数据流水线的应用
Beam 数据流水线的处理模型
Beam 数据流水线的错误处理
单个 Transform 上的错误处理
多步骤 Transform 上的错误处理
小结
思考题
赞 5
提建议
精选留言(19)
- espzest2019-06-21bundle怎么聚合成pcollection? 一个bundle处理失败,为什么需要重做前面的bundle?
作者回复: 谢谢你的提问!其实我们在第24讲中知道,PCollection是具有无序性的,所以最简单的做法bundle在处理完成之后可以直接append到结果PCollection中。 至于为什么需要重做前面的bundle,这其实也是错误处理机制的一个trade-off了。Beam希望尽可能减少persistence cost,也就是不希望将中间结果保持在某一个worker上。可以这么想,如果我们想要不重新处理前面的bundle,我们必须要将很多中间结果转换成硬盘数据,这样一方面增加很大的时间开销,另一方面因为数据持久化了在具体一台机器上,我们也没有办法再重新动态分配bundle到不同的机器上去了。
16 - cricket19812019-06-21bundle随机分配会不会产生数据倾斜?完美并行背后的机制是?beam应该也有类似spark的persist方法缓存转换中间结果,防止出错恢复链太长吧?
作者回复: 谢谢你的提问!其实文章中所讲到的随机分配并不是说像分配随机数那样将bundle随机分配出去给workers,只是说根据runner的不同,bundle的分配方式也会不一样了,但最终还是还是希望能最大化并行度。 至于完美并行的背后机制,Beam会在真正处理数据前先计算优化出执行的一个有向无环图,希望保持并行处理数据的同时,能够减少每个worker之间的联系。 Beam据我所知是有的,BEAM-7131 issue就有反应这个问题。
共 2 条评论9 - YZJ2019-06-21老师请教个问题:PCollectionA transform PCollectionB, 假如PCollectionB 要比PCollectionA大很多倍,比如transform 是把PCollectionA 中每个字符串重复1000次,那PCollectionB 就要大1000倍,worker会不会有内存溢出问题? spark中可以配置executor 的core和memery来控制每个task内存用量,beam有类似机制吗?不然怎样让资源利用最优化呢?
作者回复: 谢谢你的提问!这个问题很好啊,运行大型的Beam pipeline遇到OOM也是有可能的。要配置底层资源的话要看Runner支不支持,像如果将Google Cloud Dataflow作为Runner的话,我们可以通过配置PipelineOption来达到目的。底层使用Spark的话我个人还没有使用过,不过应该是可以用SparkContextOptions来配置的。
8 - aof2019-06-26Beam的错误处理和RDD的真的很像,因为transformation都是lazy的,只有action才会触发计算,中间的转换过程都是被记录在DAG中的,这就导致中间某个transformation失败之后,需要往上追溯之前的转换,可以理解为是寻找父transformation,然后父transformation还要往上寻找父父transformation,直到没有父transformation为止,就像是类加载机制一样。但是如果能把中间结果保存在内存中,在失败重新计算时,就能提高计算的效率。展开
作者回复: 的确如此
6 - 沈洪彬2019-06-23在 Beam 的数据流水线中,当处理的元素发生错误时流水线的的错误处理机制分两种情况 1.单个Transform上的错误处理 如果某个Bundle里元素处理失败,则整个Bundle里元素都必须重新处理 2.多步骤Transform上的错误处理 如果某个Bundle里元素处理失败,则整个Bundle里元素及与之关联的所有Bundle都必须重新处理展开
作者回复: 谢谢留言!总结得不错!
4 - onepieceJT20182019-06-21老师 想到一个问题啊 如果有个计算是 需要worker1 和 worker2 都算完的结果再计算 发生worker1 一直错误没通过 这时候worker2会一直傻傻等待嘛
作者回复: 谢谢你的提问!这个依赖worker1和worker2计算结果的Transform会一直等待。但是与此同时,worker2可以做其它的计算,甚至有可能worker1的计算如果一直出错,Beam会将这个bundle重新分配给worker2来计算。
4 - 常超2019-06-21<在多步骤的 Transform 上,如果处理的一个 Bundle 元素发生错误了,则这个元素所在的整个 Bundle 以及与这个 Bundle 有关联的所有 Bundle 都必须重新处理。 如果upstream transform里状态有更新操作,重新处理已经成功的bundle会出现数据重复,可能导致状态更新不正确吧?
作者回复: 谢谢你的提问!这个问题非常好啊,如果你所说的是stateful processing的话,那它的错误处理机制和stateful-less会不太一样。Stateful processing里的ParDo在处理每一个元素的时候会将它的state持久化,也就是保存到外部的storage中,下次需要用到这个元素的时候会再读取这个元素的state。如果是发生了错误的话,会有机制reclaim这些states或者是invalidate它们。
3 - jimyth2019-07-26老师你好,既然PCollection 是无序的,请问一下怎么处理数据流中的先后依赖的问题,本节例子的 bound中的数据都是有序的分配的,实际计算过程中是不是会出现 1,3,5出现在一个 bound ;2,4,6 出现在一个 bound 您在 23 讲的例子中,ParDo 是针对单个元素的处理,怎么实现计算2 个元素的累加的呢? 例如下面是一组速度数据 时间 速度 2019-07-26 00:00:00 10 2019-07-26 00:00:01 15 2019-07-26 00:00:02 20 2019-07-26 00:00:03 40 2019-07-26 00:00:04 70 我需要大概怎么计算加速度,展开1
- Alpha2019-06-21上一期讲到,PCollection 是有向图中的边,而 Transform 是有向图里的节点。这一期的图咋又变了呢
作者回复: 谢谢留言!哈哈,需要表达的内容不一样了。
1 - dancer2019-06-21想问老师,一个bundle的数据必须要全部处理完之后才能进行第二个transform吗?如果部分数据经过transform1后就可以继续执行transform2,这样数据并行度会更高吧,为什么没有采用这种机制呢?
作者回复: 谢谢你的提问!这个问题还是要看情况吧,如果多步骤的Transform都是ParDo的话,那确实可以按照你说的做法去做。不过当Transform涉及到Combine或者Flatten这种Transform的话, 那就必须等到这一阶段所有的Transform完成了之后才能够进行下一步的Transform了。
1 - Mr.Tree2022-12-30 来自四川错误处理机制有点像MySQL的事务,要么全部成功,要么回滚1
- 哇哈哈2022-03-13流式数据也会分bundle吗?那不是变成spark streaming的微型批处理形式了?
- weiming2022-01-121. PCollection会被划分成多个Bundle(分配多少个是随机的),Bundle会被分配到Worker中处理(分配也是随机的),最终机制保障最大程度的完美并行。 2. 错误处理中有关联Bundle的概念(因为是同一个Worker处理),如果关联Bundle中的一个Bundle失败了,所有关联的Bundle全部重做,主要是考虑到数据持久化的成本。(通过重做消除持久化) 3. 注意有PCollection不可变性,引申到Bundle中的不可变性。展开
- 旭东(Frank)2021-01-14分而治之
- 冯杰2020-04-11从您的描述中可以看出,数据的实际计算和容错都是以分区来进行的,原因在于ParDo模式下同一个Pc下不同的数据记录之间不存在依赖关系即可以完成计算。 在实际计算时,我们处理玩Trasform1得到Pc1,然后在接着计算transform2,那为什么不能以单条数据来并行呢? 即分区内的每一条数据独立完成所有的计算链,而不是要等同一个Pc下的数据都就绪后在执行下一个计算。 关于容错不以单条数据来设计,我倒是能理解,因为要这样做的话,我们必然需要为每条数据都记录他的计算关系,追溯它具体是从上游的哪一条数据来的,这会增加存储的压力。 而以分区来实现容错,我们只需要记录血缘即可,血缘关系太长,可以像Spark那样做一些持久化的操作。展开
- chief2019-07-04老师您好,bundle经过Transform会产生新的bundle,那么是同时保留前后bundle数据还是在新生成的bundle中保留血缘关系?
作者回复: 谢谢你的留言!前后bundle保不保留这个还要看你的执行DAG需不需要还用到这个bundle。至于保留前后关系的话主要是用于在发生错误的情况下重新trace back到最开始的那个transform,这个信息从DAG中就可以找到了。
- fy2019-06-23老师,有编程语言基础。我也去Beam看了看教程,请问这个可以直接学吧。还需要其他基础么,比如操作系统,计算机组成原理等
作者回复: 谢谢你的留言!确实是可以直接学习,不过我也建议如果平时有时间的话,可以看看这些计算机的基础。像操作系统的话里面的一些调度算法或许可以给你平时的实际应用有一些启发。
- TJ2019-06-21能否说一下Beam和底层执行系统的边界在哪里?那些功能由Beam提供,那些由底层如Spark提供? 如果底层是spark,是否PCollection就是RDD?
作者回复: 谢谢你的留言!如果我没有理解错你的问题的话,我想你说的“边界”指的就是在第23讲中讲到的Beam的统一模型层了。 其实Beam提供的是在Dataflow Model论文里面的一种批流统一处理的思想,数据处理引擎如果能够按照这个思想提供出相应的APIs的话那这个数据处理引擎就可以成为Beam的底层Runner。 最后一问的话是的,PCollection抽象就是Spark中的RDD。
- JohnT3e2019-06-21由于beam优化器,是不是实际产生的bundle要少于逻辑上的个数?
作者回复: 谢谢你的留言!你这样的理解其实也没有错,如果Beam优化器能优化合并掉一些步骤的话,那确实实际产生出来的bundle会比理论上可以产生出来的bundle要少。