60TB 数据量的做业从 Hive 迁移到 Spark 在 Facebook 的实践

Facebook 常用分析来进行数据驱动的决策。在过去的几年里,用户和产品都获得了增加,使得咱们分析引擎中单个查询的数据量达到了数十TB。咱们的一些批处理分析都是基于 Hive 平台(Apache Hive 是 Facebook 在2009年贡献给社区的)和 Corona( Facebook 内部的 MapReduce 实现)进行的。Facebook 还针对包括 Hive 在内的多个内部数据存储,继续增长了其 Presto 的 ANSI-SQL 查询的覆盖范围。Facebook 内部还支持其余类型的分析,如图计算、机器学习(Apache Giraph)和流处理(如 Puma、Swift 和 Stylus)。数组

尽管 Facebook 提供的服务涵盖了分析领域的普遍领域,但咱们仍在不断地与开源社区互动,以分享咱们的经验,并向他人学习。Apache Spark 于2009年由加州大学伯克利分校(UC-Berkeley)的 Matei Zaharia 创办,并于2013年贡献给 Apache。它是目前增加最快的数据处理平台之一,由于它可以支持流处理、批处理、命令式(RDD)、声明式(SQL)、图计算和机器学习用例,全部这些都在相同的 API 和底层计算引擎中。Spark 能够有效地利用大量内存,跨整个管道(pipelines)优化代码,并跨任务(tasks)重用 jvm 以得到更好的性能。Facebook 认为 Spark 已经成熟到能够在许多批处理用例中与 Hive 进行比较的地步。在本文的后面部分,将介绍 Facebook 使用 Spark 替代 Hive 的经验和教训。缓存

用例:为实体排序(entity ranking)作特性准备

实时实体排名在 Facebook 有着多种使用场景。对于一些在线服务平台,原始的特性值是使用 Hive 离线生成的,并将生成的数据加载到这些实时关联查询系统中。这些 Hive 做业是数年前开发的,占用了大量的计算资源,而且难以维护,由于这些做业被拆分红数百个 Hive 小做业。为了使得业务可以使用到新的特征数据,而且让系统变得可维护,咱们开始着手将这些做业迁移到 Spark 中。性能优化

之前的 Hive 做业实现

基于 Hive 的做业由三个逻辑阶段组成,每一个阶段对应数百个由 entity_id 分割的较小 Hive 做业,由于为每一个阶段运行较大的 Hive 做业不太可靠,而且受到每一个做业的最大任务数限制。具体以下:app

以上三个逻辑阶段能够归纳以下:框架

  • 过滤掉非生产须要的特性和噪音;
  • 对每一个(entity_id、target_id)对进行聚合;
  • 将表分为 N 个分片,并对每一个切分经过自定义 UDF 生成一个用于在线查询的自定义索引文件。

基于 Hive 构建索引的做业大约须要运行三天。管理起来也颇有挑战性,由于这条管道包含数百个分片做业,所以很难进行监控。没有简单的方法来衡量做业的总体进度或计算 ETA。考虑到现有 Hive 做业的上述局限性,咱们决定尝试使用 Spark 来构建一个更快、更易于管理的做业。机器学习

Spark 实现

若是使用 Spark 所有替换上面的做业可能会很慢,而且颇有挑战性,须要大量的资源。因此咱们首先将焦点投入在 Hive 做业中资源最密集的部分:第二阶段。咱们从50GB的压缩输入样本开始,而后逐步扩展到 300 GB、1 TB 和20 TB。在每次增长大小时,咱们都解决了性能和稳定性问题,可是尝试 20 TB 时咱们发现了最大改进的地方。jvm

在运行 20 TB 的输入时,咱们发现因为任务太多,生成了太多的输出文件(每一个文件的大小大约为100 MB)。在做业运行的10个小时中,有3个小时用于将文件从 staging 目录移动到 HDFS 中的最终目录。最初,咱们考虑了两个方案:要么改进 HDFS 中的批量重命名以支持咱们的用例;要么配置 Spark 以生成更少的输出文件(这一阶段有大量的任务——70,000个)。通过认真思考,咱们获得了第三种方案。因为咱们在做业的第二步中生成的 tmp_table2 表是临时的,而且只用于存储做业的中间输出。最后,咱们把上面 Hive 实现的三个阶段的做业用一个 Spark 做业表示,该做业读取 60 TB 的压缩数据并执行 90 TB的 shuffle 和排序,最后的 Spark job 以下:工具

咱们如何扩展 Spark 来完成这项工做?

固然,在如此大的数据量上运行单个 Spark 做业在第一次尝试甚至第十次尝试时都不会起做用。据咱们所知,这是生产环境中 shuffle 数据量最大的 Spark 做业(Databricks 的 PB 级排序是在合成数据上进行的)。咱们对 Spark 内核和应用程序进行了大量的改进和优化,才使这项工做得以运行。这项工做的好处在于,其中许多改进都适用于 Spark 的其余大型工做负载,而且咱们可以将全部工做从新贡献给开源 Apache Spark 项目 - 有关更多详细信息,请参见下面相关的 JIRA。下面咱们将重点介绍将一个实体排名做业部署到生产环境的主要改进。性能

可靠性修复(Reliability fixes)

处理节点频繁重启

为了可靠地执行长时间运行的做业,咱们但愿系统可以容错并从故障中恢复(主要是因为正常维护或软件错误致使的机器从新启动)。虽然 Spark 最初的设计能够容忍机器重动,但咱们仍是发现了各类各样的 bug/问题,咱们须要在系统正式投入生产以前解决这些问题。学习

  • 使得 PipedRDD 容忍节点重启(SPARK-13793):PipedRDD 以前在处理节点重启设计不够健壮,当它获取数据失败时,这个做业就会失败。咱们从新设计了 PipedRDD,使得它可以友好的处理这种异常,而且从这种类型的异常中恢复。
  • 最大的获取失败次数可配置( SPARK-13369 ):对于长期运行的做业而言,因为计算机重动而致使获取失败的可能性大大增长。 在 Spark 中每一个阶段容许的最大获取失败次数是写死的,所以,当达到最大失败次数时,做业一般会失败。咱们作了一个更改,使其变得可配置,并将这个参数的值从 4 增长到 20,使得做业对于 fetch 失败更加健壮。
  • Less disruptive cluster restart:长时间运行的做业应该可以在集群重启后继续运行,这样咱们就不会浪费到目前为止完成的全部处理。Spark 的可重启 shuffle service 让咱们在节点重启后保留 shuffle 文件。最重要的是,咱们在 Spark driver 中实现了可以暂停任务调度的功能,这样做业就不会由于集群重启而致使任务失败。

其余可靠性修复

  • Unresponsive driver(SPARK-13279):Spark driver 添加任务会进行一项时间复杂度为 O(N2) 的操做,这可能会致使其被卡住,最终致使做业被 killed。咱们删除这个没必要要的 O(N2) 操做来解决这个问题。
  • Excessive driver speculation:咱们发现,Spark driver 在管理大量任务时,会花费了大量时间进行推测(speculation)。在短时间内,在运行这个做业时咱们禁止了 speculation。咱们目前正在对 Spark Driver 进行修改,以减小 speculation 的时间。
  • TimSort issue due to integer overflow for large buffer( SPARK-13850 ):咱们发现 Spark 的 unsafe 内存操做有一个 bug,这会致使 TimSort 中的内存出现问题。不过 Databricks 的工做人员已经修复了这个问题,使咱们可以在大型内存缓冲区上进行操做。
  • Tune the shuffle service to handle large number of connections:在 shuffle 阶段,咱们看到许多 executors 在试图链接 shuffle service 时超时。经过增长 Netty 服务线程(spark.shuffle.io.serverThreads)和 backlog (spark.shuffle.io.backLog)的数量解决了这个问题。
  • Fix Spark executor OOM( SPARK-13958 ):一开始在每一个节点上运行四个以上的 reduce 任务是颇有挑战性的。Spark executors 的内存不足,由于 sorter 中存在一个 bug,该 bug 会致使指针数组无限增加。咱们经过在指针数组没有更多可用内存时强制将数据溢写到磁盘来修复这个问题。所以,如今咱们能够在一个节点上运行 24个任务而不会致使内存不足。

性能提高

在实现了上述可靠性改进以后,咱们可以可靠地运行 Spark 做业。此时,咱们将工做重心转移到与性能相关的问题上,以最大限度地利用 Spark。咱们使用Spark 的指标和 profilers 来发现一些性能瓶颈。

咱们用来发现性能瓶颈的工具

  • Spark UI Metrics:Spark UI 能够很好地洞察特定阶段的时间花在哪里。每一个任务的执行时间被划分为子阶段,以便更容易地找到做业中的瓶颈。
  • Jstack:Spark UI 中还提供 executor 进程的 jstack 功能,这个能够帮助咱们找到代码中的热点问题。
  • Spark Linux Perf/Flame Graph support:尽管上面的两个工具很是方便,但它们并无提供同时运行在数百台机器上做业的 CPU 概要的聚合视图。在每一个做业的基础上,咱们增长了对性能分析的支持,而且能够定制采样的持续时间/频率。

性能优化

  • Fix memory leak in the sorter(SPARK-14363)性能提高 30%:咱们发现当任务释放全部内存页,但指针数组没有被释放。结果,大量内存未被使用,致使频繁溢出和 executor OOMs。如今,咱们修复了这个问题,这个功能使得 CPU 性能提升了30%;
  • Snappy optimization ( SPARK-14277 )性能提高 10%:对于每一行的读/写,都会调用 JNI 方法(Snappy.ArrayCopy)。咱们发现了这个问题,而且将这个调用修改为非 JNI 的System.ArrayCopy调用,修改完以后 CPU 性能提升了10%;
  • Reduce shuffle write latency(SPARK-5581)性能提高近 50%:在 map 端,当将 shuffle 数据写入磁盘时,map 任务的每一个分区打开和关闭相同的文件。咱们修复了这个问题,以免没必要要的打开/关闭,修改完以后 CPU 性能提升近 50%;
  • Fix duplicate task run issue due to fetch failure (SPARK-14649):当获取失败(fetch failure)发生时,Spark driver 会从新提交已经运行的任务,这会致使性能低下。咱们经过避免从新运行正在运行的任务修复了这个问题,而且咱们发现当发生获取操做失败时,做业也更加稳定。
  • Configurable buffer size for PipedRDD(SPARK-14542)性能提高近 10%:在使用 PipedRDD 时,咱们发现用于将数据从排序器(sorter)传输到管道处理的默认缓冲区大小过小,咱们的做业花费了超过 10% 的时间来复制数据。咱们使这个缓冲区大小变得可配置,以免这个瓶颈。
  • Cache index files for shuffle fetch speed-up(SPARK-15074):咱们发现,shuffle service 常常成为瓶颈,reduce 端花费 10% 到 15% 的时间来等待获取 map 端的数据。经过更深刻的研究这个问题,咱们发现 shuffle service 为每次 shuffle fetch 都须要打开/关闭 shuffle index 文件。咱们经过缓存索引信息,这样咱们就能够避免重复打开/关闭文件,这一变化减小了50%的 shuffle fetch 时间;
  • Reduce update frequency of shuffle bytes written metrics(SPARK-15569)性能提高近 20%:使用 Spark Linux Perf 集成,咱们发现大约 20% 的 CPU 时间花在探测和更新随机字节写的指标上。
  • Configurable initial buffer size for Sorter( SPARK-15958 )性能提高近 5%:Sorter 的默认初始缓冲区大小过小(4 KB),对于大的工做负载来讲这个值过小了,所以咱们浪费了大量的时间来复制内容。咱们将这个缓冲区大小变得可配置(备注:spark.shuffle.sort.initialBufferSize), 当将这个参数设置为 64 MB 时,能够避免大量的数据复制,使得性能提高近 5%;
  • Configuring number of tasks:因为咱们输入的数据大小为 60 T,每一个 HDFS 块大小为 256 M,所以咱们要生成超过250,000个任务。尽管咱们可以运行具备如此多任务的 Spark 做业,但咱们发现,当任务数量太高时,性能会显著降低。咱们引入了一个配置参数,使 map 输入大小可配置,咱们经过将输入的 split 大小设置为 2 GB ,使得 task 的数据减小了八倍。

在全部这些可靠性和性能改进以后,咱们的实体排名系统变成了一个更快、更易于管理的管道,而且咱们提供了在 Spark 中运行其余相似做业的能力。

使用 Spark 和 Hive 运行上面实体排名程序性能比较

咱们使用如下性能指标来比较 Spark 和 Hive 运行性能。

CPU time:这是从操做系统的角度来看 CPU 使用状况。例如,若是您的做业在32核机器上仅运行一个进程,使用全部 CPU 的50%持续10秒,那么您的 CPU 时间将是 32 0.5 10 = 160 CPU 秒。

CPU reservation time:从资源管理框架的角度来看,这是 CPU 预留(CPU reservation)。例如,若是咱们将32核机器预留10秒来运行这个做业,那么 CPU 预留时间是 32 * 10 = 320 CPU秒。CPU 时间与 CPU 预留时间的比率反映了咱们集群预留 CPU 资源的状况。准确地说,当运行相同的工做负载时,与 CPU 时间相比,预留时间能够更好地比较执行引擎。例如,若是一个进程须要1个 CPU 秒来运行,可是必须保留100个 CPU 秒,那么根据这个指标,它的效率低于须要10个 CPU 秒但只预留10个 CPU 秒来作相同数量的工做的进程。咱们还计算了内存预留时间,但这里没有列出来,由于这些数字与 CPU 预留时间相似,并且使用 Spark 和 Hive 运行这个程序时都没有在内存中缓存数据。Spark 有能力在内存中缓存数据,但因为集群内存的限制,咱们并无使用这个功能。

Latency:做业从开始到结束运行时间。

结论和将来工做

Facebook 使用高性能和可扩展的分析引擎来帮助产品开发。Apache Spark 提供了将各类分析用例统一到单个 API ,而且提供了高效的计算引擎。咱们将分解成数百个 Hive 做业管道替换为一个 Spark 做业,经过一系列的性能和可靠性改进,咱们可以使用 Spark 来处理生产中的实体数据排序的用例。在这个特殊的用例中,咱们展现了 Spark 能够可靠地 shuffle 并排序 90 TB 以上的中间数据,并在一个做业中运行 250,000个 tasks。与旧的基于 Hive 计算引擎管道相比,基于 Spark 的管道产生了显著的性能改进(4.5-6倍 CPU性能提高、节省了 3-4 倍资源的使用,并下降了大约5倍的延迟),而且已经在生产环境中运行了几个月。


本文做者:过往记忆大数据

阅读原文

本文为阿里云内容,未经容许不得转载。

相关文章
相关标签/搜索