初学spark,用Java写了几个本地运行的spark实例代码,来记录一下已学的spark经常使用的算子的使用和处理逻辑,不涉及分布式集群。相关内容仅为本身的我的理解,若有错误还请指出。html
统计一个文本文件中的每一个单词的出现次数,数据格式:
java
首先经过textFile()函数将文件读入JavaRDD,而后经过flatMap算子将每一行的数据进行分割,获得多个String,一行数据分割获得的多个String以Iterator
public class SparkWordCount { public static void main(String[] args){ SparkConf conf = new SparkConf(); //添加这一行则在本地运行,不添加这一行则默认在集群执行 conf.setMaster("local"); conf.setAppName("WordCount"); //基本的初始化 JavaSparkContext sc=new JavaSparkContext(conf); //建立String类型的RDD,并从本地文件中读取数据 JavaRDD<String> fileRDD = sc.textFile("src/main/files/words.txt");//经过文件读入建立RDD //flatMap()算子用来分割操做,将原RDD中的数据分红一个个片断 //new FlatMapFunction<String, String>中的两个String分别表示输入和输出类型 JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { //经过Iterator迭代器能够将分割后的多个数据元素所有返回输出 return Arrays.asList(line.split("\\s+")).iterator(); } }); //mapToPair()算子是用来对分割后的一个个片断结果添加计数标志的,如出现次数1,该函数用来建立并返回pair类型的RDD. new PairFunction<String, String, Integer>中分别是输入类型String和输出类型<String, Integer>. JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<>(word, 1); //Tuple2是spark的二元数组类型,Java中没有 } }); //reduceByKey()算子是根据key来聚合,reduce阶段.new Function2<Integer, Integer, Integer>中分别是用来聚合的两个输入类型Integer,Integer和聚合后的输出类型Integer. JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); wordCountRDD.saveAsTextFile("E:\\result7"); } }
java实现spark统计100万人口的平均年龄以及每一个年龄的出现次数,数据格式为“序号 年龄”
数据生成代码:apache
//生成年龄数据,格式“序号 年龄” private static void makeAgeData() throws IOException { File newFile = new File("src/main/files/peopleAges.txt"); if (newFile.exists()){ System.out.println("文件已存在!"); return; } newFile.createNewFile(); FileWriter fw = new FileWriter(newFile,true); Random rand = new Random(); for (int i=1;i<=1000000;i++){ fw.append(i+" "+(rand.nextInt(100)+1)+"\n"); fw.flush(); } fw.close(); }
首先经过textFile()函数将文件数据读入RDD中。而后使用mapToPair()算子将每一行数据中的年龄做为key并对每个年龄添加计数标记1做为value,接着使用reduceByKey算子对相同年龄值的数据进行聚合。
求平均年龄时首先要求出年龄和,这也是reduce聚合操做。可是要注意reduce算子只能接收单个数据元素组成的RDD做为输入,不能接收pair类型的RDD,因此对源文件读出的RDD先经过map算子输出只有年龄值数据的RDD,而后进行reduce()。数组
public class AvgAge { public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("AvgAge"); JavaSparkContext sc = new JavaSparkContext(conf); //刚从文件读出来的RDD已是一行一行的字符串,因此能够直接进行mapToPair JavaRDD<String> fileRDD = sc.textFile("src/main/files/peopleAges.txt"); JavaPairRDD<Integer, Integer> ageOneRdd = fileRDD.mapToPair(new PairFunction<String, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(String s) throws Exception { return new Tuple2<>(Integer.parseInt(s.split("\\s+")[1]),1); } }); //使用reduceByKey算子对具备相同年龄值的数据进行聚合,获取每一个年龄值的人数 JavaPairRDD<Integer, Integer> ageCountRDD = ageOneRdd.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); //求平均年龄 //先经过map算子取出每一个年龄值做为一个RDD。 //reduce()函数的输入RDD不能是pair,只能是单个数据组成的RDD Integer ageSum = fileRDD.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { return Integer.parseInt(s.split("\\s+")[1]); } }).reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer s, Integer s2) throws Exception { return s+s2; } }); System.out.println("平均年龄:"+ageSum/fileRDD.count()); ageCountRDD.saveAsTextFile("src/main/files/ageAnalysis"); }
统计男女人数,并分别计算出男性和女性的最高和最低身高,数据格式“序号 M/F 身高”
数据生成代码app
//生成性别身高数据,格式“序号 性别(M/F) 身高” private static void makeHeightData() throws IOException { File newFile = new File("src/main/files/heightData.txt"); if (newFile.exists()){ System.out.println("文件已存在!"); return; } newFile.createNewFile(); FileWriter fw = new FileWriter(newFile,true); Random rand = new Random(); for (int i=1;i<=50000;i++){ fw.append(i+" M "+(rand.nextInt(100)+100)+"\n"); fw.append(i+" F "+(rand.nextInt(80)+100)+"\n"); fw.flush(); } fw.close(); }
首先经过textFile()函数将文件数据读入RDD。而后使用filter算子分别过滤出男性和女性数据,接着用map算子分割出身高值并将其转化成Integer类型,这样才能用于数字排序,而后使用sortBy算子排序。sortBy算子能够直接对RDD中的数据排序,不用区分key仍是value。dom
public class HeightMaxMin { public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("HeightAnalysis"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> fileRDD = sc.textFile("src/main/files/heightData.txt"); //使用filter算子分别过滤出男性和女性数据 JavaRDD<String> maleRDD = fileRDD.filter(new Function<String, Boolean>() { @Override //若是这行数据符合过滤条件则返回true public Boolean call(String s) throws Exception { return s.contains("M"); } }); JavaRDD<String> femaleRDD = fileRDD.filter(new Function<String, Boolean>() { @Override //若是这行数据符合过滤条件则返回true public Boolean call(String s) throws Exception { return s.contains("F"); } }); //使用map算子分割出身高并转化为整数类型,这样才能用排序 JavaRDD<Integer> maleHeightRDD = maleRDD.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { return Integer.parseInt(s.split("\\s+")[2]); } }); JavaRDD<Integer> femaleHeightRDD = femaleRDD.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { return Integer.parseInt(s.split("\\s+")[2]); } }); //使用sortBy算子排序 JavaRDD<Integer> sortmaleHeightRDD = (JavaRDD<Integer>) maleHeightRDD.sortBy(new Function<Integer, Object>() { @Override //返回的是排序的内容,即对输入RDD中的哪一部分进行排序就输出哪一部分 public Object call(Integer integer) throws Exception { return integer; } }, false,10);//false降序true升序,10是partition分区数,由于没有用集群因此也不太明白这个分区数具体指什么 JavaRDD<Integer> sortfemaleHeightRDD = (JavaRDD<Integer>) femaleHeightRDD.sortBy(new Function<Integer, Object>() { @Override public Object call(Integer integer) throws Exception { return integer; } }, false,10);//第二个参数true/false是正序逆序,最后一个参数10是分区数 //first()函数返回排名第一的数据 System.out.println("男性: "+sortmaleHeightRDD.count()+" "+sortmaleHeightRDD.first()); System.out.println("女性: "+sortfemaleHeightRDD.count()+" "+sortfemaleHeightRDD.first()); } }
统计一段文本里出现频率最高的前k个词,注意单词不分大小写。maven
首先从文件读入数据到RDD,而后使用flatMap算子对每一行的数据按照空格进行分割,并将全部的字母都转为小写,接着使用mapToPair算子对每个单词添加计数标记1,而后使用reduceByKey算子对单词进行reduce聚合,为了根据key来排序,聚合后再使用mapToPair算子将获得的pair里面的key和value调换一下位置。而后使用sortByKey算子根据key来进行排序,最后使用take算子取出排名前5的数据。分布式
public class wordTopK { public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("wordTopK"); JavaSparkContext sc=new JavaSparkContext(conf); JavaRDD<String> fileRDD = sc.textFile("D:\\summer_study\\ppt\\hive.txt"); //使用flatmap算子对每一行数据按空格分隔,并将全部的字母都转为小写 //注意这里不用map是由于map只能输出一个数据元素,而flatMap能够在输入元素后添加任意多元素来输出, // 好比分割后的多个元素组成Iterator来输出,可是Iterator里的每个元素依然是独立的RDD。 JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.toLowerCase().split("\\s+")).iterator(); } }); //使用maoToPair算子给每个word加上计数标记1 JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s,1); } }); //使用reduceByKey算子对word进行reduce聚合,为了根据key来排序,聚合后再将获得的pair里面的key和value调换一下 JavaPairRDD<Integer, String> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }).mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return new Tuple2<>(stringIntegerTuple2._2,stringIntegerTuple2._1); } }); //使用sortByKey算子排序,false降序,true升序 JavaPairRDD<Integer,String> sortWordCountRDD = wordCountRDD.sortByKey(false); //使用take算子取出前5 List<Tuple2<Integer,String>> result = sortWordCountRDD.take(5); System.out.println("结果:"+result.toString()); } }
ide:Idea
jdk:1.8.0_121
spark:2.1.1
附maven配置:ide
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ffxn</groupId> <artifactId>ffxn</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> <!--<scope>provided</scope>--> </dependency> </dependencies> </project>
注:以上仅为我的学习记录,spark初学菜鸟,若有错误,敬请提出。
参考:
Spark 入门实战之最好的实例
Spark学习之路