Spark SQL编程指南(Python)

前言
 
Spark SQL容许咱们在Spark环境中使用SQL或者Hive SQL执行关系型查询。它的核心是一个特殊类型的Spark RDD:SchemaRDD。
 
SchemaRDD相似于传统关系型数据库的一张表,由两部分组成:
 
Rows:数据行对象
Schema:数据行模式:列名、列数据类型、列能否为空等
 
Schema能够经过四种方式被建立:
 
(1)Existing RDD
(2)Parquet File
(3)JSON Dataset
(4)By running Hive SQL
 
考虑到Parquet File还没有在平台开始使用,所以暂时仅讨论其它三项。
 
注意: Spark SQL is currently an alpha component.
 
SQLContext(HiveContext)
 
Spark SQL的入口点为SQLContext,SQLContext的初始化依赖于SparkContext,代码示例以下:
 
 
SQLContext目前仅仅使用一个简单的SQL解析器,功能有限,并且目前不少的数据仓库是创建在Hive之上的,所以Spark为咱们提供了另外一个选择:HiveContext。
 
HiveContext使用相对比较完善的HiveQL解析器,可使用HiveUDF,能够访问现有Hive数据仓库中的数据,且适配SQLContext的全部数据源,推荐使用。
 
HiveContext初始化过程类似,以下:
 
 
数据源
 
Spark SQL(SchemaRDD)的数据源能够简单理解为就是普通的Spark RDD,全部能够应用于Spark RDD的操做都可以应用于SchemaRDD;此外,SchemaRDD还能够“注册”为一张临时表,而后经过SQL(Hive SQL)分析其中的数据(实际就是Spark RDD关联的数据)。
 
SchemaRDD
 
SchemaRDD的数据源实际就是Spark RDD,可是Spark RDD与SchemaRDD仍是有区别的,Spark RDD相对于SchemaRDD而言缺失“Schema”,所以Spark提供两种方式完成Spark RDD到SchemaRDD的转换,实际就是为Spark RDD应用“Schema”。
 
(1)使用反射推断Schema
 
若是一个Spark RDD的数据类型为Row,则Spark能够经过反射推断出该Spark RDD的Schema,并将其转换为一个SchemaRDD。
 
Spark使用反射推断某个Spark RDD的Schema时,仅仅使用这个Spark RDD的第一条数据(Row),所以必须保证这条数据的完整性。
 
Row的构建过程须要一个键值对列表,
 
Row(id = 1, name = "a", age = 28)
 
这个键值对列表已经明肯定义出数据行的列名、列值,推断仅做用于列类型。
 
代码示例
 
 
处理逻辑能够分为如下几步:
 
a. 建立一个字符串列表datas,用于模拟数据源;
b. 对datas执行“parallelize”操做,将其转换为Spark RDD source,数据类型为字符串;
c. 将Spark RDD source中的每一条数据进行切片(split)后转换为Spark RDD rows,数据类型为Row;
 
至此Spark RDD rows已经具有转换为SchemaRDD的条件:它的数据类型为Row。
 
d. 使用HiveContext推断rows的Schema,将其转换为SchemaRDD people;
 
经过people.printSchema(),咱们能够查看推断Schema的结果:
 
 
e. 将SchemaRDD people注册为一张临时表“people”;
 
f. 执行SQL查询语句:select * from people where age > 28 and age < 30,并将查询结果保存至Spark RDD results,经过results.printSchema()的输出结果:
 
 
能够看出Spark RDD results实际也是SchemaRDD,所以咱们能够继续将其注册为一张临时表;
 
g. 将SchemaRDD results注册为一张临时表“people”,并执行SQL查询语句:select name from people2,并将查询结果保存至Spark RDD results2,经过f咱们能够知道results2实际也是SchemaRDD,results2.printSchema()的输出结果:
 
 
SchemaRDD results2的数据类型为Row,受到查询语句(select name)的影响,其仅包含一列数据,列名为name。
 
h. SchemaRDD也能够执行全部Spark RDD的操做,这里咱们经过map将results2中的name值转换为大写形式,最终的输出结果:
 
 
上述示例说明如下三点:
 
a. 咱们能够将一个数据类型为Row的Spark RDD转换为一个SchemaRDD;
 
b. SchemaRDD能够注册为一张临时表执行SQL查询语句,其查询结果也是一个SchemaRDD;
 
c. SchemaRDD能够执行全部Spark RDD的操做。
 
(2)经过编码指定Schema
 
使用反射推断Schema的方式要求咱们必须可以构建一个数据类型为Row的Spark RDD,而后再将其转换为SchemaRDD;某些状况下咱们可能须要更为灵活的方式控制SchemaRDD构建过程,这正是经过编码指定Schema的意义所在。
 
经过编码指定Schema分为三步:
 
a. 构建一个数据类型为tuple或list的Spark RDD;
b. 构建Schema,须要匹配a中的tuple或list;
c.将b中的Schema应用于a中的Spark RDD。
 
代码示例
 
 
代码处理逻辑正好对应着上述三步,最终的输出结果:
 
 
其中须要注意id、age的数据类型被声明为IntegerType,所以数据源(字符串)中的数据须要作强制类型转换处理。
 
JSON Datasets
 
Spark可以自动推断出Json数据集的“数据模式”(Schema),并将它加载为一个SchemaRDD实例。这种“自动”的行为是经过下述两种方法实现的:
 
jsonFile:从一个文件目录中加载数据,这个目录中的文件的每一行均为一个JSON字符串(若是JSON字符串“跨行”,则可能致使解析错误);
 
jsonRDD:从一个已经存在的RDD中加载数据,这个RDD中的每个元素均为一个JSON字符串;
 
代码示例
 
 
能够得出如下两点:
 
a. 若是数据输入是JSON字符串的文本文件,咱们能够直接使用jsonFile构建Spark RDD,实际就是SchemaRDD;
 
b. 若是某个Spark RDD的数据类型是字符串,且字符串均是JSON格式的字符串形式,则可使用jsonRDD将其转换为一个SchemaRDD。
 
Hive Tables
 
Hive Tables已是“表”,所以咱们无需建立或转换,直接使用SQL查询便可。
 
官方代码示例
 
 
Hive UDF(Register Function)
 
Spark SQL使用HiveContext时能够支持Hive UDF,这里的UFD包含Hive自己内建的UDF,也包括咱们本身扩展的UDF(实测Spark-1.2.0-cdh5.3.2版本下没法正常使用本身扩展的UDF(Permanent Function),已经过扩展源码修复)。
 
这里重点介绍Spark SQL的Register Function,也就是说能够动态建立函数用于SQL查询,其实际做用相似于Hive UDF。
 
代码示例
 
 
代码的处理逻辑与前大致相似,即首先经过编码建立SchemaRDD people,而后将其注册为一张表(注意这里使用了另外一种方式:HiveContext registerRDDAsTable),最后执行查询语句并打印结果。
 
特别的是查询语句中使用到了一个名为“myfunc”的自定义SQL函数,而这个函数并非预先存在的(如Hive UDF),它是在咱们应用的运行期间被动态建立并注册的,注册过程使用到了HiveContext registerFunction。
 
对于Python而言,自定义函数的建立过程实际可分为两步:
 
(1)定义Python Function;
(2)将(1)中定义好的Python Function注册为SQL函数,注册时的命名可与Function的名称不一样。
 
也可使用Lambda表达式将定义Function与注册过程同时完成,如上述示例。
 
咱们自定义的SQL函数能够与Hive UDF共同使用,以下示例:
 
 
 
其中func.iptolocationbysina是Hive UDF(Permanent Function),mychange是自定义SQL函数。
 
从上面的两个示例能够看出,自定义SQL函数远比Hive UDF灵活。Hive UDF的建立过程比较复杂,须要使用Java语言完成编码并部署为jar,且在使用函数以前须要以temporaty function或permanent function的形式存在,每一次Hive UDF的更新都须要从新编码并更新jar;而自定义SQL函数是运行期间动态建立的,而使用Python编码时Function的建立及更新很是简便,推荐使用。
 
总结
 
Spark SQL为咱们提供了强大的数据分析能力,主要体如今如下三个方面:
 
(1)Spark RDD能够经过反射推断Schema或编码指定Schema的方式转换为SchemaRDD,将SchemaRDD建立为“数据表”以后,容许咱们以SQL语句的形式分析数据,节约大量编码工做量;
(2)Spark SQL容许咱们在应用运行期间根据需求动态建立自定义SQL函数,扩充SQL的数据处理能力;
(3)SchemaRDD能够执行全部Spark RDD的操做,若是SQL没法表述咱们的计算逻辑时,咱们能够经过Spark RDD丰富的API完成。
相关文章
相关标签/搜索