问题描述:
java
输入文件格式以下:apache
name1 2app
name3 4ide
name1 6函数
name1 1oop
name3 3this
name1 0spa
要求输出的文件格式以下:orm
name1 0,1,2,6对象
name3 3,4
要求是按照第一列分组,name1与name3也是按照顺序排列的,组内升序排序。
思路:
常规的输出,没法排序key所对应的多个值的顺序。为了排序组内中的值,须要将key与value放在同一个组。Job中有两个方法setGroupingComparatorClass和setSortComparatorClass,能够利用这两个方法来实现组内排序。可是这些排序都是基于key的,则就要将key和value定义成组合键。
可是必需要保证第一列相同的所有都放在同一个分区中,则就须要自定义分区,分区的时候只考虑第一列的值。因为partitioner仅仅能保证每个reducer接受同一个name的全部记录,可是reducer仍然是经过键进行分组的分区,也就说该分区中仍是按照键来分红不一样的组,还须要分组只参考name值
先按照name分组,再在name中内部进行排序。
解决方法:
运用自定义组合键的策略,将name和1定义为一个组合键。在分区的时候只参考name的值,即继承partitioner。
因为要按照name分组,则就须要定义分组策略,而后设置setGroupingComparatorClass。
setGroupingComparatorClass主要定义哪些key能够放置在一组,分组的时候会对组合键进行比较,因为这里只须要考虑组合键中的一个值,则定义实现一个WritableComparator,设置比较策略。
对于组内的排序,能够利用setSortComparatorClass来实现,
这个方法主要用于定义key如何进行排序在它们传递给reducer以前,
这里就能够来进行组内排序。
具体代码:
Hadoop版本号:hadoop1.1.2
自定义组合键
package whut; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; //自定义组合键策略 //java基本类型数据 public class TextInt implements WritableComparable{ //直接利用java的基本数据类型 private String firstKey; private int secondKey; //必需要有一个默认的构造函数 public String getFirstKey() { return firstKey; } public void setFirstKey(String firstKey) { this.firstKey = firstKey; } public int getSecondKey() { return secondKey; } public void setSecondKey(int secondKey) { this.secondKey = secondKey; } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(firstKey); out.writeInt(secondKey); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub firstKey=in.readUTF(); secondKey=in.readInt(); } //map的键的比较就是根据这个方法来进行的 @Override public int compareTo(Object o) { // TODO Auto-generated method stub TextInt ti=(TextInt)o; //利用这个来控制升序或降序 //this本对象写在前面表明是升序 //this本对象写在后面表明是降序 return this.getFirstKey().compareTo(ti.getFirstKey()); } }
分组策略
package whut; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; //主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组 public class TextComparator extends WritableComparator { //必需要调用父类的构造器 protected TextComparator() { super(TextInt.class,true);//注册comparator } @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub TextInt ti1=(TextInt)a; TextInt ti2=(TextInt)b; return ti1.getFirstKey().compareTo(ti2.getFirstKey()); } }
组内排序策略
package whut; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; //分组内部进行排序,按照第二个字段进行排序 public class TextIntComparator extends WritableComparator { public TextIntComparator() { super(TextInt.class,true); } //这里能够进行排序的方式管理 //必须保证是同一个分组的 //a与b进行比较 //若是a在前b在后,则会产生升序 //若是a在后b在前,则会产生降序 @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub TextInt ti1=(TextInt)a; TextInt ti2=(TextInt)b; //首先要保证是同一个组内,同一个组的标识就是第一个字段相同 if(!ti1.getFirstKey().equals(ti2.getFirstKey())) return ti1.getFirstKey().compareTo(ti2.getFirstKey()); else return ti2.getSecondKey()-ti1.getSecondKey();//0,-1,1 } }
分区策略
package whut; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; //参数为map的输出类型 public class KeyPartitioner extends Partitioner<TextInt, IntWritable> { @Override public int getPartition(TextInt key, IntWritable value, int numPartitions) { // TODO Auto-generated method stub return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions; } }
MapReduce策略
package whut; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //须要对数据进行分组以及组内排序的时候 public class SortMain extends Configured implements Tool{ //这里设置输入文格式为KeyValueTextInputFormat //name1 5 //默认输入格式都是Text,Text public static class GroupMapper extends Mapper<Text, Text, TextInt, IntWritable> { public IntWritable second=new IntWritable(); public TextInt tx=new TextInt(); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String lineKey=key.toString(); String lineValue=value.toString(); int lineInt=Integer.parseInt(lineValue); tx.setFirstKey(lineKey); tx.setSecondKey(lineInt); second.set(lineInt); context.write(tx, second); } } //设置reduce public static class GroupReduce extends Reducer<TextInt, IntWritable, Text, Text> { @Override protected void reduce(TextInt key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { StringBuffer sb=new StringBuffer(); for(IntWritable val:values) { sb.append(val+","); } if(sb.length()>0) { sb.deleteCharAt(sb.length()-1); } context.write(new Text(key.getFirstKey()), new Text(sb.toString())); } } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf=getConf(); Job job=new Job(conf,"SecondarySort"); job.setJarByClass(SortMain.class); // 设置输入文件的路径,已经上传在HDFS FileInputFormat.addInputPath(job, new Path(args[0])); // 设置输出文件的路径,输出文件也存在HDFS中,可是输出目录不能已经存在 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(GroupMapper.class); job.setReducerClass(GroupReduce.class); //设置分区方法 job.setPartitionerClass(KeyPartitioner.class); //下面这两个都是针对map端的 //设置分组的策略,哪些key能够放置到一组中 job.setGroupingComparatorClass(TextComparator.class); //设置key如何进行排序在传递给reducer以前. //这里就能够设置对组内如何排序的方法 /*************关键点**********/ job.setSortComparatorClass(TextIntComparator.class); //设置输入文件格式 job.setInputFormatClass(KeyValueTextInputFormat.class); //使用默认的输出格式即TextInputFormat //设置map的输出key和value类型 job.setMapOutputKeyClass(TextInt.class); job.setMapOutputValueClass(IntWritable.class); //设置reduce的输出key和value类型 //job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); int exitCode=job.isSuccessful()?0:1; return exitCode; } public static void main(String[] args) throws Exception { int exitCode=ToolRunner.run(new SortMain(), args); System.exit(exitCode); } }
注意事项
1,设置分组排序按照升序仍是降序是在自定义WritableComparable中的compareTo()方法实现的,具体升序或者降序的设置在代码中已经注释说明
2,设置组内值进行升序仍是降序的排序是在组内排序策略中的compare()方法注释说明的。
3,这里同时最重要的一点是,将第二列即放在组合键中,又做为value,这样对于组合键排序也就至关于对于value进行排序了。
4,在自定义组合键的时候,对于组合键中的数据的基本类型能够采用Java的基本类型也能够采用Hadoop的基本数据类型,对于Hadoop的基本数据类型必定要记得初始化new一个基本数据类型对象。对于组合键类,必需要有默认的构造方法。