不少初学者其实对Spark的编程模式仍是RDD这个概念理解不到位,就会产生一些误解。编程
好比,不少时候咱们经常觉得一个文件是会被完整读入到内存,而后作各类变换,这极可能是受两个概念的误导:分布式
若是你没有主动对RDDCache/Persist,它不过是一个概念上存在的虚拟数据集,你其实是看不到这个RDD的数据的全集的(他不会真的都放到内存里)。函数
一个RDD 本质上是一个函数,而RDD的变换不过是函数的嵌套。RDD我认为有两类:oop
咱们如下面的代码为例作分析:spa
sc.textFile("abc.log").map().saveAsTextFile("")
因此RDD不过是对一个函数的封装,当一个函数对数据处理完成后,咱们就获得一个RDD的数据集(是一个虚拟的,后续会解释)。code
NewHadoopRDD是数据来源,每一个parition负责获取数据,得到过程是经过iterator.next 得到一条一条记录的。假设某个时刻拿到了一条数据A,这个A会马上被map里的函数处理获得B(完成了转换),而后开始写入到HDFS上。其余数据重复如此。因此整个过程:orm
因此整个过程实际上是流式的过程,一条数据被各个RDD所包裹的函数处理。内存
刚才我反复提到了嵌套函数,怎么知道它是嵌套的呢?it
若是你写了这样一个代码:io
sc.textFile("abc.log").map().map().........map().saveAsTextFile("")
有成千上万个map,极可能就堆栈溢出了。为啥?其实是函数嵌套太深了。
按上面的逻辑,内存使用实际上是很是小的,10G内存跑100T数据也不是难事。可是为何Spark经常由于内存问题挂掉呢? 咱们接着往下看。
这就是为何要分Stage了。每一个Stage其实就是我上面说的那样,一套数据被N个嵌套的函数处理(也就是你的transform动做)。遇到了Shuffle,就被切开来,所谓的Shuffle,本质上是把数据按规则临时都落到磁盘上,至关于完成了一个saveAsTextFile的动做,不过是存本地磁盘。而后被切开的下一个Stage则以本地磁盘的这些数据做为数据源,从新走上面描述的流程。
咱们再作一次描述:
所谓Shuffle不过是把处理流程切分,给切分的上一段(咱们称为Stage M)加个存储到磁盘的Action动做,把切分的下一段(Stage M+1)数据源变成Stage M存储的磁盘文件。每一个Stage均可以走我上面的描述,让每条数据均可以被N个嵌套的函数处理,最后经过用户指定的动做进行存储。
前面咱们提到,Shuffle不过是偷偷的帮你加上了个相似saveAsLocalDiskFile
的动做。然而,写磁盘是一个高昂的动做。因此咱们尽量的把数据先放到内存,再批量写到文件里,还有读磁盘文件也是给费内存的动做。把数据放内存,就遇到个问题,好比10000条数据,到底会占用多少内存?这个其实很难预估的。因此一不当心,就容易致使内存溢出了。这其实也是一个很无奈的事情。
其实就是给某个Stage加上了一个saveAsMemoryBlockFile
的动做,而后下次再要数据的时候,就不用算了。这些存在内存的数据就表示了某个RDD处理后的结果。这个才是说为啥Spark是内存计算引擎的地方。在MR里,你是要放到HDFS里的,但Spark容许你把中间结果放内存里。
咱们从一个较新的角度解释了RDD 和Shuffle 都是一个什么样的东西。