Kubernetes部署弹性Airflow集群

在本文中,我将演示如何构建一个弹性Airflow集群,当负载低于阈值时,该集群能够在高负载下安全地横向扩展。git

经过水平Pod自动缩放器支持Kubernetes中的自动缩放。使用HPA,能够直接进行横向扩展,HPA能够为部署增长副本,并能够建立其余worker来共享工做负载。可是,伸缩是问题所在,伸缩过程会根据Pod在节点上的位置对它们进行排序,从而选择要终止的Pod。所以,若是有一个Pod仍在进行某些处理,则没法保证它不会被终止。github

在弹性Airflow 集群中,为了扩大规模,咱们须要确保进行某些处理的worker不会被终止。只有闲置的worker才应考虑终止。redis

为了实现这一点,我建立了两个CRD和两个控制器 - ElasticWorkerElasticWorkerAutoscaler,在本文后面将对它们进行介绍。数据库

对于此问题,还有其余解决方案,例如,能够建立一个Kubernetes做业,该做业能够完成一组任务。随着负载的增长,将建立更多的做业。可是,这种方法不是通用解决方案,它很是适合具备相似自动缩放要求的其余用例。此处描述的方法是一种通用实现,能够用做全面生产部署的起点。api

弹性 Airflow Cluster 架构

el01.png

组件说明以下,安全

  • Airflow scheduler解析DAG并将必要的任务添加到RabbitMQ队列。
  • PostgresDB保存有关任务,DAG,变量,链接等状态的信息。
  • RabbitMQ 将要执行的命令存储在队列中。
  • Airflow Worker 从RabbitMQ检索命令并执行它们。
  • Flower 是用于监视和管理Celery worker的基于Web的工具。在咱们的设置中,flower还包含其余脚本,以获取每一个airflow worker的指标并将其放入redis db中。
  • Redis DB 存储每一个airflow worker Pod的负载度量以及汇总的集群总负载。它还存储了咱们的自定义指标APIServer适配器的全部已注册指标。
  • Custom Metric APIServer适配器 是一个基本的自定义Metric API服务器适配器,它分别为给定的pod和airflow集群资源服务的load和total_cluster_load度量请求。它从Redis数据库检索这些指标。
  • ElasticWorker Controller 监视类型为ElasticWorker(CRD)的对象,并将集群状态与相应的Elasticworker对象中的规范协调。在较高级别,如下是该控制者的职责。bash

    • 建立等于minReplica的worker Pod
    • 若是变量scale> 0,则建立其余工做容器。可是要确保总的Pod数量不超过maxReplicas
    • scale <0时,删除worker Pod。Pod的删除由定义的“按比例缩放”策略控制。当前有三个策略-ScaleInImmediatelyScaleInBySelectorScaleInDisabled,咱们将在此处使用ScaleInBySelector策略。这样能够确保控制器仅删除已定义标签集的Pod。它还能够确保不管是否设置标签,Pod数量都不会低于minReplicas

al02.jpg

  • ElasticWorkerAutoscalerController ElasticWorkerAutoscaler控制器监视类型为ElasticWorkerAutoscaler(CRD)的对象。下面是该控制器的职责,服务器

    • 检索名称为引用ElasticWorker对象的资源的指标total_cluster_load
    • 若是total_cluster_load> targetValue,则向外扩展。计算新的工做单元数,以减小目标值的负担。计算方法与HPA相同。设置引用的ElasticWorker对象的scale属性。
    • 若是total_cluster_load <0.70 * targetValue,则缩容。若是负载低于阈值,则不会当即开始Scale-In,可是scaleInBackOff周期开始计时。默认状况下,它设置为30秒,若是仅在此期间完成,则执行缩容。若是平均时间total_cluster_load增长,则ScaleInBackOff周期无效。周期结束后,控制器将选择那些具备metricload = 0的worker Pod。而后,它使用请求中的这些pod调用shutdownHttpHook。hook是对此实现定制的,但能够推广。接下来,控制器用终止标签标记容器,最后用适当的值更新比例,以使ElasticWorker控制器更改集群状态。
    • load = 0,ShutdownHttpHook和TerminationLabelit确保仅终止那些不作任何事情的airflow worker。 HttpShutdown hook很重要,由于它能够确保标记为终止的airflow worker在ElasticWorker控制器终止它时不会从RabbitMQ接任任何任务。

al01.jpg

安装

ElasticWorker和ElasticWorkerAutoscaler控制器代码位于-elastic-worker-autoscaler架构

自定义指标APIServer适配器代码位于-elastic-worker-custommetrics-adapter并发

请遵循此处的安装说明-elastic-airflow-cluster-k8s-setup-manifests

al03.png

此外,咱们可使用如下命令启动Kubernetes仪表板并进行验证,

minikube dashboard

咱们在minikube上的设置以下:

两个命名空间

  • elastic-worker-autoscaler-system
  • elasticworker-custommetrics

命名空间:elasticworker-custommetrics包含与自定义指标相关的pod。
命名空间:elastic-worker-autoscaler-system包含用于ElasticWorker和ElasticWorkerAutoscaler的控制器容器
其他组件在默认名称空间中建立。

咱们可使用如下命令检索ElasticWorker对象,
kubectl get elasticworkers

咱们可使用如下命令检索ElasticWorkerAutoscaler对象,
kubectl get elasticworkerautoscalers

测试

安装时,咱们已经使用dag_1测试了airflow cluster。若是没有,请转到此处使用此DAG进行测试。咱们将在此处使用相同的DAG进行测试。

为了进行测试,我在ElasticWorkerAutoscaler对象中将targetValue设置为60。这意味着,一旦集群总负载超过60,则将开始向外扩展,若是负载低于〜30,则将开始向内扩展。

咱们将经过登陆调度程序Pod来触发DAG。

咱们将从测试横向扩展方案开始。

在咱们的安装中,每一个airflow worker的并发设置为2,这意味着咱们总共有2个(并发)* 2(工做人员数量)= 4个可用插槽。所以,触发4个DAG将使群集负载达到100%。

在此测试案例中,咱们将同时触发10个以上的DAG(即,咱们须要> 10个插槽)。这将致使airflow worker群集扩展到maxReplica(即5个副本)。即便负载保持在100%,ElasticWorker控制器也将确保工人数不会超过maxReplica。

屏幕截图下方是开始测试以前的airflow worker集群。目前,咱们有2个worker,全部插槽都空着。

al04.png

al05.png

让咱们经过登陆调度程序Pod来触发DAG。若是还没有完成,请记住取消暂停DAG。

#Login to Scheduler POD
kubectl exec -it airflow-scheduler-76d5df7b9b-mjp25 bash
cd dags
#If you have not unpaused dag_1 already
airflow unpause dag_1
export COUNT=0
while [[ $COUNT -lt 12 ]];
do
airflow trigger_dag dag_1
COUNT=`expr $COUNT + 1`
done;

随着负载的增长,咱们看到建立了额外的airflow worker来处理负载。

al06.png

al07.png

当咱们触发12个DAG时,建立的其余airflow worker应该不止3个,可是因为将maxReplicas设置为5,ElasticWorker控制器不会建立5个以上的airflow worker。

咱们的横向扩展方案可行!!

接下来咱们测试一下缩容。

基本的扩展方案已经过先前的测试验证。若是咱们等待一两分钟,而后检查集群状态,咱们能够看到工做线程数已缩减至minReplicas。这是由于负载降至30如下。

al08.png

al09.png

咱们还想验证它是不是安全的横向扩展,即触发横向扩展时,控制器将终止负载为0的worker,而不是仍在进行某些工做的worker。

为了测试这种状况,咱们将使用dag_2,它的任务将休眠30秒,而后将消息HI记录到文件/home/airflow/logs/count_hi.txt中。咱们将触发DAG 12次,每触发4次,咱们将等待40+秒,而后再次触发。

咱们在二者之间等待以触发缩容。 Scale-In的默认退避时间为30秒,这是为了不抖动。

为了最终验证全部任务是否运行正常,而且实际进行处理的全部worker均未终止,咱们仅在输出文件中计算消息HI。若是它等于咱们触发的DAG数量(12),则咱们的测试用例将经过。

dag_2以下:

# Lets copy the dag_2.py file into minikube VM  
**minikube ssh  
cd dags/  
cat>dag_2.py  
....PASTE CONTENT FROM SAMPLE DAG....  
ctrl-d  
logout

从调度程序Pod触发DAG。

#Login to Scheduler POD
kubectl exec -it airflow-scheduler-76d5df7b9b-mjp25 bash
cd dags
#If you have not unpaused dag_2 already
airflow unpause dag_2
export COUNT=0
while [[ $COUNT -lt 4 ]];
do
airflow trigger_dag dag_2
COUNT=`expr $COUNT + 1`
done;
sleep 40
export COUNT=0
while [[ $COUNT -lt 4 ]];
do
airflow trigger_dag dag_2
COUNT=`expr $COUNT + 1`
done;
sleep 45
export COUNT=0
while [[ $COUNT -lt 4 ]];
do
airflow trigger_dag dag_2
COUNT=`expr $COUNT + 1`
done;

处理完全部任务后,咱们将对消息HI计数。咱们将使用调度程序Pod检查输出文件。

al10.png

从上面的屏幕截图能够看出,消息HI被打印12次,与任务数相同。

结论

在本文中,咱们看到了如何构建一个弹性airflow集群,该集群能够在负载增长到特定阈值以上时横向扩展,并在负载低于特定阈值时安全地横向扩展。

咱们使用了两个新的CRD-ElasticWorker和ElasticWorkerAutoscaler以及它们各自的控制器来实现此目的。 ElasticWorker Controller管理airflow工做程序副本,并确保它在minReplica和maxReplica之间。 ElasticWorkerAutoscaler控制器轮询度量的集群总负载,并计算将集群负载达到指定targetValue所需的副本。而后,它将引用的ElasticWorker对象更新为按比例放大或按比例缩小。

相关文章
相关标签/搜索