/*scala test*/ import org.apache.spark.SparkConf import org.apache.spark.SparkContext object WordCount { def main(args: Array[String]): Unit = { println("start...") /** * 第一步:建立Spark的配置对象SparkConf,设置Spark程序运行时的配置信息, * 例如说经过设置setMaster来设置程序要连接的Spark集群的Master的URL, * 若是设置为local,则表明Spark程序在本地运行。 */ val conf = new SparkConf //建立SparkConf对象 conf.setAppName("wordCount") //设置应用程序的名称,在程序运行的监控界面能够看到名称 conf.setMaster("local") //此时,程序在本地运行,不须要安装Spark集群 /** * 第二步:建立SparkContext对象 * SparkContext是Spark程序全部功能的惟一入口,不管是采用scala、java、Python,R等都 * 必须有一个SparkContext。SparkContext核心做用:初始化Spark应用程序运行所须要的核心组件,包括 * DAGScheduler,TaskScheduler、SchedulerBackend同时还会负责Spark程序往Master注册程序等。 * SparkContext是这个Spark程序中最为相当重要的一个对象。 */ val sc = new SparkContext(conf) /** * 第三步:根据具体的数据源(HDFS、HBase、Local FS、DB、S3等)经过SparkContext建立RDD。 * RDD的建立方式有三种:根据外部的数据源(HDFS)、根据Scala集合、其余的RDD操做。数据会被RDD划分红一系列的 * Partitions,分配到每一个Partition的数据属于一个Task的处理范畴 */ val lines = sc.textFile("D://data//2.txt", 1)// /** * 第四步:对初始化的RDD进行Transformation级别处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算。 */ /** * 4.一、对每一行的字符串拆分红单个的单词 */ val words = lines.flatMap { line => line.split(" ") } //对每一行的字符串进行单词拆分并把全部行的拆分结果经过flat合并成为一 /** * 4.二、在单词拆分的基础上对每一个单词实例计数为1,也就是word => (word,1) */ val pairs = words.map { word => (word, 1) } /** * 4.三、在每一个单词实例计数为1基础之上统计每一个单词在文件中出现的总次数 */ val wordCounts = pairs.reduceByKey(_+_) //对相同的key,进行value的累计 wordCounts.foreach(map => println(map._1 +":"+ map._2)) sc.stop() println("end...") } }
/*python test*/ from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("My App") sc = SparkContext(conf = conf) lines = sc.parallelize(["pandas", "cat", "i like pandas"]) word = lines.filter(lambda s: "pandas" in s) print(word.count())
/*java test*/ import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 用java语言开发spark程序 * 第一个学习程序 wordcount * @author 18521 * */ public class wordCountLocal { public static void main(String[] args) { // TODO Auto-generated method stub // 1 建立一个sparkconf 对象并配置 // 使用setMaster 能够设置spark集群能够连接集群的URL,若是设置local 表明在本地运行而不是在集群运行 SparkConf conf = new SparkConf() .setAppName("wordCountLocal") .setMaster("local"); // 2 建立javasparkContext对象 // sparkcontext 是一个入口,主要做用就是初始化spark应用程序所需的一些核心组件,例如调度器,task, // 还会注册spark,sparkMaster结点上注册。反正就是spake应用中最重要的对象 JavaSparkContext sc = new JavaSparkContext(conf); // 3 对输入源建立一个出事RDD // 元素就是输入源文件中的一行 JavaRDD<String> lines = sc.textFile("D://data/2.txt"); // 4 把输入源拆分红一个一个的单词 // 引用一个RDD 都会建立一个function 类(比较简单的话就是一个匿名内部类) // FlatMapFunction 有连个参数输入和输出 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; public Iterable<String> call(String arg0) throws Exception { // TODO Auto-generated method stub return Arrays.asList(arg0.split(" ")); } }); // 5 须要将每个单词映射为(单词,1) 后面才能够更具单词key 对后面value 1 进行累加从而达到计数的功能 JavaPairRDD<String, Integer> parirs = words.mapToPair(new PairFunction<String, String, Integer>() { /** * 每个单词都映射成(单词,1) */ private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String arg0) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(arg0, 1); } }); // 6 以单词作为key 统计单词出现的次数,用reducebykey 算子,对每个key对于的value进行操做 JavaPairRDD<String, Integer> wordcount = parirs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer arg0, Integer arg1) throws Exception { // TODO Auto-generated method stub return arg0 + arg1; } }); // 7 已经经过spark 的几个算子 flatMap,mapToPair,reduceByKey 已经统计出每个结点中的单词出现的次数 // 这中操做叫作transformation,可是在一开始的RDD是把文件拆分打散到不一样的结点中的,因此后面还须要操做action 进行集合 // 9 action 操做经过foreach 来遍历全部最后一个RDD生成的元素 wordcount.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> arg0) throws Exception { // TODO Auto-generated method stub System.out.println(arg0._1 + " 出现了:" + arg0._2 + "次"); } }); sc.close(); } }