airflow是Airbnb公司于2014年开始开发的一个工做流调度器.不一样于其它调度器使用XML或者text文件方式定义工做流,airflow经过python文件做流,用户能够经过代码彻底自定义本身的工做流。airflow的主要功能:工做流定义、任务调度、任务依赖、变量、池、分布式执行任务等。html
web server是airflow的显示与管理工具,在页面中能看到任务及执行状况,还能配置变量、池等
node
调度器用来监控任务执行时间并提交任务给worker执行。在airflow中scheduler作为独立的服务来启动。python
工做进程,负责任务的的执行。worker进程会建立SequentialExecutor、LocalExecutor、CeleryExecutor之一来执行任务。在airflow中做为独立服务启动。mysql
celery flower用来监控celery executor的信息。
url:http://host:5555
web
主dag
即有向无图,至关于azkban中的project。dag中定义的了任务类型、任务依赖、调度周期等.dag由task组中,task定义了任务的类型、任务脚本等,dag定义task之间的依赖。airflow中的任务表现为一个个的dag.此外还有subdag,在dag中嵌套一个dag(具体做用需进一步研究)。
sql
subdag
至关于azkban中project 中的flow.将dag中的某些task合并到一个子dag中,将这个子dag作为一个执行单元。
apache
使用subdag时要注意:
1)by convention, a SubDAG’s dag_id should be prefixed by its parent and a dot. As in 'parent.child' 。
引用子dag时要加上父dag前缀,parent.child编程
2)share arguments between the main DAG and the SubDAG by passing arguments to the SubDAG operator (as demonstrated above)
经过向子dag的operator传入参数来实如今父dag和子dag信息共享。json
3)SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything
子dag必需要设置scheduler,若是没有设置或者设置为@once,则子dag直接返回执行成功,可是不会执行任务操做api
4)clearing a SubDagOperator also clears the state of the tasks within
清除子dag(的状态?)也会清除其中的task状态
5)marking success on a SubDagOperator does not affect the state of the tasks within
将子dag的状态标记为success不会影响所包含的task的状态
6)refrain from using depends_on_past=True! in tasks within the SubDAG as this can be confusing
不要在dag中使用depends_on_past=True!
7)it is possible to specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot
使用SequentialExecutor来运行子dag,其它的executor执行子dag会出问题
task定义任务的类型、任务内容、任务所依赖的dag等。dag中每一个task都要有不一样的task_id.
dag = DAG('testFile', default_args=default_args) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( #任务类型是bash task_id='echoDate', #任务id bash_command='echo date > /home/datefile', #任务命令 dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3,[]() dag=dag) t2.set_upstream(t1) #定义任务信赖,任务2依赖于任务1
任务之间经过task.set_upstream\task.set_downstream来设置依赖,也能够用位运算:
t1>>t2<<t3 表示t2依赖于t1和t3.不建议用该种方式。
操做器,定义任务该以哪一种方式执行。airflow有多种operator,如BashOperator、DummyOperator、MySqlOperator、HiveOperator以及社区贡献的operator等,其中BaseOperator是全部operator的基础operator。
BaseOperator | 基础operator,设置baseoperator会影响全部的operator |
BashOperator | executes a bash command |
DummyOperator | 空操做 |
PythonOperator | calls an arbitrary Python function |
EmailOperator | sends an email |
HTTPOperator | sends an HTTP request |
SqlOperator | executes a SQL command |
Sensor | waits for a certain time, file, database row, S3 key, etc… |
t1 = BashOperator( #任务类型是bash task_id='echoDate', #任务id bash_command='echo date > /home/datefile', #任务命令 dag=dag)
scheduler监控dag的状态,启动知足条件的dag,并将任务提交给具体的executor执行。dag经过scheduler来设置执行周期。
1.什么时候执行
注意:当使用schedule_interval
来调度一个dag,假设执行周期为1天,startdate=2016-01-01,则会在2016-01-01T23:59后执行这个任务。 airflow只会在执行周期的结尾执行任务。
2.设置dag执行周期
在dag中设置schedule_interval
来定义调度周期。该参数能够接收cron 表达式
和datetime.timedelta
对象,另外airflow还预置了一些调度周期。
preset | Run once a year at midnight of January 1 | cron |
---|---|---|
None |
Don’t schedule, use for exclusively “externally triggered” DAGs | |
@once |
Schedule once and only once | |
@hourly |
Run once an hour at the beginning of the hour | 0 * * * * |
@daily |
Run once a day at midnight | 0 0 * * * |
@weekly |
Run once a week at midnight on Sunday morning | 0 0 * * 0 |
@monthly |
Run once a month at midnight of the first day of the month | 0 0 1 * * |
@yearly |
Run once a year at midnight of January 1 | 0 0 1 1 * |
3.backfill和catchup
backfill:填充任务,手动重跑过去失败的任务(指定日期)。
catchup:若是历史任务出错,调度器尝试按调度顺序重跑历史任务(而不是按照当前时间执行当前任务)。能够在dag中设置dag.catchup = False
或者参数文件中设置catchup_by_default = False
来禁用这个功能。
4.External Triggers
我还没整明白(等我翻下书再告诉你啊~)
worker指工做节点,相似于yarn中的nodemanager。work负责启动机器上的executor来执行任务。使用celeryExecutor后能够在多个机器上部署worker服务。
执行任务的进程,dag中的task由executor来执行。有三个executor:SequentialExecutor(顺序执行)、LocalExecutor(本地执行)、CeleryExecutor(远程执行)。
dag中被实例化的任务。
池用来控制同个pool的task并行度。
aggregate_db_message_job = BashOperator( task_id='aggregate_db_message_job', execution_timeout=timedelta(hours=3), pool='ep_data_pipeline_db_msg_agg', bash_command=aggregate_db_message_job_cmd, dag=dag) aggregate_db_message_job.set_upstream(wait_for_empty_queue)
上例中,aggregate_db_message_job设置了pool,若是pool的最大并行度为1,当其它任务也设置该池时,若是aggregate_db_message_job在运行,则其它任务必须等待。
定义对airflow以外的链接,如对mysql hive hdfs等工具的链接。airflow中预置了一些链接类型,如mysql hive hdfs postgrey等。
Hooks 是对外的connection接口,经过自定义hooks实现connection中不支持的链接。
airflow中的队列严格来讲不叫Queues,叫"lebal"更为合适。在operator中,能够设置queue参数如queue=spark,而后在启动worker时:airflow worker -q spark,那么该worker只会执行spark任务。至关于节点标签。、
默认状况下,dag与dag之间 、task与task之间信息是没法共享的。若是想在dag、task之间实现信息共享,要使用XComs,经过设置在一个dag(task)中设置XComs参数在另外一个中读取来实现信息共享。
在airflow中能够设置一些变量,在dag和task中能够引用这些变量:
from airflow.models import Variable foo = Variable.get("foo") bar = Variable.get("bar", deserialize_json=True)
设置变量:
此外,airflow预置了一些变量:
具体参考:http://airflow.incubator.apache.org/code.html#macros
dag中的任务能够选择分支! BranchPythonOperator容许用户经过函数返回下一步要执行的task的id,从而根据条件选择执行的分支。azkaban没有该功能。注意,BranchPythonOperator下级task是被"selected"或者"skipped"的分支。
SLAs指在一段时间内应该彻底的操做,好比在一个小时内dag应该执行成功,若是达不目标能够执行其它任务好比发邮件发短信等。
Trigger Rules定义了某个task在何种状况下执行。默认状况下,某个task是否执行,依赖于其父task(直接上游任务)所有执行成功。airflow容许建立更复杂的依赖。经过设置operator中的trigger_rule参数来控制:
all_success
: (default) all parents have succeeded 父task全failed
all_failed
: all parents are in a failed
or upstream_failed
state 父task全failed
或者upstream_failed
状态all_done
: all parents are done with their execution 父task全执行过,无论success or failedone_failed
: fires as soon as at least one parent has failed, it does not wait for all parents to be done 当父task中有一个是failed
状态时执行,没必要等到全部的父task都执行one_success
: fires as soon as at least one parent succeeds, it does not wait for all parents to be done 当父task中有一个是success
状态时执行,没必要等到全部的父task都执行dummy
: dependencies are just for show, trigger at will 无条件执行该参数能够和depends_on_past
结合使用,当设置为true时,若是上一次没有执行成功,这一次不管如何都不会执行。
airflow中内置了一些宏,能够在代码中引用。
通用宏:
airflow特定的宏:
airflow.macros.ds_add(ds, days) |
airflow.macros.ds_format(ds, input_format, output_format) |
airflow.macros.random() → x in the interval [0, 1) |
airflow.macros.hive.closest_ds_partition(table, ds, before=True, schema='default', metastore_conn_id='metastore_default') |
airflow.macros.hive.max_partition(table, schema='default', field=None, filter=None, metastore_conn_id='metastore_default') |
详细说明:
http://airflow.incubator.apache.org/code.html#macros
airflow支持jinja2语法。Jinja2是基于python的模板引擎,功能比较相似于于PHP的smarty,J2ee的Freemarker和velocity。关于jinja2:
http://10.32.1.149:7180/cmf/login
这个太复杂,待近一步研究
airflow命令的语法结构:
airflow 子命令 [参数1][参数2]….
如 airflow test example_dag print_date 2017-05-06
子命令
子命令包括:
resetdb | Burn down and rebuild the metadata database |
render | Render a task instance’s template(s) |
variables | CRUD operations on variables |
connections | List/Add/Delete connections |
pause | Pause a DAG |
task_failed_deps | Returns the unmet dependencies for a task instance from the perspective of the scheduler |
version | Show the version |
trigger_dag | Trigger a DAG run |
initdb | Initialize the metadata database |
test | Test a task instance. This will run a task without checking for dependencies or recording it’s state in the database. |
unpause | Resume a paused DAG |
dag_state | Get the status of a dag run |
run | Run a single task instance |
list_tasks | List the tasks within a DAG |
backfill | Run subsections of a DAG for a specified date range |
list_dags | List all the DAGs |
kerberos | Start a kerberos ticket renewer |
worker | Start a Celery worker node |
webserver | Start a Airflow webserver instance |
flower | Start a Celery Flower |
scheduler | Start a scheduler instance |
task_state | Get the status of a task instance |
pool | CRUD operations on pools |
serve_logs | Serve logs generate by worker |
clear | Clear a set of task instance, as if they never ran |
upgradedb | Upgrade the metadata database to latest version |
使用:
[bqadm@sitbqbm1~]$ airflow webserver -p 8080
详细命令参考:
http://airflow.incubator.apache.org/cli.html#
airflow的api分为Operator、Macros、Modles、Hooks、Executors几个部分,主要关注Operator、Modles这两部分
详细API文档:
http://airflow.incubator.apache.org/code.html
1.建立一个pthon文件testBashOperator.py:
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'yangxw', 'depends_on_past': False, 'start_date': datetime(2017, 5, 9), 'email': ['xiaowen.yang@bqjr.cn'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG('printDate', default_args=default_args,schedule_interval='*/1 * * * *') # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='datefile', bash_command='date > /home/bqadm/datefile', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) t2.set_upstream(t1)
2.编译该文件
把文件放到$AIRFLOW_HIME/dags下,而后执行:
[bqadm@bqdpsit1 dags]$ python testFile.py [2017-05-18 10:04:17,422] {__init__.py:57} INFO - Using executor CeleryExecutor
这样dag就被建立了
3.启动dag
在web上,点击最左边按钮,将off切换为on
这样dag就启动了。dag启后,会根据自生的调度状况执行。上列中的dag每分钟执行一次,将时间写入/home/bqadm/datafile里。
若是执行出错还会发邮件通知:
airflow内置了16个示例dag,经过学习这些dag的源码可掌握operator、调度、任务依赖的知识,能快速入门。
airflow是功能强大而且极其灵活的pipeline工具,经过python脚本能控制ETL中各个环节,其缺点是使用比较复杂,须要必定的编程水平。此外,当一个dag中有数十个task时,python文件将变的很是长致使维护不便
。airflow在国内并未普遍使用,面临必定的技术风险
。