在本文中,我将演示如何构建一个弹性Airflow集群,当负载低于阈值时,该集群能够在高负载下安全地横向扩展。git
经过水平Pod自动缩放器支持Kubernetes中的自动缩放。使用HPA,能够直接进行横向扩展,HPA能够为部署增长副本,并能够建立其余worker来共享工做负载。可是,伸缩是问题所在,伸缩过程会根据Pod在节点上的位置对它们进行排序,从而选择要终止的Pod。所以,若是有一个Pod仍在进行某些处理,则没法保证它不会被终止。github
在弹性Airflow 集群中,为了扩大规模,咱们须要确保进行某些处理的worker不会被终止。只有闲置的worker才应考虑终止。redis
为了实现这一点,我建立了两个CRD和两个控制器 - ElasticWorker
和ElasticWorkerAutoscaler
,在本文后面将对它们进行介绍。数据库
对于此问题,还有其余解决方案,例如,能够建立一个Kubernetes做业,该做业能够完成一组任务。随着负载的增长,将建立更多的做业。可是,这种方法不是通用解决方案,它很是适合具备相似自动缩放要求的其余用例。此处描述的方法是一种通用实现,能够用做全面生产部署的起点。api
组件说明以下,安全
ElasticWorker Controller 监视类型为ElasticWorker(CRD)
的对象,并将集群状态与相应的Elasticworker
对象中的规范协调。在较高级别,如下是该控制者的职责。bash
minReplica
的worker Podscale> 0
,则建立其余工做容器。可是要确保总的Pod数量不超过maxReplicas
scale <0
时,删除worker Pod。Pod的删除由定义的“按比例缩放”策略控制。当前有三个策略-ScaleInImmediately,ScaleInBySelector,ScaleInDisabled,咱们将在此处使用ScaleInBySelector策略。这样能够确保控制器仅删除已定义标签集的Pod。它还能够确保不管是否设置标签,Pod数量都不会低于minReplicas
。ElasticWorkerAutoscalerController ElasticWorkerAutoscaler控制器监视类型为ElasticWorkerAutoscaler(CRD)的对象。下面是该控制器的职责,服务器
total_cluster_load
。total_cluster_load> targetValue
,则向外扩展。计算新的工做单元数,以减小目标值的负担。计算方法与HPA相同。设置引用的ElasticWorker对象的scale
属性。total_cluster_load <0.70 * targetValue
,则缩容。若是负载低于阈值,则不会当即开始Scale-In,可是scaleInBackOff
周期开始计时。默认状况下,它设置为30秒,若是仅在此期间完成,则执行缩容。若是平均时间total_cluster_load
增长,则ScaleInBackOff
周期无效。周期结束后,控制器将选择那些具备metricload = 0
的worker Pod。而后,它使用请求中的这些pod调用shutdownHttpHook
。hook是对此实现定制的,但能够推广。接下来,控制器用终止标签标记容器,最后用适当的值更新比例,以使ElasticWorker控制器更改集群状态。load = 0
,ShutdownHttpHook和TerminationLabelit确保仅终止那些不作任何事情的airflow worker。 HttpShutdown hook很重要,由于它能够确保标记为终止的airflow worker在ElasticWorker控制器终止它时不会从RabbitMQ接任任何任务。ElasticWorker和ElasticWorkerAutoscaler控制器代码位于-elastic-worker-autoscaler。 架构
自定义指标APIServer适配器代码位于-elastic-worker-custommetrics-adapter。 并发
请遵循此处的安装说明-elastic-airflow-cluster-k8s-setup-manifests。
此外,咱们可使用如下命令启动Kubernetes仪表板并进行验证,
minikube dashboard
咱们在minikube上的设置以下:
两个命名空间
命名空间:elasticworker-custommetrics包含与自定义指标相关的pod。
命名空间:elastic-worker-autoscaler-system包含用于ElasticWorker和ElasticWorkerAutoscaler的控制器容器
其他组件在默认名称空间中建立。
咱们可使用如下命令检索ElasticWorker对象,kubectl get elasticworkers
咱们可使用如下命令检索ElasticWorkerAutoscaler对象,kubectl get elasticworkerautoscalers
安装时,咱们已经使用dag_1测试了airflow cluster。若是没有,请转到此处使用此DAG进行测试。咱们将在此处使用相同的DAG进行测试。
为了进行测试,我在ElasticWorkerAutoscaler对象中将targetValue设置为60。这意味着,一旦集群总负载超过60,则将开始向外扩展,若是负载低于〜30,则将开始向内扩展。
咱们将经过登陆调度程序Pod来触发DAG。
咱们将从测试横向扩展方案开始。
在咱们的安装中,每一个airflow worker的并发设置为2,这意味着咱们总共有2个(并发)* 2(工做人员数量)= 4个可用插槽。所以,触发4个DAG将使群集负载达到100%。
在此测试案例中,咱们将同时触发10个以上的DAG(即,咱们须要> 10个插槽)。这将致使airflow worker群集扩展到maxReplica(即5个副本)。即便负载保持在100%,ElasticWorker控制器也将确保工人数不会超过maxReplica。
屏幕截图下方是开始测试以前的airflow worker集群。目前,咱们有2个worker,全部插槽都空着。
让咱们经过登陆调度程序Pod来触发DAG。若是还没有完成,请记住取消暂停DAG。
#Login to Scheduler POD kubectl exec -it airflow-scheduler-76d5df7b9b-mjp25 bash cd dags #If you have not unpaused dag_1 already airflow unpause dag_1 export COUNT=0 while [[ $COUNT -lt 12 ]]; do airflow trigger_dag dag_1 COUNT=`expr $COUNT + 1` done;
随着负载的增长,咱们看到建立了额外的airflow worker来处理负载。
当咱们触发12个DAG时,建立的其余airflow worker应该不止3个,可是因为将maxReplicas设置为5,ElasticWorker控制器不会建立5个以上的airflow worker。
咱们的横向扩展方案可行!!
接下来咱们测试一下缩容。
基本的扩展方案已经过先前的测试验证。若是咱们等待一两分钟,而后检查集群状态,咱们能够看到工做线程数已缩减至minReplicas。这是由于负载降至30如下。
咱们还想验证它是不是安全的横向扩展,即触发横向扩展时,控制器将终止负载为0的worker,而不是仍在进行某些工做的worker。
为了测试这种状况,咱们将使用dag_2,它的任务将休眠30秒,而后将消息HI记录到文件/home/airflow/logs/count_hi.txt中。咱们将触发DAG 12次,每触发4次,咱们将等待40+秒,而后再次触发。
咱们在二者之间等待以触发缩容。 Scale-In的默认退避时间为30秒,这是为了不抖动。
为了最终验证全部任务是否运行正常,而且实际进行处理的全部worker均未终止,咱们仅在输出文件中计算消息HI。若是它等于咱们触发的DAG数量(12),则咱们的测试用例将经过。
dag_2以下:
# Lets copy the dag_2.py file into minikube VM **minikube ssh cd dags/ cat>dag_2.py ....PASTE CONTENT FROM SAMPLE DAG.... ctrl-d logout
从调度程序Pod触发DAG。
#Login to Scheduler POD kubectl exec -it airflow-scheduler-76d5df7b9b-mjp25 bash cd dags #If you have not unpaused dag_2 already airflow unpause dag_2 export COUNT=0 while [[ $COUNT -lt 4 ]]; do airflow trigger_dag dag_2 COUNT=`expr $COUNT + 1` done; sleep 40 export COUNT=0 while [[ $COUNT -lt 4 ]]; do airflow trigger_dag dag_2 COUNT=`expr $COUNT + 1` done; sleep 45 export COUNT=0 while [[ $COUNT -lt 4 ]]; do airflow trigger_dag dag_2 COUNT=`expr $COUNT + 1` done;
处理完全部任务后,咱们将对消息HI计数。咱们将使用调度程序Pod检查输出文件。
从上面的屏幕截图能够看出,消息HI被打印12次,与任务数相同。
在本文中,咱们看到了如何构建一个弹性airflow集群,该集群能够在负载增长到特定阈值以上时横向扩展,并在负载低于特定阈值时安全地横向扩展。
咱们使用了两个新的CRD-ElasticWorker和ElasticWorkerAutoscaler以及它们各自的控制器来实现此目的。 ElasticWorker Controller管理airflow工做程序副本,并确保它在minReplica和maxReplica之间。 ElasticWorkerAutoscaler控制器轮询度量的集群总负载,并计算将集群负载达到指定targetValue所需的副本。而后,它将引用的ElasticWorker对象更新为按比例放大或按比例缩小。