让代码分布式运行是全部分布式计算框架须要解决的最基本的问题。java
Spark是大数据领域中至关火热的计算框架,在大数据分析领域有一统江湖的趋势,网上对于Spark源码分析的文章有不少,可是介绍Spark如何处理代码分布式执行问题的资料少之又少,这也是我撰写文本的目的。node
Spark运行在JVM之上,任务的执行依赖序列化及类加载机制,所以本文会重点围绕这两个主题介绍Spark对代码分布式执行的处理。本文假设读者对Spark、Java、Scala有必定的了解,代码示例基于Scala,Spark源码基于2.1.0版本。阅读本文你能够了解到:git
根据以上内容,读者能够基于JVM相关的语言构建一个本身的分布式计算服务框架。github
序列化(Serialization)是将对象的状态信息转换为能够存储或传输的形式的过程。所谓的状态信息指的是对象在内存中的数据,Java中通常指对象的字段数据。咱们开发Java应用的时候或多或少都处理过对象序列化,对象常见的序列化形式有JSON、XML等。shell
JDK中内置一个ObjectOutputStream
类能够将对象序列化为二进制数据,使用ObjectOutputStream
序列化对象时,要求对象所属的类必须实现java.io.Serializable
接口,不然会报java.io.NotSerializableException
的异常。apache
基本的概念先介绍到这。接下来咱们一块儿探讨一个问题:Java的方法可否被序列化?服务器
假设咱们有以下的SimpleTask
类(Java类):网络
import java.io.Serializable; public abstract class Task implements Serializable { public void run() { System.out.println("run task!"); } } public class SimpleTask extends Task { @Override public void run() { System.out.println("run simple task!"); } }
还有一个用于将对象序列化到文件的工具类FileSerializer
:闭包
import java.io.{FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} object FileSerializer { def writeObjectToFile(obj: Object, file: String) = { val fileStream = new FileOutputStream(file) val oos = new ObjectOutputStream(fileStream) oos.writeObject(obj) oos.close() } def readObjectFromFile(file: String): Object = { val fileStream = new FileInputStream(file) val ois = new ObjectInputStream(fileStream) val obj = ois.readObject() ois.close() obj } }
简单起见,咱们采用将对象序列化到文件,而后经过反序列化执行的方式来模拟代码的分布式执行。SimpleTask就是咱们须要模拟分布式执行的代码。咱们先将SimpleTask
序列化到文件中:app
val task = new SimpleTask() FileSerializer.writeObjectToFile(task, "task.ser")
而后将SimpleTask
类从咱们的代码中删除,此时只有task.ser
文件中含有task对象的序列化数据。接下来咱们执行下面的代码:
val task = FileSerializer.readObjectFromFile("task.ser").asInstanceOf[Task] task.run()
请各位读者思考,上面的代码执行后会出现什么样的结果?
run simple task!
?run task!
?实际执行会出现形以下面的异常:
Exception in thread "main" java.lang.ClassNotFoundException: site.stanzhai.serialization.SimpleTask at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at site.stanzhai.serialization.FileSerializer$.readObjectFromFile(FileSerializer.scala:20)
从异常信息来看,反序列过程当中找不到SimpleTask
类。由此能够推断序列化后的数据是不包含类的定义信息的。那么,ObjectOutputStream
到底序列化了哪些信息呢?
对ObjectOutputStream
实现机制感兴趣的同窗能够去看下JDK中这个类的实现,ObjectOutputStream
序列化对象时,从父类的数据开始序列化到子类,若是override了writeObject方法,会反射调用writeObject来序列化数据。序列化的数据会按照如下的顺序以二进制的形式输出到OutputStream中:
回到咱们的问题上:Java的方法可否被序列化?经过咱们代码示例及分析,想必你们对这个问题应该清楚了。经过ObjectOutputStream
序列化对象,仅包含类的描述(而非定义),对象的状态数据,因为缺乏类的定义,也就是缺乏SimpleTask
的字节码,反序列化过程当中就会出现ClassNotFound的异常。
如何让咱们反序列化的对象能正常使用呢?咱们还须要了解类加载器。
ClassLoader在Java中是一个抽象类,ClassLoader的做用是加载类,给定一个类名,ClassLoader会尝试查找或生成类的定义,一种典型的加载策略是将类名对应到文件名上,而后从文件系统中加载class file。
在咱们的示例中,反序列化SimpleTask
失败,是由于JVM找不到类的定义,所以要确保正常反序列化,咱们必须将SimpleTask
的class文件保存下来,反序列化的时候可以让ClassLoader加载到SimpleTask
的class。
接下来,咱们对代码作一些改造,添加一个ClassManipulator
类,用于将对象的class文件导出到当前目录的文件中,默认的文件名就是对象的类名(不含包名):
object ClassManipulator { def saveClassFile(obj: AnyRef): Unit = { val classLoader = obj.getClass.getClassLoader val className = obj.getClass.getName val classFile = className.replace('.', '/') + ".class" val stream = classLoader.getResourceAsStream(classFile) // just use the class simple name as the file name val outputFile = className.split('.').last + ".class" val fileStream = new FileOutputStream(outputFile) var data = stream.read() while (data != -1) { fileStream.write(data) data = stream.read() } fileStream.flush() fileStream.close() } }
按照JVM的规范,假设对package.Simple
这样的一个类编译,编译后的class文件为package/Simple.class
,所以咱们能够根据路径规则,从当前JVM进程的Resource中获得指定类的class数据。
在删除SimpleTask
前,咱们除了将task序列化到文件外,还须要将task的class文件保存起来,执行完下面的代码,SimpleTask
类就能够从代码中剔除了:
val task = new SimpleTask() FileSerializer.writeObjectToFile(task, "task.ser") ClassManipulator.saveClassFile(task)
因为咱们保存class文件的方式比较特殊,既不在jar包中,也不是按package/ClassName.class这种标准的保存方式,所以还须要实现一个自定义的FileClassLoader
按照咱们保存class文件的方式来加载所需的类:
class FileClassLoader() extends ClassLoader { override def findClass(fullClassName: String): Class[_] = { val file = fullClassName.split('.').last + ".class" val in = new FileInputStream(file) val bos = new ByteArrayOutputStream val bytes = new Array[Byte](4096) var done = false while (!done) { val num = in.read(bytes) if (num >= 0) { bos.write(bytes, 0, num) } else { done = true } } val data = bos.toByteArray defineClass(fullClassName, data, 0, data.length) } }
ObjectInputStream
类用于对象的反序列化,在反序列化过程当中,它根据序列化数据中类的descriptor信息,调用resolveClass
方法加载对应的类,可是经过Class.forName
加载class使用的并非咱们自定义的FileClassLoader
,因此若是直接使用ObjectInputStream
进行反序列,依然会由于找不到类而报错,下面是resolveClass
的源码:
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { String name = desc.getName(); try { return Class.forName(name, false, latestUserDefinedLoader()); } catch (ClassNotFoundException ex) { Class<?> cl = primClasses.get(name); if (cl != null) { return cl; } else { throw ex; } } }
为了能让ObjectInputStream
在序列化的过程当中使用咱们自定义的ClassLoader,咱们还须要对FileSerializer
中的readObjectFromFile
方法作些改造,修改的代码以下:
def readObjectFromFile(file: String, classLoader: ClassLoader): Object = { val fileStream = new FileInputStream(file) val ois = new ObjectInputStream(fileStream) { override def resolveClass(desc: ObjectStreamClass): Class[_] = Class.forName(desc.getName, false, classLoader) } val obj = ois.readObject() ois.close() obj }
最后,咱们将反序列化的代码调整为:
val fileClassLoader = new FileClassLoader() val task = FileSerializer.readObjectFromFile("task.ser", fileClassLoader).asInstanceOf[Task] task.run()
反序列化的过程当中可以经过fileClassLoader加载到所需的类,这样咱们在执行就不会出错了,最终的执行结果为:run simple task!
。到此为止,咱们已经完整地模拟了代码分布式执行的过程。完整的示例代码,请参阅:https://github.com/stanzhai/jvm-exercise/tree/master/src/main/scala/site/stanzhai/exercise/serialization
咱们依然经过一个示例,快速了解下Scala对闭包的处理,下面是从Scala的REPL中执行的代码:
scala> val n = 2 n: Int = 2 scala> val f = (x: Int) => x * n f: Int => Int = <function1> scala> Seq.range(0, 5).map(f) res0: Seq[Int] = List(0, 2, 4, 6, 8)
f
是采用Scala的=>
语法糖定义的一个闭包,为了弄清楚Scala是如何处理闭包的,咱们继续执行下面的代码:
scala> f.getClass res0: Class[_ <: Int => Int] = class $anonfun$1 scala> f.isInstanceOf[Function1[Int, Int]] res1: Boolean = true scala> f.isInstanceOf[Serializable] res2: Boolean = true
能够看出f
对应的类为$anonfun$1
是Function1[Int, Int]
的子类,并且实现了Serializable
接口,这说明f
是能够被序列化的。
Spark对于数据的处理基本都是基于闭包,下面是一个简单的Spark分布式处理数据的代码片断:
val spark = SparkSession.builder().appName("demo").master("local").getOrCreate() val sc = spark.sparkContext val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data) val sum = distData.map(x => x * 2).sum() println(sum) // 30.0
对于distData.map(x => x * 2)
,map中传的一个匿名函数,也是一个很是简单的闭包,对distData
中的每一个元素*2,咱们知道对于这种形式的闭包,Scala编译后是能够序列化的,因此咱们的代码能正常执行也合情合理。将入咱们将处理函数的闭包定义到一个类中,而后将代码改造为以下形式:
class Operation { val n = 2 def multiply = (x: Int) => x * n } ... val sum = distData.map(new Operation().multiply).sum() ...
咱们在去执行,会出现什么样的结果呢?实际执行会出现这样的异常:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) ... Caused by: java.io.NotSerializableException: Operation
Scala在构造闭包的时候会肯定他所依赖的外部变量,并将它们的引用存到闭包对象中,这样能保证在不一样的做用域中调用闭包不出现问题。
出现Task not serializable
的异常,是因为咱们的multiply
函数依赖Operation
类的变量n
,虽然multiply是支持序列化的,可是Operation
不支持序列化,这致使multiply
函数在序列化的过程当中出现了NotSerializable
的异常,最终致使咱们的Task序列化失败。为了确保multiply
能被正常序列化,咱们须要想办法去除对Operation
的依赖,咱们将代码作以下修改,在去执行就能够了:
class Operation { def multiply = (x: Int) => x * 2 } ... val sum = distData.map(new Operation().multiply).sum() ...
Spark对闭包序列化前,会经过工具类org.apache.spark.util.ClosureCleaner
尝试clean掉闭包中无关的外部对象引用,ClosureCleaner
对闭包的处理是在运行期间,相比Scala编译器,能更精准的去除闭包中无关的引用。这样作,一方面能够尽量保证闭包可被序列化,另外一方面能够减小闭包序列化后的大小,便于网络传输。
咱们在开发Spark应用的时候,若是遇到Task not serializable
的异常,就须要考虑下,闭包中是否或引用了没法序列化的对象,有的话,尝试去除依赖就能够了。
Spark中实现的序列化工具备多个:
从SparkEnv
类的实现来看,用于闭包序列化的是JavaSerializer
:
JavaSerializer
内部使用的是ObjectOutputStream
将闭包序列化:
private[spark] class JavaSerializationStream( out: OutputStream, counterReset: Int, extraDebugInfo: Boolean) extends SerializationStream { private val objOut = new ObjectOutputStream(out) ... }
将闭包反序列化的核心代码为:
private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader) extends DeserializationStream { private val objIn = new ObjectInputStream(in) { override def resolveClass(desc: ObjectStreamClass): Class[_] = try { Class.forName(desc.getName, false, loader) } catch { case e: ClassNotFoundException => JavaDeserializationStream.primitiveMappings.getOrElse(desc.getName, throw e) } } ... }
关于ObjectInputStream
咱们前面已有介绍,JavaDeserializationStream
有个关键的成员变量loader
,它是个ClassLoader,可让Spark使用非默认的ClassLoader按照自定义的加载策略去加载class,这样才能保证反序列化过程在其余节点正常进行。
经过前面的介绍,想要代码在另外一端执行,只有序列化还不行,还须要保证执行端可以加载到闭包对应的类。接下来咱们探讨Spark加载class的机制。
一般状况下咱们会将开发的Spark Application打包为jar包,而后经过spark-submit
命令提交到集群运行,下面是一个官网的示例:
./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ ... \ --jars /path/to/dep-libs.jar \ /path/to/examples.jar \
此时,咱们编写的代码中所包含的闭包,对应的类已经被编译到jar包中了,因此Executor端只要能加载到这个jar包,从jar包中定位闭包的class文件,就能够将闭包反序列化了。事实上Spark也是这么作的。
Spark Application的Driver端在运行的时候会基于netty创建一个文件服务,咱们运行的jar包,及--jars
中指定的依赖jar包,会被添加到文件服务器中。这个过程在SparkContext
的addJar方法中完成:
/** * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */ def addJar(path: String) { if (path == null) { logWarning("null specified as parameter to addJar") } else { var key = "" if (path.contains("\\")) { // For local paths with backslashes on Windows, URI throws an exception key = env.rpcEnv.fileServer.addJar(new File(path)) } else { val uri = new URI(path) // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies Utils.validateURL(uri) key = uri.getScheme match { // A JAR file which exists only on the driver node case null | "file" => try { env.rpcEnv.fileServer.addJar(new File(uri.getPath)) } catch { case exc: FileNotFoundException => logError(s"Jar not found at $path") null } // A JAR file which exists locally on every worker node case "local" => "file:" + uri.getPath case _ => path } } if (key != null) { val timestamp = System.currentTimeMillis if (addedJars.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added JAR $path at $key with timestamp $timestamp") postEnvironmentUpdate() } } } }
Executor端在执行任务的时候,会从任务信息中获得依赖的jar包,而后updateDependencies
从Driver端的文件服务器下载缺失的jar包,并将jar包添加到URLClassLoader中,最后再将task反序列化,反序列化前所需的jar都已准备好,所以可以将task中的闭包正常反序列化,核心代码以下:
override def run(): Unit = { ... try { val (taskFiles, taskJars, taskProps, taskBytes) = Task.deserializeWithDependencies(serializedTask) // Must be set before updateDependencies() is called, in case fetching dependencies // requires access to properties contained within (e.g. for access control). Executor.taskDeserializationProps.set(taskProps) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) ... } finally { runningTasks.remove(taskId) } }
这么来看,整个Spark Application分布式加载class的机制就比较清晰了。Executor端可以正常加载class,反序列化闭包,分布式执行代码天然就不存在什么问题了。
spark-shell
是Spark为咱们提供的一个REPL的工具,可让咱们很是方便的写一些简单的数据处理脚本。下面是一个运行在spark-shell
的代码:
scala> val f = (x: Int) => x + 1 f: Int => Int = <function1> scala> val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5) scala> val distData = sc.parallelize(data) distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26 scala> distData.map(f).sum() res0: Double = 20.0
咱们已知,闭包f
会被Scala编译为匿名类,若是要将f
序列化到Executor端执行,必需要加载f
对应的匿名类的class数据,才能正常反序列化。
Spark是如何获得f
的class数据的?Executor又是如何加载到的?
源码面前,了无秘密。咱们看一下Spark的repl项目的代码入口,核心代码以下:
object Main extends Logging { ... val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") def main(args: Array[String]) { doMain(args, new SparkILoop) } // Visible for testing private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = { interp = _interp val jars = Utils.getUserJars(conf, isShell = true).mkString(File.pathSeparator) val interpArguments = List( "-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-classpath", jars ) ++ args.toList val settings = new GenericRunnerSettings(scalaOptionError) settings.processArguments(interpArguments, true) if (!hasErrors) { interp.process(settings) // Repl starts and goes in loop of R.E.P.L Option(sparkContext).map(_.stop) } } ... }
Spark2.1.0的REPL基于Scala-2.11的scala.tools.nsc
编译工具实现,代码已经至关简洁,Spark给interp
设置了2个关键的配置-Yrepl-class-based
和-Yrepl-outdir
,经过这两个配置,咱们在shell中输入的代码会被编译为class文件输出到执行的文件夹中。若是指定了spark.repl.classdir
配置,会用这个配置的路径做为class文件的输出路径,不然使用SPARK_LOCAL_DIRS
对应的路径。下面是我测试过程当中输出到文件夹中的class文件:
咱们已经清楚Spark如何将shell中的代码编译为class了,那么Executor端,如何加载到这些class文件呢?在org/apache/spark/executor/Executor.scala
中有段和REPL相关的代码:
private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) /** * If the REPL is in use, add another ClassLoader that will read * new classes defined by the REPL as the user types code */ private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = { val classUri = conf.get("spark.repl.class.uri", null) if (classUri != null) { logInfo("Using REPL class URI: " + classUri) try { val _userClassPathFirst: java.lang.Boolean = userClassPathFirst val klass = Utils.classForName("org.apache.spark.repl.ExecutorClassLoader") .asInstanceOf[Class[_ <: ClassLoader]] val constructor = klass.getConstructor(classOf[SparkConf], classOf[SparkEnv], classOf[String], classOf[ClassLoader], classOf[Boolean]) constructor.newInstance(conf, env, classUri, parent, _userClassPathFirst) } catch { case _: ClassNotFoundException => logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!") System.exit(1) null } } else { parent } } override def run(): Unit = { ... Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() ... }
Executor启动时会判断是否为REPL模式,若是是的话会使用ExecutorClassLoader
作为反序列闭包时所使用的ClassLoader,ExecutorClassLoader
会经过网络从Driver端(也就是执行spark-shell
的节点)加载所需的class文件。这样咱们在spark-shell
中写的代码就能够分布式执行了。
Spark实现代码的分布式执行有2个关键点:
知足以上2个条件,咱们的代码就能够分布式运行了。
固然,构建一个完整的分布式计算框架,还须要有网络通讯框架、RPC、文件传输服务等做为支撑,在了解Spark代码分布式执行原理的基础上,相信读者已有思路基于JVM相关的语言构建分布式计算服务。
类比其余非JVM相关的语言,实现一个分布式计算框架,依然是须要解决序列化,动态加载执行代码的问题。