咱们传递给Spark的函数,如map(),或者filter()的判断条件函数,可以利用定义在函数以外的变量,可是集群中的每个task都会获得变量的一个副本,而且task在对变量进行的更新不会被返回给driver。而Spark的两种共享变量:累加器(accumulator)和广播变量(broadcast variable),在广播和结果聚合这两种常见类型的通讯模式上放宽了这种限制。 使用累加器能够很简便地对各个worker返回给driver的值进行聚合。java
因为对于worker节点来讲,累加器的值是不可访问的,全部对于worker上的task,累加器是write-only的。这使得累加器能够被更高效的实现,而不须要在每次更新时都进行通讯。数据库
Spark保证:在终止操做中对累加器的操做只执行一次,而转化操做中则可能屡次执行。apache
累加器的操做能够多种,好比算术加法 MAX,只要这些操做符合交换律和结合律。api
累加器会屡次传递给Executor节点(而广播只一次)。数组
import java.util.ArrayList; import java.util.List; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import scala.Tuple2; public class AccumulatorDemo { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local[4]"); conf.setAppName("WordCounter"); conf.set("spark.default.parallelism", "4"); conf.set("spark.testing.memory", "2147480000"); JavaSparkContext ctx = new JavaSparkContext(conf); Person[] persons = new Person[10000]; Broadcast<Person []> persons_br = ctx.broadcast(persons); Accumulator<Integer> count = ctx.accumulator(0); List<String> data1 = new ArrayList<String>(); data1.add("Cake"); data1.add("Bread"); data1.add(""); data1.add("Cheese"); data1.add("Milk"); data1.add("Toast"); data1.add("Bread"); data1.add(""); data1.add("Egg"); data1.add(""); JavaRDD<String> rdd1 = ctx.parallelize(data1, 2); System.out.println(rdd1.glom().collect()); rdd1.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { long id = Thread.currentThread().getId(); System.out.println("s:" + s + " in thread:" + id); if(s.equals("")){ count.add(1); } return new Tuple2<String, Integer>(s, 1); } }).collect(); System.out.println(count.value()); rdd1.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { long id = Thread.currentThread().getId(); System.out.println("s:" + s + " in thread:" + id); if(s.equals("")){ count.add(1); // System.out.println("c:"+count.value()); } System.out.println(persons_br.value().length); return new Tuple2<String, Integer>(s, 1); } }).collect(); System.out.println(count.value()); ctx.stop(); } } class Person{}
import java.util.Arrays; import java.util.List; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; /** * 实例:利用广播进行黑名单过滤! 检查新的数据 根据是否在广播变量-黑名单内,从而实现过滤数据。 */ public class BroadCastDemo { /** * 建立一个List的广播变量 * */ private static volatile Broadcast<List<String>> broadcastList = null; /** * 计数器! */ private static volatile Accumulator<Integer> accumulator = null; public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountOnlineBroadcast"); conf.set("spark.testing.memory", "2147480000"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); /** * 注意:分发广播须要一个action操做触发。 注意:广播的是Arrays的asList * 而非对象的引用。广播Array数组的对象引用会出错。 使用broadcast广播黑名单到每一个Executor中! */ broadcastList = jsc.sparkContext().broadcast(Arrays.asList("Hadoop", "Mahout", "Hive")); /** * 累加器做为全局计数器!用于统计在线过滤了多少个黑名单! 在这里实例化。 */ accumulator = jsc.sparkContext().accumulator(0, "OnlineBlackListCounter"); JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999); /** * 这里省去flatmap由于名单是一个个的! */ JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) { return v1 + v2; } }); /** * Funtion里面 前几个参数是 入参。 后面的出参。 体如今call方法里面! * */ // wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>,Time,boolean>() { // @Override // public boolean call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception { // rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() { // @Override // public Boolean call(Tuple2<String, Integer> wordPair) throws Exception { // if (broadcastList.value().contains(wordPair._1)) { // /** // * accumulator不单单用来计数。 能够同时写进数据库或者缓存中。 // */ // accumulator.add(wordPair._2); // return false; // } else { // return true; // } // }; // /** // * 广播和计数器的执行,须要进行一个action操做! // */ // }).collect(); // System.out.println("广播器里面的值" + broadcastList.value()); // System.out.println("计时器里面的值" + accumulator.value()); // return null; // } // }); jsc.start(); try { jsc.awaitTermination(); } catch (InterruptedException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } jsc.close(); } }