今天咱们来介绍一下使用Airflow来调度 Data Lake Analytics(后面简称DLA)的任务执行。DLA做为一个数据湖的解决方案,
客户有天天周期性的调度一些任务从DLA查询数据回流到业务系统的需求。由于DLA兼容
MySQL的协议,所以全部支持MySQL的协议的调度框架都自然支持DLA,今天就来介绍一下使用业界著名的
Apache Airflow 来调度DLA的做业。python
大体步骤以下:mysql
购买ECS的详细流程这里就不一一罗列了,很是的简单,按照官方的购买流程能够分分钟完成,须要注意的几点这里说一下:web
同时记录下这个ECS的外网地址:sql
Airflow是一个Python写的软件,所以咱们是经过Python的Package Manager:pip来安装的,由于咱们要使用MySQL(而不是默认的SQLite) 来做为Airflow的元数据库, 所以咱们还要安装MySQL相关的包:数据库
# 安装Airflow自己 sudo pip install apache-airflow[mysql] # 安装MySQL相关的依赖 sudo apt-get install mysql-sever sudo apt-get install libmysqlclient-dev sudo pip install mysql-python
默认安装的MySQL有一个配置须要调整:apache
# /etc/mysql/mysql.conf.d/mysqld.cnf [mysqld] explicit_defaults_for_timestamp = 1
修改完成以后重启MySQL:浏览器
root@hello:~/airflow/dags# /etc/init.d/mysql restart [ ok ] Restarting mysql (via systemctl): mysql.service.
Airflow 安装完成以后会在你的本地用户目录下产生 ~/airflow
目录, 它里面的内容大体以下:安全
root@hello:~/airflow# ll total 4168 drwxr-xr-x 4 root root 4096 Oct 19 10:40 ./ drwx------ 10 root root 4096 Oct 19 10:40 ../ -rw-r--r-- 1 root root 11765 Oct 19 10:40 airflow.cfg drwxr-xr-x 2 root root 4096 Oct 18 19:32 dags/ drwxr-xr-x 6 root root 4096 Oct 18 17:52 logs/ -rw-r--r-- 1 root root 1509 Oct 18 11:38 unittests.cfg
其中 airflow.cfg
是 Airflow集群的配置文件,各类配置都是在这里改的,dags
目录保存咱们写的任务,后面咱们要写的任务都是放在这个文件夹里面。bash
前面咱们已经安装了 MySQL 数据库,如今咱们来建立一个数据库给Airflow来保存元数据:框架
$ mysql \ -uroot \ -proot \ -e "CREATE DATABASE airflow DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci; GRANT ALL PRIVILEGES ON airflow.* TO 'airflow'@'localhost' IDENTIFIED BY 'airflow'; FLUSH PRIVILEGES;" $ airflow initdb
到之类为止,元数据库就初始化好了。
Airflow自己是一个调度工具,任务的具体执行是交给一个叫作Executor的概念来作的,默认配置的executor是 SequentialExecutor
, 不适合生产环境使用,分布式的Executor有 Celery
和 Dask
, 可是笔者尝试过 Celery
以后发现坑有点多,这里推荐使用 Dask:
安装Dask:
pip install dask
运行 dask scheduler:
# default settings for a local cluster DASK_HOST=127.0.0.1 DASK_PORT=8786 dask-scheduler --host $DASK_HOST --port $DASK_PORT
运行 dask worker:
dask-worker $DASK_HOST:$DASK_PORT
由于使用的不是默认的配置:咱们选择了使用MySQL来做为元数据库,使用Dask来执行任务,所以须要对配置文件: ~/airflow/airflow.cfg
进行修改:
[core] # 使用Dask来运行任务 executor = DaskExecutor # 元数据库的链接方式 sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow [dask] # Dask的调度地址 cluster_address = 127.0.0.1:8786
到这里位置全部准备工做作完了,咱们能够启动Airflow了,咱们须要启动 Airflow 的三个模块:
webserver: 用来承载Airflow的管理控制页面:
airflow webserver -p 80 -D
scheduler: 任务调度器, 它会监控 ~/airflow/dags
下面咱们定义的任务文件的变化,这样咱们才能经过管理控制台及时看到咱们新开发的任务:
airflow scheduler -D
worker: 跟Dask进行交互真正执行任务的:
airflow worker -D
若是一切顺利的话,一个Airflow的集群就已经Ready了,能够在上面执行任务了。默认安装里面已经一些示例的任务, 浏览器里面输入 http://<你ECS的外网IP>
就能够看到Airflow的控制页面了:
咱们的目的是要用Airflow来调度DLA的任务,首先咱们要添加一个链接串, Airflow里面经过Connection来保存链接串的具体信息, 打开页面: http://<你ECS的外网IP>/admin/connection/
你会看到以下的页面:
咱们添加一下DLA的链接信息:
这里比较重要的两个点:
from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta from airflow.hooks.mysql_hook import MySqlHook default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'dlademo', default_args=default_args, schedule_interval=timedelta(1)) t1 = BashOperator( task_id='print_date', bash_command='echo hello-airflow', dag=dag) def step2(ds, **kargs): mysql_hook = MySqlHook(mysql_conn_id = 'dla_bj_slot3') for items in mysql_hook.get_records("select * from tpch_1x.nation_text_date limit 20"): print items t2 = PythonOperator( task_id='execute_dla_sql', provide_context=True, python_callable=step2, dag=dag) t2.set_upstream(t1)
这个任务里面定义了一个DAG, 一个DAG表示一个任务流程,一个流程里面会执行有依赖关系的多个任务,DAG的第一个参数是DAG的名字, 这里咱们叫 dlademo
,它的第三个参数是调度的周期,这里是天天调度一次: timedelta(1)
。
第一个任务是执行一个bash命令: echo hello-airflow
, 第二个任务则是咱们的SQL任务,这里写的比较简单,经过SQL把DLA数据库里面的一张表查询并打印出来,最后 t2.set_upstream(t1)
设置两个任务之间的依赖关系。
如今咱们打开 http://<你的ECS公网IP>/admin/airflow/tree?dag_id=dlademo
就能够看到这个任务的详情了:
在这个图中咱们能够看到咱们定义的两个任务,以及它们之间的依赖关系。Airflow的功能很是的丰富,更多的功能就留给你们本身去体验了。
Airflow是Apache的顶级项目,从项目的成熟度和功能的丰富度来讲都很不错,入门也很简单,很容易就能够搭建本身的集群,而且它有本身的Connection机制,使得咱们不须要把数据库的用户名密码暴露在任务脚本里面,使用DLA的同窗们能够试试Airflow来调度本身的任务。
本文为云栖社区原创内容,未经容许不得转载。