近期优化了一个spark流量统计的程序,此程序跑5分钟小数据量日志不到5分钟,但相同的程序跑一天大数据量日志各类失败。经优化,使用160 vcores + 480G memory,一天的日志可在2.5小时内跑完,下面对一些优化的思路方法进行梳理。 java
三个目标优先级依次递减,首要解决的是程序可以跑通大数据量,资源性能尽可能进行优化。 node
这部分主要对程序进行优化,主要考虑stage、cache、partition等方面。 web
在进行shuffle操做时,如reduceByKey、groupByKey,会划分新的stage。同一个stage内部使用pipe line进行执行,效率较高;stage之间进行shuffle,效率较低。故大数据量下,应进行代码结构优化,尽可能减小shuffle操做。 网络
本例中,首先计算出一个baseRDD,而后对其进行cache,后续启动三个子任务基于cache进行后续计算。 并发
对于5分钟小数据量,采用StorageLevel.MEMORY_ONLY,而对于大数据下咱们直接采用了StorageLevel.DISK_ONLY。DISK_ONLY_2相较DISK_ONLY具备2备份,cache的稳定性更高,但同时开销更大,cache除了在executor本地进行存储外,还需走网络传输至其余节点。后续咱们的优化,会保证executor的稳定性,故没有必要采用DISK_ONLY_2。实时上,若是优化的很差,咱们发现executor也会大面积挂掉,这时候即使DISK_ONLY_2,也是然并卵,因此保证executor的稳定性才是保证cache稳定性的关键。 app
cache是lazy执行的,这点很容易犯错,例如: jvm
val raw = sc.textFile(file) val baseRDD = raw.map(...).filter(...) baseRDD.cache() val threadList = new Array( new Thread(new SubTaskThead1(baseRDD)), new Thread(new SubTaskThead2(baseRDD)), new Thread(new SubTaskThead3(baseRDD)) ) threadList.map(_.start()) threadList.map(_.join())
这个例子在三个子线程开始并行执行的时候,baseRDD因为lazy执行,还没被cache,这时候三个线程会同时进行baseRDD的计算,cache的功能形同虚设。能够在baseRDD.cache()后增长baseRDD.count(),显式的触发cache,固然count()是一个action,自己会触发一个job。 post
再举一个错误的例子: 性能
val raw = sc.textFile(file) val pvLog = raw.filter(isPV(_)) val clLog = raw.filter(isCL(_)) val baseRDD = pvLog.union(clLog) val baseRDD.count()
因为textFile()也是lazy执行的,故本例会进行两次相同的hdfs文件的读取,效率较差。解决办法,是对pvLog和clLog共同的父RDD进行cache。 测试
一个stage由若干partition并行执行,partition数是一个很重要的优化点。
本例中,一天的日志由6000个小文件组成,加上后续复杂的统计操做,某个stage的parition数达到了100w。parition过多会有不少问题,好比全部task返回给driver的MapStatus都已经很大了,超过spark.driver.maxResultSize(默认1G),致使driver挂掉。虽然spark启动task的速度很快,可是每一个task执行的计算量太少,有一半多的时间都在进行task序列化,形成了浪费,另外shuffle过程的网络消耗也会增长。
对于reduceByKey(),若是不加参数,生成的rdd与父rdd的parition数相同,不然与参数相同。还可使用coalesce()和repartition()下降parition数。例如,本例中因为有6000个小文件,致使baseRDD有6000个parition,可使用coalesce()下降parition数,这样parition数会减小,每一个task会读取多个小文件。
val raw = sc.textFile(file).coalesce(300) val baseRDD = raw.map(...).filter(...) baseRDD.cache()
那么对于每一个stage设置多大的partition数合适那?固然不一样的程度的复杂度不一样,这个数值须要不断进行调试,本例中经测试保证每一个parition的输入数据量在1G之内便可,若是parition数过少,每一个parition读入的数据量变大,会增长内存的压力。例如,咱们的某一个stage的ShuffleRead达到了3T,我设置parition数为6000,平均每一个parition读取500M数据。
val bigRDD = ... bigRDD.coalesce(6000).reduceBy(...)
最后,通常咱们的原始日志很大,可是计算结果很小,在saveAsTextFile前,能够减小结果rdd的parition数目,这样会计算hdfs上的结果文件数,下降小文件数会下降hdfs namenode的压力,也会减小最后咱们收集结果文件的时间。
val resultRDD = ... resultRDD.repartition(1).saveAsTextFile(output)
这里使用repartition()不使用coalesce(),是为了避免下降resultRDD计算的并发量,经过再作一次shuffle将结果进行汇总。
在搜狗咱们的spark程序跑在yarn集群上,咱们应保证咱们的程序有一个稳定高效的集群环境。
一些经常使用的参数设置以下:
--queue:集群队列 --num-executors:executor数量,默认2 --executor-memory:executor内存,默认512M --executor-cores:每一个executor的并发数,默认1
executor的数量能够根据任务的并发量进行估算,例如我有1000个任务,每一个任务耗时1分钟,若10个并发则耗时100分钟,100个并发耗时10分钟,根据本身对并发需求进行调整便可。默认每一个executor内有一个并发执行任务,通常够用,也可适当增长,固然内存的使用也会有所增长。
对于yarn-client模式,整个application所申请的资源为:
total vores = executor-cores * num-executors + spark.yarn.am.cores total memory= (executor-memory + spark.yarn.executor.memoryOverhead) * num-executors + (spark.yarn.am.memory + spark.yarn.am.memoryOverhead)
当申请的资源超出所指定的队列的min cores和min memory时,executor就有被yarn kill掉的风险。而spark的每一个stage是有状态的,若是被kill掉,对性能影响比较大。例如,本例中的baseRDD被cache,若是某个executor被kill掉,会致使其上的cache的parition失效,须要从新计算,对性能影响极大。
这里还有一点须要注意,executor-memory设置的是executor jvm启动的最大堆内存,java内存除了堆内存外,还有栈内存、堆外内存等,因此spark使用spark.yarn.executor.memoryOverhead对非堆内存进行限制,也就是说executor-memory + spark.yarn.executor.memoryOverhead是所能使用的内存的上线,若是超过此上线,就会被yarn kill掉。本次优化,堆外内存的优化起到了相当重要的做用,咱们后续会看到。
spark.yarn.executor.memoryOverhead默认是executor-memory * 0.1,最小是384M。好比,咱们的executor-memory设置为1G,spark.yarn.executor.memoryOverhead是默认的384M,则咱们向yarn申请使用的最大内存为1408M,但因为yarn的限制为倍数(不知道是否是只是咱们的集群是这样),实际上yarn运行咱们运行的最大内存为2G。这样感受浪费申请的内存,申请的堆内存为1G,实际上却给咱们分配了2G,若是对spark.yarn.executor.memoryOverhead要求不高的话,能够对executor-memory再精细化,好比申请executor-memory为640M,加上最小384M的spark.yarn.executor.memoryOverhead,正好一共是1G。
除了启动executor外,spark还会启动一个am,可使用spark.yarn.am.memory设置am的内存大小,默认是512M,spark.yarn.am.memoryOverhead默认也是最小384M。有时am会出现OOM的状况,能够适当调大spark.yarn.am.memory。
executor默认的永久代内存是64K,能够看到永久代使用率长时间为99%,经过设置spark.executor.extraJavaOptions适当增大永久代内存,例如:–conf spark.executor.extraJavaOptions=”-XX:MaxPermSize=64m”
driver端在yarn-client模式下运行在本地,也能够对相关参数进行配置,如–driver-memory等。
executor的stdout、stderr日志在集群本地,当出问题时,能够到相应的节点查询,固然从web ui上也能够直接看到。
executor除了stdout、stderr日志,咱们能够把gc日志打印出来,便于咱们对jvm的内存和gc进行调试。
--conf spark.executor.extraJavaOptions="-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log"
除了executor的日志,nodemanager的日志也会给咱们一些帮助,好比由于超出内存上限被kill、资源抢占被kill等缘由都能看到。
除此以外,spark am的日志也会给咱们一些帮助,从yarn的application页面能够直接看到am所在节点和log连接。
咱们的yarn集群节点上上跑着mapreduce、hive、pig、tez、spark等各种任务,除了内存有所限制外,CPU、带宽、磁盘IO等都没有限制(固然,这么作也是为了提升集群的硬件利用率),加上集群总体业务较多负载较高,使得spark的执行环境十分恶劣。常见的一些因为集群环境,致使spark程序失败或者性能降低的状况有:
通过上述优化,咱们的程序的稳定性有所提高,可是让咱们彻底跑通的最后一根救命稻草是内存、GC相关的优化。
咱们使用的spark版本是1.5.2(更准确的说是1.5.3-shapshot),shuffle过程当中block的传输使用netty(spark.shuffle.blockTransferService)。基于netty的shuffle,使用direct memory存进行buffer(spark.shuffle.io.preferDirectBufs),因此在大数据量shuffle时,堆外内存使用较多。固然,也可使用传统的nio方式处理shuffle,可是此方式在spark 1.5版本设置为deprecated,并将会在1.6版本完全移除,因此我最终仍是采用了netty的shuffle。
jvm关于堆外内存的配置相对较少,经过-XX:MaxDirectMemorySize能够指定最大的direct memory。默认若是不设置,则与最大堆内存相同。
Direct Memory是受GC控制的,例如ByteBuffer bb = ByteBuffer.allocateDirect(1024),这段代码的执行会在堆外占用1k的内存,Java堆内只会占用一个对象的指针引用的大小,堆外的这1k的空间只有当bb对象被回收时,才会被回收,这里会发现一个明显的不对称现象,就是堆外可能占用了不少,而堆内没占用多少,致使还没触发GC。加上-XX:MaxDirectMemorySize这个大小限制后,那么只要Direct Memory使用到达了这个大小,就会强制触发GC,这个大小若是设置的不够用,那么在日志中会看到java.lang.OutOfMemoryError: Direct buffer memory。
例如,在咱们的例子中,发现堆外内存飙升的比较快,很容易被yarn kill掉,因此应适当调小-XX:MaxDirectMemorySize(也不能太小,不然会报Direct buffer memory异常)。固然你也能够调大spark.yarn.executor.memoryOverhead,加大yarn对咱们使用内存的宽容度,可是这样比较浪费资源了。
GC优化前,最好是把gc日志打出来,便于咱们进行调试。
--conf spark.executor.extraJavaOptions="-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log"
经过看gc日志,咱们发现一个case,特定时间段内,堆内存其实很闲,堆内存使用率也就5%左右,长时间不进行父gc,致使Direct Memory一直不进行回收,一直在飙升。因此,咱们的目标是让父gc更频繁些,多触发一些Direct Memory回收。
第一,能够减小整个堆内存的大小,固然也不能过小,不然堆内存也会报OOM。这里,我配置了1G的最大堆内存。
第二,可让年轻代的对象尽快进入年老代,增长年老代的内存。这里我使用了-Xmn100m,将年轻代大小设置为100M。另外,年轻代的对象默认会在young gc 15次后进入年老代,这会形成年轻代使用率比较大,young gc比较多,可是年老代使用率低,父gc比较少,经过配置-XX:MaxTenuringThreshold=1,年轻代的对象通过一次young gc后就进入年老代,加快年老代父gc的频率。
第三,可让年老代更频繁的进行父gc。通常年老代gc策略咱们主要有-XX:+UseParallelOldGC和-XX:+UseConcMarkSweepGC这两种,ParallelOldGC吞吐率较大,ConcMarkSweepGC延迟较低。咱们但愿父gc频繁些,对吞吐率要求较低,并且ConcMarkSweepGC能够设置-XX:CMSInitiatingOccupancyFraction,即年老代内存使用率达到什么比例时触发CMS。咱们决定使用CMS,并设置-XX:CMSInitiatingOccupancyFraction=10,即年老代使用率10%时触发父gc。
经过对GC策略的配置,咱们发现父gc进行的频率加快了,带来好处就是Direct Memory可以尽快进行回收,固然也有坏处,就是gc时间增长了,cpu使用率也有所增长。
最终咱们对executor的配置以下:
--executor-memory 1G --num-executors 160 --executor-cores 1 --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.executor.extraJavaOptions="-XX:MaxPermSize=64m -XX:+CMSClassUnloadingEnabled -XX:MaxDirectMemorySize=1536m -Xmn100m -XX:MaxTenuringThreshold=1 -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=10 -XX:+UseCompressedOops -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError"
经过对Stage/Cache/Partition、资源、内存/GC的优化,咱们的spark程序最终可以在160 vcores + 480G memory资源下,使用2.5小时跑通一天的日志。
对于程序优化,我认为应本着以下几点进行: