Spark使用总结与分享

背景 java

   

使用spark开发已有几个月。相比于python/hive,scala/spark学习门槛较高。尤为记得刚开时,举步维艰,进展十分缓慢。不过谢天谢地,这段苦涩(bi)的日子过去了。忆苦思甜,为了不项目组的其余同窗走弯路,决定总结和梳理spark的使用经验。 python

   

Spark基础 算法

   

基石RDD sql

   

spark的核心是RDD(弹性分布式数据集),一种通用的数据抽象,封装了基础的数据操做,如map,filter,reduce等。RDD提供数据共享的抽象,相比其余大数据处理框架,如MapReduce,Pegel,DryadLINQ和HIVE等均缺少此特性,因此RDD更为通用。 apache

   

简要地归纳RDD:RDD是一个不可修改的,分布的对象集合。每一个RDD由多个分区组成,每一个分区能够同时在集群中的不一样节点上计算。RDD能够包含Python,Java和Scala中的任意对象。 编程

   

Spark生态圈中应用都是基于RDD构建(下图),这一点充分说明RDD的抽象足够通用,能够描述大多数应用场景。 缓存

   

RDD操做类型—转换和动做 网络

   

RDD的操做主要分两类:转换(transformation)和动做(action)。两类函数的主要区别是,转换接受RDD并返回RDD,而动做接受RDD可是返回非RDD。转换采用惰性调用机制,每一个RDD记录父RDD转换的方法,这种调用链表称之为血缘(lineage);而动做调用会直接计算。 闭包

采用惰性调用,经过血缘链接的RDD操做能够管道化(pipeline),管道化的操做能够直接在单节点完成,避免屡次转换操做之间数据同步的等待框架

使用血缘串联的操做能够保持每次计算相对简单,而不用担忧有过多的中间数据,由于这些血缘操做都管道化了,这样也保证了逻辑的单一性,而不用像MapReduce那样,为了竟可能的减小map reduce过程,在单个map reduce中写入过多复杂的逻辑。

   

   

RDD使用模式

   

RDD使用具备通常的模式,能够抽象为下面的几步

  1. 加载外部数据,建立RDD对象
  2. 使用转换(如filter),建立新的RDD对象
  3. 缓存须要重用的RDD
  4. 使用动做(如count),启动并行计算

   

RDD高效的策略

   

Spark官方提供的数据是RDD在某些场景下,计算效率是Hadoop的20X。这个数据是否有水分,咱们先不追究,可是RDD效率高的由必定机制保证的:

  1. RDD数据只读,不可修改。若是须要修改数据,必须从父RDD转换(transformation)到子RDD。因此,在容错策略中,RDD没有数据冗余,而是经过RDD父子依赖(血缘)关系进行重算实现容错。
  2. RDD数据在内存中,多个RDD操做之间,数据不用落地到磁盘上,避免没必要要的I/O操做。
  3. RDD存放的数据能够是java对象,因此避免的没必要要的对象序列化和反序列化。

总而言之,RDD高效的主要因素是尽可能避免没必要要的操做和牺牲数据的操做精度,用来提升计算效率。

   

   

Spark使用技巧

   

RDD基本函数扩展

   

RDD虽然提供了不少函数,可是毕竟仍是有限的,有时候须要扩展,自定义新的RDD的函数。在spark中,能够经过隐式转换,轻松实现对RDD扩展。画像开发过程当中,平凡的会使用rollup操做(相似HIVE中的rollup),计算多个级别的聚合数据。下面是具体实,

/**

* 扩展spark rdd,为rdd提供rollup方法

*/

implicit class RollupRDD[T: ClassTag](rdd: RDD[(Array[String], T)]) extends Serializable {

 

/**

* 相似Sql中的rollup操做

*

* @param aggregate 聚合函数

* @param keyPlaceHold key占位符,默认采用FaceConf.STAT_SUMMARY

* @param isCache,确认是否缓存数据

* @return 返回聚合后的数据

*/

def rollup[U: ClassTag](

aggregate: Iterable[T] => U,

keyPlaceHold: String = FaceConf.STAT_SUMMARY,

isCache: Boolean = true): RDD[(Array[String], U)] = {

 

if (rdd.take(1).isEmpty) {

return rdd.map(x => (Array[String](), aggregate(Array[T](x._2))))

}

 

if (isCache) {

rdd.cache // 提升计算效率

}

val totalKeyCount = rdd.first._1.size

val result = { 1 to totalKeyCount }.par.map(untilKeyIndex => { // 并行计算

rdd.map(row => {

val combineKey = row._1.slice(0, untilKeyIndex).mkString(FaceConf.KEY_SEP) // 组合key

(combineKey, row._2)

}).groupByKey.map(row => { // 聚合计算

val oldKeyList = row._1.split(FaceConf.KEY_SEP)

val newKeyList = oldKeyList ++ Array.fill(totalKeyCount - oldKeyList.size) { keyPlaceHold }

(newKeyList, aggregate(row._2))

})

}).reduce(_ ++ _) // 聚合结果

 

result

}

 

}

上面代码声明了一个隐式类,具备一个成员变量rdd,类型是RDD[(Array[String], T)],那么若是应用代码中出现了任何这样的rdd对象,而且import当前的隐式转换,那么编译器就会将这个rdd当作上面的隐式类的对象,也就可使用rollup函数,和通常的map,filter方法同样。

   

   

RDD操做闭包外部变量原则

   

RDD相关操做都须要传入自定义闭包函数(closure),若是这个函数须要访问外部变量,那么须要遵循必定的规则,不然会抛出运行时异常。闭包函数传入到节点时,须要通过下面的步骤:

  1. 驱动程序,经过反射,运行时找到闭包访问的全部变量,并封成一个对象,而后序列化该对象
  2. 将序列化后的对象经过网络传输到worker节点
  3. worker节点反序列化闭包对象
  4. worker节点执行闭包函数,

注意:外部变量在闭包内的修改不会被反馈到驱动程序。

简而言之,就是经过网络,传递函数,而后执行。因此,被传递的变量必须能够序列化,不然传递失败。本地执行时,仍然会执行上面四步。

   

广播机制也能够作到这一点,可是频繁的使用广播会使代码不够简洁,并且广播设计的初衷是将较大数据缓存到节点上,避免屡次数据传输,提升计算效率,而不是用于进行外部变量访问。

   

   

RDD数据同步

   

RDD目前提供两个数据同步的方法:广播和累计器。

   

广播 broadcast

前面提到过,广播能够将变量发送到闭包中,被闭包使用。可是,广播还有一个做用是同步较大数据。好比你有一个IP库,可能有几G,在map操做中,依赖这个ip库。那么,能够经过广播将这个ip库传到闭包中,被并行的任务应用。广播经过两个方面提升数据共享效率:1,集群中每一个节点(物理机器)只有一个副本,默认的闭包是每一个任务一个副本;2,广播传输是经过BT下载模式实现的,也就是P2P下载,在集群多的状况下,能够极大的提升数据传输速率。广播变量修改后,不会反馈到其余节点。

   

累加器 Accumulator

累加器是一个write-only的变量,用于累加各个任务中的状态,只有在驱动程序中,才能访问累加器。并且,截止到1.2版本,累加器有一个已知的缺陷,在action操做中,n个元素的RDD能够确保累加器只累加n次,可是在transformation时,spark不确保,也就是累加器可能出现n+1次累加。

   

目前RDD提供的同步机制粒度太粗,尤为是转换操做中变量状态不能同步,因此RDD没法作复杂的具备状态的事务操做。不过,RDD的使命是提供一个通用的并行计算框架,估计永远也不会提供细粒度的数据同步机制,由于这与其设计的初衷是违背的。

   

RDD优化技巧

   

RDD缓存

须要使用屡次的数据须要cache,不然会进行没必要要的重复操做。举个例子

val data = … // read from tdw

println(data.filter(_.contains("error")).count)

println(data.filter(_.contains("warning")).count)

上面三段代码中,data变量会加载两次,高效的作法是在data加载完后,马上持久化到内存中,以下

val data = … // read from tdw

data.cache

println(data.filter(_.contains("error")).count)

println(data.filter(_.contains("warning")).count)

这样,data在第一加载后,就被缓存到内存中,后面两次操做均直接使用内存中的数据。

   

转换并行化

RDD的转换操做时并行化计算的,可是多个RDD的转换一样是能够并行的,参考以下

val dataList:Array[RDD[Int]] = …

val sumList = data.list.map(_.map(_.sum))

上面的例子中,第一个map是便利Array变量,串行的计算每一个RDD中的每行的sum。因为每一个RDD之间计算是没有逻辑联系的,因此理论上是能够将RDD的计算并行化的,在scala中能够轻松试下,以下

val dataList:Array[RDD[Int]] = …

val sumList = data.list.par.map(_.map(_.sum))

注意红色代码。

   

减小shuffle网络传输

通常而言,网络I/O开销是很大的,减小网络开销,能够显著加快计算效率。任意两个RDD的shuffle操做(join等)的大体过程以下,

用户数据userData和事件events数据经过用户id链接,那么会在网络中传到另一个节点,这个过程当中,有两个网络传输过程。Spark的默认是完成这两个过程。可是,若是你多告诉spark一些信息,spark能够优化,只执行一个网络传输。能够经过使用、HashPartition,在userData"本地"先分区,而后要求events直接shuffle到userData的节点上,那么就减小了一部分网络传输,减小后的效果以下,

虚线部分都是在本地完成的,没有网络传输。在数据加载时,就按照key进行partition,这样能够经一部的减小本地的HashPartition的过程,示例代码以下

val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://…")

.partitionBy(new HashPartitioner(100)) // Create 100 partitions

.persist()

注意,上面必定要persist,不然会重复计算屡次。100用来指定并行数量。

   

Spark其余

   

Spark开发模式

   

因为spark应用程序是须要在部署到集群上运行的,致使本地调试比较麻烦,因此通过这段时间的经验累积,总结了一套开发流程,目的是为了尽量的提升开发调试效率,同时保证开发质量。固然,这套流程可能也不是最优的,后面须要持续改进。

整个流程比较清楚,这里主要谈谈为何须要单元测试。公司内的大多数项目,通常不提倡单元测试,并且因为项目进度压力,开发人员会很是抵触单元测试,由于会花费"额外"的精力。Bug这东西不会由于项目赶进度而消失,并且刚好相反,可能由于赶进度,而高于平均水平。因此,若是不花时间进行单元测试,那么会花一样多,甚至更多的时间调试。不少时候,每每一些很小的bug,却致使你花了很长时间去调试,而这些bug,刚好是很容易在单元测试中发现的。并且,单元测试还能够带来两个额外的好处:1)API使用范例;2)回归测试。因此,仍是单元测试吧,这是一笔投资,并且ROI还挺高!不过凡事须要掌握分寸,单元测试应该根据项目紧迫程度调整粒度,作到有所为,有所不为。

 

Spark其余功能

   

前面提到了spark生态圈,spark除了核心的RDD,还提供了之上的几个很使用的应用:

  1. Spark SQL: 相似hive,使用rdd实现sql查询
  2. Spark Streaming: 流式计算,提供实时计算功能,相似storm
  3. MLLib:机器学习库,提供经常使用分类,聚类,回归,交叉检验等机器学习算法并行实现。
  4. GraphX:图计算框架,实现了基本的图计算功能,经常使用图算法和pregel图编程框架。

   

后面须要继续学习和使用上面的功能,尤为是与数据挖掘强相关的MLLib。

   

参考资料

相关文章
相关标签/搜索