原先在业务开发部门作后端开发;因为业务须要,从每一个部门抽一名开发人员去作数据报表的开发,接到任务,心里告诉本身好好作,数据平台用于公司高层及运营人员方便查看数据的平台,在新的项目组能学到新的python和任务调度工具airflow,对本身会有很大的提高。python
前期准备git
测试环境搭建airflow的UI界面github
airflow的最佳实战web
airflow的官方文档(全英文)apache
etl技术文档编程
etl-example最佳实战(代码)后端
airflow中文文档api
airflow是一个可编程、调度和监控的工做流平台。bash
Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.服务器
基于有向无环图(DAG),airflow能够定义一组有依赖的任务,按照依赖依次执行。airflow提供了丰富的命令行工具用于系统管控,而其web管理界面一样也能够方便的管控调度任务,而且对任务运行状态进行实时监控,方便了系统的运维和管理。
Airflow 是 Airbnb 开源的一个用 Python 编写的工做流管理平台,自带 web UI 和调度,目前在Apache下作孵化。
在实际项目中,咱们常常遇到如下场景:
airflow经过DAG配置文件,能轻松定义各类任务及任务之间的依赖关系和调度执行,并一个可视化的操做web界面。
自带web管理界面,易上手;
业务代码和调度代码彻底解耦;
经过python代码定义子任务,并支持各类Operate操做器,灵活性大,能知足用户的各类需求;
python开源项目,支持扩展operate等插件,便于二次开发;
相似的工具备akzban,quart等;
概要:DAG(Directed Acyclic Graph)是有向无环图,也称为有向无循环图。在Airflow中,一个DAG定义了一个完整的做业。同一个DAG中的全部Task拥有相同的调度时间。
参数: dag_id: 惟一识别DAG,方便往后管理
default_args: 默认参数,若是当前DAG实例的做业没有配置相应参数,则采用DAG实例的default_args中的相应参数
schedule_interval: 配置DAG的执行周期,可采用crontab语法
概要:Task为DAG中具体的做业任务,依赖于DAG,也就是必须存在于某个DAG中。Task在DAG中能够配置依赖关系(固然也能够配置跨DAG依赖,可是并不推荐。跨DAG依赖会致使DAG图的直观性下降,并给依赖管理带来麻烦)。
参数:
dag: 传递一个DAG实例,以使当前做业属于相应DAG
task_id: 给任务一个标识符(名字),方便往后管理
owner: 任务的拥有者,方便往后管理
start_date: 任务的开始时间,即任务将在这个时间点以后开始调度
start_date:在配置中,它是做业开始调度时间。而在谈论执行情况时,它是调度开始时间。
schedule_interval:调度执行周期。
execution_date: 执行时间。在Airflow中称为执行时间,但其实它并非真实的执行时间。
因此,第一次调度时间:在做业中配置的start_date,且知足schedule_interval的时间点。记录的execution_date为做业中配置的start_date的第一个知足schedule_interval的时间。
[举个例子]
假设咱们配置了一个做业的start_date为2019年6月2日,配置的schedule_interval为* 00 12 * * *,那么第一次执行的时间将是2019年6月3日12点。所以execution_date并非如期字面说的表示执行时间,真正的执行时间是execution_date所显示的时间的下一个知足schedule_interval的时间点。
# coding: utf-8
# DAG 对象; 咱们将须要它来实例化一个 DAG
from airflow import DAG
import pendulum
# Operators; 咱们须要利用这个对象去执行流程!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
# 定义默认参数
default_args = {
'owner': 'airflow', # 拥有者名称
'depends_on_past': False, # 是否依赖上一个本身的执行状态
'start_date': datetime(2019,7,24,16,45), # 第一次开始执行的时间,为格林威治时间,为了方便测试,通常设置为当前时间减去执行周期
'retries': 3, # 失败重试次数
'retry_delay': timedelta(seconds=5) # 失败重试间隔
}
# 定义DAG,实例化
dag = DAG(
dag_id='hello_world_dag', # dag_id
default_args=default_args, # 指定默认参数
# schedule_interval="00, *, *, *, *" # 执行周期,依次是分,时,天,月,年,此处表示每一个整点执行
schedule_interval=timedelta(minutes=1) # 执行周期,表示每分钟执行一次
)
# 定义要执行的Python函数1
def hello_world_1():
current_time = str(datetime.today())
with open('/root/tmp/hello_world_1.txt', 'a') as f:
f.write('%s\n' % current_time)
assert 1 == 1 # 能够在函数中使用assert断言来判断执行是否正常,也能够直接抛出异常
# 定义要执行的Python函数2
def hello_world_2():
current_time = str(datetime.today())
with open('/root/tmp/hello_world_2.txt', 'a') as f:
f.write('%s\n' % current_time)
————————————————
# 定义要执行的task 1
t1 = PythonOperator(
task_id='hello_world_1', # task_id
python_callable=hello_world_1, # 指定要执行的函数
dag=dag, # 指定归属的dag
retries=2, # 重写失败重试次数,若是不写,则默认使用dag类中指定的default_args中的设置
)
# 定义要执行的task 2
t2 = PythonOperator(
task_id='hello_world_2', # task_id
python_callable=hello_world_2, # 指定要执行的函数
dag=dag, # 指定归属的dag
)
t2.set_upstream(t1) # t2依赖于t1;等价于 t1.set_downstream(t2);同时等价于 dag.set_dependency('hello_world_1', 'hello_world_2')
# 表示t2这个任务只有在t1这个任务执行成功时才执行,
# 或者
#t1 >> t2
复制代码
咱们能够看到,整个 DAG 的配置就是一份完整的 Python 代码,在代码中实例化 DAG,实例化适合的 Operator,并经过 set_downstream 等方法配置上下游依赖关系。
The Airflow UI makes it easy to monitor and troubleshoot your data pipelines. Here’s a quick overview of some of the features and visualizations you can find in the Airflow UI.
DAGs View
List of the DAGs in your environment, and a set of shortcuts to useful pages. You can see exactly how many tasks succeeded, failed, or are currently running at a glance.
Tree View
A tree representation of the DAG that spans across time. If a pipeline is late, you can quickly see where the different steps are and identify the blocking ones.
Graph View
The graph view is perhaps the most comprehensive. Visualize your DAG’s dependencies and their current status for a specific run.
Variable View
The variable view allows you to list, create, edit or delete the key-value pair of a variable used during jobs. Value of a variable will be hidden if the key contains any words in (‘password’, ‘secret’, ‘passwd’, ‘authorization’, ‘api_key’, ‘apikey’, ‘access_token’) by default, but can be configured to show in clear-text.
Gantt Chart
The Gantt chart lets you analyse task duration and overlap. You can quickly identify bottlenecks and where the bulk of the time is spent for specific DAG runs.
Task Duration
The duration of your different tasks over the past N runs. This view lets you find outliers and quickly understand where the time is spent in your DAG over many runs.
Code View
Transparency is everything. While the code for your pipeline is in source control, this is a quick way to get to the code that generates the DAG and provide yet more context.
完成数据从原始库迁移到目标库。
007不写就出局的主旨:一个普通人经过持续写做实现成长的平台 平台营造:陪伴+监督互助氛围,“自律+他律践行环境”,让更多战友得到新生。
全国各地的朋友能够参加,每个月按期会组织线下活动,全国各地的职场人也都在《007不写就出局》社群中,欢迎各行各业的伙伴交流。
我为何写做
战友为何加入007写做社群
想加入《007不写就出局》,扫小企鹅二维码。
天天早上喜欢听的商业思惟课程,分享给你们
敏娘娘: 由中国创业天后级导师 敏娘娘 亲自带队的 《世界首富思惟学院》即将拉开帷幕。该系统为中国首家研究-全球世界首富思惟的大学。
敏娘娘老师是微世管理咨询创始人,是中国企业家教练,新商业生态架构师,为世界500强服务:中国银行,中国移动,比亚迪。每一年培训️万名企业家,一对一指导企业10年突破️️️家。
1.表达清晰、明确、有过程描述和结果描述。
2.不要下达模糊指令,下达指令以后要给出方法。
复制代码