一,简介算法
二,自定义分区规则apache
2.1 普通的分组TopN实现缓存
2.2 自定义分区规则TopN实现ide
三,RDD的缓存spa
3.1 RDD缓存简介scala
3.2 RDD缓存方式code
在以前的文章中,咱们知道RDD的有一个特征:就是一组分片(Partition),即数据集的基本组成单位。对于RDD来讲,每一个分片都会被一个计算任务处理,并决定并行计算的粒度。用户能够在建立RDD时指定RDD的分片个数,若是没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。这个分配的规则咱们是能够本身定制的。同时咱们一直在讨论Spark快,快的方式有那些方面能够体现,RDD缓存就是其中的一个形式,这里将对这二者进行介绍。blog
分组求TopN的方式有多种,这里进行简单的几种。这里尊卑一些数据:点击下载排序
实现思路一:先对数据进行处理,而后聚合。最后进行分组排序。继承
package cn.edu360.sparkTwo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SubjectTopNone { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/", 2) // 对每一行数据进行整理 val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => { val words: Array[String] = line.split("/") val teacher: String = words(3) val subject: String = words(2).split("[.]")(0) ((subject, teacher), 1) }) // 聚合,将学科和老师联合当作key val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_) //分组排序(按学科进行分组) //[学科,该学科对应的老师的数据] val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1) // 这里取出的是每一组的数据 // 为何能够调用scala的sortby方法呢?由于一个学科的数据已经在一台机器上的一个scala集合里面了 // 弊端,调用scala的sortBy当数据量过大时,有内存溢出的缺陷 val result: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(4)) println(result.collect.toBuffer) } }
实现思路二:先对数据进行处理,而后聚合,而后对数据进行单学科过滤,最后进行排序,提交。
package cn.edu360.sparkTwo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SubjectTopNtwo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SubjectTwo").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn") val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => { val words: Array[String] = line.split("/") val teacher: String = words(3) val subject: String = words(2).split("[.]")(0) ((subject, teacher), 1) }) val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_) // 获取全部学科 val subjects: Array[String] = reduced.map(_._1._1).distinct().collect() // 对全部的reduce后的数据进行单学科过滤,在进行排序 for(sb <- subjects){ val filter: RDD[((String, String), Int)] = reduced.filter(_._1._1 == sb) // 这里进行了屡次提交 val result: Array[((String, String), Int)] = filter.sortBy(_._2, false).take(3) print(result.toBuffer) } sc.stop() } }
实现方式一:先对数据进行处理,而后聚合,然后对按照学科进行分区,而后对每个分区进行排序。
package cn.edu360.sparkTwo import org.apache.spark.{Partitioner, SparkConf, SparkContext} import org.apache.spark.rdd.RDD import scala.collection.mutable object SubjectTopNthree { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/") val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => { val words: Array[String] = line.split("/") val teacher: String = words(3) val subject: String = words(2).split("[.]")(0) ((subject, teacher), 1) }) //聚合,将学科和老师联合当作key ---> 这里有一次shuffle val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_) //计算有多少学科 val subjects: Array[String] = reduced.map(_._1._1).distinct().collect() //partitionBy按照指定的分区规则进行分区 //调用partitionBy时RDD的Key是(String, String) --->这里也有一次shuffle val partioned: RDD[((String, String), Int)] = reduced.partitionBy(new SubPartitioner(subjects)) //若是一次拿出一个分区(能够操做一个分区中的数据了) val sorted: RDD[((String, String), Int)] = partioned.mapPartitions(it => { //将迭代器转换成list,而后排序,在转换成迭代器返回 it.toList.sortBy(_._2).reverse.take(3).iterator }) val result: Array[((String, String), Int)] = sorted.collect() print(result.toBuffer) } } // 自定义分区规则,须要继承Partitioner class SubPartitioner(subs: Array[String]) extends Partitioner{ //至关于主构造器(new的时候回执行一次) //用于存放规则的一个map private val rules = new mutable.HashMap[String, Int]() var i = 0 for(sub <- subs){ rules.put(sub, i) i += 1 } //返回分区的数量(下一个RDD有多少分区) override def numPartitions: Int = subs.length //根据传入的key计算分区标号 //key是一个元组(String, String) override def getPartition(key: Any): Int = { //获取学科名称 val s: String = key.asInstanceOf[(String, String)]._1 //根据规则计算分区编号 rules(s) } }
实现方式二:上面的过程能够将聚合和分区操做进行合并,减小shuffle的次数
package cn.edu360.sparkTwo import org.apache.spark.{Partitioner, SparkConf, SparkContext} import org.apache.spark.rdd.RDD import scala.collection.mutable object SubjectTopNfour { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/") val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => { val words: Array[String] = line.split("/") val teacher: String = words(3) val subject: String = words(2).split("[.]")(0) ((subject, teacher), 1) }) val subjects: Array[String] = sbToTeacherOne.map(_._1._1).distinct().collect() // 在这里传入分区规则,即聚合时就分区 val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(new SubPartinerTwo(subjects), _+_) // 对每一个分区进行排序 val result: RDD[((String, String), Int)] = reduced.mapPartitions(it => { it.toList.sortBy(_._2).reverse.take(3).iterator }) print(result.collect().toBuffer) } } class SubPartinerTwo(subs: Array[String]) extends Partitioner{ private val rules = new mutable.HashMap[String, Int]() var i = 0 for(sub <- subs){ rules.put(sub, i) i += 1 } override def numPartitions: Int = subs.length override def getPartition(key: Any): Int = { val subject: String = key.asInstanceOf[(String, String)]._1 rules(subject) } }
Spark速度很是快的缘由之一,就是在不一样操做中能够在内存中持久化或缓存数据集。当持久化某个RDD后,每个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其余动做中重用。这使得后续的动做变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。能够说,缓存是Spark构建迭代式算法和快速交互式查询的关键。
RDD经过persist方法或cache方法能够将前面的计算结果缓存,可是并非这两个方法被调用时当即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
经过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
缓存有可能丢失,或者存储存储于内存的数据因为内存不足而被删除,RDD的缓存容错机制保证了即便缓存丢失也能保证计算的正确执行。经过基于RDD的一系列转换,丢失的数据会被重算,因为RDD的各个Partition是相对独立的,所以只须要计算丢失的部分便可,并不须要重算所有Partition。
实例:
package cn.edu360.sparkTwo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SubjectTopNtwo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SubjectTwo").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn") val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => { val words: Array[String] = line.split("/") val teacher: String = words(3) val subject: String = words(2).split("[.]")(0) ((subject, teacher), 1) }) val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_) // 这里讲reduced的数据集到缓存中 val cached: RDD[((String, String), Int)] = cached.cache() // 获取全部学科 val subjects: Array[String] = cached.map(_._1._1).distinct().collect() // 对全部的reduce后的数据进行单学科过滤,在进行排序 for(sb <- subjects){ // 由于这里的屡次提交和过滤,因此添加到缓存就有必要了 val filter: RDD[((String, String), Int)] = cached.filter(_._1._1 == sb) // 这里进行了屡次提交 val result: Array[((String, String), Int)] = filter.sortBy(_._2, false).take(3) print(result.toBuffer) } sc.stop() } }