本文做者: 林武康(花名:知瑕),阿里巴巴计算平台事业部技术专家,Apache HUE Contributor, 参与了多个开源项目的研发工做,对于分布式系统设计应用有较丰富的经验,目前主要专一于EMR数据开发相关的产品的研发工做。java
本文介绍Spark Operator的设计和实现相关的内容.git
通过近几年的高速发展,分布式计算框架的架构逐渐趋同. 资源管理模块做为其中最通用的模块逐渐与框架解耦,独立成通用的组件.目前大部分分布式计算框架都支持接入多款不一样的资源管理器. 资源管理器负责集群资源的管理和调度,为计算任务分配资源容器并保证资源隔离.Apache Spark做为通用分布式计算平台,目前同时支持多款资源管理器,包括:github
Apache Spark的运行时框架以下图所示, 其与各种资源调度器的交互流程比较相似.
图1 Spark运行时框架(Client模式)
其中,Driver负责做业逻辑的调度和任务的监控, 资源管理器负责资源分配和监控.Driver根据部署模式的不一样,启动和运行的物理位置有所不一样. 其中,Client模式下,Driver模块运行在Spark-Submit进程中, Cluster模式下,Driver的启动过程和Executor相似,运行在资源调度器分配的资源容器内.docker
K8s是Spark在2.3开始支持资源管理器,而相关讨论早在2016年就已经开始展开(https://issues.apache.org/jira/browse/SPARK-18278). Spark对K8s的支持随着版本的迭代也逐步深刻, 在即将发布的3.0中,Spark on K8s提供了更好的Kerberos支持和资源动态支持的特性.apache
Kubernetes是由Google开源的一款面向应用的容器集群部署和管理系统,近年来发展十分迅猛,相关生态已经日趋完善. 在Spark官方接入K8s前,社区一般经过在K8s集群上部署一个Spark Standalone集群的方式来实如今K8s集群上运行Spark任务的目的.方案架构以下图所示:
图2 Spark Standalone on K8s
这个模式简单易用,但存在至关大的缺陷:api
为此,Spark社区进行了深刻而普遍的讨论,在2.3版本提供了对K8s的官方支持.Spark接入K8s的好处是十分明显的:网络
Spark on K8s方案架构以下图所示, 设计细节能够参考:SPARK-18278
图3 Spark on K8s (Native)
在这个方案中, 架构
当前的方案已经解决了Spark Standalone on K8s方案的部分缺陷,然而,Spark Application的生命周期管理方式和调度方式与K8s内置的工做负载(如Deployments、DaemonSets、StatefulSets等)存在较大差别,在K8s上执行做业仍然存在很多问题:app
固然Spark on K8s方案目前还在快速开发中,更多特性不久会发布出来,相信将来和K8s的集成会更加紧密和Native, 这些特性包括:框架
在分析Spark Operator的实现以前, 先简单梳理下Kubernetes Operator的基本概念. Kubernetes Operator是由CoreOS开发的Kubernetes扩展特性, 目标是经过定义一系列CRD(自定义资源)和实现控制器,将特定领域的应用程序运维技术和知识(如部署方法、监控、故障恢复等)经过代码的方式固化下来. Spark Operator是Google基于Operator模式开发的一款的工具(https://github.com/GoogleCloudPlatform/spark-on-k8s-operator), 用于经过声明式的方式向K8s集群提交Spark做业.使用Spark Operator管理Spark应用,能更好的利用K8s原生能力控制和管理Spark应用的生命周期,包括应用状态监控、日志获取、应用运行控制等,弥补Spark on K8s方案在集成K8s上与其余类型的负载之间存在的差距.
下面简单分析下Spark Operator的实现细节.
图4 Spark Operator架构
能够看出,Spark Operator相比Spark on K8s,架构上要复杂一些,实际上Spark Operator集成了Spark on K8s的方案,提供了更全面管控特性.经过Spark Operator,用户可使用更加符合K8s理念的方式来控制Spark应用的生命周期.Spark Operator包括以下几个组件:
Spark Operator除了实现基本的做业提交外,还支持以下特性:
Spark Operator的项目是标准的K8s Operator结构, 其中最重要的包括:
manifest: 定义了Spark相关的CRD,包括:
pkg: 具体的Operator逻辑实现
controller: 自定义控制器的实现,包括:
下面主要介绍下Spark Operator是如何管理Spark做业的.
控制器的代码主要位于"pkg/controller/sparkapplication/controller.go"中.
提交做业的提交做业的主流程在submitSparkApplication方法中.
// controller.go // submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit. func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1beta2.SparkApplication { if app.PrometheusMonitoringEnabled() { ... configPrometheusMonitoring(app, c.kubeClient) } // Use batch scheduler to perform scheduling task before submitting (before build command arguments). if needScheduling, scheduler := c.shouldDoBatchScheduling(app); needScheduling { newApp, err := scheduler.DoBatchSchedulingOnSubmission(app) ... //Spark submit will use the updated app to submit tasks(Spec will not be updated into API server) app = newApp } driverPodName := getDriverPodName(app) submissionID := uuid.New().String() submissionCmdArgs, err := buildSubmissionCommandArgs(app, driverPodName, submissionID) ... // Try submitting the application by running spark-submit. submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, app)) ... app.Status = v1beta2.SparkApplicationStatus{ SubmissionID: submissionID, AppState: v1beta2.ApplicationState{ State: v1beta2.SubmittedState, }, DriverInfo: v1beta2.DriverInfo{ PodName: driverPodName, }, SubmissionAttempts: app.Status.SubmissionAttempts + 1, ExecutionAttempts: app.Status.ExecutionAttempts + 1, LastSubmissionAttemptTime: metav1.Now(), } c.recordSparkApplicationEvent(app) service, err := createSparkUIService(app, c.kubeClient) ... ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient) return app }
提交做业的核心逻辑在submission.go这个模块中:
// submission.go func runSparkSubmit(submission *submission) (bool, error) { sparkHome, present := os.LookupEnv(sparkHomeEnvVar) if !present { glog.Error("SPARK_HOME is not specified") } var command = filepath.Join(sparkHome, "/bin/spark-submit") cmd := execCommand(command, submission.args...) glog.V(2).Infof("spark-submit arguments: %v", cmd.Args) output, err := cmd.Output() glog.V(3).Infof("spark-submit output: %s", string(output)) if err != nil { var errorMsg string if exitErr, ok := err.(*exec.ExitError); ok { errorMsg = string(exitErr.Stderr) } // The driver pod of the application already exists. if strings.Contains(errorMsg, podAlreadyExistsErrorCode) { glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name) return false, nil } if errorMsg != "" { return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg) } return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err) } return true, nil } func buildSubmissionCommandArgs(app *v1beta2.SparkApplication, driverPodName string, submissionID string) ([]string, error) { ... options, err := addDriverConfOptions(app, submissionID) ... options, err = addExecutorConfOptions(app, submissionID) ... } func getMasterURL() (string, error) { kubernetesServiceHost := os.Getenv(kubernetesServiceHostEnvVar) ... kubernetesServicePort := os.Getenv(kubernetesServicePortEnvVar) ... return fmt.Sprintf("k8s://https://%s:%s", kubernetesServiceHost, kubernetesServicePort), nil }
能够看出,
提交成功后,还会作以下几件事情:
另外,若是对Spark on K8s的使用文档比较困惑,这段代码是比较好的一个示例.
在Spark Operator中,Controller使用状态机来维护Spark Application的状态信息, 状态流转和Action的关系以下图所示:
图5 _State Machine for SparkApplication_
做业提交后,Spark Application的状态更新都是经过getAndUpdateAppState()方法来实现的.
// controller.go func (c *Controller) getAndUpdateAppState(app *v1beta2.SparkApplication) error { if err := c.getAndUpdateDriverState(app); err != nil { return err } if err := c.getAndUpdateExecutorState(app); err != nil { return err } return nil } // getAndUpdateDriverState finds the driver pod of the application // and updates the driver state based on the current phase of the pod. func (c *Controller) getAndUpdateDriverState(app *v1beta2.SparkApplication) error { // Either the driver pod doesn't exist yet or its name has not been updated. ... driverPod, err := c.getDriverPod(app) ... if driverPod == nil { app.Status.AppState.ErrorMessage = "Driver Pod not found" app.Status.AppState.State = v1beta2.FailingState app.Status.TerminationTime = metav1.Now() return nil } app.Status.SparkApplicationID = getSparkApplicationID(driverPod) ... newState := driverStateToApplicationState(driverPod.Status) // Only record a driver event if the application state (derived from the driver pod phase) has changed. if newState != app.Status.AppState.State { c.recordDriverEvent(app, driverPod.Status.Phase, driverPod.Name) } app.Status.AppState.State = newState return nil } // getAndUpdateExecutorState lists the executor pods of the application // and updates the executor state based on the current phase of the pods. func (c *Controller) getAndUpdateExecutorState(app *v1beta2.SparkApplication) error { pods, err := c.getExecutorPods(app) ... executorStateMap := make(map[string]v1beta2.ExecutorState) var executorApplicationID string for _, pod := range pods { if util.IsExecutorPod(pod) { newState := podPhaseToExecutorState(pod.Status.Phase) oldState, exists := app.Status.ExecutorState[pod.Name] // Only record an executor event if the executor state is new or it has changed. if !exists || newState != oldState { c.recordExecutorEvent(app, newState, pod.Name) } executorStateMap[pod.Name] = newState if executorApplicationID == "" { executorApplicationID = getSparkApplicationID(pod) } } } // ApplicationID label can be different on driver/executors. Prefer executor ApplicationID if set. // Refer https://issues.apache.org/jira/projects/SPARK/issues/SPARK-25922 for details. ... if app.Status.ExecutorState == nil { app.Status.ExecutorState = make(map[string]v1beta2.ExecutorState) } for name, execStatus := range executorStateMap { app.Status.ExecutorState[name] = execStatus } // Handle missing/deleted executors. for name, oldStatus := range app.Status.ExecutorState { _, exists := executorStateMap[name] if !isExecutorTerminated(oldStatus) && !exists { // If ApplicationState is SUCCEEDING, in other words, the driver pod has been completed // successfully. The executor pods terminate and are cleaned up, so we could not found // the executor pod, under this circumstances, we assume the executor pod are completed. if app.Status.AppState.State == v1beta2.SucceedingState { app.Status.ExecutorState[name] = v1beta2.ExecutorCompletedState } else { glog.Infof("Executor pod %s not found, assuming it was deleted.", name) app.Status.ExecutorState[name] = v1beta2.ExecutorFailedState } } } return nil }
从这段代码能够看到, Spark Application提交后,Controller会经过监听Driver Pod和Executor Pod状态来计算Spark Application的状态,推进状态机的流转.
若是一个SparkApplication示例配置了开启度量监控特性,那么Spark Operator会在Spark-Submit提交参数中向Driver和Executor的JVM参数中添加相似"-javaagent:/prometheus/jmx_prometheus_javaagent-0.11.0.jar=8090:/etc/metrics/conf/prometheus.yaml"的JavaAgent参数来开启SparkApplication度量监控,实现经过JmxExporter向Prometheus发送度量数据.
图6 Prometheus架构
在Spark on K8s方案中, 用户须要经过kubectl port-forward
命令创建临时通道来访问Driver的WebUI,这对于须要频繁访问多个做业的WebUI的场景来讲很是麻烦. 在Spark Operator中,Spark Operator会在做业提交后,启动一个Spark WebUI Service,并配置Ingress来提供更为天然和高效的访问途径.
本文总结了Spark计算框架的基础架构,介绍了Spark on K8s的多种方案,着重介绍了Spark Operator的设计和实现.K8s Operator尊从K8s设计理念,极大的提升了K8s的扩展能力.Spark Operator基于Operator范式实现了更为完备的管控特性,是对官方Spark on K8s方案的有力补充.随着K8s的进一步完善和Spark社区的努力,能够预见Spark on K8s方案会逐渐吸纳Spark Operator的相关特性,进一步提高云原生体验.
[1] Kubernetes Operator for Apache Spark Design
[2] What is Prometheus?
[3] Spark on Kubernetes 的现状与挑战
[4] Spark in action on Kubernetes - Spark Operator的原理解析
[5] Operator pattern
[6] Custom Resources
本文为云栖社区原创内容,未经容许不得转载。