Spark学习记录(三)核心API模块介绍

spark
-------------
基于hadoop的mr,扩展MR模型高效使用MR模型,内存型集群计算,提升app处理速度。java

spark特色
-------------
速度:在内存中存储中间结果。
支持多种语言。Scala、Java、Python
内置了80+的算子.
高级分析:MR,SQL/ Streamming /mllib / graphpython

RDD:
----------------
是spark的基本数据结构,是不可变数据集。RDD中的数据集进行逻辑分区,每一个分区能够单独在集群节点
进行计算。能够包含任何java,scala,python和自定义类型。mysql

RDD是只读的记录分区集合。RDD具备容错机制。sql

建立RDD方式,1、并行化一个现有集合。数组

hadoop 花费90%时间用户rw。、

内存处理计算。在job间进行数据共享。内存的IO速率高于网络和disk的10 ~ 100之间。网络

内部包含5个主要属性
-----------------------
1.分区列表
2.针对每一个split的计算函数。
3.对其余rdd的依赖列表
4.可选,若是是KeyValueRDD的话,能够带分区类。
5.可选,首选块位置列表(hdfs block location);数据结构

 

RDD变换app

rdd的变换方法都是lazy执行的
------------------
返回指向新rdd的指针,在rdd之间建立依赖关系。每一个rdd都有计算函数和指向父RDD的指针。函数


map() //对每一个元素进行变换,应用变换函数
//(T)=>Voop

mapPartitions() //对每一个分区进行应用变换,输入的Iterator,返回新的迭代器,能够对分区进行函数处理。

//针对每一个数据分区进行操做,入参是分区数据的Iterator,map() 针对分区中的每一个元素进行操做。

mapPartitions()  //Iterator<T> => Iterator<U>

注:最好设置每一个分区都对应有一个线程。

filter() //过滤器,(T)=>Boolean
flatMap() //压扁,T => TraversableOnce[U]

 

//同mapPartitions方法同样都是针对分区处理,只不过这个方法能够获取到分区索引

mapPartitionsWithIndex(func)  //(Int, Iterator<T>) => Iterator<U>

 

//采样返回采样的RDD子集。
//withReplacement 元素是否能够屡次采样.
//fraction : 指望采样数量.[0,1]
//表示一个种子,根据这个seed随机抽取,通常都只用到前两个参数
sample(withReplacement, fraction, seed)

做用:在数据倾斜的时候,咱们那么多数据若是想知道那个key倾斜了,就须要咱们采样获取这些key,出现次数陊的key就是致使数据倾斜的key。若是这些key数据不是很重要的话,能够过滤掉,这样就解决了数据倾斜。

 

 

union() //相似于mysql union操做。

 

intersection //交集,提取两个rdd中都含有的元素。


distinct([numTasks])) //去重,去除重复的元素。

 

groupByKey() //(K,V) => (K,Iterable<V>)  使用前须要构造出对偶的RDD

reduceByKey(*) //按key聚合。注意他是一个RDD变换方法,不是action

 

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])//按照key进行聚合,这个函数逻辑较为复杂请看aggregateByKey函数的专题

 

sortByKey //根据映射的Key进行排序,可是只能根据Key排序

 

sortBy //比sortByKey更加灵活强大的排序,可根据元组中任意字段排序

 

join(otherDataset, [numTasks]) //横向链接,有两种数据(K,V)和(K,W),连接后返回(K,(V,W)),两个元组一一对应的

 

cogroup //协分组,(K,V)和(K,W)分组后返回(K,(V,W)),注意协分组不是一一对应的分组后须要(此处注意与join的区别)



cartesian(otherDataset) //笛卡尔积,RR[(A,B)] RDD[(1,2)] => RDD[(A,1),(A,2),(B,1),(B,2)]

 

pipe //将rdd的元素传递给脚本或者命令,执行结果返回造成新的RDD


coalesce(numPartitions) //减小分区


repartition //再分区


repartitionAndSortWithinPartitions(partitioner)//再分区并在分区内进行排序

 

RDD Action

Spack的中的方法都是懒的,,只有遇到了action类型的方法才会真正的执行
------------------
collect() //收集rdd元素造成数组.
count() //统计rdd元素的个数
reduce() //聚合,返回一个值。
first //取出第一个元素take(1)
take //
takeSample (withReplacement,num, [seed])
takeOrdered(n, [ordering])

saveAsTextFile(path) //保存到文件
saveAsSequenceFile(path) //保存成序列文件  sc.sequenceFile读取序列文件

saveAsObjectFile(path) (Java and Scala)

countByKey()                       //按照key统计有几个value 

 

数据倾斜

------------------------------

因为大量相同的Key,在reduce合并计算的过程当中,大量相同的Key被分配到了同一个集群节点,致使集群中这个节点计算压力很是大。

本例采用的解决方案是,在map截断将Key先接上一个随机数打散,而后在reduce计算后,再次map还原key,而后进行最终reduce。


Spark WebUI 上面代码运行的DAG 有效无环图,咱们能够清楚地看到每一次的reduce聚合都会从新划分阶段

相关文章
相关标签/搜索