Spark入门(七)--Spark的intersection、subtract、union和distinct

Spark的intersection

intersection顾名思义,他是指交叉的。当两个RDD进行intersection后,将保留二者共有的。所以对于RDD1.intersection(RDD2) 和RDD2.intersection(RDD1) 。应该是一致的。java

好比对于,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7},对于包含这两个List的RDD来讲,他们进行一次intersection应该获得result={3,4,5}python

Spark的subtract

subtract则和intersection不一样,他是找出二者之间不一致的内容。git

好比对于,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7}他们进行一次subtract获得的结果是跟顺序有关的。github

list1.subtract(list2) 
复制代码

结果应该为apache

1 2
复制代码

而对于api

list2.subtract(list1) 
复制代码

结果应该为bash

6 7
复制代码

Spark的union

union最好理解,他是把两个RDD进行整合,但不考虑其中重复的状况。好比对于,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7}他们进行一次union获得的结果是跟顺序无关的。结果应该为less

result = {1,2,3,4,5,3,4,5,6,7}
复制代码

Spark的distinct

distinc 是将RDD中重复的内容剔除,注意,这个剔除的过程并不会把重复的元素都去掉,而是重复的元素只保留一份。这固然很好理解,好比result = {1,2,3,4,5,3,4,5,6,7},进行一次distinct,则获得{1,2,3,4,5,6,7}ide

一个综合的例子

考虑到intersection、subtract、union和distinct比较经常使用,且在一个案例中可以很好体现其特色。所以咱们此次获取的数据集是两个课程,lesson1和lesson2。lesson1中有十位同窗,每一个同窗都有着许多个能力的估值,该估值是一个Int类型数据。lesson2中也是如此。对于这两个数据集我将其分别放在lesson1中和lesson2中。数据集和下面的代码都可以在github上找到并下载。post

数据集分析

对于lesson1,里面有不少同窗,每一个同窗又有不少次能力估值。在Spark入门(六)--Spark的combineByKey、sortBykey中已经提到过给每一个人的成绩求平均分,所以这里不作这个处理。

这两个数据集咱们解决以下的问题:

  • 0、计算lesson1和lesson2中每一个同窗的能力总估值
  • 一、找出lesson1中全部的同窗(不重复)
  • 二、找出lesson2中全部同窗(不重复)
  • 三、找出选了两门课程的同窗
  • 四、找出只在lesson1而不在lesson2中的同窗
  • 五、找出只在lesson2而不在lesson1中的同窗

数据的部份内容展现

对于第0个问题,由于用到的并不是本节的内容,所以标注为0。要求每一个课程中的每一个同窗能力的总估值,首先要对数据进行处理,按空格拆分。拆分后的数据应该是(姓名,分数)的元组集合,而后根据姓名对分数进行累加。

  • 第一个问题中找出lesson1中全部同窗,只要获得了每一个同窗能力的总估值,去掉分数,便可知道lesson1中的全部同窗。

  • 第二题同理。

  • 第三题要找出选了两门课的同窗,则要对两门课全部的同窗进行一次整合,而后剔除重复的数据,即先union再distinc

  • 第四题要找到lesson1中而不在lesson二中的同窗,则只要对lesson1的同窗和lesson2中的同窗进行一次substract便可

  • 第五题同理

scala实现

import org.apache.spark.{SparkConf, SparkContext}

object SparkIntersectionAndSubtract {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("SparkIntersectionAndSubtract")

    val sc = new SparkContext(conf)

    //课程一中的数据
    val lesson1Data = sc.textFile("./lesson1").map(line => (line.split(" ")(0),line.split(" ")(1).toInt))

    //将课程一中每一个人的分数相加
    val lesson1Grade = lesson1Data.reduceByKey(_+_)

    val lesson1Student = lesson1Grade.map(x=>x._1)

    //课程二中的数据处理
    val lesson2Data = sc.textFile("./lesson2").map(line => (line.split(" ")(0),line.split(" ")(1).toInt))

    //将课程二中每一个人的分数相加
    val lesson2Grade = lesson2Data.reduceByKey((x,y)=>x+y)

    val lesson2Student = lesson2Grade.map(x=>x._1)

    //在课程一中的人且在课程二中的人的集合
    println("Students On Lesson1 And On Lesson2")
    lesson1Student.intersection(lesson2Student).foreach(println)

    //在课程二中的人且在课程一中的人的集合,与上面的结果一致
    println("Students On Lesson1 And On Lesson2")
    lesson2Student.intersection(lesson1Student).foreach(println)

    //在课程一中的人但不在课程二中的人的集合
    println("Students Only In Lesson1")
    val onlyInLesson1 = lesson1Student.subtract(lesson2Student)
    onlyInLesson1.foreach(println)

    //在课程二中的人但不在课程二中的人的集合
    println("Students Only In Lesson2")
    val onlyInLesson2 = lesson2Student.subtract(lesson1Student)
    onlyInLesson2.foreach(println)


    //只选了一门课的同窗
    println("Students Only Choose One Lesson")
    lesson1Student.union(lesson2Student).foreach(println)

    //两门课全部学生(不重复)
    println("All the students")
    lesson1Student.union(lesson2Student).distinct().foreach(print)


  }

}
复制代码

java实现

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

public class SparkIntersectionAndSubtractJava {

    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkIntersectionAndSubtractJava");

        JavaSparkContext sc = new JavaSparkContext(conf);

        //java7实现
        intersectionAndSubtractJava(sc);

        //java8实现
        intersectionAndSubtractJava8(sc);
    }


    public static void intersectionAndSubtractJava(JavaSparkContext sc){

        JavaRDD<String> lesson1Data = sc.textFile("./lesson1");

        JavaRDD<String> lesson2Data = sc.textFile("./lesson2");

        JavaPairRDD<String,Integer> lesson1InfoData = lesson1Data.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s.split(" ")[0],Integer.parseInt(s.split(" ")[1]));
            }
        });

        JavaPairRDD<String,Integer> lesson2InfoData = lesson2Data.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s.split(" ")[0],Integer.parseInt(s.split(" ")[1]));
            }
        });

        JavaPairRDD<String,Integer> lesson1Grades = lesson1InfoData.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        });

        JavaPairRDD<String,Integer> lesson2Grades = lesson2InfoData.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        });

        JavaRDD<String> lesson1Students = lesson1Grades.map(new Function<Tuple2<String, Integer>, String>() {
            @Override
            public String call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2._1;
            }
        });

        JavaRDD<String> lesson2Students = lesson2Grades.map(new Function<Tuple2<String, Integer>, String>() {
            @Override
            public String call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2._1;
            }
        });

        //既在lesson1中又在lesson2中的学生
        System.out.println("Students On Lesson1 And On Lesson2");
        lesson1Students.intersection(lesson2Students).foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        //既在lesson2中又在lesson1中的学生,与上面的结果一致
        System.out.println("Students On Lesson1 And On Lesson2");
        lesson2Students.intersection(lesson1Students).foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        //只在lesson1中而不在lesson2中的学生
        JavaRDD<String> studensOnlyInLesson1 = lesson1Students.subtract(lesson2Students);
        System.out.println("Students Only In Lesson1");
        lesson1Students.subtract(lesson2Students).foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        //只在lesson2中而不在lesson1中的学生
        JavaRDD<String> studensOnlyInLesson2 = lesson2Students.subtract(lesson1Students);
        System.out.println("Students Only In Lesson2");
        studensOnlyInLesson2.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        //只选了一门课的学生
        JavaRDD<String> onlyOneLesson = studensOnlyInLesson1.union(studensOnlyInLesson2);
        System.out.println("Students Only Choose One Lesson");
        onlyOneLesson.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        System.out.println("All the students");
        lesson1Students.union(lesson2Students).distinct().foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

    }

    public static void intersectionAndSubtractJava8(JavaSparkContext sc){

        JavaRDD<String> lesson1Data = sc.textFile("./lesson1");

        JavaRDD<String> lesson2Data = sc.textFile("./lesson2");


        JavaPairRDD<String,Integer> lesson1InfoData =
        lesson1Data.mapToPair(line -> new Tuple2<>(line.split(" ")[0],Integer.parseInt(line.split(" ")[1])));


        JavaPairRDD<String,Integer> lesson2InfoData =
        lesson2Data.mapToPair(line -> new Tuple2<>(line.split(" ")[0],Integer.parseInt(line.split(" ")[1])));


        JavaPairRDD<String,Integer> lesson1Grades = lesson1InfoData.reduceByKey((x,y) -> x+y);

        JavaPairRDD<String,Integer> lesson2Grades = lesson2InfoData.reduceByKey((x,y) -> x+y);


        JavaRDD<String> studentsInLesson1 = lesson1Grades.map(x->x._1);

        JavaRDD<String> studentsInLesson2 = lesson2Grades.map(x->x._1);

        //既在lesson1中又在lesson2中的学生
        studentsInLesson1.intersection(studentsInLesson2).foreach(name -> System.out.println(name));

        //既在lesson2中又在lesson1中的学生,与上面的结果一致
        studentsInLesson1.intersection(studentsInLesson2).foreach(name -> System.out.println(name));

        //只在lesson1中的学生
        JavaRDD<String> studentsOnlyInLesson1 = studentsInLesson1.subtract(studentsInLesson2);
        studentsOnlyInLesson1.foreach(name -> System.out.println(name));

        //只在lesson2中的学生
        JavaRDD<String> studentsOnlyInLesson2 = studentsInLesson2.subtract(studentsInLesson1);
        studentsOnlyInLesson2.foreach(name -> System.out.println(name));


        //只选了一门课的学生
        JavaRDD<String> studentsOnlyOneLesson = studentsOnlyInLesson1.union(studentsInLesson2);
        studentsOnlyOneLesson.foreach(name -> System.out.println(name));


        studentsInLesson1.union(studentsInLesson2).distinct().foreach(name -> System.out.println(name));


    }

}
复制代码

python实现

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")

sc = SparkContext(conf=conf)

#lesson1数据
lesson1Data = sc.textFile("./lesson1").map(lambda x:(x.split(" ")[0],int(x.split(" ")[1])))

#lesson2数据
lesson2Data = sc.textFile("./lesson2").map(lambda x:(x.split(" ")[0],int(x.split(" ")[1])))

#lesson1中每一个人的总分
lesson1InfoData = lesson1Data.reduceByKey(lambda x,y:x+y)

#lesson2中每一个人的总分
lesson2InfoData = lesson2Data.reduceByKey(lambda x,y:x+y)

#lesson1中的学生
studentsInLesson1 = lesson1InfoData.map(lambda x:x[0])

#lesson2中的学生
studentsInLesson2 = lesson2InfoData.map(lambda x:x[0])

#在lesson1中且在lesson2中的学生
print("Students On Lesson1 And On Lesson2")
studentsInLesson1.intersection(studentsInLesson2).foreach(print)

#在lesson2中且在lesson1中的学生,与上面的结果一致
print("Students On Lesson1 And On Lesson2")
studentsInLesson2.intersection(studentsInLesson1).foreach(print)

#只在lesson1中的学生
print("Students Only In Lesson1")
studensOnlyInLesson1 = studentsInLesson1.subtract(studentsInLesson2)
studensOnlyInLesson1.foreach(print)


#只在lesson2中的学生
print("Students Only In Lesson2")
studensOnlyInLesson2 = studentsInLesson2.subtract(studentsInLesson1)
studensOnlyInLesson2.foreach(print)


#只选了一门课的学生
print("Students Only Choose One Lesson")
studensOnlyInLesson1.union(studensOnlyInLesson2).foreach(print)

#两门课全部学生(不重复)
print("All the students")
studentsInLesson1.union(studentsInLesson2).distinct().foreach(print)


复制代码

运行获得结果

Students On Lesson1 And On Lesson2
Vicky
Amy
Lili
Bob
Coco

Students On Lesson1 And On Lesson2
Vicky
Amy
Lili
Coco
Bob

Students Only In Lesson1
Bill
David
Mike
Nancy
Lucy

Students Only In Lesson2
White
Jimmy
Jason
John
Frank

Students Only Choose One Lesson
Bill
David
Mike
Nancy
Lucy
White
Jimmy
Jason
John
Frank

All the students
Vicky
Bill
Amy
White
Jimmy
Jason
Lili
David
Bob
Mike
Coco
Nancy
Lucy
John
Frank
复制代码

经过上面的例子,很是具体地应用了intersection、subtract、union和distinct来解决具体的问题。而且利用好这几个方法可以很快速地进行一些数据集之间的关系操做。事实上,直接利用这几种方法比咱们本身动手实现要好不少,由于spark中对这几种方法进行了优化。

数据集和代码都可以在github上找到并下载

相关文章
相关标签/搜索