极客时间已完结课程限时免费阅读

13 | 同样的本质,为何Spark可以更高效?

13 | 同样的本质,为何Spark可以更高效?-极客时间

13 | 同样的本质,为何Spark可以更高效?

讲述:李智慧

时长12:06大小5.54M

上一期我们讨论了 Spark 的编程模型,这期我们聊聊Spark 的架构原理。和 MapReduce 一样,Spark 也遵循移动计算比移动数据更划算这一大数据计算基本原则。但是和 MapReduce 僵化的 Map 与 Reduce 分阶段计算相比,Spark 的计算框架更加富有弹性和灵活性,进而有更好的运行性能。

Spark 的计算阶段

我们可以对比来看。首先和 MapReduce 一个应用一次只运行一个 map 和一个 reduce 不同,Spark 可以根据应用的复杂程度,分割成更多的计算阶段(stage),这些计算阶段组成一个有向无环图 DAG,Spark 任务调度器可以根据 DAG 的依赖关系执行计算阶段。
还记得在上一期,我举了一个比较逻辑回归机器学习性能的例子,发现 Spark 比 MapReduce 快 100 多倍。因为某些机器学习算法可能需要进行大量的迭代计算,产生数万个计算阶段,这些计算阶段在一个应用中处理完成,而不是像 MapReduce 那样需要启动数万个应用,因此极大地提高了运行效率。
所谓 DAG 也就是有向无环图,就是说不同阶段的依赖关系是有向的,计算过程只能沿着依赖关系方向执行,被依赖的阶段执行完成之前,依赖的阶段不能开始执行,同时,这个依赖关系不能有环形依赖,否则就成为死循环了。下面这张图描述了一个典型的 Spark 运行 DAG 的不同阶段。
从图上看,整个应用被切分成 3 个阶段,阶段 3 需要依赖阶段 1 和阶段 2,阶段 1 和阶段 2 互不依赖。Spark 在执行调度的时候,先执行阶段 1 和阶段 2,完成以后,再执行阶段 3。如果有更多的阶段,Spark 的策略也是一样的。只要根据程序初始化好 DAG,就建立了依赖关系,然后根据依赖关系顺序执行各个计算阶段,Spark 大数据应用的计算就完成了。
上图这个 DAG 对应的 Spark 程序伪代码如下。
rddB = rddA.groupBy(key)
rddD = rddC.map(func)
rddF = rddD.union(rddE)
rddG = rddB.join(rddF)
所以,你可以看到 Spark 作业调度执行的核心是 DAG,有了 DAG,整个应用就被切分成哪些阶段,每个阶段的依赖关系也就清楚了。之后再根据每个阶段要处理的数据量生成相应的任务集合(TaskSet),每个任务都分配一个任务进程去处理,Spark 就实现了大数据的分布式计算。
具体来看的话,负责 Spark 应用 DAG 生成和管理的组件是 DAGScheduler,DAGScheduler 根据程序代码生成 DAG,然后将程序分发到分布式计算集群,按计算阶段的先后关系调度执行。
那么 Spark 划分计算阶段的依据是什么呢?显然并不是 RDD 上的每个转换函数都会生成一个计算阶段,比如上面的例子有 4 个转换函数,但是只有 3 个阶段。
你可以再观察一下上面的 DAG 图,关于计算阶段的划分从图上就能看出规律,当 RDD 之间的转换连接线呈现多对多交叉连接的时候,就会产生新的阶段。一个 RDD 代表一个数据集,图中每个 RDD 里面都包含多个小块,每个小块代表 RDD 的一个分片。
一个数据集中的多个数据分片需要进行分区传输,写入到另一个数据集的不同分片中,这种数据分区交叉传输的操作,我们在 MapReduce 的运行过程中也看到过。
是的,这就是 shuffle 过程,Spark 也需要通过 shuffle 将数据进行重新组合,相同 Key 的数据放在一起,进行聚合、关联等操作,因而每次 shuffle 都产生新的计算阶段。这也是为什么计算阶段会有依赖关系,它需要的数据来源于前面一个或多个计算阶段产生的数据,必须等待前面的阶段执行完毕才能进行 shuffle,并得到数据。
这里需要你特别注意的是,计算阶段划分的依据是 shuffle,不是转换函数的类型,有的函数有时候有 shuffle,有时候没有。比如上图例子中 RDD B 和 RDD F 进行 join,得到 RDD G,这里的 RDD F 需要进行 shuffle,RDD B 就不需要。
因为 RDD B 在前面一个阶段,阶段 1 的 shuffle 过程中,已经进行了数据分区。分区数目和分区 Key 不变,就不需要再进行 shuffle。
这种不需要进行 shuffle 的依赖,在 Spark 里被称作窄依赖;相反的,需要进行 shuffle 的依赖,被称作宽依赖。跟 MapReduce 一样,shuffle 也是 Spark 最重要的一个环节,只有通过 shuffle,相关数据才能互相计算,构建起复杂的应用逻辑。
在你熟悉 Spark 里的 shuffle 机制后我们回到今天文章的标题,同样都要经过 shuffle,为什么 Spark 可以更高效呢?
其实从本质上看,Spark 可以算作是一种 MapReduce 计算模型的不同实现。Hadoop MapReduce 简单粗暴地根据 shuffle 将大数据计算分成 Map 和 Reduce 两个阶段,然后就算完事了。而 Spark 更细腻一点,将前一个的 Reduce 和后一个的 Map 连接起来,当作一个阶段持续计算,形成一个更加优雅、高效的计算模型,虽然其本质依然是 Map 和 Reduce。但是这种多个计算阶段依赖执行的方案可以有效减少对 HDFS 的访问,减少作业的调度执行次数,因此执行速度也更快。
并且和 Hadoop MapReduce 主要使用磁盘存储 shuffle 过程中的数据不同,Spark 优先使用内存进行数据存储,包括 RDD 数据。除非是内存不够用了,否则是尽可能使用内存, 这也是 Spark 性能比 Hadoop 高的另一个原因。

Spark 的作业管理

我在专栏上一期提到,Spark 里面的 RDD 函数有两种,一种是转换函数,调用以后得到的还是一个 RDD,RDD 的计算逻辑主要通过转换函数完成。
另一种是 action 函数,调用以后不再返回 RDD。比如 count() 函数,返回 RDD 中数据的元素个数;saveAsTextFile(path),将 RDD 数据存储到 path 路径下。Spark 的 DAGScheduler 在遇到 shuffle 的时候,会生成一个计算阶段,在遇到 action 函数的时候,会生成一个作业(job)。
RDD 里面的每个数据分片,Spark 都会创建一个计算任务去处理,所以一个计算阶段会包含很多个计算任务(task)。
关于作业、计算阶段、任务的依赖和时间先后关系你可以通过下图看到。
图中横轴方向是时间,纵轴方向是任务。两条粗黑线之间是一个作业,两条细线之间是一个计算阶段。一个作业至少包含一个计算阶段。水平方向红色的线是任务,每个阶段由很多个任务组成,这些任务组成一个任务集合。
DAGScheduler 根据代码生成 DAG 图以后,Spark 的任务调度就以任务为单位进行分配,将任务分配到分布式集群的不同机器上执行。

Spark 的执行过程

Spark 支持 Standalone、Yarn、Mesos、Kubernetes 等多种部署方案,几种部署方案原理也都一样,只是不同组件角色命名不同,但是核心功能和运行流程都差不多。
上面这张图是 Spark 的运行流程,我们一步一步来看。
首先,Spark 应用程序启动在自己的 JVM 进程里,即 Driver 进程,启动后调用 SparkContext 初始化执行配置和输入数据。SparkContext 启动 DAGScheduler 构造执行的 DAG 图,切分成最小的执行单位也就是计算任务。
然后 Driver 向 Cluster Manager 请求计算资源,用于 DAG 的分布式计算。Cluster Manager 收到请求以后,将 Driver 的主机地址等信息通知给集群的所有计算节点 Worker。
Worker 收到信息以后,根据 Driver 的主机地址,跟 Driver 通信并注册,然后根据自己的空闲资源向 Driver 通报自己可以领用的任务数。Driver 根据 DAG 图开始向注册的 Worker 分配任务。
Worker 收到任务后,启动 Executor 进程开始执行任务。Executor 先检查自己是否有 Driver 的执行代码,如果没有,从 Driver 下载执行代码,通过 Java 反射加载后开始执行。

小结

总结来说,Spark 有三个主要特性:RDD 的编程模型更简单,DAG 切分的多阶段计算过程更快速,使用内存存储中间计算结果更高效。这三个特性使得 Spark 相对 Hadoop MapReduce 可以有更快的执行速度,以及更简单的编程实现。
Spark 的出现和流行其实也有某种必然性,是天时、地利、人和的共同作用。首先,Spark 在 2012 年左右开始流行,那时内存的容量提升和成本降低已经比 MapReduce 出现的十年前强了一个数量级,Spark 优先使用内存的条件已经成熟;其次,使用大数据进行机器学习的需求越来越强烈,不再是早先年那种数据分析的简单计算需求。而机器学习的算法大多需要很多轮迭代,Spark 的 stage 划分相比 Map 和 Reduce 的简单划分,有更加友好的编程体验和更高效的执行效率。于是 Spark 成为大数据计算新的王者也就不足为奇了。

思考题

Spark 的流行离不开它成功的开源运作,开源并不是把源代码丢到 GitHub 上公开就万事大吉了,一个成功的开源项目需要吸引大量高质量开发者参与其中,还需要很多用户使用才能形成影响力。
Spark 开发团队为 Spark 开源运作进行了大量的商业和非商业活动,你了解这些活动有哪些吗?假如你所在的公司想要开源自己的软件,用于提升自己公司的技术竞争力和影响力,如果是你负责人,你应该如何运作?
欢迎你写下自己的思考或疑问,与我和其他同学一起讨论。
分享给需要的人,Ta购买本课程,你将得20
生成海报并分享

赞 27

提建议

上一篇
12 | 我们并没有觉得MapReduce速度慢,直到Spark出现
下一篇
14 | BigTable的开源实现:HBase
unpreview
 写留言

精选留言(64)

  • vivi
    2018-12-25
    懂了原理,实战其实很简单,不用急着学部署啊,操作,原理懂了,才能用好,我觉得讲得很好

    作者回复: 👍🏻

    68
  • 张小喵
    2019-12-09
    把12|13反复读了4、5遍,以下是我的感悟 Spark再次理解: 1:Spark计算框架编程和运行速度比MapReduce更加的简单和快 编程更简单:Spark编程关注的数据是RDD,RDD是个抽象的,比较不好理解,我的理解RDD是一次计算阶段中 要操作的所有的数据的抽象,虽然它们是分片的,并且分布在HDFS的任意节点上,但是概念上,我们是针对这个抽象的RDD编程的。使用Scala编程,wordcount只需要三行代码,很简单是吧,但是背后的整体的计算过程是相当的复杂的。这样看我们在MapReduce中的编程要是关注所有的数据的,默认为map的数据输入的数据是整个要处理的数据,并不是说,其他的应用就不知道了,毕竟我们也可以在代码中感知到在文件中的偏移量这种东西。 速度更快:MapReduce暴力的把计算阶段分为两个阶段,Map和Reduce阶段, 如果一个应用的计算实现只有两个阶段,那么MapReduce计算框架的速度不会比Spark慢多少,慢的地方只是在于Spark是不经过落盘的操作的,直接在内存中存储,但是如果一个应用的计算阶段变得很多的话,比如机器学习中的迭代计算,那么使用MapReduce的就非常慢了,如果这个应用的计算分为10000个计算阶段,如果用MapReduce实现,就需要启动5000次相关的应用,速度很慢,并且编码会很麻烦,如果 是Spark,那么在一次应用就可以解决该问题。 Spark的多个计算阶段的理解: 相比于MapReduce只有两个计算阶段, Spark理论上可以有无限个计算阶段, 这也是Spark的速度的优势 Spark的计算阶段的表示中,DAG(有向无环图)是Spark的关键,DAG可以很好的表示每个计算阶段的关系,或者说依赖书序 那么DAG是谁生成的呢,是根据什么生成的呢? DAG是有Spark计算框架根据用于所写的代码生成的,那怎么依据代码的什么生成的呢? 类比MapReduce的两个计算阶段,两个阶段之间的过度是什么?是shuffle,Spark也是根据Spark中的代码中的转换函数是否是有shuffle操作进行划分阶段的! 我的理解,Spark的每个计算阶段可以类比MapReduce中的Map阶段或者Reduce阶段。不同的是,Spark计算阶段关注的是RDD,但是又有相同的点,RDD中的数据组成也是一片一片的,Spark中的最小的任务也就是对于片的计算,原理和MapReduce一样,Spark中的片和MapReduce中的片是通一个东西,每个片都是分布于HDFS上的,对于每个片的计算大概率也是在片所在的计算节点的。所以Spark的计算也是分布式,并且原理和MapReduce是一样的。 Spark中RDD上的操作函数:RDD上的操作函数分为两种类型,一种是转换函数,另一种是action函数 转换函数:Spark编程中对于RDD的操作基本是用转换函数来完成的,转换函数是计算RDD,转换函数又分为两种 只是改变RDD内容的转换函数:类似map,filter函数,RDD本身物理上没有变化,所有的操作都是针对于当前的分片 会新生成RDD转换函数:类似于reduceByKey这种函数,会组合key生成新的RDD(所有的会生成新的RDD的函数都可以作为 计算阶段的分割函数吗?) action函数:aciton函数对于RDD的操作没有返回值,或者说不会改变RDD的内容,比如rdd.saveToPath等等 Spark中一些关于应用的生命周期中的过程的名词、概念: Spark中的RDD函数主要有两种,一是转换函数,调用转换函数可以返回一个RDD(产生新的RDD?/会shuffle的函数分割计算阶段),RDD的计算逻辑主要是由转换函数完成 另一种是action函数,调用这种函数不返回RDD,DAGScheduler在遇到shuffle的时候生成一个新的计算阶段,在遇到action函数的时候,产生一个作业。(我理解可以类似于,一次MapReduce程序对应Spark中的一个作业) 在每个计算阶段都是针对RDD(包含很多片)的计算,每个分片Spark都会创建一个计算任务去处理,所以每个计算阶段会包含很多个计算任务
    展开
    共 1 条评论
    40
  • 落叶飞逝的恋
    2018-11-27
    总结:Spark的优点就是能够动态根据计算逻辑的复杂度进行不断的拆分子任务,而实现在一个应用中处理所有的逻辑,而不像MapReduce需要启动多个应用进行计算。
    36
  • My dream
    2018-11-28
    老师,你讲的理论看的我头晕脑胀的,能不能讲点实战操作,搭建spark环境,通过案例来讲的话,对于我们这些初学学生来说是最直观,最容易弄明白它为什么会那么快,像你这样一味的讲理论,不讲实战,我们实在是吸收不了,理解不了你讲的这些知识点
    共 7 条评论
    22
  • scorpiozj
    2018-11-28
    移动计算比移动数据划算 总结的真好 很多设计仔细想一想都是围绕这个中心 关于开源 1 准备好详细的使用 api文档并提供示例 2 撰写设计思路 和竞品比较的优势 以及创新点 3 提前联系若干团队使用产品 并请他们提供真实的提高效率的数据报告 4 联系公关团队在知名技术论坛推广 5 成立团队 及时响应开发者疑问 需求和pr等
    展开
    22
  • 纯洁的憎恶
    2018-11-27
    这两天的内容对我来说有些复杂,很多知识点没有理解透。针对“而 Spark 更细腻一点,将前一个的 Reduce 和后一个的 Map 连接起来,当作一个阶段持续计算,形成一个更加优雅、高效地计算模型”。这句话中“将前一个的 Reduce 和后一个的 Map 连接起来”在细节上该如何理解,这也是明显的串行过程,感觉不会比传统的MapReduce快?是因为不同阶段之间有可能并行么?

    作者回复: 引用楼下的评论回复 落叶飞逝的恋 总结:Spark的优点就是能够动态根据计算逻辑的复杂度进行不断的拆分子任务,而实现在一个应用中处理所有的逻辑,而不像MapReduce需要启动多个应用进行计算。

    共 4 条评论
    11
  • ming
    2018-12-08
    老师,有一句话我不太理解,请老师指导。“DAGScheduler 根据代码和数据分布生成 DAG 图”。根据代码生产DAG图我理解,但是为什么生成DAG图还要根据数据分布生成,数据分布不同,生成的DAG图也会不同吗?

    作者回复: 数据分布删掉,谢谢指正。

    7
  • 周洋舟
    2018-11-28
    每一篇文章都认真的读了,有些东西还没真正的去在实际工作中体会到,但这种思维的启发还是受益匪浅。
    7
  • 张飞
    2019-03-01
    1.“而 Spark 更细腻一点,将前一个的 Reduce 和后一个的 Map 连接起来,当作一个阶段持续计算,形成一个更加优雅、高效地计算模型”,stage之间是串行的,即便前一个的reduce和后一个的map连接起来,也是要从前一个stage的计算节点的磁盘上拉取数据的,这跟mapreduce的计算是一样的,老师所说的高效在这里是怎么提现的呢? 2. spark的内存计算主要体现在shuffle过程,下一个stage拉取上一个stage的数据的时候更多的使用内存,在这里区分出与mapreduce计算的不同,别的还有什么阶段比mapreduce更依赖内存的吗? 3.我是不是可以这样理解,如果只有map阶段的话,即便计算量很大,mapreduce与spark的计算速度上也没有太大的区别? 可能问题问的不够清晰,希望老师解答一下。
    展开

    作者回复: 1 Spark的map和reduce的划分要更优雅一点,比如宽依赖和窄依赖,编程上看不出明显的map和reduce,这种优雅还有很多,多写一些spark和MapReduce程序就能感受到。 2 如果内存够用,Spark几乎总是使用内存。 3 可以这么理解。

    6
  • 2019-03-07
    对于hbase和高速发展的es,不知道您怎么看,他们的优缺点是什么?
    5
  • 追梦小乐
    2018-11-27
    老师,我想请教几个问题: 1、“根据每个阶段要处理的数据量生成相应的任务集合(TaskSet),每个任务都分配一个任务进程去处理”,是一个任务集合TaskSet启动一个进程,taskSet里面的任务是用线程计算吗?还是每个TaskSet里面的任务启动一个进程? 2、task-time 图中红色密集的表示什么? 3、Spark 的执行过程 的图中 Executor 中的 Cache 是表示内存吗?task和Executor不是在内存中的吗?
    展开

    作者回复: 每个任务一个进程 很多红色线条,每条线代表一个任务 cache理解成存储rdd的内存

    共 2 条评论
    5
  • 落叶飞逝的恋
    2019-03-04
    现在回过头看,Spark的编程模型其实类似Java8的Steam编程模型
    共 1 条评论
    4
  • 白鸽
    2018-11-30
    Executor 从 Diver 下载执行代码,是整个程序 jar包?还是仅 Executor 计算任务对应的一段计算程序(经SparkSession初始化后的)?

    作者回复: 整个jar

    4
  • 多襄丸
    2018-11-27
    啊、老师现在的提问都好大,我现在是老虎吃天无从下爪啊 ^_^

    作者回复: 有些问题不一定要得到答案或者回答出来,只是关注到了思考一下,就会有收获~

    4
  • weiruan85
    2019-07-31
    1.完备的技术说明文档是必须的,比如使用场景,常见问题,环境搭建,核心技术的原理等。 2.输出真实等使用案例,以及给解决实际问题带来等好处,比如如果没有我们的开源方案是怎么实现的,有了这个方案是怎么实现的,差异是什么 3.商业推广,找业界有名的公司站台,或者有名的技术大牛做宣传(头羊效应) 4.归根结底,还是得有开创性的技术,能解决现实中的某一类问题。
    展开

    作者回复: 赞

    3
  • Yezhiwei
    2019-01-14
    这里是学习过程中做的一些总结 https://mp.weixin.qq.com/s/OyPRXAu9hR1KWIbvc20y1g

    作者回复: 👍👍👍

    3
  • weiruan85
    2019-07-31
    客户:我想给数据库中的一张关键表添加几个索引,对生产有没有影响 fey:为什么要加索引呢,是张什么表 客户:系统中的流水表,查询比较慢 fey:数据量大概多大 客户:有5000万 fey:存储了多久的数据呢 客户:存了将近2年的数据了 fey:按照业务应该保存多久的数据呢 客户:3个月 fey:那我们是不是应该先把历史数据进行归档,然后在添加索引呢。 客户:对,可以先做数据的归档。
    展开
    2
  • 冰ྂ镇ྂ可ྂ乐ྂ
    2019-04-30
    之前讲mr时候也有提到生成dag,spark这里也是dag,二者的差异是mr中,map reduce为一组操作(可能没有reduce)的一个job job之间是依赖关系,而spark并非简单依照m r划分而是针对数据的处理情况,如果r后到下一个m是窄依赖,则属于同一个stage,属于一个流程,这样理解对吗?

    作者回复: 可以这样理解。 MR模型本身不包含DAG,需要外部工具基于MR构建DAG实现复杂的计算,比如Hive。 Spark的计算模型本身就是包含DAG。

    2
  • 尼糯米
    2019-01-08
    1、DAGScheduler根据应用构造执行的DAG:是不是一个应用便构造一个DAG 2、DAG划分出计算阶段 3、每个计算阶段,根据要处理的数据量,为每个数据分片生成一个对应的任务,这些任务组成一个任务集合 4、每个任务分配一个进程 5、分布式计算:某个计算阶段,其某个任务的进程在集群某处执行计算 6、执行action函数便生成作业:依据DAG计算,是不是便可以生成多个作业 我这么理解可以吗?
    展开

    作者回复: √

    2
  • Richard
    2018-12-14
    “DAGScheduler 根据程序代码生成 DAG,然后将程序分发到分布式计算集群,按计算阶段的先后关系调度执行。Worker 收到任务后,启动 Executor 进程开始执行任务。Executor 先检查自己是否有 Driver 的执行代码,如果没有,从 Driver 下载执行代码,通过 Java 反射加载后开始执行。” 针对这一段话,我想多请教老师一些,我理解的DAGScheduler 根据程序代码生成 DAG,类似于关系型数据库优化器根据SQL生成执行计划,然后spark计算引擎根据这些计划去做计算,我的疑惑的是:DAG已经是根据代码生成的了,那Worker 还要从 Driver 下载执行代码去执行,我无法想象worker是如何执行代码的,能否帮忙解疑一下?
    展开

    作者回复: SQL的执行计划和spark的dag,都是执行描述,可以用文本查看的,不包括执行代码。 worker下载应用程序的jar包,反射加载执行。第三模块spark源码优化有详细描述。

    2