spark执行操做时,能够使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。java
spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。python
查询每一个国家的呼号个数apache
# 将呼号前缀(国家代码)做为广播变量 signPrefixes = sc.broadcast(loadCallSignTable()) def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1] return (country, count) countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y))) countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
// 将呼号前缀(国家代码)做为广播变量 val signPrefixes = sc.broadcast(loadCallSignTable()) def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1] return (country, count) val countryContactCounts = contactCounts.map{case (sign, count) => { val country = lookupInArray(sign, signPrefixes.value) (country, count) }}.reduceByKey((x, y) => x+y) countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
// 将呼号前缀(国家代码)做为广播变量 final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable()); JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) { String sign = callSignCount._1(); String country = lookupCountry(sign, signPrefixes.value()); return new Tuple2(country, callSignCount._2()); } }).reduceByKey(new SumInts()); countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
累加空行编程
file = sc.textFile(inputFile) # 建立Accumulator[Int]并初始化为0 blankLines = sc.accumulator(0) def extractCallSigns(line): global blankLines # 访问全局变量 if (line == ""): blankLines += 1 return line.split(" ") callSigns = file.flatMap(extractCallSigns) callSigns.saveAsTextFile(outputDir + "/callsigns") print "Blank lines: %d" % blankLines.value
val file = sc.textFile("file.txt") val blankLines = sc.accumulator(0) //建立Accumulator[Int]并初始化为0 val callSigns = file.flatMap(line => { if (line == "") { blankLines += 1 //累加器加1 } line.split(" ") }) callSigns.saveAsTextFile("output.txt") println("Blank lines:" + blankLines.value)
JavaRDD<String> rdd = sc.textFile(args[1]); final Accumulator<Integer> blankLines = sc.accumulator(0); JavaRDD<String> callSigns = rdd.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String line) { if ("".equals(line)) { blankLines.add(1); } return Arrays.asList(line.split(" ")); } }); callSigns.saveAsTextFile("output.text"); System.out.println("Blank lines:" + blankLines.value());
忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。缓存