Airflow使用入门指南

Airflow能作什么

关注公众号, 查看更多 http://mp.weixin.qq.com/s/xPjXMc_6ssHt16J07BC7jAphp

Airflow是一个工做流分配管理系统,经过有向非循环图的方式管理任务流程,设置任务依赖关系和时间调度。css

Airflow独立于咱们要运行的任务,只须要把任务的名字和运行方式提供给Airflow做为一个task就能够。html

安装和使用

最简单安装

在Linux终端运行以下命令 (须要已安装好python2.xpip):python

pip install airflow pip install "airflow[crypto, password]"
  • 1
  • 2

安装成功以后,执行下面三步,就可使用了。默认是使用的SequentialExecutor, 只能顺次执行任务。mysql

  • 初始化数据库 airflow initdb [必须的步骤]
  • 启动web服务器 airflow webserver -p 8080 [方即可视化管理dag]
  • 启动任务 airflow scheduler [scheduler启动后,DAG目录下的dags就会根据设定的时间定时启动]
  • 此外咱们还能够直接测试单个DAG,如测试文章末尾的DAG airflow test ct1 print_date 2016-05-14

最新版本的Airflow可从https://github.com/apache/incubator-airflow下载得到,解压缩按照安装python包的方式安装。linux

配置 mysql以启用LocalExecutorCeleryExecutor

  • 安装mysql数据库支持git

    yum install mysql mysql-server pip install airflow[mysql]
    • 1
    • 2
  • 设置mysql根用户的密码github

    ct@server:~/airflow: mysql -uroot #以root身份登陆mysql,默认无密码
    mysql> SET PASSWORD=PASSWORD("passwd"); mysql> FLUSH PRIVILEGES; # 注意sql语句末尾的分号 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 新建用户和数据库web

    # 新建名字为<airflow>的数据库
    
    mysql> CREATE DATABASE airflow; # 新建用户`ct`,密码为`152108`, 该用户对数据库`airflow`有彻底操做权限 mysql> GRANT all privileges on airflow.* TO 'ct'@'localhost' IDENTIFIED BY '152108'; mysql> FLUSH PRIVILEGES; 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 修改airflow配置文件支持mysqlredis

    • airflow.cfg 文件一般在~/airflow目录下
    • 更改数据库连接

      sql_alchemy_conn = mysql://ct:152108@localhost/airflow 对应字段解释以下: dialect+driver://username:password@host:port/database
      • 1
      • 2
    • 初始化数据库 airflow initdb

    • 初始化数据库成功后,可进入mysql查看新生成的数据表。

      ct@server:~/airflow: mysql -uct -p152108
      mysql> USE airflow;
      mysql> SHOW TABLES; +-------------------+ | Tables_in_airflow | +-------------------+ | alembic_version | | chart | | connection | | dag | | dag_pickle | | dag_run | | import_error | | job | | known_event | | known_event_type | | log | | sla_miss | | slot_pool | | task_instance | | users | | variable | | xcom | +-------------------+ 17 rows in set (0.00 sec)
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
  • centos7中使用mariadb取代了mysql, 但全部命令的执行相同

    yum install mariadb mariadb-server
    systemctl start mariadb ==> 启动mariadb
    systemctl enable mariadb ==> 开机自启动
    mysql_secure_installation ==> 设置 root密码等相关
    mysql -uroot -p123456 ==> 测试登陆!
    • 1
    • 2
    • 3
    • 4
    • 5

配置LocalExecutor

注:做为测试使用,此步能够跳过, 最后的生产环境用的是CeleryExecutor; 若CeleryExecutor配置不方便,也可以使用LocalExecutor。

前面数据库已经配置好了,因此若是想使用LocalExecutor就只须要修改airflow配置文件就能够了。airflow.cfg 文件一般在~/airflow目录下,打开更改executor为 executor = LocalExecutor即完成了配置。

把文后TASK部分的dag文件拷贝几个到~/airflow/dags目录下,顺次执行下面的命令,而后打开网址http://127.0.0.1:8080就能够实时侦测任务动态了:

ct@server:~/airflow: airflow initdb` (若前面执行过,就跳过) ct@server:~/airflow: airflow webserver --debug & ct@server:~/airflow: airflow scheduler
  • 1
  • 2
  • 3

配置CeleryExecutor (rabbitmq支持)

  • 安装airflow的celery和rabbitmq组件

    pip install airflow[celery] pip install airflow[rabbitmq]
    • 1
    • 2
  • 安装erlang和rabbitmq

    • 若是能直接使用yumapt-get安装则万事大吉。
    • 我使用的CentOS6则不能,须要以下一番折腾,
    # (Centos6,[REF](http://www.rabbitmq.com/install-rpm.html)) wget https://packages.erlang-solutions.com/erlang/esl-erlang/FLAVOUR_1_general/esl-erlang_18.3-1~centos~6_amd64.rpm yum install esl-erlang_18.3-1~centos~6_amd64.rpm wget https://github.com/jasonmcintosh/esl-erlang-compat/releases/download/1.1.1/esl-erlang-compat-18.1-1.noarch.rpm yum install esl-erlang-compat-18.1-1.noarch.rpm wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm yum install rabbitmq-server-3.6.1-1.noarch.rpm
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  • 配置rabbitmq

    • 启动rabbitmq: rabbitmq-server -detached
    • 开机启动rabbitmq: chkconfig rabbitmq-server on
    • 配置rabbitmq (REF)

      rabbitmqctl add_user ct 152108 rabbitmqctl add_vhost ct_airflow rabbitmqctl set_user_tags ct airflow rabbitmqctl set_permissions -p ct_airflow ct ".*" ".*" ".*" rabbitmq-plugins enable rabbitmq_management # no usage
      • 1
      • 2
      • 3
      • 4
      • 5
  • 修改airflow配置文件支持Celery

    • airflow.cfg 文件一般在~/airflow目录下

    • 更改executor为 executor = CeleryExecutor

    • 更改broker_url

      broker_url = amqp://ct:152108@localhost:5672/ct_airflow Format explanation: transport://userid:password@hostname:port/virtual_host
      • 1
      • 2
    • 更改celery_result_backend,

      # 能够与broker_url相同 celery_result_backend = amqp://ct:152108@localhost:5672/ct_airflow Format explanation: transport://userid:password@hostname:port/virtual_host
      • 1
      • 2
      • 3
      • 4
      • 5
  • 测试

    • 启动服务器:airflow webserver --debug
    • 启动celery worker (不能用根用户):airflow worker
    • 启动scheduler: airflow scheduler
    • 提示: 
      • 测试过程当中注意观察运行上面3个命令的3个窗口输出的日志
      • 当遇到不符合常理的状况时考虑清空 airflow backend的数据库, 可以使用airflow resetdb清空。
      • 删除dag文件后,webserver中可能还会存在相应信息,这时须要重启webserver并刷新网页。
      • 关闭webserver:ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}

一个脚本控制airflow系统的启动和重启

#!/bin/bash #set -x #set -e set -u usage() { cat <<EOF ${txtcyn} Usage: $0 options${txtrst} ${bldblu}Function${txtrst}: This script is used to start or restart webserver service. ${txtbld}OPTIONS${txtrst}: -S Start airflow system [${bldred}Default FALSE${txtrst}] -s Restart airflow server only [${bldred}Default FALSE${txtrst}] -a Restart all airflow programs including webserver, worker and scheduler. [${bldred}Default FALSE${txtrst}] EOF } start_all= server_only= all= while getopts "hs:S:a:" OPTION do case $OPTION in h) usage exit 1 ;; S) start_all=$OPTARG ;; s) server_only=$OPTARG ;; a) all=$OPTARG ;; ?) usage exit 1 ;; esac done if [ -z "$server_only" ] && [ -z "$all" ] && [ -z "${start_all}" ]; then usage exit 1 fi if [ "$server_only" == "TRUE" ]; then ps -ef | grep -Ei '(airflow-webserver)' | grep master | \ awk '{print $2}' | xargs -i kill {} cd ~/airflow/ nohup airflow webserver >webserver.log 2>&1 & fi if [ "$all" == "TRUE" ]; then ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {} cd ~/airflow/ nohup airflow webserver >>webserver.log 2>&1 & nohup airflow worker >>worker.log 2>&1 & nohup airflow scheduler >>scheduler.log 2>&1 & fi if [ "${start_all}" == "TRUE" ]; then cd ~/airflow/ nohup airflow webserver >>webserver.log 2>&1 & nohup airflow worker >>worker.log 2>&1 & nohup airflow scheduler >>scheduler.log 2>&1 & fi 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

airflow.cfg 其它配置

  • dags_folder

    dags_folder目录支持子目录和软链接,所以不一样的dag能够分门别类的存储起来。

  • 设置邮件发送服务

    smtp_host = smtp.163.com smtp_starttls = True smtp_ssl = False smtp_user = username@163.com smtp_port = 25 smtp_password = userpasswd smtp_mail_from = username@163.com
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 多用户登陆设置 (彷佛只有CeleryExecutor支持)

    • 修改airflow.cfg中的下面3行配置
    authenticate = True auth_backend = airflow.contrib.auth.backends.password_auth filter_by_owner = True
    • 1
    • 2
    • 3
    • 增长一个用户(在airflow所在服务器的python下运行)
    import airflow from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser user = PasswordUser(models.User()) user.username = 'ehbio' user.email = 'mail@ehbio.com' user.password = 'ehbio' session = settings.Session() session.add(user) session.commit() session.close() exit()
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

TASK

  • 参数解释

    • depends_on_past

      Airflow assumes idempotent tasks that operate on immutable data 
      chunks. It also assumes that all task instance (each task for each 
      schedule) needs to run.

      If your tasks need to be executed sequentially, you need to 
      tell Airflow: use the depends_on_past=True flag on the tasks 
      that require sequential execution.)

      若是在TASK本该运行却没有运行时,或者设置的interval@once时,推荐使用depends_on_past=False。我在运行dag时,有时会出现,明明上游任务已经运行结束,下游任务却没有启动,整个dag就卡住了。这时设置depends_on_past=False能够解决这类问题。

    • timestamp in format like 2016-01-01T00:03:00

    • Task中调用的命令出错后须要在网站Graph view中点击run手动重启。 
      为了方便任务修改后的顺利运行,有个折衷的方法是:

      • 设置 email_on_retry: True
      • 设置较长的retry_delay,方便在收到邮件后,能有时间作出处理
      • 而后再修改成较短的retry_delay,方便快速启动
  • 写完task DAG后,必定记得先检测下有无语法错误 python dag.py

  • 测试文件1:ct1.py

    from airflow import DAG from airflow.operators import BashOperator, MySqlOperator from datetime import datetime, timedelta one_min_ago = datetime.combine(datetime.today() - timedelta(minutes=1), datetime.min.time()) default_args = { 'owner': 'airflow', #为了测试方便,起始时间通常为当前时间减去schedule_interval 'start_date': datatime(2016, 5, 29, 8, 30), 'email': ['chentong_biology@163.com'], 'email_on_failure': False, 'email_on_retry': False, 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), #'queue': 'bash_queue', #'pool': 'backfill', #'priority_weight': 10, #'end_date': datetime(2016, 5, 29, 11, 30), } # DAG id 'ct1'必须在airflow中是unique的, 通常与文件名相同 # 多个用户时可加用户名作标记 dag = DAG('ct1', default_args=default_args, schedule_interval="@once") t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) #cmd = "/home/test/test.bash " 注意末尾的空格 t2 = BashOperator( task_id='echo', bash_command='echo "test" ', retries=3, dag=dag) templated_command = """ {% for i in range(2) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7) }}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': "Parameter I passed in"}, dag=dag) # This means that t2 will depend on t1 running successfully to run # It is equivalent to t1.set_downstream(t2) t2.set_upstream(t1) t3.set_upstream(t1) # all of this is equivalent to # dag.set_dependency('print_date', 'sleep') # dag.set_dependency('print_date', 'templated') 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
  • 测试文件2: ct2.py

    from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta one_min_ago = datetime.combine(datetime.today() - timedelta(minutes=1), datetime.min.time()) default_args = { 'owner': 'airflow', 'depends_on_past': True, 'start_date': one_min_ago, 'email': ['chentong_biology@163.com'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 5, 'retry_delay': timedelta(hours=30), #'queue': 'bash_queue', #'pool': 'backfill', #'priority_weight': 10, #'end_date': datetime(2016, 5, 29, 11, 30), } dag = DAG('ct2', default_args=default_args, schedule_interval="@once") t1 = BashOperator( task_id='run1', bash_command='(cd /home/ct/test; bash run1.sh -f ct_t1) ', dag=dag) t2 = BashOperator( task_id='run2', bash_command='(cd /home/ct/test; bash run2.sh -f ct_t1) ', dag=dag) t2.set_upstream(t1) 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
  • run1.sh

    #!/bin/bash #set -x set -e set -u usage() { cat <<EOF ${txtcyn} Usage: $0 options${txtrst} ${bldblu}Function${txtrst}: This script is used to do ********************. ${txtbld}OPTIONS${txtrst}: -f Data file ${bldred}[NECESSARY]${txtrst} -z Is there a header[${bldred}Default TRUE${txtrst}] EOF } file= header='TRUE' while getopts "hf:z:" OPTION do case $OPTION in h) usage exit 1 ;; f) file=$OPTARG ;; z) header=$OPTARG ;; ?) usage exit 1 ;; esac done if [ -z $file ]; then usage exit 1 fi cat <<END >$file A B C D E F G END sleep 20s
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
  • run2.sh

    #!/bin/bash #set -x set -e set -u usage() { cat <<EOF ${txtcyn} Usage: $0 options${txtrst} ${bldblu}Function${txtrst}: This script is used to do ********************. ${txtbld}OPTIONS${txtrst}: -f Data file ${bldred}[NECESSARY]${txtrst} EOF } file= header='TRUE' while getopts "hf:z:" OPTION do case $OPTION in h) usage exit 1 ;; f) file=$OPTARG ;; ?) usage exit 1 ;; esac done if [ -z $file ]; then usage exit 1 fi awk 'BEGIN{OFS=FS="\t"}{print $0, "53"}' $file >${file}.out

其它问题

  • The DagRun object has room for a conf parameter that gets exposed 
    in the “context” (templates, operators, …). That is the place 
    where you would associate parameters to a specific run. For now this 
    is only possible in the context of an externally triggered DAG run. 
    The way the TriggerDagRunOperator works, you can fill in the conf 
    param during the execution of the callable that you pass to the 
    operator.

    If you are looking to change the shape of your DAG through parameters, 
    we recommend doing that using “singleton” DAGs (using a “@once” 
    schedule_interval), meaning that you would write a 
    Python program that generates multiple dag_ids, one of each run, 
    probably based on metadata stored in a config file or elsewhere.

    The idea is that if you use parameters to alter the shape of your 
    DAG, you break some of the assumptions around continuity of the 
    schedule. Things like visualizing the tree view or how to perform a 
    backfill becomes unclear and mushy. So if the shape of your DAG 
    changes radically based on parameters, we consider those to be 
    different DAGs, and you generate each one in your pipeline file.

  • 彻底删掉某个DAG的信息

    set @dag_id = 'BAD_DAG'; delete from airflow.xcom where dag_id = @dag_id; delete from airflow.task_instance where dag_id = @dag_id; delete from airflow.sla_miss where dag_id = @dag_id; delete from airflow.log where dag_id = @dag_id; delete from airflow.job where dag_id = @dag_id; delete from airflow.dag_run where dag_id = @dag_id; delete from airflow.dag where dag_id = @dag_id;
  • supervisord自动管理进程
  • [program:airflow_webserver]
    command=/usr/local/bin/python2.7 /usr/local/bin/airflow webserver user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-webserver.err.log stdout_logfile=/var/log/airflow-webserver.out.log [program:airflow_worker] command=/usr/local/bin/python2.7 /usr/local/bin/airflow worker user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-worker.err.log stdout_logfile=/var/log/airflow-worker.out.log [program:airflow_scheduler] command=/usr/local/bin/python2.7 /usr/local/bin/airflow scheduler user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-scheduler.err.log stdout_logfile=/var/log/airflow-scheduler.out.log
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
  • 在特定状况下,修改DAG后,为了不当前日期以前任务的运行,可使用backfill填补特定时间段的任务

    • airflow backfill -s START -e END --mark_success DAG_ID

端口转发

  • 以前的配置都是在内网服务器进行的,但内网服务器只开放了SSH端口22,所以 
    我尝试在另一台电脑上使用相同的配置,而后设置端口转发,把外网服务器 
    的rabbitmq的5672端口映射到内网服务器的对应端口,而后启动airflow链接 

    • ssh -v -4 -NF -R 5672:127.0.0.1:5672 aliyun
    • 上一条命令表示的格式为

      ssh -R <local port>:<remote host>:<remote port> <SSH hostname>

      local port表示hostname的port

      Remote connections from LOCALHOST:5672 forwarded to local address 127.0.0.1:5672

    • -v: 在测试时打开

    • -4: 出现错误”bind: Cannot assign requested address”时,force the 
      ssh client to use ipv4
    • 若出现”Warning: remote port forwarding failed for listen port 52698” 
      ,关掉其它的ssh tunnel。

不一样机器使用airflow

  • 在外网服务器(用作任务分发服务器)配置与内网服务器相同的airflow模块
  • 使用前述的端口转发以便外网服务器绕过内网服务器的防火墙访问rabbitmq 5672端口。
  • 在外网服务器启动 airflow webserver scheduler, 在内网服务器启动 
    airflow worker 发现任务执行状态丢失。继续学习Celery,以解决此问题。

安装redis (最后没用到)

任务未按预期运行可能的缘由

  • 检查 start_date 和end_date是否在合适的时间范围内
  • 检查 airflow workerairflow scheduler和 
    airflow webserver --debug的输出,有没有某个任务运行异常
  • 检查airflow配置路径中logs文件夹下的日志输出
  • 若以上都没有问题,则考虑数据冲突,解决方式包括清空数据库或着给当前 
    dag一个新的dag_id

References

  1. https://pythonhosted.org/airflow/
  2. http://kintoki.farbox.com/post/ji-chu-zhi-shi/airflow
  3. http://www.jianshu.com/p/59d69981658a
  4. http://bytepawn.com/luigi-airflow-pinball.html
  5. https://github.com/airbnb/airflow
  6. https://media.readthedocs.org/pdf/airflow/latest/airflow.pdf
  7. http://www.csdn.net/article/1970-01-01/2825690
  8. http://www.cnblogs.com/harrychinese/p/airflow.html
  9. http://www.javashuo.com/article/p-fpqshddf-z.html

声明

文章原写于http://blog.genesino.com/2016/05/airflow/。转载请注明出处。

相关文章
相关标签/搜索