读写parquet格式文件的几种方式

转:http://blog.csdn.net/woloqun/article/details/76068147

摘要

本文将介绍经常使用parquet文件读写的几种方式php

1.用spark的hadoopFile api读取hive中的parquet格式文件html

2.用sparkSql读写hive中的parquet格式java

3.用新旧MapReduce读写parquet格式文件git

读parquet文件github

首先建立hive表,数据用tab分隔sql

[java]  view plain  copy
  1. create table test(name string,age int)   
  2.  row format delimited  
  3.  fields terminated by '\t';  

加载数据

[java]  view plain  copy
  1. load data local inpath '/home/work/test/ddd.txt' into table test;  

数据样例格式:shell

[java]  view plain  copy
  1. hive> select * from test limit 5;   
  2. OK  
  3. leo 27  
  4. jim 38  
  5. leo 15  
  6. jack    22  
  7. jay 7  
  8. Time taken: 0.101 seconds, Fetched: 5 row(s)  

建立parquet格式表
[java]  view plain  copy
  1. create table test_parquet(name string,age int) stored as parquet  

查看表结构
[java]  view plain  copy
  1. hive> show create table test_parquet;  
  2. OK  
  3. CREATE TABLE `test_parquet`(  
  4.   `name` string,   
  5.   `age` int)  
  6. ROW FORMAT SERDE   
  7.   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
  8. STORED AS INPUTFORMAT   
  9.   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'  
  10. OUTPUTFORMAT   
  11.   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'  
  12. LOCATION  
  13.   'hdfs://localhost:9000/user/hive/warehouse/test_parquet'  
  14. TBLPROPERTIES (  
  15.   'transient_lastDdlTime'='1495038003')  

能够看到数据的inputFormat是MapredParquetInputFormat,以后咱们将用这个类来解析数据文件

往parquet格式表中插入数据apache

[java]  view plain  copy
  1. insert into table test_parquet select * from test;  

a.用spark中hadoopFile api解析hive中parquet格式文件api

若是是用spark-shell中方式读取文件必定要将hive-exec-0.14.0.jar加入到启动命令行中(MapredParquetInputFormat在这个jar中),还有就是要指定序列化的类,启动命令行以下app

[java]  view plain  copy
  1. spark-shell --master spark://xiaobin:7077 --jars /home/xiaobin/soft/apache-hive-0.14.0-bin/lib/hive-exec-0.14.0.jar  
  2.    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer  

具体读取代码以下

[java]  view plain  copy
  1. scala> import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat  
  2. import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat  
  3.    
  4. scala> import org.apache.hadoop.io.{ArrayWritable, NullWritable, Text}  
  5. import org.apache.hadoop.io.{ArrayWritable, NullWritable, Text}  
  6.    
  7. scala>     val file =sc.hadoopFile("hdfs://localhost:9000/user/hive/warehouse/test_parquet/000000_0",  
  8.      |       classOf[MapredParquetInputFormat],classOf[Void],classOf[ArrayWritable])  
  9. file: org.apache.spark.rdd.RDD[(Void, org.apache.hadoop.io.ArrayWritable)] =  
  10.       hdfs://localhost:9000/user/hive/warehouse/test_parquet/000000_0 HadoopRDD[0] at hadoopFile at <console>:29  
  11.   
  12. scala> file.take(10).foreach{case(k,v)=>  
  13.      |       val writables = v.get()  
  14.      |       val name = writables(0)  
  15.      |       val age = writables(1)  
  16.      |       println(writables.length+"    "+name+"   "+age)  
  17.      |     }  

用MapredParquetInputFormat解析hive中parquet格式文件,每行数据将会解析成一个key和value,这里的key是空值,value是一个ArrayWritable,value的长度和表的列个数同样,value各个元素对应hive表中行各个字段的值

b.用spark DataFrame 解析parquet文件

[java]  view plain  copy
  1. val conf = new SparkConf().setAppName("test").setMaster("local")  
  2. val sc = new SparkContext(conf)  
  3. val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
  4. val parquet: DataFrame =  
  5.      sqlContext.read.parquet("hdfs://192.168.1.115:9000/user/hive/warehouse/test_parquet")  
  6. parquet.printSchema()  
  7. parquet.select(parquet("name"), parquet("age") + 1).show()  
  8.    
  9. root  
  10.  |-- name: string (nullable = true)  
  11.  |-- age: integer (nullable = true)  
  12.    
  13. +----+---------+  
  14. |name|(age + 1)|  
  15. +----+---------+  
  16. | leo|       28|  
  17. | jim|       39|  
  18. | leo|       16|  
  19. |jack|       23|  
  20. | jay|        8|  
  21. | jim|       38|  
  22. |jack|       37|  
  23. | jay|       12|  

c.用hivesql直接读取hive表

在local模式下没有测试成功,打包用spark-submit测试,代码以下

[java]  view plain  copy
  1. val conf = new SparkConf().setAppName("test")  
  2. val sc = new SparkContext(conf)  
  3. val hiveContext = new HiveContext(sc)  
  4. val sql: DataFrame = hiveContext.sql("select * from test_parquet limit 10")  
  5. sql.take(10).foreach(println)  
  6.    
  7. [leo,27]                                                                          
  8. [jim,38]  
  9. [leo,15]  
  10. [jack,22]  
  11. [jay,7]  
  12. [jim,37]  
  13. [jack,36]  
  14. [jay,11]  
  15. [leo,35]  
  16. [leo,33]  

提交任务命令行

[java]  view plain  copy
  1. spark-submit --class quickspark.QuickSpark02 --master spark://192.168.1.115:7077 sparkcore-1.0-SNAPSHOT.jar  

写parquet文件

a.用spark写parquet文件

[java]  view plain  copy
  1. val conf = new SparkConf().setAppName("test").setMaster("local")  
  2. val sc = new SparkContext(conf)  
  3. val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
  4.    
  5. //    读取文件生成RDD  
  6. val file = sc.textFile("hdfs://192.168.1.115:9000/test/user.txt")  
  7.    
  8.  //定义parquet的schema,数据字段和数据类型须要和hive表中的字段和数据类型相同,不然hive表没法解析  
  9. val schema = (new StructType)  
  10.       .add("name", StringType, true)  
  11.       .add("age", IntegerType, false)  
  12.    
  13. val rowRDD = file.map(_.split("\t")).map(p => Row(p(0), Integer.valueOf(p(1).trim)))  
  14. //    将RDD装换成DataFrame  
  15. val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)  
  16. peopleDataFrame.registerTempTable("people")  
  17.     peopleDataFrame.write.parquet("hdfs://192.168.1.115:9000/user/hive/warehouse/test_parquet/")  

用hivesql读取用spark DataFrame生成的parquet文件

[java]  view plain  copy
  1. hive> select * from test_parquet limit 10;  
  2. OK  
  3. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
  4. SLF4J: Defaulting to no-operation (NOP) logger implementation  
  5. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
  6. leo 27  
  7. jim 38  
  8. leo 15  
  9. jack    22  
  10. jay 7  
  11. jim 37  
  12. jack    36  
  13. jay 11  
  14. leo 35  
  15. leo 33  

b.用MapReduce写parquet文件

用MR读写parquet文件,刚开始打算使用hive中指定的org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat这个类,可是这个类的getRecordWriter方法没实现,直接抛出异常

[java]  view plain  copy
  1. @Override  
  2. public RecordWriter<Void, ArrayWritable> getRecordWriter(  
  3.     final FileSystem ignored,  
  4.     final JobConf job,  
  5.     final String name,  
  6.     final Progressable progress  
  7.     ) throws IOException {  
  8.   throw new RuntimeException("Should never be used");  
  9. }  

因此使用官方提供的parquet解析方式,github地址:https://github.com/apache/parquet-mr/,导入依赖

[java]  view plain  copy
  1. <dependency>  
  2.                 <groupId>org.apache.parquet</groupId>  
  3.                 <artifactId>parquet-common</artifactId>  
  4.                 <version>1.8.1</version>  
  5.             </dependency>  
  6.             <dependency>  
  7.                 <groupId>org.apache.parquet</groupId>  
  8.                 <artifactId>parquet-encoding</artifactId>  
  9.                 <version>1.8.1</version>  
  10.             </dependency>  
  11.             <dependency>  
  12.                 <groupId>org.apache.parquet</groupId>  
  13.                 <artifactId>parquet-column</artifactId>  
  14.                 <version>1.8.1</version>  
  15.             </dependency>  
  16.             <dependency>  
  17.                 <groupId>org.apache.parquet</groupId>  
  18.                 <artifactId>parquet-hadoop</artifactId>  
  19.                 <version>1.8.1</version>  
  20.             </dependency>  

Parquet读写有新旧两个版本,主要是新旧MR api之分,咱们用新旧老版本的MR实现下parquet文件的读写

旧版本以下

[java]  view plain  copy
  1. package com.fan.hadoop.parquet;  
  2.    
  3. import java.io.IOException;  
  4. import java.util.*;  
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.io.*;  
  7. import org.apache.hadoop.mapred.*;  
  8. import org.apache.parquet.hadoop.example.GroupWriteSupport;  
  9. import org.apache.parquet.example.data.Group;  
  10. import org.apache.parquet.example.data.simple.SimpleGroupFactory;  
  11. import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;  
  12. import org.apache.parquet.schema.MessageTypeParser;  
  13. /** 
  14.  * Created by fanlegefan.com on 17-7-17. 
  15.  */  
  16. public class ParquetMR {  
  17.    
  18.     public static class Map extends MapReduceBase implements  
  19.             Mapper<LongWritable, Text, Text, IntWritable> {  
  20.    
  21.         private final static IntWritable one = new IntWritable(1);  
  22.         private Text word = new Text();  
  23.    
  24.         public void map(LongWritable key, Text value,  
  25.                         OutputCollector<Text, IntWritable> output,  
  26.                             Reporter reporter) throws IOException {  
  27.             String line = value.toString();  
  28.             StringTokenizer tokenizer = new StringTokenizer(line);  
  29.             while (tokenizer.hasMoreTokens()) {  
  30.                 word.set(tokenizer.nextToken());  
  31.                 output.collect(word, one);  
  32.             }  
  33.         }  
  34.     }  
  35.    
  36.     public static class Reduce extends MapReduceBase implements  
  37.             Reducer<Text, IntWritable, Void, Group> {  
  38.         private SimpleGroupFactory factory;  
  39.         public void reduce(Text key, Iterator<IntWritable> values,  
  40.                            OutputCollector<Void, Group> output,  
  41.                            Reporter reporter) throws IOException {  
  42.             int sum = 0;  
  43.             while (values.hasNext()) {  
  44.                 sum += values.next().get();  
  45.             }  
  46.    
  47.             Group group = factory.newGroup()  
  48.                     .append("name",  key.toString())  
  49.                     .append("age", sum);  
  50.             output.collect(null,group);  
  51.         }  
  52.    
  53.         @Override  
  54.         public void configure(JobConf job) {  
  55.             factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(job));  
  56.         }  
  57.     }  
  58.    
  59.     public static void main(String[] args) throws Exception {  
  60.         JobConf conf = new JobConf(ParquetMR.class);  
  61.         conf.setJobName("wordcount");  
  62.    
  63.         String in = "hdfs://localhost:9000/test/wordcount.txt";  
  64.         String out = "hdfs://localhost:9000/test/wd";  
  65.    
  66.    
  67.         String writeSchema = "message example {\n" +  
  68.                 "required binary name;\n" +  
  69.                 "required int32 age;\n" +  
  70.                 "}";  
  71.    
  72.         conf.setMapOutputKeyClass(Text.class);  
  73.         conf.setMapOutputValueClass(IntWritable.class);  
  74.    
  75.         conf.setOutputKeyClass(NullWritable.class);  
  76.         conf.setOutputValueClass(Group.class);  
  77.    
  78.         conf.setMapperClass(Map.class);  
  79.         conf.setReducerClass(Reduce.class);  
  80.    
  81.         conf.setInputFormat(TextInputFormat.class);  
  82.         conf.setOutputFormat(DeprecatedParquetOutputFormat.class);  
  83.    
  84.         FileInputFormat.setInputPaths(conf, new Path(in));  
  85.         DeprecatedParquetOutputFormat.setWriteSupportClass(conf, GroupWriteSupport.class);  
  86.         GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema), conf);  
  87.    
  88.         DeprecatedParquetOutputFormat.setOutputPath(conf, new Path(out));  
  89.    
  90.         JobClient.runJob(conf);  
  91.     }  
  92.    
  93. }  

生成的文件:

[java]  view plain  copy
  1. hadoop dfs -ls /test/wd  
  2. Found 2 items  
  3. -rw-r--r--   3 work supergroup          0 2017-07-18 17:41 /test/wd/_SUCCESS  
  4. -rw-r--r--   3 work supergroup        392 2017-07-18 17:41 /test/wd/part-00000-r-00000.parquet  

将生成的文件复制到hive表test_parquet的路径下:

[java]  view plain  copy
  1. hadoop dfs -cp /test/wd/part-00000-r-00000.parquet /user/work/warehouse/test_parquet/  

测试hive表读取parquet文件

[java]  view plain  copy
  1. hive> select * from test_parquet limit 10;  
  2. OK  
  3. action  2  
  4. hadoop  2  
  5. hello   3  
  6. in  2  
  7. presto  1  
  8. spark   1  
  9. world   1  
  10. Time taken: 0.056 seconds, Fetched: 7 row(s)  

新版本以下

新版本的MR读写Parquet和老版本有点区别,schema必须用在conf中设置,其余的区别不大

[java]  view plain  copy
  1. conf.set("parquet.example.schema",writeSchema);  

仍是贴下完整的代码

[java]  view plain  copy
  1. package com.fan.hadoop.parquet;  
  2.    
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.LongWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Job;  
  9. import org.apache.hadoop.mapreduce.Mapper;  
  10. import org.apache.hadoop.mapreduce.Reducer;  
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  13. import org.apache.parquet.example.data.Group;  
  14. import org.apache.parquet.example.data.simple.SimpleGroupFactory;  
  15. import org.apache.parquet.hadoop.ParquetOutputFormat;  
  16. import org.apache.parquet.hadoop.example.GroupWriteSupport;  
  17. import java.io.IOException;  
  18. import java.util.StringTokenizer;  
  19.    
  20. /** 
  21.  * Created by fanglegefan.com on 17-7-18. 
  22.  */  
  23. public class ParquetNewMR {  
  24.    
  25.     public static class WordCountMap extends  
  26.             Mapper<LongWritable, Text, Text, IntWritable> {  
  27.    
  28.         private final IntWritable one = new IntWritable(1);  
  29.         private Text word = new Text();  
  30.    
  31.         public void map(LongWritable key, Text value, Context context)  
  32.                 throws IOException, InterruptedException {  
  33.             String line = value.toString();  
  34.             StringTokenizer token = new StringTokenizer(line);  
  35.             while (token.hasMoreTokens()) {  
  36.                 word.set(token.nextToken());  
  37.                 context.write(word, one);  
  38.             }  
  39.         }  
  40.     }  
  41.    
  42.     public static class WordCountReduce extends  
  43.             Reducer<Text, IntWritable, Void, Group> {  
  44.         private SimpleGroupFactory factory;  
  45.    
  46.         public void reduce(Text key, Iterable<IntWritable> values,  
  47.                            Context context) throws IOException, InterruptedException {  
  48.             int sum = 0;  
  49.             for (IntWritable val : values) {  
  50.                 sum += val.get();  
  51.             }  
  52.             Group group = factory.newGroup()  
  53.                     .append("name",  key.toString())  
  54.                     .append("age", sum);  
  55.             context.write(null,group);  
  56.         }  
  57.    
  58.         @Override  
  59.         protected void setup(Context context) throws IOException, InterruptedException {  
  60.             super.setup(context);  
  61.             factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration()));  
  62.    
  63.         }  
  64.     }  
  65.    
  66.     public static void main(String[] args) throws Exception {  
  67.         Configuration conf = new Configuration();  
  68.         String writeSchema = "message example {\n" +  
  69.                 "required binary name;\n" +  
  70.                 "required int32 age;\n" +  
  71.                 "}";  
  72.         conf.set("parquet.example.schema",writeSchema);  
  73.    
  74.         Job job = new Job(conf);  
  75.         job.setJarByClass(ParquetNewMR.class);  
  76.         job.setJobName("parquet");  
  77.    
  78.         String in = "hdfs://localhost:9000/test/wordcount.txt";  
  79.         String out = "hdfs://localhost:9000/test/wd1";  
  80.    
  81.         job.setMapOutputKeyClass(Text.class);  
  82.         job.setMapOutputValueClass(IntWritable.class);  
  83.    
  84.         job.setOutputValueClass(Group.class);  
  85.    
  86.         job.setMapperClass(WordCountMap.class);  
  87.         job.setReducerClass(WordCountReduce.class);  
  88.    
  89.         job.setInputFormatClass(TextInputFormat.class);  
  90.         job.setOutputFormatClass(ParquetOutputFormat.class);  
  91.    
  92.         FileInputFormat.addInputPath(job, new Path(in));  
  93.         ParquetOutputFormat.setOutputPath(job, new Path(out));  
  94.         ParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);  
  95.    
  96.         job.waitForCompletion(true);  
  97.     }  
  98. }  

查看生成的文件

[java]  view plain  copy
  1. hadoop dfs -ls /user/work/warehouse/test_parquet  
  2.    
  3. Found 4 items  
  4. -rw-r--r--   1 work work          0 2017-07-18 18:27 /user/work/warehouse/test_parquet/_SUCCESS  
  5. -rw-r--r--   1 work work        129 2017-07-18 18:27 /user/work/warehouse/test_parquet/_common_metadata  
  6. -rw-r--r--   1 work work        275 2017-07-18 18:27 /user/work/warehouse/test_parquet/_metadata  
  7. -rw-r--r--   1 work work        392 2017-07-18 18:27 /user/work/warehouse/test_parquet/part-r-00000.parquet  

将生成的文件复制到hive表test_parquet的路径下:

[java]  view plain  copy
  1. hadoop dfs -cp /test/wd/part-00000-r-00000.parquet /user/work/warehouse/test_parquet/  

测试hive

[java]  view plain  copy
  1. hive> select name,age from test_parquet limit 10;  
  2. OK  
  3. action  2  
  4. hadoop  2  
  5. hello   3  
  6. in  2  
  7. presto  1  
  8. spark   1  
  9. world   1  
  10. Time taken: 0.036 seconds, Fetched: 7 row(s)  

用mapreduce读parquet文件

[java]  view plain  copy
  1. package com.fan.hadoop.parquet;  
  2.    
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.LongWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Job;  
  8. import org.apache.hadoop.mapreduce.Mapper;  
  9. import org.apache.hadoop.mapreduce.Reducer;  
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  11. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  12. import org.apache.parquet.example.data.Group;  
  13. import org.apache.parquet.hadoop.ParquetInputFormat;  
  14. import org.apache.parquet.hadoop.api.DelegatingReadSupport;  
  15. import org.apache.parquet.hadoop.api.InitContext;  
  16. import org.apache.parquet.hadoop.api.ReadSupport;  
  17. import org.apache.parquet.hadoop.example.GroupReadSupport;  
  18.    
  19. import java.io.IOException;  
  20. import java.util.*;  
  21.    
  22. /** 
  23.  * Created by fanglegefan.com on 17-7-18. 
  24.  */  
  25. public class ParquetNewMRReader {  
  26.    
  27.     public static class WordCountMap1 extends  
  28.             Mapper<Void, Group, LongWritable, Text> {  
  29.    
  30.         protected void map(Void key, Group value,  
  31.                            Mapper<Void, Group, LongWritable, Text>.Context context)  
  32.                 throws IOException, InterruptedException {  
  33.    
  34.             String name = value.getString("name",0);  
  35.             int  age = value.getInteger("age",0);  
  36.    
  37.             context.write(new LongWritable(age),  
  38.                     new Text(name));  
  39.         }  
  40.     }  
  41.    
  42.     public static class WordCountReduce1 extends  
  43.             Reducer<LongWritable, Text, LongWritable, Text> {  
  44.    
  45.         public void reduce(LongWritable key, Iterable<Text> values,  
  46.                            Context context) throws IOException, InterruptedException {  
  47.             Iterator<Text> iterator = values.iterator();  
  48.             while(iterator.hasNext()){  
  49.                 context.write(key,iterator.next());  
  50.             }  
  51.         }  
  52.    
  53.     }  
  54.    
  55.     public static final class MyReadSupport extends DelegatingReadSupport<Group> {  
  56.         public MyReadSupport() {  
  57.             super(new GroupReadSupport());  
  58.         }  
  59.    
  60.         @Override  
  61.         public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) {  
  62.             return super.init(context);  
  63.         }  
  64.     }  
  65.    
  66.     public static void main(String[] args) throws Exception {  
  67.         Configuration conf = new Configuration();  
  68.         String readSchema = "message example {\n" +  
  69.                 "required binary name;\n" +  
  70.                 "required int32 age;\n" +  
  71.                 "}";  
  72.         conf.set(ReadSupport.PARQUET_READ_SCHEMA, readSchema);  
  73.    
  74.         Job job = new Job(conf);  
  75.         job.setJarByClass(ParquetNewMRReader.class);  
  76.         job.setJobName("parquet");  
  77.    
  78.         String in = "hdfs://localhost:9000/test/wd1";  
  79.         String  out = "hdfs://localhost:9000/test/wd2";  
  80.    
  81.    
  82.         job.setMapperClass(WordCountMap1.class);  
  83.         job.setReducerClass(WordCountReduce1.class);  
  84.    
  85.         job.setInputFormatClass(ParquetInputFormat.class);  
  86.         ParquetInputFormat.setReadSupportClass(job, MyReadSupport.class);  
  87.         ParquetInputFormat.addInputPath(job, new Path(in));  
  88.    
  89.         job.setOutputFormatClass(TextOutputFormat.class);  
  90.         FileOutputFormat.setOutputPath(job, new Path(out));  
  91.    
  92.         job.waitForCompletion(true);  
  93.     }  
  94. }  

查看生成的文件

[java]  view plain  copy
  1. hadoop dfs -cat /test/wd2/part-r-00000  
  2.    
  3. 1       world  
  4. 1       spark  
  5. 1       presto  
  6. 2       in  
  7. 2       hadoop  
  8. 2       action  
  9. 3       hello