SparkSQL性能调优与优化指南

spark 内存泄露

1.高并发状况下的内存泄露的具体表现

很遗憾,Spark的设计架构并非为了高并发请求而设计的,咱们尝试在网络条件很差的集群下,进行100并发的查询,在压测3天后发现了内存泄露。node

a)在进行大量小SQL的压测过程当中发现,有大量的activejob在spark ui上一直处于pending状态,且永远不结束,以下图所示sql

 

b)而且发现driver内存爆满apache

 

c)用内存分析分析工具分析了下网络

 

2.高并发下AsynchronousListenerBus引发的WEB UI的内存泄露

短期内 SPARK 提交大量的SQL ,并且SQL里面存在大量的 union与join的情形,会建立大量的event对象,使得这里的 event数量超过10000个event ,

一旦超过10000个event就开始丢弃 event,而这个event是用来回收 资源的,丢弃了 资源就没法回收了
。 针对UI页面的这个问题,咱们将这个队列长度的限制给取消了。session

 

 

 

 

 

 

3.AsynchronousListenerBus自己引发的内存泄露

抓包发现多线程


 

 

 

这些event是经过post方法传递的,并写入到队列里架构

 

 

可是也是由一个单线程进行postToAll的并发

 

 

可是在高并发状况下,单线程的postToAll的速度没有post的速度快,会致使队列堆积的event愈来愈多,若是是持续性的高并发的SQL查询,这里就会致使内存泄露jvm

 

接下来咱们在分析下postToAll的方法里面,那个路径是最慢的,致使事件处理最慢的逻辑是那个?高并发

 

 

 

 


可能您都不敢相信,经过jstack抓取分析,程序大部分时间都阻塞在记录日志上

 

能够经过禁用这个地方的log来提高event的速度

 

log4j.logger.org.apache.spark.scheduler=ERROR

 


 

 

 

4.高并发下的Cleaner的内存泄露

       说道这里,Cleaner的设计应该算是spark最糟糕的设计。spark的ContextCleaner是用于回收与清理已经完成了的 广播boradcast,shuffle数据的。可是高并发下,咱们发现这个地方积累的数据会愈来愈多,最终致使driver内存跑满而挂掉。

l咱们先看下,是如何触发内存回收的

 

      没错,就是经过System.gc() 回收的内存,若是咱们在jvm里配置了禁止执行System.gc,这个逻辑就等于废掉(并且有不少jvm的优化参数通常都推荐配置禁止system.gc 参数)

lclean过程

这是一个单线程的逻辑,并且每次清理都要协同不少机器一同清理,清理速度相对来讲比较慢,可是SQL并发很大的时候,产生速度超过了清理速度,整个driver就会发生内存泄露。并且brocadcast若是占用内存太多,也会使用很是多的本地磁盘小文件,咱们在测试中发现,高持续性并发的状况下本地磁盘用于存储blockmanager的目录占据了咱们60%的存储空间。

 

 

咱们再来分析下 clean里面,那个逻辑最慢

 

真正的瓶颈在于blockManagerMaster里面的removeBroadcast,由于这部分逻辑是须要跨越多台机器的。

 

针对这种问题,

l咱们在SQL层加了一个SQLWAITING逻辑,判断了堆积长度,若是堆积长度超过了咱们的设定值,咱们这里将阻塞新的SQL的执行。堆积长度能够经过更改conf目录下的ya100_env_default.sh中的ydb.sql.waiting.queue.size的值来设置。

 

l建议集群的带宽要大一些,万兆网络确定会比千兆网络的清理速度快不少。

l给集群休息的机会,不要一直持续性的高并发,让集群有间断的机会。

l增大spark的线程池,能够调节conf下的spark-defaults.conf的以下值来改善。

 

 

 

5.线程池与threadlocal引发的内存泄露

       发现spark,Hive,lucene都很是钟爱使用threadlocal来管理临时的session对象,期待SQL执行完毕后这些对象可以自动释放,可是与此同时spark又使用了线程池,线程池里的线程一直不结束,这些资源一直就不释放,时间久了内存就堆积起来了。

针对这个问题,延云修改了spark关键线程池的实现,更改成每1个小时,强制更换线程池为新的线程池,旧的线程数可以自动释放。

 

6.文件泄露

      您会发现,随着请求的session变多,spark会在hdfs和本地磁盘建立海量的磁盘目录,最终会由于本地磁盘与hdfs上的目录过多,而致使文件系统和整个文件系统瘫痪。在YDB里面咱们针对这种状况也作了处理。

 

7.deleteONExit内存泄露

 

 

 

 

 

为何会有这些对象在里面,咱们看下源码

 

 

 

 

 

 

 

 

8.JDO内存泄露

多达10万多个JDOPersistenceManager

 


 

 

 


 


 

 

 

 

 

 

 

 

9.listerner内存泄露

经过debug工具监控发现,spark的listerner随着时间的积累,通知(post)速度运来越慢

发现全部代码都卡在了onpostevent上

 

 

 

 

 

jstack的结果以下


 

 

研究下了调用逻辑以下,发现是循环调用listerners,并且listerner都是空执行才会产生上面的jstack截图

 

 

经过内存发现有30多万个linterner在里面

 

 

发现都是大多数都是同一个listener,咱们核对下该处源码

 

 

最终定位问题

确系是这个地方的BUG ,每次建立JDBC链接的时候 ,spark就会增长一个listener, 时间久了,listener就会积累愈来愈多  针对这个问题 我简单的修改了一行代码,开始进入下一轮的压测

 

 

 

 

二12、spark源码调优

      测试发现,即便只有1条记录,使用 spark进行一次SQL查询也会耗时1秒,对不少即席查询来讲1秒的等待,对用户体验很是不友好。针对这个问题,咱们在spark与hive的细节代码上进行了局部调优,调优后,响应时间由原先的1秒缩减到如今的200~300毫秒。

      

如下是咱们改动过的地方

1.SessionState 的建立目录 占用较多的时间

 

 

另外使用Hadoop namenode HA的同窗会注意到,若是第一个namenode是standby状态,这个地方会更慢,就不止一秒,因此除了改动源码外,若是使用namenode ha的同窗必定要注意,将active状态的node必定要放在前面。

2.HiveConf的初始化过程占用太多时间

频繁的hiveConf初始化,须要读取core-default.xml,hdfs-default.xml,yarn-default.xml

,mapreduce-default.xml,hive-default.xml等多个xml文件,而这些xml文件都是内嵌在jar包内的。

第一,解压这些jar包须要耗费较多的时间,第二每次都对这些xml文件解析也耗费时间。

 

 

 

 

 

 

 

 

 

 

 

 

3.广播broadcast传递的hadoop configuration序列化很耗时

lconfiguration的序列化,采用了压缩的方式进行序列化,有全局锁的问题

lconfiguration每次序列化,传递了太多了没用的配置项了,1000多个配置项,占用60多Kb。咱们剔除了不是必须传输的配置项后,缩减到44个配置项,2kb的大小。

 

 

 

 

 

 

4.对spark广播数据broadcast的Cleaner的改进

 

因为SPARK-3015 的BUG,spark的cleaner 目前为单线程回收模式。

你们留意spark源码注释

 

 

 

其中的单线程瓶颈点在于广播数据的cleaner,因为要跨越不少机器,须要经过akka进行网络交互。

若是回收并发特别大,SPARK-3015 的bug报告会出现网络拥堵,致使大量的 timeout出现。

为何回收量特变大呢? 实际上是由于cleaner 本质是经过system.gc(),按期执行的,默认积累30分钟或者进行了gc后才触发cleaner,这样就会致使瞬间,大量的akka并发执行,集中释放,网络不瞬间瘫痪才不怪呢。

可是单线程回收意味着回收速度
恒定,若是查询并发很大,回收速度跟不上cleaner的速度,会致使cleaner积累不少,会致使进程OOM(YDB作了修改,会限制前台查询的并发)。不管是OOM仍是限制并发都不是咱们但愿看到的,因此针对高并发状况下,这种单线程的回收速度是知足不了高并发的需求的。对于官方的这样的作法,咱们表示并非一个完美的cleaner方案。并发回收必定要支持,只要解决akka的timeout问题便可。因此这个问题要仔细分析一下,akka为何会timeout,是由于cleaner占据了太多的资源,那么咱们是否能够控制下cleaner的并发呢?好比说使用4个并发,而不是默认将所有的并发线程都给占满呢?这样及解决了cleaner的回收速度,也解决了akka的问题不是更好么?针对这个问题,咱们最终仍是选择了修改spark的ContextCleaner对象,将广播数据的回收 改为多线程的方式,但如今了线程的并发数量,从而解决了该问题。