PySpark 的背后原理--在Driver端,经过Py4j实如今Python中调用Java的方法.pyspark.executor 端一个Executor上同时运行多少个Task,就会有多少个...

PySpark 的背后原理

Spark主要是由Scala语言开发,为了方便和其余系统集成而不引入scala相关依赖,部分实现使用Java语言开发,例如External Shuffle Service等。整体来讲,Spark是由JVM语言实现,会运行在JVM中。然而,Spark除了提供Scala/Java开发接口外,还提供了Python、R等语言的开发接口,为了保证Spark核心实现的独立性,Spark仅在外围作包装,实现对不一样语言的开发支持,本文主要介绍Python Spark的实现原理,剖析pyspark应用程序是如何运行起来的。html

Spark运行时架构

首先咱们先回顾下Spark的基本运行时架构,以下图所示,其中橙色部分表示为JVM,Spark应用程序运行时主要分为Driver和Executor,Driver负载整体调度及UI展现,Executor负责Task运行,Spark能够部署在多种资源管理系统中,例如Yarn、Mesos等,同时Spark自身也实现了一种简单的Standalone(独立部署)资源管理系统,能够不用借助其余资源管理系统便可运行。更多细节请参考Spark Scheduler内部原理剖析python

spark-structure

用户的Spark应用程序运行在Driver上(某种程度上说,用户的程序就是Spark Driver程序),通过Spark调度封装成一个个Task,再将这些Task信息发给Executor执行,Task信息包括代码逻辑以及数据信息,Executor不直接运行用户的代码。apache

PySpark运行时架构

为了避免破坏Spark已有的运行时架构,Spark在外围包装一层Python API,借助Py4j实现Python和Java的交互,进而实现经过Python编写Spark应用程序,其运行时架构以下图所示。服务器

pyspark-structure

 

 

Application Properties----python 2.4开始支持限制pyspark executor内存

Property Name Default Meaning
spark.app.name (none) The name of your application. This will appear in the UI and in log data.
spark.driver.cores 1 Number of cores to use for the driver process, only in cluster mode.
spark.driver.maxResultSize 1g Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors.
spark.driver.memory 1g Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m2g).
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.
spark.driver.memoryOverhead driverMemory * 0.10, with minimum of 384 The amount of off-heap memory to be allocated per driver in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). This option is currently supported on YARN and Kubernetes.
spark.executor.memory 1g Amount of memory to use per executor process, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m2g).
spark.executor.pyspark.memory Not set The amount of memory to be allocated to PySpark in each executor, in MiB unless otherwise specified. If set, PySpark memory for an executor will be limited to this amount. If not set, Spark will not limit Python's memory use and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. NOTE: Python memory usage may not be limited on platforms that do not support resource limiting, such as Windows.

 

其中白色部分是新增的Python进程,在Driver端,经过Py4j实如今Python中调用Java的方法,即将用户写的PySpark程序”映射”到JVM中,例如,用户在PySpark中实例化一个Python的SparkContext对象,最终会在JVM中实例化Scala的SparkContext对象;在Executor端,则不须要借助Py4j,由于Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码,虽然里面可能包含有用户定义的Python函数或Lambda表达式,Py4j并不能实如今Java里调用Python的方法,为了能在Executor端运行用户定义的Python函数或Lambda表达式,则须要为每一个Task单独启一个Python进程,经过socket通讯方式将Python函数或Lambda表达式发给Python进程执行。语言层面的交互整体流程以下图所示,实线表示方法调用,虚线表示结果返回。架构

pyspark-call

下面分别详细剖析PySpark的Driver是如何运行起来的以及Executor是如何运行Task的。app

Driver端运行原理

当咱们经过spark-submmit提交pyspark程序,首先会上传python脚本及依赖,并申请Driver资源,当申请到Driver资源后,会经过PythonRunner(其中有main方法)拉起JVM,以下图所示。less

pyspark-driver-runtime

PythonRunner入口main函数里主要作两件事:机器学习

  • 开启Py4j GatewayServer
  • 经过Java Process方式运行用户上传的Python脚本

用户Python脚本起来后,首先会实例化Python版的SparkContext对象,在实例化过程当中会作两件事:socket

  • 实例化Py4j GatewayClient,链接JVM中的Py4j GatewayServer,后续在Python中调用Java的方法都是借助这个Py4j Gateway
  • 经过Py4j Gateway在JVM中实例化SparkContext对象

通过上面两步后,SparkContext对象初始化完毕,Driver已经起来了,开始申请Executor资源,同时开始调度任务。用户Python脚本中定义的一系列处理逻辑最终遇到action方法后会触发Job的提交,提交Job时是直接经过Py4j调用Java的PythonRDD.runJob方法完成,映射到JVM中,会转给sparkContext.runJob方法,Job运行完成后,JVM中会开启一个本地Socket等待Python进程拉取,对应地,Python进程在调用PythonRDD.runJob后就会经过Socket去拉取结果。函数

把前面运行时架构图中Driver部分单独拉出来,以下图所示,经过PythonRunner入口main函数拉起JVM和Python进程,JVM进程对应下图橙色部分,Python进程对应下图白色部分。Python进程经过Py4j调用Java方法提交Job,Job运行结果经过本地Socket被拉取到Python进程。还有一点是,对于大数据量,例如广播变量等,Python进程和JVM进程是经过本地文件系统来交互,以减小进程间的数据传输。

pyspark-driver

Executor端运行原理

为了方便阐述,以Spark On Yarn为例,当Driver申请到Executor资源时,会经过CoarseGrainedExecutorBackend(其中有main方法)拉起JVM,启动一些必要的服务后等待Driver的Task下发,在尚未Task下发过来时,Executor端是没有Python进程的。当收到Driver下发过来的Task后,Executor的内部运行过程以下图所示。

pyspark-executor-runtime

Executor端收到Task后,会经过launchTask运行Task,最后会调用到PythonRDD的compute方法,来处理一个分区的数据,PythonRDD的compute方法的计算流程大体分三步走:

  • 若是不存在pyspark.deamon后台Python进程,那么经过Java Process的方式启动pyspark.deamon后台进程,注意每一个Executor上只会有一个pyspark.deamon后台进程,不然,直接经过Socket链接pyspark.deamon,请求开启一个pyspark.worker进程运行用户定义的Python函数或Lambda表达式。pyspark.deamon是一个典型的多进程服务器,来一个Socket请求,fork一个pyspark.worker进程处理,一个Executor上同时运行多少个Task,就会有多少个对应的pyspark.worker进程。
  • 紧接着会单独开一个线程,给pyspark.worker进程喂数据,pyspark.worker则会调用用户定义的Python函数或Lambda表达式处理计算。
  • 在一边喂数据的过程当中,另外一边则经过Socket去拉取pyspark.worker的计算结果。

把前面运行时架构图中Executor部分单独拉出来,以下图所示,橙色部分为JVM进程,白色部分为Python进程,每一个Executor上有一个公共的pyspark.deamon进程,负责接收Task请求,并fork pyspark.worker进程单独处理每一个Task,实际数据处理过程当中,pyspark.worker进程和JVM Task会较频繁地进行本地Socket数据通讯。

pyspark-executor.png

总结

整体上来讲,PySpark是借助Py4j实现Python调用Java,来驱动Spark应用程序,本质上主要仍是JVM runtime,Java到Python的结果返回是经过本地Socket完成。虽然这种架构保证了Spark核心代码的独立性,可是在大数据场景下,JVM和Python进程间频繁的数据通讯致使其性能损耗较多,恶劣时还可能会直接卡死,因此建议对于大规模机器学习或者Streaming应用场景仍是慎用PySpark,尽可能使用原生的Scala/Java编写应用程序,对于中小规模数据量下的简单离线任务,可使用PySpark快速部署提交。

转载请注明出处,本文永久连接:http://sharkdtu.com/posts/pyspark-internal.html

相关文章
相关标签/搜索