最近在Prettyyes一直想创建起很是专业的data pipeline系统,而后没有不少时间,这几个礼拜正好app上线,有时间开始创建本身的 data pipeline,可以很好的作天天的数据导入,数据收集,以及数据分析。前端
ETL 是经常使用的数据处理,在之前的公司里,ETL 差很少是数据处理的基础,要求很是稳定,容错率高,并且可以很好的监控。ETL的全称是 Extract,Transform,Load, 通常状况下是将乱七八糟的数据进行预处理,而后放到储存空间上。能够是SQL的也能够是NOSQL的,还能够直接存成file的模式。python
一开始个人设计思路是,用几个cron job和celery来handle全部的处理,而后将咱们的log文件存在hdfs,还有一些数据存在mysql,大概天天跑一次。核心是可以scale,稳定,容错,roll back。咱们的data warehouse就放在云上,就简单处理了。mysql
有了本身的ETL系统我以为就很安心了,之后可以作数据处理和机器学习方面就相对方便一些。git
一开始我设计的思路和Uber一开始的ETL很像,由于我以为很方便。可是我发觉一个很严重的问题,我一我的忙不过来。首先,要至少写个前端UI来监控cron job,可是市面上的都不好。其次,容错的autorestart写起来很费劲,多是我本身没有找到一个好的处理方法。最后部署的时候至关麻烦,若是要写好这些东西,我一我的的话要至少一个月的时间,可能还不是特别robust。在尝试写了2两天的一些碎片处理的脚本以后我发觉时间拖了实在过久了。github
airbnb是我很喜欢的公司,他们有不少开源的工具,airflow我以为是最实用的表明。airflow 是能进行数据pipeline的管理,甚至是能够当作更高级的cron job 来使用。如今通常的大厂都说本身的数据处理是ETL,美其名曰 data pipeline,可能跟google倡导的有关。airbnb的airflow是用python写的,它能进行工做流的调度,提供更可靠的流程,并且它还有自带的UI(多是跟airbnb设计主导有关)。话很少说,先放两张截图:web
airflow里最重要的一个概念是DAG。sql
DAG是directed asyclic graph,在不少机器学习里有应用,也就是所谓的有向非循环。可是在airflow里你能够看作是一个小的工程,小的流程,由于每一个小的工程里能够有不少“有向”的task,最终达到某种目的。在官网中的介绍里说dag的特色:bash
Scheduled: each job should run at a certain scheduled intervalapp
Mission critical: if some of the jobs aren’t running, we are in trouble机器学习
Evolving: as the company and the data team matures, so does the data processing
Heterogenous: the stack for modern analytics is changing quickly, and most companies run multiple systems that need to be glued together
YEAH! It's awesome, right? After reading all of these, I found it's perfectly fit Prettyyes.
安装airflow超级简单,使用pip就能够,如今airflow的版本是1.6.1,可是有个小的bug,这个以后会告诉你们如何修改。
pip install airflow
这里有个坑,由于airflow涉及到很到数据处理的包,因此会安装pandas和numpy(这个Data Scientist应该都很熟悉)可是国内pip install 安装很是慢,用douban的源也有一些小的问题。个人解决方案是,直接先用豆瓣的源安装numpy 和 pandas,而后再安装airflow,自动化部署的时候能够在requirements.txt 里调整顺序就好了
摘自官方网站
# airflow needs a home, ~/airflow is the default, # but you can lay foundation somewhere else if you prefer # (optional) export AIRFLOW_HOME=~/airflow # install from pypi using pip pip install airflow # initialize the database airflow initdb # start the web server, default port is 8080 airflow webserver -p 8080
而后你就能够上web ui查看全部的dags,来监控你的进程。
通常第一次运行以后,airflow会在默认文件夹下生成airflow文件夹,而后你只要在里面新建一个文件dag就能够了。我这边部署在阿里云上的文件tree大概是这个样子的。
如下是我本身写的咱们公司prettyyes里须要天天处理log的其中一个小的dag:
from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta import ConfigParser config = ConfigParser.ConfigParser() config.read('/etc/conf.ini') WORK_DIR = config.get('dir_conf', 'work_dir') OUTPUT_DIR = config.get('dir_conf', 'log_output') PYTHON_ENV = config.get('dir_conf', 'python_env') default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.today() - timedelta(days=1), 'retries': 2, 'retry_delay': timedelta(minutes=15), } dag = DAG('daily_process', default_args=default_args, schedule_interval=timedelta(days=1)) templated_command = "echo 'single' | {python_env}/python {work_dir}/mr/LogMR.py"\ .format(python_env=PYTHON_ENV, work_dir=WORK_DIR) + " --start_date {{ ds }}" task = BashOperator( task_id='process_log', bash_command=templated_command, dag=dag )
写好以后,只要将这个dag放入以前创建好的dag文件夹,而后运行:
python <dag_file>
来确保没有语法错误。在测试里你能够看到个人
schedule_interval=timedelta(days=1)
这样咱们的数据处理的任务就至关于天天跑一次。更重要的是,airflow还提供处理bash处理的接口外还有hadoop的不少接口。能够为之后链接hadoop系统提供便利。不少具体的功能能够看官方文档。
airflow 1.6.1有一个网站的小的bug,安装成功后,点击dag里的log会出现如下页面:
这个只要将
airflow/www/utils.py
文件替换成最新的airflow github上的utils.py文件就行,具体的问题在这个:
fixes datetime issue when persisting logs
airflow自己没有deamon模式,因此直接用supervisord就ok了,咱们只要写4行代码。
[program:airflow_web] command=/home/kimi/env/athena/bin/airflow webserver -p 8080 [program:airflow_scheduler] command=/home/kimi/env/athena/bin/airflow scheduler
我以为airflow特别适合小的团队,他的功能强大,并且真的部署方便。和hadoop,mrjob又能够无缝链接,对咱们的业务有很大的提高。