大数据入门第二十二天——spark(三)自定义分区、排序与查找

1、自定义分区

  1.概述html

    默认的是Hash的分区策略,这点和Hadoop是相似的,具体的分区介绍,参见:https://blog.csdn.net/high2011/article/details/68491115java

  2.实现apache

package cn.itcast.spark.day3 import java.net.URL import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext} import scala.collection.mutable /** * Created by root on 2016/5/18. */ object UrlCountPartition { def main(args: Array[String]) { val conf = new SparkConf().setAppName("UrlCountPartition").setMaster("local[2]") val sc = new SparkContext(conf) //rdd1将数据切分,元组中放的是(URL, 1) val rdd1 = sc.textFile("c://itcast.log").map(line => { val f = line.split("\t") (f(1), 1) }) val rdd2 = rdd1.reduceByKey(_ + _) val rdd3 = rdd2.map(t => { val url = t._1 val host = new URL(url).getHost (host, (url, t._2)) }) val ints = rdd3.map(_._1).distinct().collect() val hostParitioner = new HostParitioner(ints) // val rdd4 = rdd3.partitionBy(new HashPartitioner(ints.length))  val rdd4 = rdd3.partitionBy(hostParitioner).mapPartitions(it => { it.toList.sortBy(_._2._2).reverse.take(2).iterator }) rdd4.saveAsTextFile("c://out4") //println(rdd4.collect().toBuffer)  sc.stop() } } /** * 决定了数据到哪一个分区里面 * @param ins */ class HostParitioner(ins: Array[String]) extends Partitioner { val parMap = new mutable.HashMap[String, Int]() var count = 0 for(i <- ins){ parMap += (i -> count) count += 1 } override def numPartitions: Int = ins.length override def getPartition(key: Any): Int = { parMap.getOrElse(key.toString, 0) } }

   // 与Hadoop相通,再也不赘述ide

2、自定义排序

  基本上就是结合以前的隐式转换了:(这里使用样例类能够不用new就能获得实例,另外也能够用于模式匹配)oop

package cn.itcast.spark.day3 import org.apache.spark.{SparkConf, SparkContext} object OrderContext { implicit val girlOrdering = new Ordering[Girl] { override def compare(x: Girl, y: Girl): Int = { if(x.faceValue > y.faceValue) 1 else if (x.faceValue == y.faceValue) { if(x.age > y.age) -1 else 1 } else -1 } } } /** * Created by root on 2016/5/18. */ //sort =>规则 先按faveValue,比较年龄 //name,faveValue,age  object CustomSort { def main(args: Array[String]) { val conf = new SparkConf().setAppName("CustomSort").setMaster("local[2]") val sc = new SparkContext(conf) val rdd1 = sc.parallelize(List(("yuihatano", 90, 28, 1), ("angelababy", 90, 27, 2),("JuJingYi", 95, 22, 3))) import OrderContext._ val rdd2 = rdd1.sortBy(x => Girl(x._2, x._3), false) println(rdd2.collect().toBuffer) sc.stop() } } /** * 第一种方式 * @param faceValue * @param age case class Girl(val faceValue: Int, val age: Int) extends Ordered[Girl] with Serializable { override def compare(that: Girl): Int = { if(this.faceValue == that.faceValue) { that.age - this.age } else { this.faceValue -that.faceValue } } } */ /** * 第二种,经过隐式转换完成排序 * @param faceValue * @param age */ case class Girl(faceValue: Int, age: Int) extends Serializable

   // 复习隐式转换,基本也无新内容ui

3、IP查找小练习

  参考:https://www.cnblogs.com/wnbahmbb/p/6250099.htmlthis

相关文章
相关标签/搜索