spark SQL Parquet 文件的读取与加载
sql
是由许多其余数据处理系统支持的柱状格式。Spark SQL支持阅读和编写自动保留原始数据模式的Parquet文件。在编写Parquet文件时,出于兼容性缘由,全部列都会自动转换为空。编程
1, 以编程方式加载数据json
这里使用上一节的例子中的数据:常规数据加载缓存
private def runBasicParquetExample(spark: SparkSession): Unit = { import spark.implicits._ // val peopleDF = spark.read.json("examples/src/main/resources/people.json") //DataFrames能够保存为Parquet文件,维护模式信息 peopleDF.write.parquet("people.parquet") //在上面建立的parquet文件中读取 // Parquet文件是自描述的,因此模式被保存 //加载Parquet文件的结果也是一个DataFrame val parquetFileDF = spark.read.parquet("people.parquet") // Parquet文件也能够用来建立临时视图,而后在SQL语句 parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ }2,分区操做
表分区是像Hive这样的系统中经常使用的优化方法。在分区表中,数据一般存储在不一样的目录中,分区列值在每一个分区目录的路径中编码。如今,Parquet数据源可以自动发现和推断分区信息。例如,咱们可使用如下目录结构,两个额外的列gender和country分区列将全部之前使用的人口数据存储到分区表中:
架构
path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...经过传递path/to/table给SparkSession.read.parquet或者SparkSession.read.load,Spark SQL将自动从路径中提取分区信息。如今,返回的DataFrame的模式变成:
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)请注意,分区列的数据类型是自动推断的。目前支持数字数据类型和字符串类型。有时用户可能不但愿自动推断分区列的数据类型。对于这些用例,可使用spark.sql.sources.partitionColumnTypeInference.enabled默认 的自动类型推断来配置true。当禁用类型推断时,字符串类型将用于分区列。
像ProtocolBuffer,Avro和Thrift同样,Parquet也支持模式演变。用户能够从简单的模式开始,并根据须要逐渐向模式添加更多的列。经过这种方式,用户可能会以不一样的可是 相互兼容的模式结束多个Parquet文件。Parquet数据源如今能够自动检测这种状况并合并全部这些文件的模式。
因为模式合并是一个相对昂贵的操做,而且在大多数状况下不是必需的,因此咱们从1.5.0开始默认关闭它。你能够经过app
1) 将数据源选项设置mergeSchema为true读取Parquet文件(以下面的示例所示)工具
2)设置全局SQL选项spark.sql.parquet.mergeSchema来true。
性能
例子以下:优化
private def runParquetSchemaMergingExample(spark: SparkSession): Unit = { import spark.implicits._ // 建立一个简单的DataFrame,存储到一个分区目录 val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") //在新的分区目录中建立另外一个DataFrame, //添加一个新的列并删除一个现存的列 val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2") //读取分区表 val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() //最终的模式由Parquet文件中的全部3列组成 //分区列出如今分区目录路径中 // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true) // $example off:schema_merging$ }4, Hive metastore Parquet
Hive / Parquet Schema调解
Hive和Parquet从表模式处理的角度来看,有两个关键的区别。
1)hive 是不区分大小写的,而Parquet不是 编码
2) Hive认为全部列都是能够空的,而Parquet的可空性是显着的
因为这个缘由,在将Hive metastore Parquet表转换为Spark SQL Parquet表时,咱们必须将Hive Metastore模式与Parquet模式协调一致。协调规则是:
在两个模式中具备相同名称的字段必须具备相同的数据类型,而不论是否为空。协调字段应该具备Parquet方面的数据类型,以保证可空性。
协调的模式刚好包含在Hive Metastore模式中定义的那些字段。
1)仅出如今Parquet模式中的任何字段将被放置在协调的模式中。
2) 仅在Hive Metastore模式中出现的任何字段才会做为可协调字段添加到协调模式中。
元数据刷新
Spark SQL缓存Parquet元数据以得到更好的性能。当Hive Metastore Parquet表转换启用时,这些转换表的元数据也被缓存。若是这些表由Hive或其余外部工具更新,则须要手动刷新以确保一致的元数据。
spark.catalog.refreshTable("my_table")5,Configuration配置
Parquet的结构能够用作setConf方法上SparkSession或经过运行 SET key=value使用SQL命令
Property Name |
Default | Meaning |
spark.sql.parquet.binaryAsString | false | 一些其余派奎斯生产系统,特别是Impala,Hive和旧版本的Spark SQL, 在写出Parquet架构时不会区分二进制数据和字符串。该标志告诉Spark SQL 将二进制数据解释为字符串以提供与这些系统的兼容性。 |
spark.sql.parquet.int96AsTimestamp | true | 一些Parquet生产系统,特别是Impala和Hive,将时间戳存储到INT96中。 该标志告诉Spark SQL将INT96数据解释为一个时间戳,以提供与这些系统的兼容性。 |
spark.sql.parquet.cacheMetadata | true | 打开Parquet模式元数据的缓存。能够加快查询静态数据。 |
spark.sql.parquet.compression.codec | snappy | 设置写入Parquet文件时使用的压缩编解码器。可接受的值包括:未压缩,快速, gzip,lzo。 |
spark.sql.parquet.filterPushdown | true | 设置为true时启用Parquet过滤器下推优化。 |
spark.sql.hive.convertMetastoreParquet | true | 当设置为false时,Spark SQL将使用Hive SerDe来替代内置支持的Parquet表。 |
spark.sql.parquet.mergeSchema | false | 若是为true,则Parquet数据源合并从全部数据文件收集的模式,不然若是 没有摘要文件可用,则从摘要文件或随机数据文件中选取模式。 |
spark.sql.optimizer.metadataOnly | true | 若是为true,则启用使用表元数据的仅限元数据查询优化来生成分区列,而 不是表扫描。当扫描的全部列都是分区列时,该查询将适用,而且查询具备 知足不一样语义的聚合运算符。 |