Airbnb的数据工程师 Maxime Beauchemin 激动地表示道:Airflow 是一个咱们正在用的工做流调度器,如今的版本已经更新到1.6.1了,而且引入了一些列调度引擎的改革。咱们喜欢它是由于它写代码太容易了,也便于调试和维护。咱们也喜欢全都用他来写代码,而不是像xml那样的配置文件用来描述DAG。更不用说,咱们显然不用再学习太多东西。html
在一个分布式环境中,宕机是时有发生的。Airflow经过自动重启任务来适应这一变化。到目前为止一切安好。当咱们有一系列你想去重置状态的任务时,你就会发现这个功能简直是救世主。为了解决这个问题,咱们的策略是创建子DAG。这个子DAG任务将自动重试本身的那一部分,所以,若是你以子DAG设置任务为永不重试,那么凭借子DAG操做你就能够获得整个DAG成败的结果。若是这个重置是DAG的第一个任务设置子DAG的策略就会很是有效,对于有一个相对复杂的依赖关系结构设置子DAG是很是棒的作法。注意到子DAG操做任务不会正确地标记失败任务,除非你从GitHub用了最新版本的Airflow。解决这个问题的另一个策略是使用重试柄:python
def make_spooq_exporter(table, schema, task_id, dag): return SpooqExportOperator( jdbc_url=('jdbc:mysql://%s/%s?user=user&password=pasta' % (TARGET_DB_HOST,TARGET_DB_NAME)), target_table=table, hive_table='%s.%s' % (schema, table), dag=dag, on_retry_callback=truncate_db, task_id=task_id) def truncate_db(context): hook = MySqlHook('clean_db_export') hook.run( 'truncate `%s`'%context['task_instance'].task.target_table, autocommit=False, parameters=None)
这样你的重试柄就能够将任务隔离,每次执行某个特定的任务。mysql
这在执行一个特定的可重复的任务时很是管用。用代码来定义工做流是这个系统最强大之处是你能够以编码的方式产生DAG。这在在没有人工干预的状况下自动接入新的数据源的时候很是有用。git
咱们借助现有的日志目录将检查HDFS日志融入DAG,而且在每次融入这些数据的时候在每一个目录下产生一个任务。示例代码以下:github
lognames = list( hdfs.list_filenames(conf.get('incoming_log_path'), full_path=False)) for logname in lognames: # TODO 使用适当的正则表达式来过滤掉不良日志名,使得Airflow 能用符合特定的字符找出相应任务的名字 if logname not in excluded_logs and '%' not in logname and '@' not in logname: ingest = LogIngesterOperator( # 由于log_name以做为unicode返回值,因此须要用str()包装task_id task_id=str('ingest_%s' % logname), db=conf.get('hive_db'), logname=logname, on_success_callback=datadog_api.check_data_lag, dag=dp_dag ) ingest.set_upstream(transfer_from_incoming) ingest.set_downstream(transform_hive)
在天天结束的时候执行每日任务,而不是在当天工做开始的时候去执行这些任务。你不能将子DAG放在DAG文件夹下,换句话说除非你保管一类DAG,不然你不能够将子DAG放在本身的模块中。正则表达式
或者更具体地说就是,虽然你也能够将子DAG放在DAG文件夹下,可是接着子DAG将先主DAG同样运行本身的调度。这里是一个两个DAG的例子(假设他们同时在DAG文件夹下,也就是所谓的差DAG)这里的子DAG将在主DAG中经过调度器被单独调度。sql
from airflow.models import DAG from airflow.operators import PythonOperator, SubDagOperator from bad_dags.subdag import hive_dag from datetime import timedelta, datetime main_dag = DAG( dag_id='main_dag', schedule_interval=timedelta(hours=1), start_date=datetime(2015, 9, 18, 21) ) # 显然,这单独执行不起做用 transform_hive = SubDagOperator( subdag=hive_dag, task_id='hive_transform', dag=main_dag, trigger_rule=TriggerRule.ALL_DONE )
from airflow.models import DAG from airflow.operators import HiveOperator from datetime import timedelta, datetime # 这将经过子DAG操做符被做为像是本身的调度任务中那样运行。 hive_dag = DAG('main_dag.hive_transform', # 注意到这里的重复迭代 schedule_interval=timedelta(hours=1), start_date=datetime(2015, 9, 18, 21)) hive_transform = HiveOperator(task_id='flatten_tables', hql=send_charge_hql, dag=dag)
除非你真的想这个子DAG被主DAG调度。docker
咱们经过使用工厂函数解决这个问题。这是一个优点那就是 主DAG能够传递一些必要的参数到子DAG,所以他们在调度的时候其余参数也自动赋值了。当你的主DAG发生变化时,咱们不须要去跟踪参数。数据库
在下面的例子中,假设DAG是所谓的好DAG:apache
from airflow.models import DAG from airflow.operators import PythonOperator, SubDagOperator from good_dags.subdag import hive_dag from datetime import timedelta, datetime main_dag = DAG( dag_id='main_dag', schedule_interval=timedelta(hours=1), start_date=datetime(2015, 9, 18, 21) ) # 显然,这单独执行不起做用 transform_hive = SubDagOperator( subdag=hive_dag(main_dag.start_date, main_dag.schedule_interval), task_id='hive_transform', dag=main_dag, trigger_rule=TriggerRule.ALL_DONE )
from airflow.models import DAG from airflow.operators import HiveOperator # 对调度程序来讲,没有Dag的顶层模块就不起做用了 def hive_dag(start_date, schedule_interval): # you might like to make the name a parameter too dag = DAG('main_dag.hive_transform', # 注意这里的设置 schedule_interval=schedule_interval, start_date=start_date) hive_transform = HiveOperator(task_id='flatten_tables', hql=send_charge_hql, dag=dag) return dag
使用工厂类使得子DAG在保障调度器从开始运行时就可维护就更强。
另外一种模式是将主DAG和子DAG之间的共享设为默认参数,而后传递到工厂函数中去,(感谢 Maxime 的建议)。
即便子DAG做为其父DAG的一部分被触发子DAG也必须有一个调度,若是他们的调度是设成None,这个子DAG操做符将不会触发任何任务。
更糟糕的是,若是你对子DAG被禁用,接着你又去运行子DAG操做,并且还没运行完,那么之后你的子DAG就再也运行不起来了。
这将快速致使你的主DAG同时运行的任务数量一下就达到上限(默认一次写入是16个)而且这将致使调度器形同虚设。
这两个例子都是缘起子DAG操做符被当作了回填工做。这里能够看到这个
Airflow1.6的最大更新是引入了DagRun。如今,任务调度实例是由DagRun对象来建立的。
相应地,若是你想跑一个DAG而不是回填工做,你可能就须要用到DagRun。
你能够在代码里写一些airflow trigger_dag
命令,或者也能够经过DagRun页面来操做。
这个巨大的优点就是调度器的行为能够被很好的理解,就像它能够遍历DagRun同样,基于正在运行的DagRun来调度任务实例。
这个服务器如今能够向咱们显示每个DagRun的状态,而且将任务实例的状态与之关联。
新的模型也提供了一个控制调度器的方法。下一个DagRun会基于数据库里上一个DagRun的实例来调度。
除了服务峰值的例外以外,大多数实例是处于运行仍是结束状态都不会影响总体任务的运行。
这意味着若是你想返回一个在现有和历史上不连续集合的部分DagRun ,你能够简单删掉这个DagRun任务实例,而且设置DagRun的状态为正在运行。
按照咱们的经验,一个须要占用很长时间运行的调度器至少是个最终没有安排任务的CeleryExcecutor
。很不幸,咱们仍然不知道具体的缘由。不过庆幸的是,Airflow 内建了一个以num_runs形式做标记的权宜之计。它为调度器确认了许多迭代器来在它退出以前确保执行这个循环。咱们运行了10个迭代,Airbnb通常运行5个。注意到这里若是用LocalExecutor
将会引起一些问题。咱们如今使用chef
来重启executor;咱们正计划转移到supervisor
上来自动重启。
这个airflow.operators
包有一些魔法,它让咱们只能使用正确导入的操做符。这意味着若是你没有安装必要的依赖,你的操做符就会失效。
Airflow 是正在快速迭代中,并且不仅是Airbnb本身在作贡献。Airflow将会继续演化,而我也将写更多有关Airflow的技巧供你们学习使用。
若是你也对解决这些问题感兴趣,那就加入咱们吧!
Youtube: Airflow An open source platform to author and monitor data pipelines
Jonathan Dinu: Scalable Pipelines with Luigi or: I’ll have the Data Engineering, hold the Java!
Managing Containerized Data Pipeline Dependencies With Luigi
Petabyte-Scale Data Pipelines with Docker, Luigi and Elastic Spot Instances
原做者:Marcin Tustin 翻译:Harry Zhu
英文原文地址:Airflow: Tips, Tricks, and Pitfalls做为分享主义者(sharism),本人全部互联网发布的图文均听从CC版权,转载请保留做者信息并注明做者 Harry Zhu 的 FinanceR专栏:https://segmentfault.com/blog/harryprince,若是涉及源代码请注明GitHub地址:https://github.com/harryprince。微信号: harryzhustudio商业使用请联系做者。