Yarn上常驻Spark-Streaming程序调优

对于长时间运行的Spark Streaming做业,一旦提交到YARN群集便须要永久运行,直到有意中止。任何中断都会引发严重的处理延迟,并可能致使数据丢失或重复。YARN和Apache Spark都不是为了执行长时间运行的服务而设计的。可是,它们已经成功地知足了近实时数据处理做业的常驻需求。成功并不必定意味着没有技术挑战。shell

这篇博客总结了在安全的YARN集群上,运行一个关键任务且长时间的Spark Streaming做业的经验。您将学习如何将Spark Streaming应用程序提交到YARN群集,以免在值班时候的不眠之夜。apache

 

Fault tolerance

 在YARN集群模式下,Spark驱动程序与Application Master(应用程序分配的第一个YARN容器)在同一容器中运行。此过程负责从YARN 驱动应用程序和请求资源(Spark执行程序)。重要的是,Application Master消除了在应用程序生命周期中运行的任何其余进程的须要。即便一个提交Spark Streaming做业的边缘Hadoop节点失败,应用程序也不会受到影响。编程

要以集群模式运行Spark Streaming应用程序,请确保为spark-submit命令提供如下参数:缓存

spark-submit --master yarn --deploy-mode cluster

因为Spark驱动程序和Application Master共享一个JVM,Spark驱动程序中的任何错误都会阻止咱们长期运行的工做。幸运的是,能够配置从新运行应用程序的最大尝试次数。设置比默认值2更高的值是合理的(从YARN集群属性yarn.resourcemanager.am.max尝试中导出)。对我来讲,4工做至关好,即便失败的缘由是永久性的,较高的值也可能致使没必要要的从新启动。安全

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4

若是应用程序运行数天或数周,而不从新启动或从新部署在高度使用的群集上,则可能在几个小时内耗尽4次尝试。为了不这种状况,尝试计数器应该在每一个小时都重置。app

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h

 

另外一个重要的设置是在应用程序发生故障以前executor失败的最大数量。默认状况下是max(2 * num executors,3),很是适合批处理做业,但不适用于长时间运行的做业。该属性具备相应的有效期间,也应设置。dom

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h

对于长时间运行的做业,您也能够考虑在放弃做业以前提升任务失败的最大数量。默认状况下,任务将重试4次,而后做业失败。jvm

复制代码
spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8
复制代码

 

Performance

当Spark Streaming应用程序提交到集群时,必须定义运行做业的YARN队列。我强烈建议使用YARN Capacity Scheduler并将长时间运行的做业提交到单独的队列。没有一个单独的YARN队列,您的长时间运行的工做早晚将被的大量Hive查询抢占。函数

复制代码
spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    --queue realtime_queue
复制代码

 

Spark Streaming工做的另外一个重要问题是保持处理时间的稳定性和高度可预测性。处理时间应保持在批次持续时间如下以免延误。我发现Spark的推测执行有不少帮助,特别是在繁忙的群集中。当启用推测性执行时,批处理时间更加稳定。只有当Spark操做是幂等时,才能启用推测模式。工具

复制代码
spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    --queue realtime_queue \
    --conf spark.speculation=true
复制代码

 

Security

在安全的HDFS群集上,长时间运行的Spark Streaming做业因为Kerberos票据到期而失败。没有其余设置,当Spark Streaming做业提交到集群时,会发布Kerberos票证。当票证到期时Spark Streaming做业不能再从HDFS写入或读取数据。

在理论上(基于文档),应该将Kerberos主体和keytab做为spark-submit命令传递:

复制代码
spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab
复制代码

 

实际上,因为几个错误(HDFS-9276SPARK-11182)必须禁用HDFS缓存。若是没有,Spark将没法从HDFS上的文件读取更新的令牌。

复制代码
spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab \
     --conf spark.hadoop.fs.hdfs.impl.disable.cache=true
复制代码

Mark Grover指出,这些错误只影响在HA模式下配置了NameNodes的HDFS集群。谢谢,马克

 

 

Logging

访问Spark应用程序日志的最简单方法是配置Log4j控制台追加程序,等待应用程序终止并使用yarn logs -applicationId [applicationId]命令。不幸的是终止长时间运行的Spark Streaming做业来访问日志是不可行的。

我建议安装和配置Elastic,Logstash和Kibana(ELK套装)。ELK的安装和配置是超出了这篇博客的范围,但请记住记录如下上下文字段:

  • YARN application id
  • YARN container hostname
  • Executor id (Spark driver is always 000001, Spark executors start from 000002)
  • YARN attempt (to check how many times Spark driver has been restarted)

Log4j配置使用Logstash特定的appender和布局定义应该传递给spark-submit命令:

复制代码
spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab \
     --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
     --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --files /path/to/log4j.properties
复制代码

最后,Spark Job的Kibana仪表板可能以下所示:

 

 

Monitoring

长时间运行的工做全天候运行,因此了解历史指标很重要。Spark UI仅在有限数量的批次中保留统计信息,而且在从新启动后,全部度量标准都消失了。再次,须要外部工具。我建议安装Graphite用于收集指标和Grafana来创建仪表板。

首先,Spark须要配置为将指标报告给Graphite,准备metrics.properties文件:

 

复制代码
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=[hostname]
*.sink.graphite.port=[port]
*.sink.graphite.prefix=some_meaningful_name

driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
复制代码

 

Graceful stop

最后一个难题是如何以优雅的方式中止部署在YARN上的Spark Streaming应用程序。中止(甚至杀死)YARN应用程序的标准方法是使用命令yarn application -kill [applicationId]。这个命令会中止Spark Streaming应用程序,但这可能发生在批处理中。所以,若是该做业是从Kafka读取数据而后在HDFS上保存处理结果,并最终提交Kafka偏移量,看成业在提交偏移以前中止工做时,您应该预见到HDFS会有重复的数据。

解决优雅关机问题的第一个尝试是在关闭程序时回调Spark Streaming Context的中止方法。

sys.addShutdownHook {
    streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}

使人失望的是,因为Spark应用程序几乎当即被杀死,一个退出回调函数来不及完成已启动的批处理任务。此外,不能保证JVM会调用shutdown hook。

在撰写本博客文章时,惟一确认的YARN Spark Streaming应用程序的确切方法是通知应用程序关于计划关闭,而后以编程方式中止流式传输(但不是关闭挂钩)。命令yarn application -kill 若是通知应用程序在定义的超时后没有中止,则应该仅用做最后手段。

可使用HDFS上的标记文件(最简单的方法)或使用驱动程序上公开的简单Socket / HTTP端点(复杂方式)通知应用程序。

由于我喜欢KISS原理,下面你能够找到shell脚本伪代码,用于启动/中止Spark Streaming应用程序使用标记文件:

复制代码
start() {
    hdfs dfs -touchz /path/to/marker/my_job_unique_name
    spark-submit ...
}

stop() {
    hdfs dfs -rm /path/to/marker/my_job_unique_name
    force_kill=true
    application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
    for i in `seq 1 10`; do
        application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
        if [ -n "$application_status" ]; then
            sleep 60s
        else
            force_kill=false
            break
        fi
    done
    $force_kill && yarn application -kill ${application_id}
}
复制代码

在Spark Streaming应用程序中,后台线程应该监视标记文件,当文件消失时中止上下文调用

streamingContext.stop(stopSparkContext = true, stopGracefully = true).

 

Summary

 能够看到,部署在YARN上的关键任务Spark Streaming应用程序的配置至关复杂。以上提出的技术,由一些很是聪明的开发人员通过漫长而冗长乏味的迭代学习。最终,部署在高可用的YARN集群上的长期运行的Spark Streaming应用很是稳定。

相关文章
相关标签/搜索