spark wordcount—IDEA

  • 1.首先在IDEA中,确认是否存在scala编译工具没有的话去官网下载http://www.scala-lang.org/,下面咱们来用scala来写一个wordcount demo
  • 在IDEA创建HelloWord项目,项目为scala项目,而后在创建一个包名为com.admin 创建HelloWord.scala文件,这边是一个对象文件Object.此处jar文件要引用spark-assembly-1.6.1-hadoop2.6.0.jar包
  • 1
  • scala代码以下:
    package com.admin
    
    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
     * Created by test on 2016/5/24.
     */
    object HelloWord {
      def main(args: Array[String]) {
        if(args.length>1){
          System.err.println("")
          System.exit(1)
        }
    
        val conf=new SparkConf()
        val sc = new SparkContext(conf)
        val line = sc.textFile(args(0))
        line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)
        sc.stop()
      }
    
    }
  • scala代码比java简单多了,JAVA真的是要好多的代码,JAVA代码以下:
    package com.admin;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.regex.Pattern;
    
    public final class JavaWordCount {
        private static final Pattern SPACE = Pattern.compile(" ");
    
        public static void main(String[] args) throws Exception {
    
            if (args.length < 1) {
                System.err.println("Usage: JavaWordCount <file>");
                System.exit(1);
            }
    
            SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
            JavaSparkContext ctx = new JavaSparkContext(sparkConf);
            JavaRDD<String> lines = ctx.textFile(args[0], 1);
    
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterable<String> call(String s) {
                    return Arrays.asList(SPACE.split(s));
                }
            });
    
            JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
    
            JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer i1, Integer i2) {
                    return i1 + i2;
                }
            });
    
            List<Tuple2<String, Integer>> output = counts.collect();
            for (Tuple2<?, ?> tuple : output) {
                System.out.println(tuple._1() + ": " + tuple._2());
            }
            ctx.stop();
        }
    }
  • 关于IDEA我也是刚接触之前都是eclipse开发 ,程序写好了接下来就是打成jar文件了234
  • 能够在打包的目录下找到jar文件,以后传入集群服务器中。1
  • 开始运行,运行在hadoop yarn上,--name 为当前运行job的名字 --class 是jar中的主类名,最后的 /user/test.txt是hdfs上的一个文件。文件的类容:import org apache spark api java JavaPairRDD import org apache spark api java JavaRDD import org apache spark api java JavaSparkContext import org apache spark api java function FlatMapFunction import org apache spark api java function Function import org apache spark api java function Function2 import org apache spark api java function PairFunction import scala Tuple2
  • 运行结束查看结果1
  • 单词统计出现的次数
相关文章
相关标签/搜索