spark 广播变量

Spark广播变量java

使用广播变量来优化,广播变量的原理是:apache

在每个Executor中保存一份全局变量,task在执行的时候须要使用和这一份变量就能够,极大的减小了Executor的内存开销。api

Executor中task在执行的时候若是使用到了广播变量,会找Executor里面的BlockManager来获取广播变量。ide

若是BlockManager中没有这个关闭变量,会从driver端拉取关闭变量。优化

在Driver端也有一个blockManagerMaster,其余的task执行的时候直接使用blockmanager中的广播变量就能够。spa

package SparkStreaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.broadcast.Broadcast; import java.util.Arrays; import java.util.List; public class BroadCast { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("BroadCast"); JavaSparkContext sc = new JavaSparkContext(conf); /* * 使用广播变量,广播变量的定义必须在driver端,由于sc没有被序列化不能被发送到Executor端 * */ Broadcast<String> blackname = sc.broadcast("dwj3"); List<String> name = Arrays.asList( "dwj1", "dwj2", "dwj3"); //String blackName = "dwj3"; JavaRDD<String> nameRDD = sc.parallelize(name); JavaRDD<String> namefilter = nameRDD.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { String blacknames = blackname.getValue(); return !blacknames.equals(s); } }); List<String> lastname = namefilter.collect(); for(String str:lastname){ System.out.println(str); } } }

注意:在声明广播变量的时候,必须在driver端,由于sc没有被序列化,是不能被发送到Executor端的。code

相关文章
相关标签/搜索