问题描述:java
一个trade table表apache
product1"trade1数组
product2"trade2app
product3"trade3ide
一个pay table表oop
product1"pay1测试
product2"pay2this
product2"pay3spa
product1"pay4命令行
product3"pay5
product3"pay6
创建两个表之间的链接,该两表是一对多关系的
以下:
trade1pay1
trade1pay4
trade2pay2
...
思路:
为了将两个表整合到一块儿,因为有相同的第一列,且第一个表与第二个表是一对多关系的。
这里依然采用分组,以及组内排序,只要保证一方最早到达reduce端,则就能够进行迭代处理了。
为了保证第一个表先到达reduce端,能够为定义一个组合键,包含两个值,第一个值为product,第二个值为0或者1,来分别表明第一个表和第二个表,只要按照组内升序排列便可。
具体代码:
自定义组合键策略
package whut.onetomany; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class TextIntPair implements WritableComparable{ //product1 0/1 private String firstKey;//product1 private int secondKey;//0,1;0表明是trade表,1表明是pay表 //只须要保证trade表在pay表前面就行,则只须要对组顺序排列 public String getFirstKey() { return firstKey; } public void setFirstKey(String firstKey) { this.firstKey = firstKey; } public int getSecondKey() { return secondKey; } public void setSecondKey(int secondKey) { this.secondKey = secondKey; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(firstKey); out.writeInt(secondKey); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub firstKey=in.readUTF(); secondKey=in.readInt(); } @Override public int compareTo(Object o) { // TODO Auto-generated method stub TextIntPair tip=(TextIntPair)o; return this.getFirstKey().compareTo(tip.getFirstKey()); } }
分组策略
package whut.onetomany; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TextComparator extends WritableComparator{ protected TextComparator() { super(TextIntPair.class,true);//注册比较器 } @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub TextIntPair tip1=(TextIntPair)a; TextIntPair tip2=(TextIntPair)b; return tip1.getFirstKey().compareTo(tip2.getFirstKey()); } }
组内排序策略:目的是保证第一个表比第二个表先到达
package whut.onetomany; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; //分组内部进行排序,按照第二个字段进行排序 public class TextIntComparator extends WritableComparator { public TextIntComparator() { super(TextIntPair.class,true); } //这里能够进行排序的方式管理 //必须保证是同一个分组的 //a与b进行比较 //若是a在前b在后,则会产生升序 //若是a在后b在前,则会产生降序 @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub TextIntPair ti1=(TextIntPair)a; TextIntPair ti2=(TextIntPair)b; //首先要保证是同一个组内,同一个组的标识就是第一个字段相同 if(!ti1.getFirstKey().equals(ti2.getFirstKey())) return ti1.getFirstKey().compareTo(ti2.getFirstKey()); else return ti1.getSecondKey()-ti2.getSecondKey();//0,-1,1 } }
分区策略:
package whut.onetomany; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class PartitionByText extends Partitioner<TextIntPair, Text> { @Override public int getPartition(TextIntPair key, Text value, int numPartitions) { // TODO Auto-generated method stub return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions; } }
MapReduce
package whut.onetomany; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JoinMain extends Configured implements Tool { public static class JoinMapper extends Mapper<LongWritable, Text, TextIntPair, Text> { private TextIntPair tp=new TextIntPair(); private Text val=new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //获取要处理的文件的名称 FileSplit file=(FileSplit)context.getInputSplit(); String fileName=file.getPath().toString(); //获取输入行分隔 String line=value.toString(); String[] lineKeyValue=line.split("\""); String lineKey=lineKeyValue[0]; String lineValue=lineKeyValue[1]; tp.setFirstKey(lineKey); //判断是不是trade文件 if(fileName.indexOf("trade")>=0) { tp.setSecondKey(0); val.set(lineValue); } //判断是不是pay文件 else if(fileName.indexOf("pay")>=0) { tp.setSecondKey(1); val.set(lineValue); } context.write(tp, val); } } public static class JoinReducer extends Reducer<TextIntPair, Text, Text, Text> { @Override protected void reduce(TextIntPair key, Iterable<Text> values, Context context)throws IOException, InterruptedException { Iterator<Text> valList=values.iterator(); //注意这里必定要写成string不可变,写成Text有问题 //Text trade=valList.next(); String tradeName=valList.next().toString(); while(valList.hasNext()) { Text pay=valList.next(); context.write(new Text(tradeName), pay); } } } @Override public int run(String[] args) throws Exception { Configuration conf=getConf(); Job job=new Job(conf,"JoinJob"); job.setJarByClass(JoinMain.class); //ToolRunner已经利用GenericOptionsParser解析了命令行中的参数 //而且将其存放在数组中,传递给该run()方法了 FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.addInputPath(job, new Path(args[1])); //输入文件必须以,隔开 //FileInputFormat.addInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); //设置分区方法 job.setPartitionerClass(PartitionByText.class); //设置分组排序 job.setGroupingComparatorClass(TextComparator.class); job.setSortComparatorClass(TextIntComparator.class); job.setMapOutputKeyClass(TextIntPair.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); int exitCode=job.isSuccessful()?0:1; return exitCode; } public static void main(String[] args)throws Exception { // TODO Auto-generated method stub int code=ToolRunner.run(new JoinMain(), args); System.exit(code); } }
注意:
通常有些地方没有定义组内排序策略,可是通过屡次测试,发现没法保证第一个表在第二个表以前到达,则这里就自定义了组内排序策略。版本号为Hadoop1.1.2