本博客内容基于Spark2.2版本,在阅读文章并想实际操做前,请确保你有:java
spark-submit --master yarn xxxx
的任务提交老版本任务提交是基于启动本地进程,执行脚本spark-submit xxx
** 的方式作的。其中一个关键的问题就是得到提交Spark任务的Application-id,由于这个id是跟任务状态的跟踪有关系的。若是你的资源管理框架用的是yarn,应该知道每一个运行的任务都有一个applicaiton_id,这个id的生成规则是:node
appplication_时间戳_数字
老版本的spark经过修改SparkConf参数spark.app.id
就能够手动指定id,新版本的代码是直接读取的taskBackend中的applicationId()方法,这个方法具体的实现是根据实现类来定的。在yarn中,是经过Yarn的YarnClusterSchedulerBackend实现的。sql
感兴趣的同窗能够看一下,生成applicaiton_id的逻辑在hadoop-yarn工程的ContainerId中定义。数据库
总结一句话就是,想要自定义id,甭想了!!!!apache
因而当时脑壳瓜不灵光的我,就想到那就等应用建立好了以后,直接写到数据库里面呗。怎么写呢?服务器
仍是归结于互联网时代的信息大爆炸,我看到群友的聊天,知道了SparkLauncer这个东西,调查后发现他能够基于Java代码自动提交Spark任务。SparkLauncher支持两种模式:app
固然是更倾向于第二种啦,由于好处不少:框架
首先建立一个最基本的Spark程序:dom
import org.apache.spark.sql.SparkSession; import java.util.ArrayList; import java.util.List; public class HelloWorld { public static void main(String[] args) throws InterruptedException { SparkSession spark = SparkSession .builder() //.master("yarn") //.appName("hello-wrold") //.config("spark.some.config.option", "some-value")
.getOrCreate(); List<Person> persons = new ArrayList<>(); persons.add(new Person("zhangsan", 22, "male")); persons.add(new Person("lisi", 25, "male")); persons.add(new Person("wangwu", 23, "female")); spark.createDataFrame(persons, Person.class).show(false); spark.close(); } }
而后建立SparkLauncher类:ide
import org.apache.spark.launcher.SparkAppHandle; import org.apache.spark.launcher.SparkLauncher; import java.io.IOException; public class Launcher { public static void main(String[] args) throws IOException { SparkAppHandle handler = new SparkLauncher() .setAppName("hello-world") .setSparkHome(args[0]) .setMaster(args[1]) .setConf("spark.driver.memory", "2g") .setConf("spark.executor.memory", "1g") .setConf("spark.executor.cores", "3") .setAppResource("/home/xinghailong/launcher/launcher_test.jar")
//此处应写类的全限定名 .setMainClass("HelloWorld") .addAppArgs("I come from Launcher") .setDeployMode("cluster") .startApplication(new SparkAppHandle.Listener(){ @Override public void stateChanged(SparkAppHandle handle) { System.out.println("********** state changed **********"); } @Override public void infoChanged(SparkAppHandle handle) { System.out.println("********** info changed **********"); } }); while(!"FINISHED".equalsIgnoreCase(handler.getState().toString()) && !"FAILED".equalsIgnoreCase(handler.getState().toString())){ System.out.println("id "+handler.getAppId()); System.out.println("state "+handler.getState()); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
打包完成后上传到部署Spark的服务器上。因为SparkLauncher所在的类引用了SparkLauncher,因此还须要把这个jar也上传到服务器上。
[xiangcong@hnode10 launcher]$ ls launcher_test.jar spark-launcher_2.11-2.2.0.jar [xiangcong@hnode10 launcher]$ pwd /home/xiangcong/launcher
因为SparkLauncher须要指定SPARK_HOME,所以若是你的机器能够执行spark-submit,那么就看一下spark-submit里面,SPARK_HOME是在哪
[xiangcong@hnode10 launcher]$ which spark2-submit /var/lib/hadoop-hdfs/bin/spark2-submit
最后几行就能看到:
export SPARK2_HOME=/var/lib/hadoop-hdfs/app/spark # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 exec "${SPARK2_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
综上,咱们须要的是:
而后执行命令启动测试:
java -Djava.ext.dirs=/home/xinghailong/launcher -cp launcher_test.jar Launcher /var/lib/hadoop-hdfs/app/spark yarn
说明:
-Djava.ext.dirs
设置当前目录为java类加载的目录观察发现成功启动运行了:
id null state UNKNOWN Mar 10, 2018 12:00:52 PM org.apache.spark.launcher.OutputRedirector redirect INFO: 18/03/10 12:00:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable ********** state changed ********** ...省略一大堆拷贝jar的日志 ********** info changed **********
********** state changed ********** Mar 10, 2018 12:00:55 PM org.apache.spark.launcher.OutputRedirector redirect INFO: 18/03/10 12:00:55 INFO yarn.Client: Application report for application_1518263195995_37615 (state: ACCEPTED) ... 省略一堆重定向的日志 application_1518263195995_37615 (state: ACCEPTED) id application_1518263195995_37615 state SUBMITTED Mar 10, 2018 12:01:00 PM org.apache.spark.launcher.OutputRedirector redirect INFO: 18/03/10 12:01:00 INFO yarn.Client: Application report for application_1518263195995_37615 (state: RUNNING) ********** state changed ********** ... 省略一堆重定向的日志 INFO: user: hdfs ********** state changed ********** Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Shutdown hook called Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f07e0213-61fa-4710-90f5-2fd2030e0701
这样就实现了基于Java应用提交Spark任务,并得到其Appliation_id和状态进行定位跟踪的需求了。