Airflow教程-使用Airflow实现ETL调度

1、Airflow是什么

airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,如今在Apache Software Foundation 孵化。airflow 将workflow编排为由tasks组成的DAGs(有向无环图),调度器在一组workers上按照指定的依赖关系执行tasks。同时,airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操做,而且airflow提供了监控和报警系统。python

2、Airflow的核心概念

  1. DAGs:即有向无环图(Directed Acyclic Graph),将全部须要运行的tasks按照依赖关系组织起来,描述的是全部tasks执行的顺序。
  2. Operators:airflow内置了不少operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令...同时,用户能够自定义Operator,这给用户提供了极大的便利性。能够理解为用户须要的一个操做,是Airflow提供的类
  3. Tasks:Task 是 Operator的一个实例
  4. Task Instance:因为Task会被重复调度,每次task的运行就是不一样的task instance了。Task instance 有本身的状态,包括"running", "success", "failed", "skipped", "up for retry"等。
  5. Task Relationships:DAGs中的不一样Tasks之间能够有依赖关系

3、使用AirFlow完整天级的任务调度

说了这么多抽象的概念,估计看官仍是云里雾里,下面就直接举个例子来讲明吧。mysql

##1. 安装airflow Airflow能够约等于只支持linux和mac,Windows上极其难装,笔者放弃了. 安装也很简单,如下代码来自官方文档,使用了Python的pip管理:linux

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install apache-airflow

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080

# start the scheduler
airflow scheduler

# visit localhost:8080 in the browser and enable the example dag in the home page

安装好了之后访问localhost:8080便可访问ui界面web

2. 基本配置

  1. 须要建立~/airflow/dags目录,这个目录是默认的存放DAG的地方,想修改的话能够修改~/airflow/airflow.cfg文件
  2. 修改airflow的数据库 airflow会使用sqlite做为默认的数据库,此状况下airflow进行调度的任务都只能单个的执行.在调度任务量不大的状况下,可使用sqlite做为backend.若是想scale out的话,须要修改配置文件,官方推荐使用mysql或者postgresql做为backend数据库.

3. 使用PostgresOperator执行SQL完成ETL任务

经过搜集信息,了解到PostgresOperator能执行SQL,而且还支持传参数.能解决大多数ETL任务中的传参问题.传参使用的是Python的Jinjia模块.sql

  1. 建立DAG 首先建立一个test_param_sql.py文件.内容以下:
from datetime import datetime, timedelta
import airflow
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import Variable

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 26), #start_date会决定这个DAG从哪天开始生效
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
# Variable是Airflow提供的用户自定义变量的功能,在UI界面的Admin -> Variable下能够进行增删改查,此处笔者定义了sql_path做为存放sql文件的地方
tmpl_search_path = Variable.get("sql_path")  

dag = airflow.DAG(
    'test_param_sql',
    schedule_interval=timedelta(days=1), # schedule_interval是调度的频率
    template_searchpath=tmpl_search_path, 
    default_args=args,
    max_active_runs=1)

test_param_sql = PostgresOperator(
    task_id='test_param_sql',
    postgres_conn_id='postgres_default',
    sql='param_sql.sql',
    dag=dag,
    params={'period': '201905'},
    pool='pricing_pool')

match_finish = DummyOperator(
    task_id='match_finish',
    dag=dag
)

test_param_sql >> match_finish
  1. 准备要执行的Sql文件 建立test_sql.sql文件. SQL文件会被Jinjia解析,可使用一些宏来实现时间的替换 例

{{ ds }} 会被转换为当天的 YYYY-MM-DD 格式的日期数据库

{{ ds_nodash }} 会被转换为当天的 YYYYMMDD的格式的日期apache

在本例里则是经过{{params.period}} 取到了 DAG上传入的参数,bash

insert into test.param_sql_test
select * from test.dm_input_loan_info_d
where period = {{params.period}};
  1. 总体的目录结构以下 dags/ test_param_sql.py sql/ test_sql.sql函数

  2. 测试dag是否正确 可使用 airflow test dag_id task_id date 进行测试,测试会执行Operator,Operator指定的行为会进行调度. 可是不会将执行的行为记录到Airflow的数据库里工具

  3. 发布 把文件放到~/airflow/dags目录下,sql文件不要放在dags目录下,能够找其余地方(好比同级目录),配置好上文说到的Variable,能找到便可.笔者的理解是,airflow会扫描dags目录下的内容,并尝试解析成dag,若是有不能成功解析的内容,ui界面上会有错误提示,致使dag显示不出来等问题.

其余有用的信息

  1. 如何在dag.py里引入其余的本地python模块 须要把本地的python模块放到一个zip文件里,例如: my_dag1.py my_dag2.py package1/init.py package1/functions.py 而后把这个zip文件放到dags目录下,才能被正确解析

  2. pooling能够控制任务的并行度,若是给DAG指定了一个不存在的pooling,任务会一直处于scheduled的状态,不继续进行

相关文章
相关标签/搜索