很遗憾, spark 的设计架构并非为了高并发请求而设计的,咱们尝试在网络条件很差的集群下,进行 100 并发的查询,在压测 3 天后发现了内存泄露。node
a) 在进行大量小 SQL 的压测过程当中发现,有大量的 activejob 在 spark ui 上一直处于 pending 状态,且永远不结束,以下图所示sql
b) 而且发现 driver 内存爆满apache
c) 用内存分析分析工具分析了下网络
短期内 SPARK 提交大量的 SQL ,并且 SQL里面存在大量的 union与 join的情形,会建立大量的 event对象,使得这里的 event 数量超过 10000 个 event ,一旦超过 10000 个 event 就开始丢弃 event,而这个 event 是用来回收 资源的,丢弃了 资源就没法回session
收了。 针对 UI 页面的这个问题,咱们将这个队列长度的限制给取消了。多线程
抓包发现架构
这些 event 是经过 post 方法传递的,并写入到队列里并发
可是也是由一个单线程进行 postToAll 的jvm
可是在高并发状况下,单线程的 postToAll 的速度没有 post 的速度快,会致使队列堆积的高并发
event 愈来愈多,若是是持续性的高并发的 SQL 查询,这里就会致使内存泄露
接下来咱们在分析下 postToAll 的方法里面,那个路径是最慢的,致使事件处理最慢的逻辑是那个?
可能您都不敢相信,经过 jstack 抓取分析,程序大部分时间都阻塞在记录日志上
能够经过禁用这个地方的 log 来提高 event 的速度
log4j.logger.org.apache.spark.scheduler=ERROR
说道这里, Cleaner 的设计应该算是 spark 最糟糕的设计。 spark 的 ContextCleaner 是用于回收与清理已经完成了的 广播 boradcast,shuffle 数据的。可是高并发下,咱们发现这个地方积累的数据会愈来愈多,最终致使 driver 内存跑满而挂掉。
l 咱们先看下,是如何触发内存回收的
没错,就是经过 System.gc() 回收的内存,若是咱们在 jvm 里配置了禁止执行 System.gc,这个逻辑就等于废掉(并且有不少 jvm 的优化参数通常都推荐配置禁止 system.gc 参数)
l clean 过程
这是一个单线程的逻辑,并且每次清理都要协同不少机器一同清理,清理速度相对来讲比较慢,可是SQL 并发很大的时候,产生速度超过了清理速度,整个 driver 就会发生内存泄露。并且 brocadcast 若是占用内存太多,也会使用很是多的本地磁盘小文件,咱们在测试中发现,高持续性并发的状况下本地磁盘用于存储 blockmanager 的目录占据了咱们 60%的存储空间。
咱们再来分析下 clean 里面,那个逻辑最慢
真正的瓶颈在于 blockManagerMaster 里面的 removeBroadcast,由于这部分逻辑是须要跨越多台机器的。
针对这种问题,
咱们在 SQL 层加了一个 SQLWAITING 逻辑,判断了堆积长度,若是堆积长度超过了咱们的设定值,咱们这里将阻塞新的 SQL 的执行。堆积长度能够经过更改 conf 目录下的 ya100_env_default.sh 中的ydb.sql.waiting.queue.size 的值来设置。
l 建议集群的带宽要大一些,万兆网络确定会比千兆网络的清理速度快不少。
l 给集群休息的机会,不要一直持续性的高并发,让集群有间断的机会。
l 增大 spark 的线程池,能够调节 conf 下的 spark-defaults.conf 的以下值来改善。
发现 spark, hive, lucene 都很是钟爱使用 threadlocal 来管理临时的 session 对象,期待 SQL 执行完毕后这些对象可以自动释放,可是与此同时 spark 又使用了线程池,线程池里的线程一直不结束,这些资源一直就不释放,时间久了内存就堆积起来了。
针对这个问题,延云修改了 spark 关键线程池的实现,更改成每 1 个小时,强制更换线程池为新的线程池,旧的线程数可以自动释放。
您会发现,随着请求的 session 变多, spark 会在 hdfs 和本地磁盘建立海量的磁盘目录,最终会由于本地磁盘与 hdfs 上的目录过多,而致使文件系统和整个文件系统瘫痪。在 YDB 里面咱们针对这种状况也作了处理。
为何会有这些对象在里面,咱们看下源码
多达 10 万多个 JDOPersistenceManager
经过 debug 工具监控发现,spark 的 listerner 随着时间的积累,通知(post)速度运来越慢
发现全部代码都卡在了 onpostevent 上
jstack 的结果以下
研究下了调用逻辑以下,发现是循环调用 listerners,并且 listerner 都是空执行才会产生上面的 jstack 截图
经过内存发现有 30 多万个 linterner 在里面
发现都是大多数都是同一个listener,咱们核对下该处源码
最终定位问题
确系是这个地方的 BUG ,每次建立 JDBC 链接的时候 , spark 就会增长一个 listener, 时间久了, listener就会积累愈来愈多 针对这个问题 我简单的修改了一行代码,开始进入下一轮的压测
测试发现,即便只有 1 条记录,使用 spark 进行一次 SQL 查询也会耗时 1 秒,对不少即席查询来讲 1秒的等待,对用户体验很是不友好。针对这个问题,咱们在 spark 与 hive 的细节代码上进行了局部调优,调优后,响应时间由原先的 1 秒缩减到如今的 200~300 毫秒。
如下是咱们改动过的地方
另外使用 hadoop namenode HA 的同窗会注意到,若是第一个 namenode 是 standby 状态,这个地方会更慢,就不止一秒,因此除了改动源码外,若是使用 namenode ha 的同窗必定要注意,将 active 状态的 node 必定要放在前面。
频繁的 hiveConf 初始化,须要读取 core-default.xml, hdfs-default.xml, yarn-default.xml, mapreduce-default.xml, hive-default.xml 等多个xml 文件,而这些xml 文件都是内嵌在 jar 包内的。
第一,解压这些 jar 包须要耗费较多的时间,第二每次都对这些 xml 文件解析也耗费时间。
configuration 的序列化,采用了压缩的方式进行序列化,有全局锁的问题configuration 每次序列化,传递了太多了没用的配置项了, 1000 多个配置项,占用 60 多 Kb。咱们剔除了不是必须传输的配置项后,缩减到 44 个配置项, 2kb 的大小。
因为 SPARK-3015 的 BUG, spark 的 cleaner 目前为单线程回收模式。
你们留意 spark 源码注释
其中的单线程瓶颈点在于广播数据的 cleaner,因为要跨越不少台机器,须要经过 akka 进行网络交互。
若是回收并发特别大, SPARK-3015 的 bug 报告会出现网络拥堵,致使大量的 timeout 出现。
为何回收量特变大呢? 实际上是由于 cleaner 本质是经过 system.gc() ,按期执行的,默认积累 30 分钟或者进行了 gc 后才触发 cleaner,这样就会致使瞬间,大量的 akka 并发执行,集中释放,网络不瞬间瘫痪才不怪呢。
可是单线程回收意味着回收速度恒定,若是查询并发很大,回收速度跟不上 cleaner 的速度,会致使 cleaner 积累不少,会致使进程 OOM( YDB 作了修改,会限制前台查询的并发)。
不管是 OOM 仍是限制并发都不是咱们但愿看到的,因此针对高并发状况下,这种单线程的回收速度是知足不了高并发的需求的。
对于官方的这样的作法,咱们表示并非一个完美的 cleaner 方案。并发回收必定要支持,
只要解决 akka 的 timeout 问题便可。
因此这个问题要仔细分析一下, akka 为何会 timeout,是由于 cleaner 占据了太多的资源,那么咱们是否能够控制下 cleaner 的并发呢?好比说使用 4 个并发,而不是默认将所有的并发线程都给占满呢?这样及解决了 cleaner 的回收速度,也解决了 akka 的问题不是更好么?
针对这个问题,咱们最终仍是选择了修改 spark 的 ContextCleaner 对象,将广播数据的回收改为多线程的方式,但如今了线程的并发数量,从而解决了该问题。