package com.demo.admin; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Test extends Configured implements Tool { //构建map类 public static class TestMap extends Mapper<LongWritable, Text, Text, TestWritable>{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ //根据$开始切割字段名 final String[] splited = value.toString().split(“\\$”); //以name为key从0开始 final String name = splited[0]; final Text k2 = new Text(name); //phone shiqu就是第四位和第八位 final TestWritable v2=new TestWritable(splited[4], splited[8]); //name为key值 phone和shiqu为v2写入 context.write(k2, v2); } } //构建reduce类 public static class TestReduce extends Reducer<Text, TestWritable, Text, TestWritable>{ public void reduce(Text k2,Iterable<TestWritable> v2s,Context context) throws IOException, InterruptedException{ String phone; String shiqu; //循环全部的key值和values值 for(TestWritable testWritable:v2s){ phone=testWritable.phone; shiqu=testWritable.shiqu; TestWritable v3=new TestWritable(phone, shiqu); context.write(k2, v3); } } } //main方法启动 public static void main(String [] args) throws IOException, Exception{ ToolRunner.run(new Test(), args); } @SuppressWarnings(“deprecation”) public int run(String[] args) throws Exception { Configuration conf=new Configuration(); String[]argArray=new GenericOptionsParser(conf, args).getRemainingArgs(); if(argArray.length!=2){ System.out.println(“请提供两个参数”); System.exit(1); } Job job=Job.getInstance(conf, “Test”); FileSystem fs = FileSystem.get(new URI(args[1]), conf); fs.delete(new Path(args[1])); job.setJarByClass(Test.class); job.setMapperClass(TestMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TestWritable.class); job.setReducerClass(TestReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TestWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); return 0; } static class TestWritable implements Writable{ String phone; String shiqu; public TestWritable(String phone,String shiqu){ this.phone=phone; this.shiqu=shiqu; } //无参构造方法public class UserBean implements Writable //这个应该是在自定义writable的时候须要注意,反射过程当中须要调用无参构造。 public TestWritable(){} public void readFields(DataInput in) throws IOException { this.phone=in.readUTF(); this.shiqu=in.readUTF(); } public void write(DataOutput out) throws IOException { out.writeUTF(phone); out.writeUTF(shiqu); } public String toString() { return phone + “\t” + shiqu + “\t”; } } } 示例文件:张三$25$男$未婚$15997444444$409930360$中国$湖北$广水 输入文件:张三 15997444444广水 shell 命令: /usr/local/hadoop/bin/hadoop fs -put /home/XX/test.txt /test_log/ /usr/local/hadoop/bin/hadoop jar /home/XX/test.jar /test_log/test.txt /test_cleaned/ 1>/dev/nul