[译] 理解 Apache Airflow 的关键概念

四部分系列的第三系列

Quizlet寻找最优工做流管理系统第一部分第二部分中,咱们促进了现代商业实践中对工做流管理系统(WMS)的需求,并提供了一份但愿得到的特性以及功能列表,这使得咱们最后选择了 Apache Airflow 做为咱们的 WMS 选择。这篇文章旨在给好奇的读者提供提供关于 Airflow 的组件和操做的详细概述。咱们会经过实现本系列第一部分中介绍的示例工做流(查阅 图 3.1)来介绍 Airflow 的关键概念。前端

图 3.1:数据处理工做流的示例。android

Airflow 是一种 WMS,即:它将任务以及它们的依赖看做代码,按照那些计划规范任务执行,并在 worker 进程之间分发需执行的任务。Airflow 提供了一个用于显示当前活动任务和过去任务状态的优秀 UI,并容许用户手动管理任务的执行和状态。ios

工做流都是“有向无环图”

Airflow 中的工做流是具备方向性依赖的任务集合。具体说明则是 Airflow 使用有向有向无环图 —— 或简称的 DAG —— 来表现工做流。图中的每一个节点都是一个任务,图中的边表示的是任务之间的依赖(该图强制为无循环的,所以不会出现循环依赖,从而致使无限执行循环)。git

图 3.2 顶部演示了咱们的示例工做流是如何在 Airflow 中变现为 DAG 的。注意在图 1.1 中咱们的示例工做流任务的执行计划结构与图 3.2 中的 DAG 结构类似。github

图 3.2 来自 Airflow UI 的屏幕截图,表示示例工做流 DAG。面板顶部:1 月 25 号 DagRun 的图表视图。深绿色节点表示 TaskInstance 的“成功”状态。淡绿色描绘了 TaskInstance 的“运行”状态。底部子面板example_workflow DAG 的树图。Airflow 的主要组件在屏幕截图中高亮显示,包括 Sensor、Operator、任务、DagRunsTaskInstancesDagRuns 在图视中表示为列 —— DagRun 在 1 月 25 号用青色表示。图示中的每一个方框表示一个 TaskInstance —— 1 月 25 号 为 perform_currency_conversion 任务的 TaskInstance(“运行态”)用蓝色表示。数据库

在高级别中,能够将 DAG 看做是一个包含任务极其依赖,什么时候以及如何设置那些任务的上下文的容器。每一个 DAG 都有一组属性,最重要的是它的 dag_id,在全部 DAG 中的惟一标识符,它的 start_date 用于说明 DAG 任务被执行的时间,schedule_interval 用于说明任务被执行的频率。此外,dag_idstart_dateschedule_interval,每一个 DAG 均可以使用一组 default_arguments 进行初始化。这些默认参数由 DAG 中的全部任务继承。apache

在下列代码块中,咱们在 Airflow 中定义了一个用于实现咱们游戏公司示例工做流的 DAG。json

# 每一个工做流/DAG 都必需要有一个惟一的文本标识符
WORKFLOW_DAG_ID = 'example_workflow_dag'

# 开始/结束时间是 datetime 对象
# 这里咱们在 2017 年 1 月 1 号开始执行
WORKFLOW_START_DATE = datetime(2017, 1, 1)

# 调度器/重试间隔是 timedelta 对象
# 这里咱们天天都执行 DAG 任务
WORKFLOW_SCHEDULE_INTERVAL = timedelta(1)

# 默认参数默认应用于全部任务
# 在 DAG 中
WORKFLOW_DEFAULT_ARGS = {
    'owner': 'example',
    'depends_on_past': False,
    'start_date': WORKFLOW_START_DATE,
    'email': ['example@example_company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

# 初始化 DAG
dag = DAG(
    dag_id=WORKFLOW_DAG_ID,
    start_date=WORKFLOW_START_DATE,
    schedule_interval=WORKFLOW_SCHEDULE_INTERVAL,
    default_args=WORKFLOW_DEFAULT_ARGS,
)
复制代码

OperatorsSensors 和 Tasks

尽管 DAG 用于组织并设置执行上下文,但 DAG 不会执行任何实际计算。相反,任务其实是 Airflow 中咱们想要执行“所作工做”的元素。任务有两种特色:它们能够执行一些显示操做,在这种状况下,它们是 Operator,或者它们能够暂停执行依赖任务,直到知足某些条件,在这种状况下,它们是 Sensors。原则上来讲,Operator 能够执行在 Python 中被执行的任何函数。一样,Sensors 能够检查任何进程或者数据结构的状态。后端

下述代码块显示了如何定义一些(假设的)Operator 和 Sensor 类来实现咱们的工做流示例。api

##################################################
# 自定义 Sensors 示例/ Operators (NoOps) #
##################################################

class ConversionRatesSensor(BaseSensorOperator):
    """ An example of a custom Sensor. Custom Sensors generally overload the `poke` method inherited from `BaseSensorOperator` """
    def __init__(self, *args, **kwargs):
        super(ConversionRatesSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        print 'poking {}'.__str__()
        
        # poke functions should return a boolean
        return check_conversion_rates_api_for_valid_data(context)

class ExtractAppStoreRevenueOperator(BaseOperator):
    """ An example of a custom Operator that takes non-default BaseOperator arguments. Extracts data for a particular app store identified by `app_store_name`. """
    def __init__(self, app_store_name, *args, **kwargs):
        self.app_store_name = app_store_name
        super(ExtractAppStoreRevenueOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()
        
        # pull data from specific app store
        json_revenue_data = extract_app_store_data(self.app_store_name, context)
        
        # upload app store json data to filestore, can use context variable for 
        # date-specific storage metadata
        upload_appstore_json_data(json_revenue_data, self.app_store_name, context)

class TransformAppStoreJSONDataOperator(BaseOperator):
    """ An example of a custom Operator that takes non-default BaseOperator arguments. Extracts, transforms, and loads data for an array of app stores identified by `app_store_names`. """
    def __init__(self, app_store_names, *args, **kwargs):
        self.app_store_names = app_store_names
        super(TransformJSONDataOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()
        
        # load all app store data from filestores. context variable can be used to retrieve
        # particular date-specific data artifacts
        all_app_stores_extracted_data = []
        for app_store in self.app_store_names:
            all_app_stores_extracted_data.append(extract_app_store_data(app_store, context))
        
        # combine all app store data, transform to proper format, and upload to filestore 
        all_app_stores_json_data = combine_json_data(all_app_stores_extracted_data)
        app_stores_transformed_data = transform_json_data(all_app_stores_json_data)
        upload_data(app_stores_transformed_data, context)
复制代码

代码定义了 BaseSensorOperator 的子类,即 ConversionRatesSensor。这个类实现了全部 BaseSensorOperator 对象必需的 poke 方法。若是下游任务要继续执行,poke 方法必须返回 True,不然返回 False。在咱们的示例中,这个 sensor 将用于决定什么时候外部 API 的交换率什么时候可用。

ExtractAppStoreRevenueOperatorTransformAppStoreJSONDataOperator 这两个类都继承自 Airflow 的BaseOperator 类,并实现了 execute 方法。在咱们的示例中,这两个类的 execute 方法都从应用程序存储 API 中获取数据,并将它们转换为公司首选的存储格式。注意 ExtractAppStoreRevenueOperator 也接受一个自定义参数 app_store_name,它告诉类应用程序存储应该从哪里获取请求数据。

注意,Operator 和 Sensor 一般在单独文件中定义,并导入到咱们定义 DAG 的同名命名空间中。但咱们也能够将这些类定义添加到同一个 DAG 定义的文件中。

形式上,Airflow 定义任务为 Sensor 或 Operator 类实例化。实例化任务须要提供一个惟一的 task_id 和 DAG 容器来添加任务(注意:在高于 1.8 的版本中,再也不须要 DAG 对象)。下面的代码块显示了如何实例化执行示例工做流所需的全部任务。(注意:咱们假设示例中引用的全部 Operator 都是在命名空间中定义或导入的)。

########################
# 实例化任务 #
########################

# 实例化任务来提取广告网络收入
extract_ad_revenue = ExtractAdRevenueOperator(
    task_id='extract_ad_revenue',
    dag=dag)

# 动态实例化任务来提取应用程序存储数据
APP_STORES = ['app_store_a', 'app_store_b', 'app_store_c']
app_store_tasks = []
for app_store in APP_STORES:
    task = ExtractAppStoreRevenueOperator(
        task_id='extract_{}_revenue'.format(app_store),
        dag=dag,
        app_store_name=app_store,
        )
    app_store_tasks.append(task)

# 实例化任务来等待转换率、数据均衡
wait_for_conversion_rates = ConversionRatesSensor(
    task_id='wait_for_conversion_rates',
    dag=dag)

# 实例化任务,从 API 中提取转化率
extract_conversion_rates = ExtractConversionRatesOperator(
    task_id='get_conversion_rates',
    dag=dag)

# 实例化任务来转换电子表格数据
transform_spreadsheet_data = TransformAdsSpreadsheetDataOperator(
    task_id='transform_spreadsheet_data',
    dag=dag) 

# 从全部应用程序存储中实例化任务转换 JSON 数据
transform_json_data = TransformAppStoreJSONDataOperator(
    task_id='transform_json_data',
    dag=dag,
    app_store_names=APP_STORES)

# 实例化任务来应用
perform_currency_conversions = CurrencyConversionsOperator(
    task_id='perform_currency_conversions',
    dag=dag)

# 实例化任务来组合全部数据源
combine_revenue_data = CombineDataRevenueDataOperator(
    task_id='combine_revenue_data',
    dag=dag)  

# 实例化任务来检查历史数据是否存在
check_historical_data = CheckHistoricalDataOperator(
    task_id='check_historical_data',
    dag=dag)

# 实例化任务来根据历史数据进行预测
predict_revenue = RevenuePredictionOperator(
    task_id='predict_revenue',
    dag=dag)  
复制代码

此任务实例化代码在与 DAG 定义相同的文件/命名空间中执行。咱们能够看到添加任务的代码很是简洁,并且容许经过注解进行内联文档。第 10–19 行展现了在代码中定义工做流的优点之一。咱们可以动态地定义三个不一样的任务,用于使用 for 循环从每一个应用程序存储中提取数据。这种方法可能在这个小示例中不会给咱们带来太大的好处,但随着应用程序商店数量的增长,好处会日益显著。

定义任务依赖关系

Airflow 的关键优点是定义任务之间依赖关系的简洁性和直观约定。下述代码代表了咱们如何为示例工做流定义任务依赖关系图:

###############################
# 定义任务依赖关系 #
###############################

# 依赖设置使用 `.set_upstream` 和/或 
# `.set_downstream` 方法
# (in version >=1.8.1,也可使用
# `extract_ad_revenue << transform_spreadsheet_data` 语法)

transform_spreadsheet_data.set_upstream(extract_ad_revenue)

# 动态定义应用程序存储依赖项
for task in app_store_tasks:
    transform_json_data.set_upstream(task)

extract_conversion_rates.set_upstream(wait_for_conversion_rates)

perform_currency_conversions.set_upstream(transform_json_data)
perform_currency_conversions.set_upstream(extract_conversion_rates)

combine_revenue_data.set_upstream(transform_spreadsheet_data)
combine_revenue_data.set_upstream(perform_currency_conversions)

check_historical_data.set_upstream(combine_revenue_data)

predict_revenue.set_upstream(check_historical_data) 
复制代码

同时,此代码在与 DAG 定义相同的文件/命名空间中运行。任务依赖使用 set_upstreamset_downstream operators 来设置(但在高于 1.8 的版本中,使用移位运算符 <<>> 来更简洁地执行类似操做是可行的)。一个任务还能够同时具备多个依赖(例如,combine_revenue_data),或一个也没有(例如,全部的 extract_* 任务)。

图 3.2 的顶部子面板显示了由上述代码所建立的 Airflow DAG,渲染为 Airflow 的 UI(稍后咱们会详细介绍 UI)。 DAG 的依赖结构与在图 1.1 显示的咱们为咱们的示例工做流所提出的执行计划很是类似。当 DAG 被执行时,Airflow 会使用这种依赖结构来自动肯定哪些任务能够在任什么时候间点同时运行(例如,全部的 extract_* 任务)。

DagRuns 和 TaskInstances

一旦咱们定义了 DAG —— 即,咱们已经实例化了任务并定义了它们的依赖项 —— 咱们就能够基于 DAG 的参数来执行任务。Airflow 中的一个关键概念是 execution_time。当 Airflow 调度器正在运行时,它会定义一个用于执行 DAG 相关任务的按期间断的日期计划。执行时间从 DAG start_date 开始,并重复每个 schedule_interval。在咱们的示例中,调度时间是 (‘2017–01–01 00:00:00’, ‘2017–01–02 00:00:00’, ...)。对于每个 execution_time,都会建立 DagRun 并在执行时间上下文中进行操做。所以,DagRun 只是具备必定执行时间的 DAG(参见 图 3.2 的底部子面板)。

全部与 DagRun 关联的任务都称为 TaskInstance。换句话说,TaskInstance 是一个已经实例化并且拥有 execution_date 上下文的任务(参见 图 3.2 的底部子面板)。DagRuns 和 TaskInstance 是 Airflow 的核心概念。每一个DagRun and TaskInstance 都与记录其状态的 Airflow 元数据库中的一个条目相关联(例如 “queued”、“running”、“failed”、“skipped”、“up for retry”)。读取和更新这些状态是 Airflow 调度和执行过程的关键。

Airflow 的架构

在其核心中,Airflow 是创建在元数据库上的队列系统。数据库存储队列任务的状态,调度器使用这些状态来肯定如何将其它任务添加到队列的优先级。此功能由四个主要组件编排。(请参阅图 3.2 的左子面板):

  1. 元数据库:这个数据库存储有关任务状态的信息。数据库使用在 SQLAlchemy 中实现的抽象层执行更新。该抽象层将 Airflow 剩余组件功能从数据库中干净地分离了出来。
  2. 调度器:调度器是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务须要被执行以及任务执行优先级的过程。调度器一般做为服务运行。
  3. 执行器:Excutor 是一个消息队列进程,它被绑定到调度器中,用于肯定实际执行每一个任务计划的工做进程。有不一样类型的执行器,每一个执行器都使用一个指定工做进程的类来执行任务。例如,LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其余像 CeleryExecutor 的执行器使用存在于独立的工做机器集群中的工做进程执行任务。
  4. Workers:这些是实际执行任务逻辑的进程,由正在使用的执行器肯定。

图 3.2:Airflow 的通常架构。Airflow 的操做创建于存储任务状态和工做流的元数据库之上(即 DAG)。调度器和执行器将任务发送至队列,让 Worker 进程执行。WebServer 运行(常常与调度器在同一台机器上运行)并与数据库通讯,在 Web UI 中呈现任务状态和任务执行日志。每一个有色框代表每一个组件均可以独立于其余组件存在,这取决于部署配置的类型。

调度器操做

首先,Airflow 调度器操做看起来更像是黑魔法而不是逻辑程序。也就是说,若是你发现本身正在调试它的执行,那么了解调度器的工做原理久能够节省大量的时间,为了让读者免于深陷 Airflow 的源代码(尽管咱们很是推荐它!)咱们用伪代码概述了调度器的基本操做:

步骤 0. 从磁盘中加载可用的 DAG 定义(填充 DagBag)

当调度器运行时:
	步骤 1. 调度器使用 DAG 定义来标识而且/或者初始化在元数据的 db 中的任何 DagRuns。
	
	步骤 2. 调度器检查与活动 DagRun 关联的 TaskInstance 的状态,解析 TaskInstance 之间的任何依赖,标识须要被执行的 TaskInstance,而后将它们添加至 worker 队列,将新排列的 TaskInstance 状态更新为数据库中的“排队”状态。
	
	步骤 3. 每一个可用的 worker 从队列中取一个 TaskInstance,而后开始执行它,将此 TaskInstance 的数据库记录从“排队”更新为“运行”。
	
	步骤 4. 一旦一个 TaskInstance 完成运行,关联的 worker 就会报告到队列并更新数据库中的 TaskInstance 的状态(例如“完成”、“失败”等)。
	
	步骤 5. 调度器根据全部已完成的相关 TaskInstance 的状态更新全部活动 DagRuns 的状态(“运行”、“失败”、“完成”)。
	
	步骤 6. 重复步骤 1-5
复制代码

Web UI

除了主要的调度和执行组件外,Airflow 还支持包括全功能的 Web UI 组件(参阅图 3.2 的一些 UI 示例),包括:

  1. Webserver:此过程运行一个简单的 Flask 应用程序,它从元数据库中读取全部任务状态,并让 Web UI 呈现这些状态。
  2. Web UI:此组件容许客户端用户查看和编辑元数据库中的任务状态。因为调度器和数据库之间的耦合,Web UI 容许用户操做调度器的行为。
  3. 执行日志:这些日志由 worker 进程编写,存储在磁盘或远程文件存储区(例如 GCSS3)中。Webserver 访问日志并将其提供给 Web UI。

尽管对于 Airflow 的基本操做来讲,这些附加组件都不是必要的,但从功能性角度来讲,它们确实使 Airflow 有别于当前的其余工做流管理。 具体来讲,UI 和集成执行日志容许用户检查和诊断任务执行,以及查看和操做任务状态。

命令行接口

除了调度程序和 Web UI,Airflow 还经过命令行接口(CLI)提供了健壮性的特性。尤为是,当咱们开发 Airflow 时,发现如下的这些命令很是有用:

  • airflow test DAG_ID TASK_ID EXECUTION_DATE。容许用户在不影响元数据库或关注任务依赖的状况下独立运行任务。这个命令很适合独立测试自定义 Operator 类的基本行为。
  • airflow backfill DAG_ID TASK_ID -s START_DATE -e END_DATE。在 START_DATEEND_DATE 之间执行历史数据的回填,而不须要运行调度器。当你须要更改现有工做流的一些业务逻辑并须要更新历史数据时,这是很好的。(请注意,回填不须要在数据库中建立 DagRun 条目,由于它们不是由 [SchedulerJob](https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L471) 类运行的)。
  • airflow clear DAG_ID。移除 DAG_ID 元数据库中的 TaskInstance 记录。当你迭代工做流/DAG 功能时,这会颇有用。
  • airflow resetdb:虽然你一般不想常常运行这个命令,但若是你须要建立一个“干净的历史记录”,这是很是有帮助的,这种状况载最初设置 Airflow 时可能会出现(注意:这个命令只影响数据库,不删除日志)。

综上所述,咱们提供了一些更加抽象的概念,做为 Airflow 的基础。在此系列的最后部分 installment 中,咱们将讨论在生产中部署 Airflow 时的一些更实际的注意事项。

感谢 Laura Oppenheimer

若是发现译文存在错误或其余须要改进的地方,欢迎到 掘金翻译计划 对译文进行修改并 PR,也可得到相应奖励积分。文章开头的 本文永久连接 即为本文在 GitHub 上的 MarkDown 连接。


掘金翻译计划 是一个翻译优质互联网技术文章的社区,文章来源为 掘金 上的英文分享文章。内容覆盖 AndroidiOS前端后端区块链产品设计人工智能等领域,想要查看更多优质译文请持续关注 掘金翻译计划官方微博知乎专栏

相关文章
相关标签/搜索