咱们知道hadoop支持不少压缩格式,有时候为了节省存储空间咱们常常讲mapreduce任务生成的结果数据保存成压缩文件,snappy 压缩是比较经常使用的存储加压缩格式,这里讲解如何经过mapreduce任务生成snappy压缩结果集,将snappy结果数据集,经过load方式加载到Hive数据表中。
java
举例说明一下,咱们这一份订单流水文件,里面有三列:消费者,商品,购买数量,咱们经过mapreduce任务生成一份统计结果,而后load到hive数据表中,统计结果格式:consumer_product,购买数量
apache
1.数据准备,mapreduce输入数据input1.txt,三列消费者id,产品id,购买数量:
app
[root@salver158 ~]# hadoop fs -text /tmp/input/input1.txtconsumer1:product1:1consumer2:product2:2consumer2:product3:3consumer1:product1:3consumer1:product3:2consumer1:product1:1consumer1:product2:1consumer2:product2:2consumer1:product1:3consumer2:product3:3consumer1:product1:2consumer1:product2:1
2.自定义map类:ide
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] lines = value.toString().split(":");
/*购买数量id*/ int count=Integer.valueOf(lines[2]); System.out.println("消费者: "+lines[0]); System.out.println("商品: "+lines[1]); System.out.println("购买数量: "+count); String mapKey=lines[0]+","+lines[1]; //拼接key:消费者id_商品id value为 购买数量:count, context.write(new Text(mapKey), new IntWritable(count)); } }
3.自定义reduce类,这里直接用的wordcount里面的reduce没作改动:
函数
//参数同Map同样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型。 // 这里的输入是来源于map,因此类型要与map的输出类型对应 。 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable();
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0;
//for循环遍历,将获得的values值累加,统计购买次数 for (IntWritable value : values) { sum += value.get(); } result.set(sum); context.write(key, result);//将结果保存到context中,最终输出形式为"key" + "result" } }
4.这里整个主函数以下:oop
package com.hadoop.ljs;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-03-10 10:54 * @version: v1.0 * @description: com.hadoop.ljs */public class TradeCount { public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { //这里的IntWritable至关于Int类型 //Text至关于String类型 // map参数<keyIn key,valueIn value,Context context>,将处理后的数据写入context并传给reduce protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] lines = value.toString().split(":"); /*购买数量id*/ int count=Integer.valueOf(lines[2]);
System.out.println("消费者: "+lines[0]); System.out.println("商品: "+lines[1]); System.out.println("购买数量: "+count);
String mapKey=lines[0]+","+lines[1]; //拼接key:消费者id_商品id value为 购买数量:count, context.write(new Text(mapKey), new IntWritable(count)); } } //参数同Map同样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型。 // 这里的输入是来源于map,因此类型要与map的输出类型对应 。 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0;
//for循环遍历,将获得的values值累加,统计购买次数 for (IntWritable value : values) { sum += value.get(); } result.set(sum); context.write(key, result);//将结果保存到context中,最终输出形式为"key" + "result" } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); /* *//*设置map端使用snappy压缩*//* configuration.set("mapreduce.map.output.compress","true"); configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); *//*设置reduce端使用snappy压缩*//* configuration.set("mapreduce.output.fileoutputformat.compress","true"); configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD"); configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");*/ /* *//*设置Reduce端分隔符*//* configuration.set("mapreduce.output.textoutputformat.separator","@@@");*/ /*或者经过下面这个属性设置*/ /*configuration.set("mapred.textoutputformat.separator",",");*/ String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(configuration, "TradeCount"); job.setJarByClass(TradeCount.class); job.setMapperClass(MyMapper.class); job.setCombinerClass(MyReducer.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
6.mvn打成jar包,经过hadoop jar提交,执行以下命令:
spa
hadoop jar /root/hadoop273-1.0-SNAPSHOT-jar-with-dependencies.jar \-Dmapreduce.map.output.compress=true \ -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec \-Dmapreduce.output.fileoutputformat.compress=true \-Dmapreduce.output.fileoutputformat.compress.type=RECORD \-Dmapreduce.output.textoutputformat.separator="," \-Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec \/tmp/input /tmp/output2
我输入的参数比较多,map端的压缩能够不设置,只设置reduce便可,你能够直接在代码中进行设置,这里只是让你们熟悉下各个参数:
code
1).map端设置snappy压缩:
orm
configuration.set("mapreduce.map.output.compress","true");configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
2).reduce端设置snappy压缩:ip
configuration.set("mapreduce.output.fileoutputformat.compress","true");configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
3).指定reduce端分隔符,默认为制表符:
/*设置Reduce端分隔符*/configuration.set("mapreduce.output.textoutputformat.separator",","); /*或者经过下面这个属性设置*//*configuration.set("mapred.textoutputformat.separator",",");*/
7.生成结果以下,因为我上面的命令,加了-D后面的几个参数,对结果集作了压缩,因此生成的结果集是.snappy文件,以下:
[root@salver158 ~]# hadoop fs -ls /tmp/output2Found 2 items-rw-r--r-- 3 hbase hdfs 0 2020-03-10 12:32 /tmp/output2/_SUCCESS-rw-r--r-- 3 hbase hdfs 70 2020-03-10 12:32 /tmp/output2/part-r-00000.snappy[root@salver158 ~]# hadoop fs -text /tmp/output2/part-r-00000.snappy20/03/10 14:06:11 INFO compress.CodecPool: Got brand-new decompressor [.snappy]consumer1,product1,10consumer1,product2,2consumer1,product3,2consumer2,product2,4consumer2,product3,6
8.新建hive表,执行命令:
hive> create table table8(name string,product string,count int )ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;
9.加载数据到数据表中,执行命令:
hive> load data inpath '/tmp/output2/part-r-00000.snappy' into table table8;Loading data to table lujs.table8Table lujs.table8 stats: [numFiles=1, numRows=0, totalSize=70, rawDataSize=0]OKTime taken: 1.018 seconds
hive> select * from table8;OKconsumer1 product1 10consumer1 product2 2consumer1 product3 2consumer2 product2 4consumer2 product3 6Time taken: 0.295 seconds, Fetched: 5 row(s)
至此,Hive处理mapreduce任务生成的snappy结果集讲解完毕,我这里只是简单的作了一个wordcount,你的业务逻辑确定比这个要复杂不少,只须要在map或者reduce端自定义本身的业务逻辑便可,思路都是一致的。