Spark菜鸟记录

一、RDD[(k,v)] join()优化,join以前会对两个RDD的key作hash,经过网络把相同hash值的数据传到同一个节点,所以对屡次join的RDD 作预分区与持久化可提升效率。html

map()操做会失去父RDD的信息,由于key值有可能发生改变,但 mapValues()、flatMapValues()不会。多父RDD已分区,默认采起第一个父RDD的分区方式缓存

cogroup() 、groupWith() 、 join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、combineByKey() 以及 lookup() 等发生跨节点数据混洗的操做均可以进行优化。网络

RDD.partitionBy( new HashPartitioner(3)).persist(StorageLevel.MEMORY_AND_DISK_SER)//构造3个分区
RDD.partitioner//获取分区信息

 2.累加器,行动操做中每一个任务只会对累加器修改一次,转换操做也许会因为缓存移出又从新使用等操做致使屡次修改。(spark1.2)数据结构

只有驱动器能够读,对执行器是只写变量。app

 val accu = sc.accumulator(initialValue)

3.广播变量,只被发到各节点一次,相似BitTorrent通讯机制,只读,修改不会影响其余节点的值,less

val broad = sc.broadcast(T)

4.task、stage、jobide

一个RDD有多少partition 就会生成多少task;一个或多个RDD生成一个stage(通常以shuffle操做为分隔);一个action 生成一个job性能

spark读取文件的并行度与HDFS block有关,HBASE region数有关,hive文件不可分割则与文件数有关,可分割文件与分割数有关。未压缩文件和BZip2Codec压缩类型可分割。优化

  /**
   This input format overrides computeSplitSize() to make sure that each split only contains full records. Each InputSplit passed to FixedLengthBinaryRecordReader will start at the first byte of a record, and the last byte will the last byte of a record.
   */
  override def computeSplitSize(blockSize: Long, minSize: Long, maxSize: Long): Long = {
    val defaultSize = Math.max(minSize, Math.min(maxSize, blockSize))
    // If the default size is less than the length of a record, make it equal to it Otherwise, make sure the split size is as close to possible as the default size,but still contains a complete set of records, with the first record starting at the first byte in the split and the last record ending with the last byte
    if (defaultSize < recordLength) {
      recordLength.toLong
    } else {
      (Math.floor(defaultSize / recordLength) * recordLength).toLong
    }
  }

5.性能调优ui

a.调整并行度

val rdd2 = rdd1.map((_,1)).reduceByKey(_+_,10)//shuffle时调整并行度 shuffle原理:https://www.cnblogs.com/diaozhaojian/p/9635829.html
//Mapreduce和spark shuffle区别(https://mp.weixin.qq.com/s/FT2V9IwNoMl_JU_UDulJ-w) rdd2.repartition(10)//对RDD作重分区,会打乱数据作重分区 rdd2.coalesce(10)//减小分区,调用Repartition(numPartitions, shuffle = false, logicalPlan)

6.RDD.CheckPoint与RDD.persist(https://www.cnblogs.com/jcchoiling/p/6513569.html)(https://blog.csdn.net/rlnLo2pNEfx9c/article/details/81417061)

  CheckPoint 把RDD持久化到HDFS上,加强容错性。job运行时若检测到RDD.CheckPoint会启动一个新job作checkpoint操做,同时删除父RDD,截断依赖链,改变lineage(谱系图)。

设置检查点目录sc.setCheckpointDir("hdfs://IP:9000/checkpoint/")。由于checkpoint要从新计算,所以checkpoint以前建议先persist。

persist 根据存储级别,把数据缓存到不一样介质上,只是保存数据,不改变DAG。

Persist 由executor的blockManager管理,所以driver结束以后persist到 磁盘的数据也会清除,而checkpoint 持久化到HDFS,利用HDFS高可用,不调用remove不会清除。

7.Parquet与ORC(https://blog.csdn.net/yu616568/article/details/51868447)(ORC:https://www.cnblogs.com/ITtangtang/p/7677912.html)

  parquet支持嵌套数据结构,经过repeated和group实现Map、Array等复杂数据结构。(每一个字段有重复次数(required(出现一次)repeated(0或屡次)optional(0或1次))、字段类型(group和primitive)和字段名三个属性)。

多行记录构成一个行组(row group),行组中每一个列做为一个列块(column chunk),不一样列块可采起不一样压缩方式,列块划分为多个页。为了更好地存储嵌套格式,页的成员值由value、Repetition level和Definition level三部分组成,对于repeated类型列,repetition lever标记了所处哪条记录已经在该记录位置。

每一个行组的统计信息包括schema、列块的最大最小值空值数等信息。每一个页的元数据包括value数目,数据页、索引页的offset等信息。

  ORC格式经过把struct类型生成一个schema树,struct类型做为根节点,中序遍历子节点,获得全部叶子节点的数据,交由父节点封装成嵌套数据结构。以此来支持LIST、STRUCT、MAP等复杂结构。

相关文章
相关标签/搜索