spark定时提交job

本文内容:提交spark job到spark集群,并使用crontab指定计划任务,定时提交spark job。java


任务提交

进入${SPARK_HOME}/bin目录,有一个spark-submit的文件,就是经过脚本提交spark job的。python

vim ./spark-submit 看看里面都干了啥。linux

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

首先找了一下环境变量,而后source了一下,这里并无提交什么东西,关键是最后一行,执行${SPARK_HOME}/bin目录下的spark-class,带了一个org.apache.spark.deploy.SparkSubmit参数,而且把参数传给了spark-class,基本能够判断spark-submit并非真正提交job的地方。web

vim ./spark-classshell

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

. "${SPARK_HOME}"/bin/load-spark-env.sh

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
...

# Find Spark jars.
if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
...

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi


# command array and checks the value to see if the launcher succeeded.
build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <(build_command "$@")

...

CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

这里只给出了部分源码,代码有点长,看注释基本就能明白这里作了什么事:apache

  1. 首先这里又source了一下环境变量,可见在linux下环境变量的重要性,无时无刻都要注意环境变量。
  2. 而后找到Java的环境变量。再而后去找spark中内置的jar包,这个目录在${SPARK_HOME}/jars下,里面包含了spark基本jar包,因此在项目打包的时候这里面的jar包就不须要打进去了,生命周期设置为provided。
  3. 找scala的环境变量,构建一个LAUNCH_CLASSPATH。
  4. 构建命令CMD,而且带上原始输入的参数。
  5. 最后执行${CMD}。

最终是调用了org.apache.spark.launcher.Main这个类,而后把输入的参数给了这个Main中的main(String[] argsArray)方法。绕了好大一圈仍是回到了Java,这里的Main是Java的一个类,具体源码自行去阅读(真的有点复杂,由于这里的java代码又调用了scala...)。vim

提交参数说明

上问分析源码的时候屡次看到了 $@ 这个字符,做用就是获取全部的输入参数,如今就来分析一下这些参数里都有什么。app

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]
  • --class 指出你须要运行的类,好比:org.apache.spark.examples.SparkPi
  • --master 集群主节点的URL,若是是提交到spark集群,spark提供的web界面中有这个URL。而且推荐使用RESTURL。好比:spark://23.195.26.187:6066。 spark job的部署模式有多种:local[N] (N表示开启的线程数) Spark Standalone、Mesos、Yarn、Kubernetes。
  • --deploy-medo 这个参数决定了spark job的运行模式,集群或者是本地(cluster or client)默认为client。
  • conf 自定义的一些配置,以键值对的形式添加。
  • <application-jar> 对应spark job所在的jar包的位置。建议使用绝对路径。
  • [application-arguments] main()函数中的参数。

如下是spark官网给出的示例:dom

# Run application locally on 8 cores
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8] \
  /path/to/examples.jar \
  100

# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000

# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000

# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode cluster \  # can be client for client mode
  --executor-memory 20G \
  --num-executors 50 \
  /path/to/examples.jar \
  1000

# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
  --master spark://207.184.161.138:7077 \
  examples/src/main/python/pi.py \
  1000

# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master mesos://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  http://path/to/examples.jar \
  1000

# Run on a Kubernetes cluster in cluster deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master k8s://xx.yy.zz.ww:443 \
  --deploy-mode cluster \
  --executor-memory 20G \
  --num-executors 50 \
  http://path/to/examples.jar \
  1000

定时任务实现

http://www.javashuo.com/article/p-xrkmoeet-dg.htmlide

这篇文章给出了如何指定计划任务,这里须要用到计划任务。把上文中的提交job的shell写到一个脚本中去,而后 crontab -e 新建一个计划任务,而后设置时间定时提交job。

# 每两个小时的第0分钟执行一次
0 */2 * * * . /etc/profile;/bin/sh /usr/hdp/spark/tmp/submit-spark-job.sh

而后重启crond服务:

systemctl restart crond.service
crontab -l # 查看当前计划任务
相关文章
相关标签/搜索