java,scala,python,spark,hadoop,local模式测试代码,idea,jdk1.8,scla2.11,python2.7,spark-2.4.3-bin-hadoop2.7

/*scala test*/
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCount {
  def main(args: Array[String]): Unit = {
    println("start...")
    /**
      * 第一步:建立Spark的配置对象SparkConf,设置Spark程序运行时的配置信息,
      * 例如说经过设置setMaster来设置程序要连接的Spark集群的Master的URL,
      * 若是设置为local,则表明Spark程序在本地运行。
      */
    val conf = new SparkConf //建立SparkConf对象
    conf.setAppName("wordCount") //设置应用程序的名称,在程序运行的监控界面能够看到名称
    conf.setMaster("local") //此时,程序在本地运行,不须要安装Spark集群

    /**
      * 第二步:建立SparkContext对象
      * SparkContext是Spark程序全部功能的惟一入口,不管是采用scala、java、Python,R等都
      * 必须有一个SparkContext。SparkContext核心做用:初始化Spark应用程序运行所须要的核心组件,包括
      * DAGScheduler,TaskScheduler、SchedulerBackend同时还会负责Spark程序往Master注册程序等。
      * SparkContext是这个Spark程序中最为相当重要的一个对象。
      */
    val sc = new SparkContext(conf)

    /**
      * 第三步:根据具体的数据源(HDFS、HBase、Local FS、DB、S3等)经过SparkContext建立RDD。
      * RDD的建立方式有三种:根据外部的数据源(HDFS)、根据Scala集合、其余的RDD操做。数据会被RDD划分红一系列的
      * Partitions,分配到每一个Partition的数据属于一个Task的处理范畴
      */
    val lines = sc.textFile("D://data//2.txt", 1)//

    /**
      * 第四步:对初始化的RDD进行Transformation级别处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算。
      */
    /**
      * 4.一、对每一行的字符串拆分红单个的单词
      */
    val words = lines.flatMap { line => line.split(" ") } //对每一行的字符串进行单词拆分并把全部行的拆分结果经过flat合并成为一

    /**
      * 4.二、在单词拆分的基础上对每一个单词实例计数为1,也就是word => (word,1)
      */
    val pairs = words.map { word => (word, 1) }

    /**
      * 4.三、在每一个单词实例计数为1基础之上统计每一个单词在文件中出现的总次数
      */
    val wordCounts = pairs.reduceByKey(_+_) //对相同的key,进行value的累计

    wordCounts.foreach(map => println(map._1 +":"+ map._2))

    sc.stop()

    println("end...")
  }
}
/*python test*/
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)

lines = sc.parallelize(["pandas", "cat", "i like pandas"])
word = lines.filter(lambda s: "pandas" in s)

print(word.count())

 

/*java test*/
import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * 用java语言开发spark程序
 * 第一个学习程序 wordcount
 * @author 18521
 *
 */
public class wordCountLocal {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        // 1 建立一个sparkconf 对象并配置
        // 使用setMaster 能够设置spark集群能够连接集群的URL,若是设置local 表明在本地运行而不是在集群运行
        SparkConf conf = new SparkConf()
                .setAppName("wordCountLocal")
                .setMaster("local");

        // 2 建立javasparkContext对象
        // sparkcontext 是一个入口,主要做用就是初始化spark应用程序所需的一些核心组件,例如调度器,task,
        // 还会注册spark,sparkMaster结点上注册。反正就是spake应用中最重要的对象
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 3 对输入源建立一个出事RDD
        // 元素就是输入源文件中的一行
        JavaRDD<String> lines = sc.textFile("D://data/2.txt");
        // 4 把输入源拆分红一个一个的单词
        // 引用一个RDD 都会建立一个function 类(比较简单的话就是一个匿名内部类)
        // FlatMapFunction 有连个参数输入和输出
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;


            public Iterable<String> call(String arg0) throws Exception {
                // TODO Auto-generated method stub
                return Arrays.asList(arg0.split(" "));
            }
        });
        // 5 须要将每个单词映射为(单词,1) 后面才能够更具单词key 对后面value 1 进行累加从而达到计数的功能
        JavaPairRDD<String, Integer> parirs = words.mapToPair(new PairFunction<String, String, Integer>() {

            /**
             * 每个单词都映射成(单词,1)
             */
            private static final long serialVersionUID = 1L;


            public Tuple2<String, Integer> call(String arg0) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(arg0, 1);
            }
        });
        // 6 以单词作为key 统计单词出现的次数,用reducebykey 算子,对每个key对于的value进行操做
        JavaPairRDD<String, Integer> wordcount = parirs.reduceByKey(new Function2<Integer, Integer, Integer>() {


            public Integer call(Integer arg0, Integer arg1) throws Exception {
                // TODO Auto-generated method stub
                return arg0 + arg1;
            }
        });

        // 7 已经经过spark 的几个算子 flatMap,mapToPair,reduceByKey 已经统计出每个结点中的单词出现的次数
        // 这中操做叫作transformation,可是在一开始的RDD是把文件拆分打散到不一样的结点中的,因此后面还须要操做action 进行集合
        // 9 action 操做经过foreach 来遍历全部最后一个RDD生成的元素
        wordcount.foreach(new VoidFunction<Tuple2<String, Integer>>() {

            @Override
            public void call(Tuple2<String, Integer> arg0) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(arg0._1 + " 出现了:" + arg0._2 + "次");
            }
        });
        sc.close();
    }
}
相关文章
相关标签/搜索