一,必备知识apache
1.1 经典14问安全
1.2 问题前提网络
二,序列化问题函数
2.1 Spark序列化出现状况工具
2.2 Spark序列化问题解决spa
三,线程安全问题线程
3.1 Spark线程安全出现状况code
3.2 Spark线程安全问题解决orm
1.SparkContext哪一端生成的? Driver端 2.DAG是在哪一端被构建的? Driver端 3.RDD是在哪一端生成的? Driver端 4.广播变量是在哪一端调用的方法进行广播的? Driver端 5.要广播的数据应该在哪一端先建立好再广播呢? Driver端 6.调用RDD的算子(Transformation和Action)是在哪一端调用的 Driver端 7.RDD在调用Transformation和Action时须要传入一个函数,函数是在哪一端声明和传入的? Driver端 8.RDD在调用Transformation和Action时须要传入函数,请问传入的函数是在哪一端执行了函数的业务逻辑? Executor中的Task执行的 9.自定义的分区器这个类是在哪一端实例化的? Driver端 10.分区器中的getParitition方法在哪一端调用的呢? Executor中的Task中调用的 11.Task是在哪一端生成的呢? Driver端 12.DAG是在哪一端构建好的并被切分红一到多个State的 Driver端 13.DAG是哪一个类完成的切分Stage的功能? DAGScheduler 14.DAGScheduler将切分好的Stage以什么样的形式给TaskScheduler TaskSet
在上面的12问的7-8问中,函数的申明和调用分别在Driver和Execute中进行,这其中就会牵扯到序列化问题和线程安全问题。接下来会对其进行解释。blog
工具类:
package cn.edu360.spark05 // 随意定义一工具类 class MyUtil { def get(msg: String): String ={ msg+"aaa" } }
Spark实现类:
package cn.edu360.spark05 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SequenceTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]") var sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/") val words = lines.flatMap(_.split(" ")) // 对类进行实例化 val util = new MyUtil // 调用实例的方法 val value: RDD[String] = words.map(word => util.get(word)) value.collect() sc.stop() } }
报错信息以下:
上述报错信息就说明是MyUtil实例的序列化问题。该实例是在Driver端建立,经过网络发送到Worker的Executer端。可是这个实例并为序列化,因此会报这些错误。
解决方案一:实现序列化接口
package cn.edu360.spark05 // 继承Serializable class MyUtil extends Serializable { def get(msg: String): String ={ msg+"aaa" } }
弊端:须要本身实现序列化接口,相对麻烦
解决方案二:不实现序列化接口,在Executer进行MyUtil内进行实例化
package cn.edu360.spark05 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SequenceTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]") var sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/") val words = lines.flatMap(_.split(" ")) val value: RDD[String] = words.map(word => { // 在这里进行实例化,这里的操做是在Executer中 val util = new MyUtil util.get(word) }) val result: Array[String] = value.collect() print(result.toBuffer) sc.stop() } }
弊端:每一次调用都须要建立一个新的实例,浪费资源,浪费内存。
解决方案三:采用单例模式
MyUtil类:
package cn.edu360.spark05 // 将class 改成 object的单例模式 object MyUtil { def get(msg: String): String ={ msg+"aaa" } }
Spark实现类:
package cn.edu360.spark05 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SequenceTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]") var sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/") val words = lines.flatMap(_.split(" ")) val value: RDD[String] = words.map(word => { // 调用方法 MyUtil.get(word) }) val result: Array[String] = value.collect() print(result.toBuffer) sc.stop() } }
有共享成员变量:
1. 工具类使用object,说明工具类是单例的,有线程安全问题。在函数内部使用,是在Executer中被初始化,一个Executer中有一个实例,因此 就出现了线程安全问题。
2. 工具类使用Class,说明是多例的,没有线程安全问题。每一个task都会持有一份工具类的实例。
没有共享成员变量:
1. 工具类Object,没有线程安全问题
2. 工具类使用class,实现序列化便可
工具类优先使用object,但尽量不使用成员变量,若实在有这方面的需求,能够定义类的类型,或者把成员变量变成线程安全的成员变量,例如加锁等。