Hive实战:加载mapreduce生成的snappy压缩文件到Hive表

咱们知道hadoop支持不少压缩格式,有时候为了节省存储空间咱们常常讲mapreduce任务生成的结果数据保存成压缩文件,snappy 压缩是比较经常使用的存储加压缩格式,这里讲解如何经过mapreduce任务生成snappy压缩结果集,将snappy结果数据集,经过load方式加载到Hive数据表中。
java


    举例说明一下,咱们这一份订单流水文件,里面有三列:消费者,商品,购买数量,咱们经过mapreduce任务生成一份统计结果,而后load到hive数据表中,统计结果格式:consumer_product,购买数量
apache


    1.数据准备,mapreduce输入数据input1.txt,三列消费者id,产品id,购买数量:
app

[root@salver158 ~]# hadoop fs -text /tmp/input/input1.txtconsumer1:product1:1consumer2:product2:2consumer2:product3:3consumer1:product1:3consumer1:product3:2consumer1:product1:1consumer1:product2:1consumer2:product2:2consumer1:product1:3consumer2:product3:3consumer1:product1:2consumer1:product2:1

2.自定义map类:ide

protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
           String[] lines = value.toString().split(":");
           /*购买数量id*/            int  count=Integer.valueOf(lines[2]);                        System.out.println("消费者: "+lines[0]);            System.out.println("商品: "+lines[1]);            System.out.println("购买数量: "+count);                        String mapKey=lines[0]+","+lines[1];            //拼接key:消费者id_商品id   value为 购买数量:count,            context.write(new Text(mapKey), new IntWritable(count));        }    }


3.自定义reduce类,这里直接用的wordcount里面的reduce没作改动:
函数

 //参数同Map同样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型。    // 这里的输入是来源于map,因此类型要与map的输出类型对应 。    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {        private IntWritable result = new IntWritable();
       @Override        protected void reduce(Text key, Iterable<IntWritable> values, Context context)                throws IOException, InterruptedException {            int sum = 0;
           //for循环遍历,将获得的values值累加,统计购买次数            for (IntWritable value : values) {                sum += value.get();            }            result.set(sum);            context.write(key, result);//将结果保存到context中,最终输出形式为"key" + "result"        }    }

4.这里整个主函数以下:oop

package com.hadoop.ljs;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-03-10 10:54 * @version: v1.0 * @description: com.hadoop.ljs */public class TradeCount {    public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {        //这里的IntWritable至关于Int类型        //Text至关于String类型        // map参数<keyIn key,valueIn value,Context context>,将处理后的数据写入context并传给reduce        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] lines = value.toString().split(":");            /*购买数量id*/            int  count=Integer.valueOf(lines[2]);
           System.out.println("消费者: "+lines[0]);            System.out.println("商品: "+lines[1]);            System.out.println("购买数量: "+count);
           String mapKey=lines[0]+","+lines[1];            //拼接key:消费者id_商品id   value为 购买数量:count,            context.write(new Text(mapKey), new IntWritable(count));        }    }    //参数同Map同样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型。    // 这里的输入是来源于map,因此类型要与map的输出类型对应 。    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {        private IntWritable result = new IntWritable();        @Override        protected void reduce(Text key, Iterable<IntWritable> values, Context context)                throws IOException, InterruptedException {            int sum = 0;
           //for循环遍历,将获得的values值累加,统计购买次数            for (IntWritable value : values) {                sum += value.get();            }            result.set(sum);            context.write(key, result);//将结果保存到context中,最终输出形式为"key" + "result"        }    }    public static void main(String[] args) throws IOExceptionClassNotFoundExceptionInterruptedException {        Configuration configuration = new Configuration();    /*    *//*设置map端使用snappy压缩*//*        configuration.set("mapreduce.map.output.compress","true");        configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");        *//*设置reduce端使用snappy压缩*//*        configuration.set("mapreduce.output.fileoutputformat.compress","true");        configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");        configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");*/      /*  *//*设置Reduce端分隔符*//*        configuration.set("mapreduce.output.textoutputformat.separator","@@@");*/        /*或者经过下面这个属性设置*/        /*configuration.set("mapred.textoutputformat.separator",",");*/                        String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();        if (otherArgs.length < 2) {            System.err.println("Usage: wordcount <in> [<in>...] <out>");            System.exit(2);        }        Job job = Job.getInstance(configuration, "TradeCount");        job.setJarByClass(TradeCount.class);        job.setMapperClass(MyMapper.class);        job.setCombinerClass(MyReducer.class);        job.setReducerClass(MyReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        for (int i = 0; i < otherArgs.length - 1; ++i) {            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));        }        FileOutputFormat.setOutputPath(job,                new Path(otherArgs[otherArgs.length - 1]));        System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

6.mvn打成jar包,经过hadoop jar提交,执行以下命令:
spa

hadoop jar /root/hadoop273-1.0-SNAPSHOT-jar-with-dependencies.jar     \-Dmapreduce.map.output.compress=true   \  -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec  \-Dmapreduce.output.fileoutputformat.compress=true  \-Dmapreduce.output.fileoutputformat.compress.type=RECORD \-Dmapreduce.output.textoutputformat.separator=","   \-Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec  \/tmp/input   /tmp/output2

    我输入的参数比较多,map端的压缩能够不设置,只设置reduce便可,你能够直接在代码中进行设置,这里只是让你们熟悉下各个参数:
code

    1).map端设置snappy压缩:
orm

configuration.set("mapreduce.map.output.compress","true");configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

    2).reduce端设置snappy压缩:ip

configuration.set("mapreduce.output.fileoutputformat.compress","true");configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

    3).指定reduce端分隔符,默认为制表符:

 /*设置Reduce端分隔符*/configuration.set("mapreduce.output.textoutputformat.separator",",");  /*或者经过下面这个属性设置*//*configuration.set("mapred.textoutputformat.separator",",");*/


7.生成结果以下,因为我上面的命令,加了-D后面的几个参数,对结果集作了压缩,因此生成的结果集是.snappy文件,以下:

[root@salver158 ~]# hadoop fs -ls /tmp/output2Found 2 items-rw-r--r--   3 hbase hdfs          0 2020-03-10 12:32 /tmp/output2/_SUCCESS-rw-r--r--   3 hbase hdfs         70 2020-03-10 12:32 /tmp/output2/part-r-00000.snappy[root@salver158 ~]# hadoop fs -text /tmp/output2/part-r-00000.snappy20/03/10 14:06:11 INFO compress.CodecPool: Got brand-new decompressor [.snappy]consumer1,product1,10consumer1,product2,2consumer1,product3,2consumer2,product2,4consumer2,product3,6

8.新建hive表,执行命令:

hive> create table table8(name string,product string,count int )ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;

9.加载数据到数据表中,执行命令:

hive> load data inpath '/tmp/output2/part-r-00000.snappy'  into table table8;Loading data to table lujs.table8Table lujs.table8 stats: [numFiles=1, numRows=0, totalSize=70, rawDataSize=0]OKTime taken: 1.018 seconds
hive> select * from table8;OKconsumer1  product1  10consumer1  product2  2consumer1  product3  2consumer2  product2  4consumer2  product3  6Time taken: 0.295 seconds, Fetched: 5 row(s)

 

    至此,Hive处理mapreduce任务生成的snappy结果集讲解完毕,我这里只是简单的作了一个wordcount,你的业务逻辑确定比这个要复杂不少,只须要在map或者reduce端自定义本身的业务逻辑便可,思路都是一致的。

相关文章
相关标签/搜索