Airflow 是 Airbnb 开发的用于工做流管理的开源项目,自带 web UI 和调度。如今 Apache 下作孵化,地址是 github.com/apache/incu…python
Airflow 主要解决的问题能够参考 Airbnb 官方的博客: airflow-a-workflow-management-platform,简单来讲就是管理和调度各类离线定时 Job ,能够替代 crontab。mysql
当 cron job 规模达到数百上千时,其对人的要求将会很是高的,若是你的团队经历过这样的事情,应该能体会其中痛苦,因此使用相似 airflow 这样的工具代替 cron 来作定时任务将会极大提升工做效率。git
Airflow 在 pip 上已经改名为 apache-airflow
,下载最新版请使用后者 pip install apache-airflow
。github
Airflow 1.8 版本依赖的是 MySQL 5.6 以上,5.7 如下报 1071, u'Specified key was too long; max key length is 767 bytes
,若是你使用 MySQL 做为你的 airflow backend 请升级你的 MySQL 到最新版。web
MySQL 5.6 升级到 5.7 在使用 airflow 时会报 1146, u"Table 'performance_schema.session_variables' doesn't exist"
,执行 mysql_upgrade -u root -p --force
解决。sql
Airflow 的 mysql driver 使用的是 mysqlclient mysql://root:@127.0.0.1/sqlalchemy_lab?charset=utf8
,若是使用其余 driver 将报 syntax error。shell
Airflow 中最基本的两个概念是:DAG 和 task。DAG 的全称是 Directed Acyclic Graph 是全部你想执行的任务的集合,在这个集合中你定义了他们的依赖关系,一个 DAG 是指一个 DAG object,一个 DAG object 能够在 Python 脚本中配置完成。数据库
好比一个简单的的 DAG 包含三个 task:A、B、C,A 执行成功以后 B 才能执行,C 不依赖 A 和 B 便可执行。在这个简单的 DAG 中 A B C 能够是任何你想要执行的任务。apache
DAG 的定义使用 Python 完成的,其实就是一个 Python 文件,存放在 DAG 目录,Airflow 会动态的从这个目录构建 DAG object,每一个 DAG object 表明了一个 workflow,每一个 workflow 均可以包含任意个 task。bash
Airflow 是基于 Python 构建的,能够很容易用 pip 安装使用,pip install apache-airflow
,默认状况下 airflow 会在 ~/airflow
目录存放相关配置。
Airflow 提供了一些列命令来完成 airflow 的初始化工做来和它的正确使用。
# 在 airflow 目录初始化数据库和 airflow 配置 airflow initdb # 启动 airflow web airflow webserver # 开始调度 airflow scheduler复制代码
更详细的信息请参考文档 airflow.incubator.apache.org/
DAG 的配置用 Python 完成像这样:
""" Code that goes along with the Airflow tutorial located at: https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py """ from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@airflow.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(1)) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) t2.set_upstream(t1) # t2 依赖 t1 t3.set_upstream(t1)复制代码
DAG 脚本的目的只是定义 DAG 的配置,并不包含任何的数据处理,在这里 operator 就是 task。
一个 DAG 脚本是由 DAG object 的实例化和对应的 operator 组成的,除此以外咱们还能够定义默认的参数提供给每一个任务。
DAG 对象实例化能够根据咱们的须要提供对应的初始化参数,实例化 DAG 对象须要提供惟一的 dag_id:
dag = DAG( 'tutorial', default_args=default_args, schedule_interval=timedelta(1))复制代码
t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag)复制代码
task 对象的定义的就是 operator 的实例化,operator 有 task_id,用来区分任务,能够按照须要定制 bash_command,也能够传递参数等。
Task 之间是能相互创建依赖的,形如:
t2.set_upstream(t1) # This means that t2 will depend on t1 # running successfully to run # It is equivalent to # t1.set_downstream(t2) t3.set_upstream(t1) # all of this is equivalent to # dag.set_dependency('print_date', 'sleep') # dag.set_dependency('print_date', 'templated')复制代码
Airflow 会自动检测环形依赖以防止 task 没法工做的状况出现,更复杂的状况请参考文档。
和 airflow.cfg 同级目录下创建 dag 目录,用来存放第一个 DAG 脚本,而后执行 python tutorial.py
,若是没有报错说明 tutorial 创建成功了。
Airflow 提供了一些列的命令行用来查看 DAG 和 task
# print the list of active DAGs airflow list_dags # prints the list of tasks the "tutorial" dag_id airflow list_tasks tutorial # prints the hierarchy of tasks in the tutorial DAG airflow list_tasks tutorial --tree复制代码
执行任务很简单,指定 DAG 并去指定 task 和执行的日期
# command layout: command subcommand dag_id task_id date # testing print_date airflow test tutorial print_date 2015-06-01 # testing sleep airflow test tutorial sleep 2015-06-01复制代码
test 命令会执行任务而且输出到控制台,不会把任务的执行状态进行持久化
执行任务在 Airflow 中称之为 backfill,以 backfill 执行会真正开始追踪任务的执行状态和依赖,而且会记录日志
# optional, start a web server in debug mode in the background # airflow webserver --debug & # start your backfill on a date range airflow backfill tutorial -s 2015-06-01 -e 2015-06-07复制代码
Airflow 会默认加载任意它能导入到饿 DAG object,这就意味着只要是全局的 DAG object 均可以被导入,可是有时候为了让 DAG 不被导入,好比 SubDagOperator 就可使用 local 的做用域。
dag_1 = DAG('this_dag_will_be_discovered')
def my_function()
dag_2 = DAG('but_this_dag_will_not')
my_function()复制代码
DAG 的默认参数会应用到全部的 operator 中。
default_args=dict( start_date=datetime(2016, 1, 1), owner='Airflow') dag = DAG('my_dag', default_args=default_args) op = DummyOperator(task_id='dummy', dag=dag) print(op.owner) # Airflow复制代码
Airflow operator 很容易扩展,这也是 airflow 几乎支持任何形式 task 重要缘由。虽然 Airflow 支持不一样的 task 能够传输数据,可是若是你的两个 task 之间确实须要共享数据,最好的办法是把他们写在一块儿。
更多原创文章,关注 wecatch 公众号