spark
-------------
基于hadoop的mr,扩展MR模型高效使用MR模型,内存型集群计算,提升app处理速度。java
spark特色
-------------
速度:在内存中存储中间结果。
支持多种语言。Scala、Java、Python
内置了80+的算子.
高级分析:MR,SQL/ Streamming /mllib / graphpython
RDD:
----------------
是spark的基本数据结构,是不可变数据集。RDD中的数据集进行逻辑分区,每一个分区能够单独在集群节点
进行计算。能够包含任何java,scala,python和自定义类型。mysql
RDD是只读的记录分区集合。RDD具备容错机制。sql
建立RDD方式,1、并行化一个现有集合。数组
hadoop 花费90%时间用户rw。、
内存处理计算。在job间进行数据共享。内存的IO速率高于网络和disk的10 ~ 100之间。网络
内部包含5个主要属性
-----------------------
1.分区列表
2.针对每一个split的计算函数。
3.对其余rdd的依赖列表
4.可选,若是是KeyValueRDD的话,能够带分区类。
5.可选,首选块位置列表(hdfs block location);数据结构
RDD变换app
rdd的变换方法都是lazy执行的
------------------
返回指向新rdd的指针,在rdd之间建立依赖关系。每一个rdd都有计算函数和指向父RDD的指针。函数
map() //对每一个元素进行变换,应用变换函数
//(T)=>Voop
mapPartitions() //对每一个分区进行应用变换,输入的Iterator,返回新的迭代器,能够对分区进行函数处理。
//针对每一个数据分区进行操做,入参是分区数据的Iterator,map() 针对分区中的每一个元素进行操做。
mapPartitions() //Iterator<T> => Iterator<U>
注:最好设置每一个分区都对应有一个线程。
filter() //过滤器,(T)=>Boolean
flatMap() //压扁,T => TraversableOnce[U]
//同mapPartitions方法同样都是针对分区处理,只不过这个方法能够获取到分区索引
mapPartitionsWithIndex(func) //(Int, Iterator<T>) => Iterator<U>
//采样返回采样的RDD子集。
//withReplacement 元素是否能够屡次采样.
//fraction : 指望采样数量.[0,1]
//表示一个种子,根据这个seed随机抽取,通常都只用到前两个参数
sample(withReplacement, fraction, seed)
做用:在数据倾斜的时候,咱们那么多数据若是想知道那个key倾斜了,就须要咱们采样获取这些key,出现次数陊的key就是致使数据倾斜的key。若是这些key数据不是很重要的话,能够过滤掉,这样就解决了数据倾斜。
union() //相似于mysql union操做。
intersection //交集,提取两个rdd中都含有的元素。
distinct([numTasks])) //去重,去除重复的元素。
groupByKey() //(K,V) => (K,Iterable<V>) 使用前须要构造出对偶的RDD
reduceByKey(*) //按key聚合。注意他是一个RDD变换方法,不是action
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])//按照key进行聚合,这个函数逻辑较为复杂请看aggregateByKey函数的专题
sortByKey //根据映射的Key进行排序,可是只能根据Key排序
sortBy //比sortByKey更加灵活强大的排序,可根据元组中任意字段排序
join(otherDataset, [numTasks]) //横向链接,有两种数据(K,V)和(K,W),连接后返回(K,(V,W)),两个元组一一对应的
cogroup //协分组,(K,V)和(K,W)分组后返回(K,(V,W)),注意协分组不是一一对应的分组后须要(此处注意与join的区别)
cartesian(otherDataset) //笛卡尔积,RR[(A,B)] RDD[(1,2)] => RDD[(A,1),(A,2),(B,1),(B,2)]
pipe //将rdd的元素传递给脚本或者命令,执行结果返回造成新的RDD
coalesce(numPartitions) //减小分区
repartition //再分区
repartitionAndSortWithinPartitions(partitioner)//再分区并在分区内进行排序
RDD Action
Spack的中的方法都是懒的,,只有遇到了action类型的方法才会真正的执行
------------------
collect() //收集rdd元素造成数组.
count() //统计rdd元素的个数
reduce() //聚合,返回一个值。
first //取出第一个元素take(1)
take //
takeSample (withReplacement,num, [seed])
takeOrdered(n, [ordering])
saveAsTextFile(path) //保存到文件
saveAsSequenceFile(path) //保存成序列文件 sc.sequenceFile读取序列文件
saveAsObjectFile(path) (Java and Scala)
countByKey() //按照key统计有几个value
数据倾斜
------------------------------
因为大量相同的Key,在reduce合并计算的过程当中,大量相同的Key被分配到了同一个集群节点,致使集群中这个节点计算压力很是大。
本例采用的解决方案是,在map截断将Key先接上一个随机数打散,而后在reduce计算后,再次map还原key,而后进行最终reduce。
Spark WebUI 上面代码运行的DAG 有效无环图,咱们能够清楚地看到每一次的reduce聚合都会从新划分阶段