Spark on Yarn是将yarn做为ClusterManager的运行模式,Spark会将资源(container)的管理与协调统一交给yarn去处理。html
Spark on Yarn分为client/cluster模式:
对于client模式,Spark程序的Driver/SparkContext实例用户提交机上,该机器能够位于yarn集群以内或以外,只须要起能正常与ResourceManager通讯及正确配置HADOOP_CONF_DIR或YARN_CONF_DIR环境变量指向yarn集群。生产环境中,一般提交机不会是yarn集群内部的节点,手握配置权限的状况下,能够按需配置支撑Spark程序须要的软件、环境、文件等。
对于cluster模式,Spark程序的Driver/SparkContext实例位于ApplicationMaster(am)中,am做为一个container能够起在yarn集群中任何一个NodeManager上,默认状况下,咱们就须要为全部的节点机器准备好Spark程序须要的全部运行环境。java
Python提供了很是丰富的数学运算、机器学习处理库——如numpy、pandas、scipy等等。愈来愈多的同事但愿利用这些高效的库开发各类算法而后以PySpark程序跑到咱们的Spark上。python
对于scala/java写的Spark程序,咱们能够将咱们所依赖的jar一块儿与咱们的main函数所在的主程序打成一个fat jar,经过spark-submit提交后,这些依赖就会经过Yarn的Distribute Cache分发到全部节点支撑运行。
对于python写的Spark程序若是有外部依赖就很尴尬了,python自己就是两种语言,在全部NodeManager节点上安装你全部须要的依赖对于IT运维人员也是一个很是痛苦的事情。linux
参考官方文档算法
For Python, you can use the
--py-files
argument ofspark-submit
to add.py
,.zip
or.egg
files to be distributed with your application. If you depend on multiple Python files we recommend
packaging them into a.zip
or.egg
.apache
--py-files,能够解决部分依赖的问题,但对于有些场景就可能不是很方便,或者不可能实现。bash
对于这些问题 ,社区也有相关的讨论,详细能够看下 这个ticket https://issues.apache.org/jira/browse/SPARK-13587网络
pyspark原理的资料比较少,能够看下wikiapp
https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals运维
能够看下上面连接中的图,图中左右分为driver/executor, 图白色和绿色分python和java,能够看到无论PySpark适宜client仍是cluster模式跑在yarn上,driver和executor端都有python的进程起着,这就须要集群中的全部节点都有相应的python依赖环境。
从灵活性的角度来说,这里从前辈的讨论中总结一下,提供一种在运行时建立python运行及相关依赖的办法
一、下载并安装anaconda
https://www.anaconda.com/download/#linux
二、安装anaconda
sh Anaconda2-5.0.1-Linux-x86_64.sh
三、建立须要的依赖环境 conda create
/home/username/anaconda3/bin/conda create --prefix=/home/username/name1/tools/anaconda2/envs/projname_py36 python==3.6 # 查当作功的环境 $ conda env list |grep projname_py36
第一次根据网络状况下载上述这些依赖,可能会比较久,之后就会快不少。
du -sh projname_py36 847M projname_py36
能够看到依赖包整个大小仍是挺大的,对于一些实时性比较高的场景这种方式其实不太有利,有些不须要的依赖在建立的时候能够不打进去。固然咱们还须要zip压缩一下,能够减少部分网络开销。固然若是咱们把这个环境直接提早put到hdfs,也就没有这个问题了。
# 附:若是后续每次都要使用这个 conda projname_py36 环境,能够作成自动加载 conda 配置并初始化 #!/usr/bin/env bash # set_conda_env.sh export CONDA_HOME=/home/username/anaconda3 export JAVA_HOME=/opt/soft/jdk/jdk1.7.0_40 export JRE_HOME=/opt/soft/jdk/jdk1.7.0_40/jre export HAADOOP_HOME=/usr/lib/software/hadoop export SPARK_HOME=/usr/lib/software/spark/spark-2.1 export PATH=$SPARK_HOME/bin:$SPARK_HOME/python:$PATH:$CONDA_HOME/bin/conda export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH export PATH=/home/username/anaconda3/bin:$PATH source activate base conda deactivate conda activate /home/username/ooxx/tools/anaconda2/envs/projname_py36 export PYTHONUTF8=1 而后执行: . set_conda_env.sh
四、环境与依赖总体打包上传 HDFS
zip -r projname_py36_env.zip ./projname_py36/ #unzip -t projname_py36_env.zip|grep bin/python # testing: projname_py36/bin/python3 OK # testing: projname_py36/bin/python3.6m OK # testing: projname_py36/bin/python3.6 OK # testing: projname_py36/bin/python OK # testing: projname_py36/bin/python3-config OK # testing: projname_py36/bin/python3.6m-config OK # testing: projname_py36/bin/python3.6-config OK hadoop fs -put projname_py36_env.zip /tmp/hadoop-username/projname/ hadoop fs -ls /tmp/hadoop-username/projname/ rm -rf projname_py36_env.zip
这样咱们就能够经过 --archives hdfs://hdp-66-cluster/tmp/hadoop-username/projname/projname_py36_env.zip#PyEnv 的方式将python及其依赖环境上传并分发到spark各个进程的working dir。
为了节约时间,直接从spark示例代码里拷一个出来测试,而且以 cluster 模式提交:
# correlations_example1.py from __future__ import print_function import numpy as np from pyspark import SparkContext # $example on$ from pyspark.mllib.stat import Statistics # $example off$ if __name__ == "__main__": sc = SparkContext(appName="CorrelationsExample") # SparkContext # $example on$ seriesX = sc.parallelize([1.0, 2.0, 3.0, 3.0, 5.0]) # a series # seriesY must have the same number of partitions and cardinality as seriesX seriesY = sc.parallelize([11.0, 22.0, 33.0, 33.0, 555.0]) # Compute the correlation using Pearson's method. Enter "spearman" for Spearman's method. # If a method is not specified, Pearson's method will be used by default. print("Correlation is: " + str(Statistics.corr(seriesX, seriesY, method="pearson"))) data = sc.parallelize( [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([5.0, 33.0, 366.0])] ) # an RDD of Vectors # calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method. # If a method is not specified, Pearson's method will be used by default. print(Statistics.corr(data, method="pearson")) rdd = sc.parallelize([str(Statistics.corr(data, method="pearson"))]) rdd.saveAsTextFile("hdfs://hdp-66-cluster/tmp/username/name2/correlations_example.txt") # $example off$ sc.stop()
# 完整 cluster 提交流程 (1) cp /usr/lib/software/spark/spark-2.1/examples/src/main/python/mllib/correlations_example.py ~/projname/correlations_example1.py (2) /usr/lib/software/spark/spark-2.3.2-bin-U1.1/bin/spark-submit \ --master yarn \ --deploy-mode cluster \ --queue root.spark.username.spark \ --num-executors 8 \ --executor-cores 1 \ --archives hdfs://hdp-66-cluster/tmp/hadoop-username/projname/projname_py36_env.zip#PyEnv \ --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=PyEnv/projname_py36/bin/python \ --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=PyEnv/projname_py36/bin/python \ --conf spark.executorEnv.PYSPARK_PYTHON=PyEnv/projname_py36/bin/python \ --conf spark.executorEnv.PYSPARK_DRIVER_PYTHON=PyEnv/projname_py36/bin/python \ #--py-files a.py,b.py,c.py \ ~/projname/correlations_example1.py #--archives后面的参数默认是找本地路径的文件,只有加上hdfs://host:port/path才会找hdfs上的文件 (3) hadoop fs -cat hdfs://hdp-66-cluster/tmp/hadoop-username/projname/correlations_example.txt/*
[1] PySpark on Yarn的相关依赖的解决方式