airflow已经成为了任务编排系统的事实标准,使用和terraform同样的代码及配置的任务开发方式。html
airflow使用python做为开发语言,很是简单易学、容易上手。python
完整案例代码已上传github:github.com/neatlife/my…git
可使用docker一键启动github
git clone https://github.com/puckel/docker-airflow
cd docker-airflow
docker-compose -f docker-compose-LocalExecutor.yml up -d
复制代码
访问ip:8080查看效果 web
能够看到airflow已经可用了docker
这个dag文件就是用来定义任务和任务之间的前后、依赖关系的。shell
参考:airflow.apache.org/tutorial.ht…apache
其中几个比较重要的参数以下:vim
参数名 | 做用 |
---|---|
start_date | 任务开始时间 |
end_date | 任务结束时间,不填表明永远 |
retries | 任务执行失败重试次数 |
retry_delay | 重试的间隔时间 |
代码实现以下 vim mydag.py
api
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
"start_date": datetime(2019, 7, 10),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG("mydag", default_args=default_args, schedule_interval=timedelta(1))
复制代码
上面的"schedule_interval=timedelta(1)"表明一天触发一次这个任务,也可使用crontab的语法,参考:airflow.apache.org/scheduler.h…
这个新的dag文件须要使用python执行这个脚本导入,可使用下面的命令进行导入
docker-compose -f docker-compose-LocalExecutor.yml exec webserver bash
python dags/mydag.py
复制代码
刷新这个web界面,就能够看到这个新加的mydag任务了
这里定义三个任务,分别echo出1, 2, 3,关系以下
代码实现以下
t1 = BashOperator(task_id="echo1", bash_command="echo 1", dag=dag)
t2 = BashOperator(task_id="echo2", bash_command="echo 2", dag=dag)
t3 = BashOperator(task_id="echo3", bash_command="echo 3", dag=dag)
复制代码
上面使任务使用airflow执行的bash,airflow还能够执行不少输入,完整列表参考:airflow.apache.org/_api/airflo…
t2.set_upstream(t1)
t3.set_upstream(t1)
复制代码
首次修改这个dag文件,airflow回自动加载,点击"Refresh"按钮能够手动加载这个配置文件
操做效果以下
能够看到这个echo1, echo2, echo3的依赖关系定义成功了
先启用这个任务,而后点击执行按钮,操做效果以下
也能够只启用这个任务,而后等这个airflow按照设定的时间,自动触发,手动触发能够快速验证
这个任务的执行状态用不一样的颜色进行表示
单击 "Browser" > "Task Instance"能够查看全部被执行的任务列表
单击具体的task id就能够进入到task执行状况详情页面了,单击详情页的"Log"就能够看到任务的日志输出了,操做效果以下:
若是使用了这个airflow的模板功能,能够在任务执行详情页面,查看这个被执行任务的模板渲染结果,操做效果以下:
须要先删除dag定义文件,好比 rm dags/mydag.py,而后在web界面上删除
这个airflow依赖很是的多,由于没到1.0版本,存在一些bug,因此建议使用docker启动,主要的dag任务编排功能使用是ok的,其它功能能够先了解下
airflow可使用模板功能生成脚本,参考:airflow.apache.org/tutorial.ht…
jinjia模板语法参考:jinja.pocoo.org/docs/dev/
airflow默认以utc时区运行,若是须要计算正确的时间,须要把时间进行时区转换,核心代码以下
#将本地时间转换为utc时间,再设置为start_date
tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
复制代码