spark-1.6.0 [原文地址]html
Spark SQL是Spark中处理结构化数据的模块。与基础的Spark RDD API不一样,Spark SQL的接口提供了更多关于数据的结构信息和计算任务的运行时信息。在Spark内部,Spark SQL会可以用于作优化的信息比RDD API更多一些。Spark SQL现在有了三种不一样的API:SQL语句、DataFrame API和最新的Dataset API。不过真正运行计算的时候,不管你使用哪一种API或语言,Spark SQL使用的执行引擎都是同一个。这种底层的统一,使开发者能够在不一样的API之间来回切换,你能够选择一种最天然的方式,来表达你的需求。java
本文中全部的示例都使用Spark发布版本中自带的示例数据,而且能够在spark-shell、pyspark shell以及sparkR shell中运行。python
Spark SQL的一种用法是直接执行SQL查询语句,你可以使用最基本的SQL语法,也能够选择HiveQL语法。Spark SQL能够从已有的Hive中读取数据。更详细的请参考Hive Tables 这一节。若是用其余编程语言运行SQL,Spark SQL将以DataFrame返回结果。你还能够经过命令行command-line 或者 JDBC/ODBC 使用Spark SQL。mysql
DataFrame是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来讲,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame能够从不少数据源(sources)加载数据并构造获得,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。sql
DataFrame API支持Scala, Java, Python, and R。shell
Dataset是Spark-1.6新增的一种API,目前仍是实验性的。Dataset想要把RDD的优点(强类型,可使用lambda表达式函数)和Spark SQL的优化执行引擎的优点结合到一块儿。Dataset能够由JVM对象构建(constructed )获得,然后Dataset上可使用各类transformation算子(map,flatMap,filter 等)。数据库
Dataset API 对 Scala 和 Java的支持接口是一致的,但目前还不支持Python,不过Python自身就有语言动态特性优点(例如,你可使用字段名来访问数据,row.columnName)。对Python的完整支持在将来的版本会增长进来。apache
Spark SQL全部的功能入口都是SQLContext
类,及其子类。不过要建立一个SQLContext对象,首先须要有一个SparkContext对象。编程
val sc: SparkContext // 假设已经有一个 SparkContext 对象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 用于包含RDD到DataFrame隐式转换操做 import sqlContext.implicits._
除了SQLContext以外,你也能够建立HiveContext,HiveContext是SQLContext 的超集。json
除了SQLContext的功能以外,HiveContext还提供了完整的HiveQL语法,UDF使用,以及对Hive表中数据的访问。要使用HiveContext,你并不须要安装Hive,并且SQLContext能用的数据源,HiveContext也同样能用。HiveContext是单独打包的,从而避免了在默认的Spark发布版本中包含全部的Hive依赖。若是这些依赖对你来讲不是问题(不会形成依赖冲突等),建议你在Spark-1.3以前使用HiveContext。然后续的Spark版本,将会逐渐把SQLContext升级到和HiveContext功能差很少的状态。
spark.sql.dialect选项能够指定不一样的SQL变种(或者叫SQL方言)。这个参数能够在SparkContext.setConf里指定,也能够经过 SQL语句的SET key=value命令指定。对于SQLContext,该配置目前惟一的可选值就是”sql”,这个变种使用一个Spark SQL自带的简易SQL解析器。而对于HiveContext,spark.sql.dialect 默认值为”hiveql”,固然你也能够将其值设回”sql”。仅就目前而言,HiveSQL解析器支持更加完整的SQL语法,因此大部分状况下,推荐使用HiveContext。
Spark应用能够用SparkContext建立DataFrame,所需的数据来源能够是已有的RDD(existing RDD
),或者Hive表,或者其余数据源(data sources.)
如下是一个从JSON文件建立DataFrame的小栗子:
val sc: SparkContext // 已有的 SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("examples/src/main/resources/people.json") // 将DataFrame内容打印到stdout df.show()
DataFrame提供告终构化数据的领域专用语言支持,包括Scala, Java, Python and R.
这里咱们给出一个结构化数据处理的基本示例:
val sc: SparkContext // 已有的 SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 建立一个 DataFrame val df = sqlContext.read.json("examples/src/main/resources/people.json") // 展现 DataFrame 的内容 df.show() // age name // null Michael // 30 Andy // 19 Justin // 打印数据树形结构 df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // select "name" 字段 df.select("name").show() // name // Michael // Andy // Justin // 展现全部人,但全部人的 age 都加1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // 筛选出年龄大于21的人 df.filter(df("age") > 21).show() // age name // 30 Andy // 计算各个年龄的人数 df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1
DataFrame的完整API列表请参考这里:API Documentation
除了简单的字段引用和表达式支持以外,DataFrame还提供了丰富的工具函数库,包括字符串组装,日期处理,常见的数学函数等。完整列表见这里:DataFrame Function Reference.
SQLContext.sql能够执行一个SQL查询,并返回DataFrame结果。
val sqlContext = ... // 已有一个 SQLContext 对象 val df = sqlContext.sql("SELECT * FROM table")
Dataset API和RDD相似,不过Dataset不使用Java序列化或者Kryo,而是使用专用的编码器(Encoder )来序列化对象和跨网络传输通讯。若是这个编码器和标准序列化都能把对象转字节,那么编码器就能够根据代码动态生成,并使用一种特殊数据格式,这种格式下的对象不须要反序列化回来,就能容许Spark进行操做,如过滤、排序、哈希等。
// 对普通类型数据的Encoder是由 importing sqlContext.implicits._ 自动提供的 val ds = Seq(1, 2, 3).toDS() ds.map(_ + 1).collect() // 返回: Array(2, 3, 4) // 如下这行不只定义了case class,同时也自动为其建立了Encoder case class Person(name: String, age: Long) val ds = Seq(Person("Andy", 32)).toDS() // DataFrame 只需提供一个和数据schema对应的class便可转换为 Dataset。Spark会根据字段名进行映射。 val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path).as[Person]
Spark SQL有两种方法将RDD转为DataFrame。
1. 使用反射机制,推导包含指定类型对象RDD的schema。这种基于反射机制的方法使代码更简洁,并且若是你事先知道数据schema,推荐使用这种方式;
2. 编程方式构建一个schema,而后应用到指定RDD上。这种方式更啰嗦,但若是你事先不知道数据有哪些字段,或者数据schema是运行时读取进来的,那么你极可能须要用这种方式。
Spark SQL的Scala接口支持自动将包含case class对象的RDD转为DataFrame。对应的case class定义了表的schema。case class的参数名经过反射,映射为表的字段名。case class还能够嵌套一些复杂类型,如Seq和Array。RDD隐式转换成DataFrame后,能够进一步注册成表。随后,你就能够对表中数据使用SQL语句查询了。
// sc 是已有的 SparkContext 对象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 为了支持RDD到DataFrame的隐式转换 import sqlContext.implicits._ // 定义一个case class. // 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制, // 你可使用自定义class,并实现Product接口。固然,你也能够改用编程方式定义schema case class Person(name: String, age: Int) // 建立一个包含Person对象的RDD,并将其注册成table val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // sqlContext.sql方法能够直接执行SQL语句 val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // SQL查询的返回结果是一个DataFrame,且可以支持全部常见的RDD算子 // 查询结果中每行的字段能够按字段索引访问: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // 或者按字段名访问: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型 teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // 返回结果: Map("name" -> "Justin", "age" -> 19)
若是不能事先经过case class定义schema(例如,记录的字段结构是保存在一个字符串,或者其余文本数据集中,须要先解析,又或者字段对不一样用户有所不一样),那么你可能须要按如下三个步骤,以编程方式的建立一个DataFrame:
For example:
例如:
// sc 是已有的SparkContext对象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 建立一个RDD val people = sc.textFile("examples/src/main/resources/people.txt") // 数据的schema被编码与一个字符串中 val schemaString = "name age" // Import Row. import org.apache.spark.sql.Row; // Import Spark SQL 各个数据类型 import org.apache.spark.sql.types.{StructType,StructField,StringType}; // 基于前面的字符串生成schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // 将RDD[people]的各个记录转换为Rows,即:获得一个包含Row对象的RDD val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // 将schema应用到包含Row对象的RDD上,获得一个DataFrame val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) // 将DataFrame注册为table peopleDataFrame.registerTempTable("people") // 执行SQL语句 val results = sqlContext.sql("SELECT name FROM people") // SQL查询的结果是DataFrame,且可以支持全部常见的RDD算子 // 而且其字段能够以索引访问,也能够用字段名访问 results.map(t => "Name: " + t(0)).collect().foreach(println)
Spark SQL支持基于DataFrame操做一系列不一样的数据源。DataFrame既能够当成一个普通RDD来操做,也能够将其注册成一个临时表来查询。把DataFrame注册为table以后,你就能够基于这个table执行SQL语句了。本节将描述加载和保存数据的一些通用方法,包含了不一样的Spark数据源,而后深刻介绍一下内建数据源可用选项。
在最简单的状况下,全部操做都会以默认类型数据源来加载数据(默认是Parquet,除非修改了spark.sql.sources.default 配置)。
val df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
你也能够手动指定数据源,并设置一些额外的选项参数。数据源可由其全名指定(如,org.apache.spark.sql.parquet),而对于内建支持的数据源,可使用简写名(json, parquet, jdbc)。任意类型数据源建立的DataFrame均可以用下面这种语法转成其余类型数据格式。
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
Spark SQL还支持直接对文件使用SQL查询,不须要用read方法把文件加载进来。
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Save操做有一个可选参数SaveMode,用这个参数能够指定如何处理数据已经存在的状况。很重要的一点是,这些保存模式都没有加锁,因此其操做也不是原子性的。另外,若是使用Overwrite模式,实际操做是,先删除数据,再写新数据。
仅Scala/Java | 全部支持的语言 | 含义 |
---|---|---|
SaveMode.ErrorIfExists (default) |
"error" (default) |
(默认模式)从DataFrame向数据源保存数据时,若是数据已经存在,则抛异常。 |
SaveMode.Append |
"append" |
若是数据或表已经存在,则将DataFrame的数据追加到已有数据的尾部。 |
SaveMode.Overwrite |
"overwrite" |
若是数据或表已经存在,则用DataFrame数据覆盖之。 |
SaveMode.Ignore |
"ignore" |
若是数据已经存在,那就放弃保存DataFrame数据。这和SQL里CREATE TABLE IF NOT EXISTS有点相似。 |
在使用HiveContext的时候,DataFrame能够用saveAsTable方法,将数据保存成持久化的表。与registerTempTable不一样,saveAsTable会将DataFrame的实际数据内容保存下来,而且在HiveMetastore中建立一个游标指针。持久化的表会一直保留,即便Spark程序重启也没有影响,只要你链接到同一个metastore就能够读取其数据。读取持久化表时,只须要用用表名做为参数,调用SQLContext.table方法便可获得对应DataFrame。
默认状况下,saveAsTable会建立一个”managed table“,也就是说这个表数据的位置是由metastore控制的。一样,若是删除表,其数据也会同步删除。
Parquet 是一种流行的列式存储格式。Spark SQL提供对Parquet文件的读写支持,并且Parquet文件可以自动保存原始数据的schema。写Parquet文件的时候,全部的字段都会自动转成nullable,以便向后兼容。
仍然使用上面例子中的数据:
// 咱们继续沿用以前例子中的sqlContext对象 // 为了支持RDD隐式转成DataFrame import sqlContext.implicits._ val people: RDD[Person] = ... // 和上面例子中相同,一个包含case class对象的RDD // 该RDD将隐式转成DataFrame,而后保存为parquet文件 people.write.parquet("people.parquet") // 读取上面保存的Parquet文件(多个文件 - Parquet保存完实际上是不少个文件)。Parquet文件是自描述的,文件中保存了schema信息 // 加载Parquet文件,并返回DataFrame结果 val parquetFile = sqlContext.read.parquet("people.parquet") // Parquet文件(多个)能够注册为临时表,而后在SQL语句中直接查询 parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
像Hive这样的系统,一个很经常使用的优化手段就是表分区。在一个支持分区的表中,数据是保存在不一样的目录中的,而且将分区键以编码方式保存在各个分区目录路径中。Parquet数据源如今也支持自动发现和推导分区信息。例如,咱们能够把以前用的人口数据存到一个分区表中,其目录结构以下所示,其中有2个额外的字段,gender和country,做为分区键:
path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
在这个例子中,若是须要读取Parquet文件数据,咱们只须要把 path/to/table 做为参数传给 SQLContext.read.parquet 或者 SQLContext.read.load。Spark SQL可以自动的从路径中提取出分区信息,随后返回的DataFrame的schema以下:
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)
注意,分区键的数据类型将是自动推导出来的。目前,只支持数值类型和字符串类型数据做为分区键。
有的用户可能不想要自动推导出来的分区键数据类型。这种状况下,你能够经过 spark.sql.sources.partitionColumnTypeInference.enabled (默认是true)来禁用分区键类型推导。禁用以后,分区键老是被当成字符串类型。
从Spark-1.6.0开始,分区发现默认只在指定目录的子目录中进行。以上面的例子来讲,若是用户把 path/to/table/gender=male 做为参数传给 SQLContext.read.parquet 或者 SQLContext.read.load,那么gender就不会被做为分区键。若是用户想要指定分区发现的基础目录,能够经过basePath选项指定。例如,若是把 path/to/table/gender=male做为数据目录,而且将basePath设为 path/to/table,那么gender仍然会最为分区键。
像ProtoBuffer、Avro和Thrift同样,Parquet也支持schema演变。用户从一个简单的schema开始,逐渐增长所需的新字段。这样的话,用户最终会获得多个schema不一样但互相兼容的Parquet文件。目前,Parquet数据源已经支持自动检测这种状况,并合并全部文件的schema。
由于schema合并相对代价比较大,而且在多数状况下不是必要的,因此从Spark-1.5.0以后,默认是被禁用的。你能够这样启用这一功能:
// 继续沿用以前的sqlContext对象 // 为了支持RDD隐式转换为DataFrame import sqlContext.implicits._ // 建立一个简单的DataFrame,存到一个分区目录中 val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("data/test_table/key=1") // 建立另外一个DataFrame放到新的分区目录中, // 并增长一个新字段,丢弃一个老字段 val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // 读取分区表 val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // 最终的schema将由3个字段组成(single,double,triple) // 而且分区键出如今目录路径中 // root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true)
在读写Hive metastore Parquet 表时,Spark SQL用的是内部的Parquet支持库,而不是Hive SerDe,由于这样性能更好。这一行为是由spark.sql.hive.convertMetastoreParquet 配置项来控制的,并且默认是启用的。
Hive和Parquet在表结构处理上主要有2个不一样点:
因为以上缘由,咱们必须在Hive metastore Parquet table转Spark SQL Parquet table的时候,对Hive metastore schema作调整,调整规则以下:
Spark SQL会缓存Parquet元数据以提升性能。若是Hive metastore Parquet table转换被启用的话,那么转换过来的schema也会被缓存。这时候,若是这些表由Hive或其余外部工具更新了,你必须手动刷新元数据。
// 注意,这里sqlContext是一个HiveContext sqlContext.refreshTable("my_table")
Parquet配置能够经过 SQLContext.setConf 或者 SQL语句中 SET key=value来指定。
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.parquet.binaryAsString |
false | 有些老系统,如:特定版本的Impala,Hive,或者老版本的Spark SQL,不区分二进制数据和字符串类型数据。这个标志的意思是,让Spark SQL把二进制数据当字符串处理,以兼容老系统。 |
spark.sql.parquet.int96AsTimestamp |
true | 有些老系统,如:特定版本的Impala,Hive,把时间戳存成INT96。这个配置的做用是,让Spark SQL把这些INT96解释为timestamp,以兼容老系统。 |
spark.sql.parquet.cacheMetadata |
true | 缓存Parquet schema元数据。能够提高查询静态数据的速度。 |
spark.sql.parquet.compression.codec |
gzip | 设置Parquet文件的压缩编码格式。可接受的值有:uncompressed, snappy, gzip(默认), lzo |
spark.sql.parquet.filterPushdown |
true | 启用过滤器下推优化,能够讲过滤条件尽可能推导最下层,已取得性能提高 |
spark.sql.hive.convertMetastoreParquet |
true | 若是禁用,Spark SQL将使用Hive SerDe,而不是内建的对Parquet tables的支持 |
spark.sql.parquet.output.committer.class |
org.apache.parquet.hadoop. |
Parquet使用的数据输出类。这个类必须是 org.apache.hadoop.mapreduce.OutputCommitter的子类。通常来讲,它也应该是 org.apache.parquet.hadoop.ParquetOutputCommitter的子类。注意:1. 若是启用spark.speculation, 这个选项将被自动忽略
2. 这个选项必须用hadoop configuration设置,而不是Spark SQLConf 3. 这个选项会覆盖 spark.sql.sources.outputCommitterClass Spark SQL有一个内建的org.apache.spark.sql.parquet.DirectParquetOutputCommitter, 这个类的在输出到S3的时候比默认的ParquetOutputCommitter类效率高。 |
spark.sql.parquet.mergeSchema |
false |
若是设为true,那么Parquet数据源将会merge 全部数据文件的schema,不然,schema是从summary file获取的(若是summary file没有设置,则随机选一个) |
Spark SQL在加载JSON数据的时候,能够自动推导其schema并返回DataFrame。用SQLContext.read.json读取一个包含String的RDD或者JSON文件,便可实现这一转换。
注意,一般所说的json文件只是包含一些json数据的文件,而不是咱们所须要的JSON格式文件。JSON格式文件必须每一行是一个独立、完整的的JSON对象。所以,一个常规的多行json文件常常会加载失败。
// sc是已有的SparkContext对象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 数据集是由路径指定的 // 路径既能够是单个文件,也能够仍是存储文本文件的目录 val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path) // 推导出来的schema,可由printSchema打印出来 people.printSchema() // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // 将DataFrame注册为table people.registerTempTable("people") // 跑SQL语句吧! val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // 另外一种方法是,用一个包含JSON字符串的RDD来建立DataFrame val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
Spark SQL支持从Apache Hive读写数据。然而,Hive依赖项太多,因此没有把Hive包含在默认的Spark发布包里。要支持Hive,须要在编译spark的时候增长-Phive和-Phive-thriftserver标志。这样编译打包的时候将会把Hive也包含进来。注意,hive的jar包也必须出如今全部的worker节点上,访问Hive数据时候会用到(如:使用hive的序列化和反序列化SerDes时)。
Hive配置在conf/目录下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配置)文件中。请注意,若是在YARN cluster(yarn-cluster mode)模式下执行一个查询的话,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必须在驱动器(driver)和全部执行器(executor)均可用。一种简便的方法是,经过spark-submit命令的–jars和–file选项来提交这些文件。
若是使用Hive,则必须构建一个HiveContext,HiveContext是派生于SQLContext的,添加了在Hive Metastore里查询表的支持,以及对HiveQL的支持。用户没有现有的Hive部署,也能够建立一个HiveContext。若是没有在hive-site.xml里配置,那么HiveContext将会自动在当前目录下建立一个metastore_db目录,再根据HiveConf设置建立一个warehouse目录(默认/user/hive/warehourse)。因此请注意,你必须把/user/hive/warehouse的写权限赋予启动spark应用程序的用户。
// sc是一个已有的SparkContext对象 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // 这里用的是HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
Spark SQL对Hive最重要的支持之一就是和Hive metastore进行交互,这使得Spark SQL能够访问Hive表的元数据。从Spark-1.4.0开始,Spark SQL有专门单独的二进制build版本,能够用来访问不一样版本的Hive metastore,其配置表以下。注意,无论所访问的hive是什么版本,Spark SQL内部都是以Hive 1.2.1编译的,并且内部使用的Hive类也是基于这个版本(serdes,UDFs,UDAFs等)
如下选项可用来配置Hive版本以便访问其元数据:
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.hive.metastore.version |
1.2.1 |
Hive metastore版本,可选的值为0.12.0 到 1.2.1 |
spark.sql.hive.metastore.jars |
builtin |
初始化HiveMetastoreClient的jar包。这个属性能够是如下三者之一:
目前内建为使用Hive-1.2.1,编译的时候启用-Phive,则会和spark一块儿打包。若是没有-Phive,那么spark.sql.hive.metastore.version要么是1.2.1,要就是未定义
使用maven仓库下载的jar包版本。这个选项建议不要再生产环境中使用
|
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc, |
一个逗号分隔的类名前缀列表,这些类使用classloader加载,且能够在Spark SQL和特定版本的Hive间共享。例如,用来访问hive metastore 的JDBC的driver就须要这种共享。其余须要共享的类,是与某些已经共享的类有交互的类。例如,自定义的log4j appender |
spark.sql.hive.metastore.barrierPrefixes |
(empty) |
一个逗号分隔的类名前缀列表,这些类在每一个Spark SQL所访问的Hive版本中都会被显式的reload。例如,某些在共享前缀列表(spark.sql.hive.metastore.sharedPrefixes)中声明为共享的Hive UD函数 |
Spark SQL也能够用JDBC访问其余数据库。这一功能应该优先于使用JdbcRDD。由于它返回一个DataFrame,而DataFrame在Spark SQL中操做更简单,且更容易和来自其余数据源的数据进行交互关联。JDBC数据源在java和python中用起来也很简单,不须要用户提供额外的ClassTag。(注意,这与Spark SQL JDBC server不一样,Spark SQL JDBC server容许其余应用执行Spark SQL查询)
首先,你须要在spark classpath中包含对应数据库的JDBC driver,下面这行包括了用于访问postgres的数据库driver
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
远程数据库的表能够经过Data Sources API,用DataFrame或者SparkSQL 临时表来装载。如下是选项列表:
属性名 | 含义 |
---|---|
url |
须要链接的JDBC URL |
dbtable |
须要读取的JDBC表。注意,任何能够填在SQL的where子句中的东西,均可以填在这里。(既能够填完整的表名,也可填括号括起来的子查询语句) |
driver |
JDBC driver的类名。这个类必须在master和worker节点上均可用,这样各个节点才能将driver注册到JDBC的子系统中。 |
partitionColumn, lowerBound, upperBound, numPartitions |
这几个选项,若是指定其中一个,则必须所有指定。他们描述了多个worker如何并行的读入数据,并将表分区。partitionColumn必须是所查询的表中的一个数值字段。注意,lowerBound和upperBound只是用于决定分区跨度的,而不是过滤表中的行。所以,表中全部的行都会被分区而后返回。 |
fetchSize |
JDBC fetch size,决定每次获取多少行数据。在JDBC驱动上设成较小的值有利于性能优化(如,Oracle上设为10) |
val jdbcDF = sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load()
对于有必定计算量的Spark做业来讲,可能的性能改进的方式,不是把数据缓存在内存里,就是调整一些开销较大的选项参数。
Spark SQL能够经过调用SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列存储格式缓存到内存中。随后,Spark SQL将会扫描必要的列,并自动调整压缩比例,以减小内存占用和GC压力。你也能够用SQLContext.uncacheTable(“tableName”)来删除内存中的table。
你还可使用SQLContext.setConf 或在SQL语句中运行SET key=value命令,来配置内存中的缓存。
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | 若是设置为true,Spark SQL将会根据数据统计信息,自动为每一列选择单独的压缩编码方式。 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控制列式缓存批量的大小。增大批量大小能够提升内存利用率和压缩率,但同时也会带来OOM(Out Of Memory)的风险。 |
如下选项一样也能够用来给查询任务调性能。不过这些选项在将来可能被放弃,由于spark将支持愈来愈多的自动优化。
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.autoBroadcastJoinThreshold |
10485760 (10 MB) | 配置join操做时,可以做为广播变量的最大table的大小。设置为-1,表示禁用广播。注意,目前的元数据统计仅支持Hive metastore中的表,而且须要运行这个命令:ANALYSE TABLE <tableName> COMPUTE STATISTICS noscan |
spark.sql.tungsten.enabled |
true | 设为true,则启用优化的Tungsten物理执行后端。Tungsten会显式的管理内存,并动态生成表达式求值的字节码 |
spark.sql.shuffle.partitions |
200 | 配置数据混洗(shuffle)时(join或者聚合操做),使用的分区数。 |
Spark SQL能够做为JDBC/ODBC或者命令行工具的分布式查询引擎。在这种模式下,终端用户或应用程序,无需写任何代码,就能够直接在Spark SQL中运行SQL查询。
这里实现的Thrift JDBC/ODBC server和Hive-1.2.1中的HiveServer2
是相同的。你可使用beeline脚原本测试Spark或者Hive-1.2.1的JDBC server。
在Spark目录下运行下面这个命令,启动一个JDBC/ODBC server
./sbin/start-thriftserver.sh
这个脚本能接受全部 bin/spark-submit 命令支持的选项参数,外加一个 –hiveconf 选项,来指定Hive属性。运行./sbin/start-thriftserver.sh –help能够查看完整的选项列表。默认状况下,启动的server将会在localhost:10000端口上监听。要改变监听主机名或端口,能够用如下环境变量:
export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...
或者Hive系统属性 来指定
./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...
接下来,你就能够开始在beeline中测试这个Thrift JDBC/ODBC server:
./bin/beeline
下面的指令,能够链接到一个JDBC/ODBC server
beeline> !connect jdbc:hive2://localhost:10000
可能须要输入用户名和密码。在非安全模式下,只要输入你本机的用户名和一个空密码便可。对于安全模式,请参考beeline documentation.
Hive的配置是在conf/目录下的hive-site.xml,core-site.xml,hdfs-site.xml中指定的。
你也能够在beeline的脚本中指定。
Thrift JDBC server也支持经过HTTP传输Thrift RPC消息。如下配置(在conf/hive-site.xml中)将启用HTTP模式:
hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 hive.server2.http.endpoint - HTTP endpoint; default is cliservice
一样,在beeline中也能够用HTTP模式链接JDBC/ODBC server:
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
Spark SQL CLI是一个很方便的工具,它能够用local mode运行hive metastore service,而且在命令行中执行输入的查询。注意Spark SQL CLI目前还不支持和Thrift JDBC server通讯。
用以下命令,在spark目录下启动一个Spark SQL CLI
./bin/spark-sql
Hive配置在conf目录下hive-site.xml,core-site.xml,hdfs-site.xml中设置。你能够用这个命令查看完整的选项列表:./bin/spark-sql –help
./sbin/start-thriftserver.sh \ --conf spark.sql.hive.thriftServer.singleSession=true \ ...
根据用户的反馈,咱们提供了一个新的,更加流畅的API,用于数据读(SQLContext.read)写(DataFrame.write),同时老的API(如:SQLCOntext.parquetFile, SQLContext.jsonFile)将被废弃。
有关SQLContext.read和DataFrame.write的更详细信息,请参考API文档。
根据用户的反馈,咱们改变了DataFrame.groupBy().agg()的默认行为,在返回的DataFrame结果中保留了分组字段。若是你想保持1.3中的行为,设置spark.sql.retainGroupColumns为false便可。
// 在1.3.x中,若是要保留分组字段"department", 你必须显式的在agg聚合时包含这个字段 df.groupBy("department").agg($"department", max("age"), sum("expense")) // 而在1.4+,分组字段"department"默认就会包含在返回的DataFrame中 df.groupBy("department").agg(max("age"), sum("expense")) // 要回滚到1.3的行为(不包含分组字段),按以下设置便可: sqlContext.setConf("spark.sql.retainGroupColumns", "false")
在Spark 1.3中,咱们去掉了Spark SQL的”Alpha“标签,并清理了可用的API。从Spark 1.3起,Spark SQL将对1.x系列二进制兼容。这个兼容性保证不包括显式的标注为”unstable(如:DeveloperAPI或Experimental)“的API。
对于用户来讲,Spark SQL 1.3最大的改动就是SchemaRDD更名为DataFrame。主要缘由是,DataFrame再也不直接由RDD派生,而是经过本身的实现提供RDD的功能。DataFrame只须要调用其rdd方法就能转成RDD。
在Scala中仍然有SchemaRDD,只不过这是DataFrame的一个别名,以便兼容一些现有代码。但仍然建议用户改用DataFrame。Java和Python用户就没这个福利了,他们必须改代码。
在Spark 1.3以前,有单独的java兼容类(JavaSQLContext和JavaSchemaRDD)及其在Scala API中的镜像。Spark 1.3中将Java API和Scala API统一。两种语言的用户都应该使用SQLContext和DataFrame。通常这些类中都会使用两种语言中都有的类型(如:Array取代各语言独有的集合)。有些状况下,没有通用的类型(例如:闭包或者maps),将会使用函数重载来解决这个问题。
另外,java特有的类型API被删除了。Scala和java用户都应该用org.apache.spark.sql.types来编程描述一个schema。
Spark 1.3以前的不少示例代码,都在开头用 import sqlContext._,这行将会致使全部的sqlContext的函数都被引入进来。所以,在Spark 1.3咱们把RDDs到DataFrames的隐式转换隔离出来,单独放到SQLContext.implicits对象中。用户如今应该这样写:import sqlContext.implicits._
另外,隐式转换也支持由Product(如:case classes或tuples)组成的RDD,但须要调用一个toDF方法,而不是自动转换。
若是须要使用DSL(被DataFrame取代的API)中的方法,用户以前须要导入DSL(import org.apache.spark.sql.catalyst.dsl), 而如今应该要导入 DataFrame API(import org.apache.spark.sql.functions._)
Spark 1.3删除了sql包中的DataType类型别名。如今,用户应该使用 org.apache.spark.sql.types中的类。
注册UDF的函数,无论是DataFrame,DSL或者SQL中用到的,都被挪到SQLContext.udf中。
sqlContext.udf.register("strLen", (s: String) => s.length())
Python UDF注册保持不变。
在python中使用DataTypes,你须要先构造一个对象(如:StringType()),而不是引用一个单例。
用户能够经过以下命令,为JDBC客户端session设定一个Fair Scheduler pool。
SET spark.sql.thriftserver.scheduler.pool=accounting;
在Shark中,默认的reducer个数是1,而且由mapred.reduce.tasks设定。Spark SQL废弃了这个属性,改成 spark.sql.shuffle.partitions, 而且默认200,用户可经过以下SET命令来自定义:
SET spark.sql.shuffle.partitions=10; SELECT page, count(*) c FROM logs_last_month_cached GROUP BY page ORDER BY c DESC LIMIT 10;
你也能够把这个属性放到hive-site.xml中来覆盖默认值。
目前,mapred.reduce.tasks属性仍然能被识别,而且自动转成spark.sql.shuffle.partitions
shark.cache表属性已经不存在了,而且以”_cached”结尾命名的表也再也不会自动缓存。取而代之的是,CACHE TABLE和UNCACHE TABLE语句,用以显式的控制表的缓存:
CACHE TABLE logs_last_month; UNCACHE TABLE logs_last_month;
注意:CACHE TABLE tbl 如今默认是饥饿模式,而非懒惰模式。不再须要手动调用其余action来触发cache了!
从Spark-1.2.0开始,Spark SQL新提供了一个语句,让用户本身控制表缓存是不是懒惰模式
CACHE [LAZY] TABLE [AS SELECT] ...
如下几个缓存相关的特性再也不支持:
Spark SQL设计时考虑了和Hive metastore,SerDes以及UDF的兼容性。目前这些兼容性斗是基于Hive-1.2.1版本,而且Spark SQL能够连到不一样版本的Hive metastore(从0.12.0到1.2.1,参考:http://spark.apache.org/docs/latest/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore)
Spark SQL Thrift JDBC server采用了”out of the box”(开箱即用)的设计,使用很方便,并兼容已有的Hive安装版本。你不须要修改已有的Hive metastore或者改变数据的位置,或者表分区。
Spark SQL 支持绝大部分Hive功能,如:
SELECT
GROUP BY
ORDER BY
CLUSTER BY
SORT BY
=
, ⇔
, ==
, <>
, <
, >
, >=
, <=
, etc)+
, -
, *
, /
, %
, etc)AND
, &&
, OR
, ||
, etc)sign
, ln
, cos
, etc)instr
, length
, printf
, etc)JOIN
{LEFT|RIGHT|FULL} OUTER JOIN
LEFT SEMI JOIN
CROSS JOIN
SELECT col FROM ( SELECT a + b AS col from t1) t2
CREATE TABLE
CREATE TABLE AS SELECT
ALTER TABLE
TINYINT
SMALLINT
INT
BIGINT
BOOLEAN
FLOAT
DOUBLE
STRING
BINARY
TIMESTAMP
DATE
ARRAY<>
MAP<>
STRUCT<>
如下是目前不支持的Hive特性的列表。多数是不经常使用的。
不支持的Hive常见功能
不支持的Hive高级功能
Hive输入、输出格式
Hive优化
一些比较棘手的Hive优化目前尚未在Spark中提供。有一些(如索引)对应Spark SQL这种内存计算模型来讲并不重要。另一些,在Spark SQL将来的版本中会支持。
STREAMTABLE
join提示:Spark SQL里没有这玩艺儿Spark SQL和DataFrames支持以下数据类型:
ByteType
: 1字节长的有符号整型,范围:-128
到 127
.ShortType
: 2字节长有符号整型,范围:-32768
到 32767
.IntegerType
: 4字节有符号整型,范围:-2147483648
到 2147483647
.LongType
: 8字节有符号整型,范围: -9223372036854775808
to 9223372036854775807
.FloatType
: 4字节单精度浮点数。DoubleType
: 8字节双精度浮点数DecimalType
: 任意精度有符号带小数的数值。内部使用java.math.BigDecimal, BigDecimal包含任意精度的不缩放整型,和一个32位的缩放整型StringType
: 字符串BinaryType
: 字节序列BooleanType
: 布尔类型TimestampType
: 表示包含年月日、时分秒等字段的日期DateType
: 表示包含年月日字段的日期ArrayType(elementType, containsNull)
:数组类型,表达一系列的elementType类型的元素组成的序列,containsNull表示数组可否包含null值MapType(keyType, valueType, valueContainsNull)
:映射集合类型,表示一个键值对的集合。键的类型是keyType,值的类型则由valueType指定。对应MapType来讲,键是不能为null的,而值可否为null则取决于valueContainsNull。StructType(fields):
表示包含StructField序列的结构体。
全部Spark SQL支持的数据类型都在这个包里:org.apache.spark.sql.types,你能够这样导入之:
import org.apache.spark.sql.types._
Data type | Value type in Scala | API to access or create a data type |
---|---|---|
ByteType | Byte | ByteType |
ShortType | Short | ShortType |
IntegerType | Int | IntegerType |
LongType | Long | LongType |
FloatType | Float | FloatType |
DoubleType | Double | DoubleType |
DecimalType | java.math.BigDecimal | DecimalType |
StringType | String | StringType |
BinaryType | Array[Byte] | BinaryType |
BooleanType | Boolean | BooleanType |
TimestampType | java.sql.Timestamp | TimestampType |
DateType | java.sql.Date | DateType |
ArrayType | scala.collection.Seq | ArrayType(elementType, [containsNull])注意:默认containsNull为true |
MapType | scala.collection.Map | MapType(keyType, valueType, [valueContainsNull])注意:默认valueContainsNull为true |
StructType | org.apache.spark.sql.Row | StructType(fields)注意:fields是一个StructFields的序列,而且同名的字段是不容许的。 |
StructField | 定义字段的数据对应的Scala类型(例如,若是StructField的dataType为IntegerType,则其数据对应的scala类型为Int) | StructField(name, dataType, nullable) |
这是Not-a-Number的缩写,某些float或double类型不符合标准浮点数语义,须要对其特殊处理:
原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文连接地址: 《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南