前言
ETL是 Extract-Transform-Load的缩写,也就是抽取-转换-加载,在数据工做中是很是重要的部分。实际上,ETL就是一个对数据进行批处理的过程,一个ETL程序就是一个批处理脚本,执行时能将一堆数据转化成咱们须要的形式。 每一个接触过数据批处理的工程师,都走过ETL的流程,只是没有意识到而已。按照ETL过程的框架来从新认识数据批处理,有利于咱们更清晰地编写批处理脚本。 在单机范围内的数据量下,使用python的pandas包就能够很是方便地完成数据批处理工做。但当数据量达到1G以上时,pandas处理起来就有些力不从心了,到数据量达到1T以上,只能以分块的方式存储在分布式系统上时,pandas就无能为力了。在当前的技术背景下,典型的场景就是数据存储在Hive on HDFS上。要作ETL,就须要新的工具。Hadoop生态下,原生的工具是MapReduce计算模型,一般用Java编写,比较复杂,每次计算的中间结果也须要进行磁盘存取,很是费时。Spark是一个MPP架构的计算引擎,相比MapReduce,Spark 有DataFrame(又名 Schema RDD), 以表的形式来储存数据,不管是理解仍是操做,都更为简单,还支持Python,在许多须要使用函数做参数的场合,很是好用。html
本教程将介绍如何使用pyspark.sql模块,操做Spark DataFrame,从Hive中读取数据,通过一系列转换,最后存入Hive中。Spark的DataFrame和pandas的DataFrame的概念很类似,只是操做略有不一样,若是读者有pandas的使用经验,很容易就能快速上手。 教程只是为了方便读者快速入门,想要更好地开发Spark程序,仍然须要详细了解Spark的API接口,对python环境下,Hive的ETL来讲,研究pyspark.sql模块下的内容就足够了,能够参考官方文档。python
环境:Spark的API随版本不一样会有较大变化,目前比较流行的版本是1.6和2.2,本文使用Spark 1.6.0,语言为Python 2.7。默认数据都储存在Hive中,Hadoop集群带有yarn。sql
冒烟测试
学习一门语言或者软件的第一步,永远都是冒烟测试。最经典的冒烟测试就是输出Hello World。但对ETL来讲,一个打印"Hello World"的Spark程序是没什么用的。因此咱们这里讲讲如何打印一张表,这张表有一行数据,列名为t,值为"Hello World"。数据库
Spark的核心是SparkContext,它提供了Spark程序的运行环境。而SqlContext则是由SparkContext产生,提供了对数据库表的访问接口。由于这里数据库的环境是Hive,一般使用SqlContext的派生类HiveContext。在Spark提供的交互式环境中,会在启动时自动建立环境,生成SparkContext和HiveContext的实例。在pyspark的交互式环境中,SparkContext实例名为sc,HiveContext实例名为sqlContext。apache
交互式操做只在学习和调试时使用,实际工做中仍是要靠命令行执行脚本。在脚本中咱们就须要本身生成SparkContext和HiveContext了。基本操做代码以下:编程
# -*- coding: UTF-8 -*- from pyspark import SparkContext,HiveContext sc = SparkContext(appName="Hello World") # appName就是这个Spark程序的名字,在DEBUG时有用 hc = HiveContext(sc) df = hc.createDataFrame([["Hello World"]],['t']) # 建立一个DataFrame,第一个参数是数据,一个二维列表,第二个参数是表头,一个列表) first_cell = df.collect()[0][0] # 取第一个单元格的值 df.show() # 将表打印到屏幕上 print(first_cell)
将这段代码保存成文件hello.py,在终端中进入到该文件所在目录,输入命令spark-submit --master yarn hello.py
,而后就能够看到屏幕上输出以下,冒烟测试就算完成了。json
+-----------+ | t| +-----------+ |Hello World| +-----------+ Hello World
指令解释:spark-submit
就是spark的执行程序,master yarn
是spark-submit的参数,指定yarn做为计算调度的中心。最后hello.py就是咱们的ETL程序。api
Extract 抽取
ETL的第一步就是从数据源抽取数据,在Spark中就是从Hive里读取数据。缓存
Hive虽然实质上是个MapReduce接口的封装,但从上层抽象模型来看,有最基本的Schema、Table和Column,还有一套类SQL语法,能够说就是一个典型的关系数据库模型,所以在ETL过程当中,咱们彻底能够把Hive当成一个关系数据库来看待。网络
抽取的经常使用方法由两种,一种是直接读取Hive表,一种是经过Hive QL读取。 都须要以HiveContext的实例做为入口,结果返回一个Spark DataFrame,为了检查结果,可使用show方法查看DataFrame的数据。
假设咱们有一个名为test 的库,里面有一张表为t1,数据结构以下:
a | b | c |
---|---|---|
1 | 2 | 3 |
4 | 5 | 6 |
7 | 8 | 9 |
直接读取Hive表
HiveContext对读取操做提供统一的接口- DataFrameReader,HiveContext的实例的read属性就能够获取到这个接口。 固然,这个接口也能用来读取Hive的数据,read.table
就可获取到表的数据,代码以下
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext sc = SparkContext(appName="extract") hc = HiveContext(sc) # 生成HiveContext实例 t =hc.read.table("test.t1") t.show()
Hive QL读取
实质是让HiveContext将HiveQL传给Hive,让Hive执行后,将查询结果封装成Spark DataFrame返回。在处理过程比较简单,或者须要大量设置别名时,比较有用(由于Spark批量设置别名不太方便),但不推荐写太过复杂的Hive QL,由于Hive 执行Hive QL的实质是把Hive QL转成MapReduce执行,在计算效率上是不如Spark的。
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext sc = SparkContext(appName="extract") hc = HiveContext(sc) hc.sql("use test") t = hc.sql("select * from t1") t.show()
Load 加载
为何不先讲Trasform呢?由于Trasform的操做不少,先讲Load有助于快速上手完成一个初级的ETL程序。 相似于读取,HiveContext也提供了统一的写接口,名为DataFrameWriter.调用write属性便可获取。
写入的具体方式也不少,不过为了快速上手,只讲最关键的一些东西。
mode 写入方式
若是表已经存在,该如何操做。
- append 追加: 在尾部追加数据
- overwrite 覆写: 覆盖原有数据
- error 错误: 抛出异常
- ignore忽略 : 自动跳过
由于Hive on HDFS的关系,更新表最快的方式是全表覆写。对于须要更新原有的ETL,通常都是全表重写,只须要追加的,就能够用追加。
format 文件格式
在Hive on HDFS中,数据实质上是以文件的形式保存的。不一样的文件格式,在压缩容量、支持数据类型和查询速度上都有所不一样。textfile,avro,sequence,parquet,json等。目前我经常使用的格式是text和parquet,若是不设置文件格式,默认会使用Hive表的文件格式,若是Hive表不存在,则使用Hive表的默认格式textfile
加载新表
了解了上面的操做以后,咱们就能够开始写加载部分的代码了,只须要使用一个saveAsTable方法就好了,很是简单。
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext sc = SparkContext(appName="load") hc = HiveContext(sc) hc.sql("use test") t1 = hc.sql("select a as a1,b as b1,c as c1 from t1") t1.write.saveAsTable("test.t2",format="parquet",mode="overwrite") # 将t1的三个列更名后存成t2表 t2.read.table("test.t2") t2.show()
转换
转换是ETL过程当中最复杂的部分,去掉抽取和加载,剩下的全都是转换,包含的内容是很是多的,常见的有筛选、聚合、多列合并或计算,列赋值,根据不一样的须要有不一样的处理方法。因为Spark的转换操做较为啰嗦,因此推荐把部分简单的操做经过Hive QL的方式,在抽取步骤中交由Hive完成,这样有助于精简代码,提升可读性,下降维度难度。 下面就讲一讲Spark DataFrame 转换部分的基本概念和操做。
向量化编程
对于平常用Java来作数据批处理的工程师来讲,可能更习惯用for循环来逐条处理数据。但这样作在操做上是很不方便的,也不太利于阅读理解。在科学计算的语境下,数据老是以DataFrame的形式储存,也就是一张表。数据处理操做一般是对这张表的某些行或者某些列来进行处理。好比,“令t1表的a列中数字大于2的值的,所有都等于2”,或者“给t1表新加一常数列d,值为99”,这样的操做在向量化编程的语境下,就是一个调用API接口的操做,比for循环容易被理解。 能够类比pandas。在pandas中,也主要是经过向量化编程的方式来处理数据,虽然提供了迭代器的接口,能够一行行地读取数据,但通常以表做为修改对象的操做,主要是以API接口来完成,不推荐使用迭代器来作行级修改。一来操做不方便,二来运算速度未必能比优化过的API接口快。 Spark是分布式执行的,数据分散在各个机器上,背后有一套调度系统来控制数据计算负载。若是用for循环来处理,就是把负载都加在了执行脚本的机器上,通常来讲执行脚本的机器都是不储存数据的master,实际上这一过程就会致使须要把数据从slave传到master上,无谓地增长了网络负担。因此,在Spark脚本里,严禁使用原生的python for循环来处理SparkData Frame,即便要用,也应该使用Spark提供的API接口。
基本操做对象
在Spark DataFrame语境下,操做对象主要有三个:DataFrame,Row,Column。
- DataFrame: DataFrame就是一张表,有表头和若干行数据。这张表是一个有序、可迭代的集合。
- Row:DataFrame 集合中的元素就是Row。每一个Row储存一行数据,有相同的属性,这些属性和表头同名。DataFrame没有API接口能够直接获取到某个Row,但能够经过Colect方法获取到Row对象的list,再从中获取指定的Row。
- Column:Column与数据的实际结构无关,是一个操做上的概念。在实际的转换操做中,绝大多数都是对若干列进行数学运算、拼接、映射等等。取DataFrame中的一列,获得的就是一个Column对象。
事实上,最经常使用的主要是DataFrame和Column,Row不多用到。其中,DataFrame是核心,一个ETl过程,实质就是从抽取一个DataFrame开始,通过一系列的DataFrame变换,获得一个与目标一致的DataFrame,而后写入到目标数据库中去。Column在其中扮演着中间点的角色,好比取DataFrame的多个列,拼接合成一个新列,而后把这个新列加到本来的DataFrame中去。
基本操做分类
上面提到了,DataFrame是核心操做对象。其实在Spark中,真正意义上的核心操做对象是RDD,一个有序的,分布式储存在内存中的操做对象。DataFrame就是一个特殊的RDD——Schema RDD。全部的DataFrame操做,均可以归类为两种基本操做:转化(Transformation)和行动(action)。转换操做是不会触发Spark的实际计算的,即便转换过程当中出现了错误,在执行到这一行代码时,也不会报错。直到执行了行动操做以后,才会真正让Spark执行计算,这时候才会抛出在转化过程当中出现的错误。这在DEBU时,尤为是交互式编程环境下,可能会致使问题代码定位错误,须要特别注意。
- Transform:典型的转换操做有读(read),筛选(filter)、拼接(union)等等,只要这个过程只改变DataFrame的形态,而不须要实际取出DataFrame的数据进行计算,都属于转换。理论上来讲,ETL过程当中的Transfrom过程,主干流程只会有转换操做,不会有Action操做。
- Action:典型的动做操做有计数(count),打印表(show),写(write)等,这些操做都须要真正地取出数据,就会触发Spark的计算。
筛选
filter(cond):筛选出知足条件cond的行。cond能够填字符串,格式和SQL中的where子句同样,也能够填Bool类型的Column对象,好比 df['a']>1。
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext sc = SparkContext(appName="transform") hc = HiveContext(sc) df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c']) t1 = df.filter("a > 1 and c < 9") t1.show() # 输出 4,5,6 这一行 t2 = df.filter( (df['b']<5) & (df['c']<8)) # 可使用&或|对两个bool列进行逻辑运算,但必需要用圆括号括起,限定运算顺序。 t2.show() # 输出 1,2,3 这一行
赋值,加列
withColumn(col_name,col):col_name是列名,col是列值,必须是一个Column对象。 赋值和加列操做是相同的,col_name存在,就是赋值,不然就是加列。
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext sc = SparkContext(appName="transform") hc = HiveContext(sc) df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c']) t1 = df.withColumn("c",df['c']+1) t1.show() # c的值全都增长了1 t2 = df.withColumn("d",df['a']+1) t2.show() # 增长了新一列d
删除列
drop(col_name):col_name为列名。该方法会返回一个删除col_name列的DataFrame
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext sc = SparkContext(appName="transform") hc = HiveContext(sc) df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c']) t = df.drop("c") t.show() # 只有 a,b两列
给列取名
alias(col_name):一般和select配合使用,请看下面的例子
选取列
select(*cols):cols为列名或列对象。 赋值和删除操做,每次只能改加减一列数据,若是想要批量地改变,尤为是调整列顺序的时候,就很是有用了。在ETL中,当须要计算的列不少时,一般就是逐个计算出不一样的列对象,最后用select把它们排好顺序。
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext sc = SparkContext(appName="transform") hc = HiveContext(sc) df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c']) a1 = (df['a']+1).alias("a1") # 新增一个列对象,取名为a1 t = df.select("a",a1,"b") # 若是用字符串,必须是df中存在的列名。 t.show() # 显示a, a_1,b 三列
###生成Column对象 在赋值的例子里,Column对象是由原DataFrame的Column通过简单的数学运算或逻辑运算获得的,但若是咱们想生成一些更特殊的Column呢?好比常数列或者本身定义复杂的规则。 Spark提供了pyspark.sql.functions,含有丰富的接口,其中就有咱们须要的东西。篇幅有限,只能介绍一些经常使用的,更多的仍是须要去看官方文档。
####常数列 lit(value):value数必须是必须为pyspark.sql.types
支持的类型,好比int,double,string,datetime等
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext from pyspark.sql.functions import lit from datetime import datetime sc = SparkContext(appName="transform") hc = HiveContext(sc) df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c']) t = df.withColumn("constant",lit(datetime(2018,1,1,2,3,4,999))) t.show(truncate=False)
取整
round、floor:和Python的标准函数用法一致,只是数字换成列名
条件分支
when(cond,value):符合cond就取value,value能够是常数也能够是一个列对象,连续能够接when构成多分支 otherwise(value):接在when后使用,全部不知足when的行都会取value,若不接这一项,则取Null。
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext from pyspark.sql.functions import when sc = SparkContext(appName="transform") hc = HiveContext(sc) df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c']) t = df.withColumn("when",when(df['a']==1,"a=1").when(df['b']==5,df['b']%5).otherwise("other")) t.show() # 生成when列,值分别为 a=1,0,other
日期和时间
current_date():当前日期,返回一个date列 current_timestamp():当前时刻,返回一个timestamp列 date_add(start, days):日期正向偏移,start为开始时间,必须是Column或字符串对象,指向一个date或timestamp列,days为偏移天数。 date_sub(start, days):相似date_add,可是负向偏移。 date_format(date, format): 日期格式化,date为要格式化的时间,必须是Column或字符串对象,指向一个date或timestamp列,days为偏移天数,format为格式化的字符串,具体参考Hive QL的date_format函数。 datediff(end, start):计算天数差
####自定义规则 udf(f, returnType=StringType): 自定义处理函数,f为自定义的处理函数,returnType为f的返回类型,必须为pyspark.sql.types
支持的类型,若是不填,会默认自动转化为String类型。udf会返回一个函数,能够当作列函数使用。 这在处理逻辑很是复杂时颇有用。好比对身份证号进行校验计算,而后取出有效的身份证号的第1,4,10位,这个复杂流程很难用Spark提供的API拼接起来,只能本身写。 做为教程,就不写太复杂的函数了。 自定义函数f的传入参数为列的值。
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType sc = SparkContext(appName="transform") hc = HiveContext(sc) df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c']) def f(a,b,c): r=0 if a==1: r=1 elif b==5: r=2 return r col_match = udf(f,IntegerType()) t = df.withColumn("col_match",col_match("a","b","c")) t.show() # 生成col_match列,值分别为 a=1,2,0
排序
Spark支持多字段,升降序排序。 可使用orderBy和sort,由于操做比较简单也符合直觉,这里略去例子,详情能够看文档。
聚合
Spark 支持直接聚合,也支持分组聚合。聚合的表达方式很是多,这里仅选取经常使用的。
直接聚合
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext from pyspark.sql.functions import sum sc = SparkContext(appName="transform") hc = HiveContext(sc) df = hc.createDataFrame([[1,1,3],[4,1,2],[7,2,9]],['a','b','c']) t = df.agg(sum("a")) print(t.collect()[0][0]) # 打印 12
分组聚合
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext from pyspark.sql.functions import sum,max sc = SparkContext(appName="transform") hc = HiveContext(sc) df = hc.createDataFrame([[1,1,3],[4,1,2],[7,2,9]],['a','b','c']) t = df.groupBy("b").agg(sum("a"),max("c")) t.show()
输出:
+---+------+------+ | b|sum(a)|max(c)| +---+------+------+ | 1| 5| 3| | 2| 7| 9| +---+------+------+
窗口函数
有一类分析需求,是须要分组计算,但保持数据的粒度不变的。好比经过成绩表,按班计算的学生的成绩排名,加一列到本来的成绩表中,整个表的每一行仍然表示一名学生。这种分析需求称为窗口分析,好比说每一个班,就是一个窗口,在这个窗口中,计算出班级成绩排名,再并到原表中。 这种分析,首先要建立一个窗口,而后再使用窗口函数来进行计算。Spark提供了丰富的窗口函数,能够知足各种分析需求。
建立窗口
使用pyspark.sql.Window
对象能够建立一个窗口,最简单的窗口能够什么都没有,但通常不推荐这样作。可使用partitionBy进行分组,使用orderBy进行排序,好比
from pyspark.sql import Window window = Window.partitionBy("a").orderBy("b")
窗口函数使用示例
rank():根据窗口中partitionBy进行分组,以orderBy排序
# -*- coding: UTF-8 -*- from pyspark import SparkContext, HiveContext from pyspark.sql.functions import rank,desc from pyspark.sql import Window sc = SparkContext(appName="transform") hc = HiveContext(sc) score = [ ['a','a_1',90], ['a','a_2',80], ['a','a_3',85], ['b','b_1',70], ['b','b_2',80], ['b','b_3',75], ['c','c_1',90] ] df = hc.createDataFrame(score,['class','student','score']) class_window = Window.partitionBy("class").orderBy(desc("score")) #降序排列 class_rank = rank().over(class_window) class_row_number = row_number().over(class_window) #窗口函数(xxx).over(window),就是通常的用法 t = df.withColumn("rank",class_rank) t.show()
按班级,分数从高到低,生成排名
+-----+-------+-----+----+ |class|student|score|rank| +-----+-------+-----+----+ | a| a_1| 90| 1| | a| a_3| 85| 2| | a| a_2| 80| 3| | b| b_2| 80| 1| | b| b_3| 75| 2| | b| b_1| 70| 3| | c| c_1| 90| 1| +-----+-------+-----+----+
缓存
在实际业务中,经常会碰到这种需求:须要把一个计算结果,稍加不一样的改动,分别存为不一样的表。好比,ETL中为了保证出错后能重试,就会要求除了保存转换计算结果以外,还要备份一份到备份表里。备份表一般是按天分区的,每一个区存当天的转换计算结果。而应用表则不分区,只存最新一天的计算结果。 在完成这一需求时,若是是先保存应用表,而后再添加分区列后添加到分区表,就会触发两次完整的计算流程,时间翻倍。而若是有缓存,就不同了。咱们能够在计算到最终结果时,缓存一下这张表,而后把它保存为应用表,再添加分区列保存为分区表。那么,实际计算中,到缓存操做为止的计算,只会触发一次,实际消耗时间为1次到最终结果的计算+1次加分区列,远小于2次计算的时间。当某些中间结果须要反复使用时,缓存能够给咱们带来极大的效率提高。固然,相应地,内存也会占用更多,仍是应该根据具体状况决定如何取舍。缓存的方法很简单,只要让DataFrame对象执行cache方法就好了:df.cache()