示例代码java
package com.vip09; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class ScoreWritable implements WritableComparable<Object>{ //在自定义的数据类型中,建议使用java原生的数据类型 private float chinese ; private float math ; private float english ; private float physics ; private float chemistry ; //在自定义的数据类型中,必需要有一个无参的构造方法 public ScoreWritable(){} public ScoreWritable(float chinese, float math, float english, float physics, float chemistry) { this.chinese = chinese; this.math = math; this.english = english; this.physics = physics; this.chemistry = chemistry; } public void set(float chinese, float math, float english, float physics, float chemistry){ this.chinese = chinese; this.math = math; this.english = english; this.physics = physics; this.chemistry = chemistry; } public float getChinese() { return chinese; } public float getMath() { return math; } public float getEnglish() { return english; } public float getPhysics() { return physics; } public float getChemistry() { return chemistry; } //是在写入数据的时候调用,进行序列化 @Override public void write(DataOutput out) throws IOException { out.writeFloat(chinese); out.writeFloat(math); out.writeFloat(english); out.writeFloat(physics); out.writeFloat(chemistry); } //该方法是在取出数据时调用,反序列化,以便生成对象 @Override public void readFields(DataInput in) throws IOException { chinese = in.readFloat() ; math = in.readFloat() ; english = in.readFloat() ; physics = in.readFloat() ; chemistry = in.readFloat() ; } @Override public int compareTo(Object o) { // TODO Auto-generated method stub return 0; } }
package com.vip09; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ScoreCount extends Configured implements Tool{ //map和reduce public static class ScoreMapper extends Mapper<Text, ScoreWritable, Text, ScoreWritable>{ @Override protected void map(Text key, ScoreWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static class ScoreReducer extends Reducer<Text, ScoreWritable, Text, Text>{ private Text text = new Text() ; @Override protected void reduce(Text key, Iterable<ScoreWritable> value, Context context) throws IOException, InterruptedException { float totalScore = 0.0f ; float avgScore = 0.0f ; for (ScoreWritable sw : value) { totalScore = sw.getChinese() + sw.getEnglish() + sw.getMath() + sw.getPhysics() + sw.getChemistry() ; avgScore = totalScore/5 ; } text.set(totalScore + "\t" + avgScore); context.write(key, text); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration() ; //删除已经存在的输出目录 Path mypath = new Path(args[1]) ; FileSystem hdfs = mypath.getFileSystem(conf); if(hdfs.isDirectory(mypath)){ hdfs.delete(mypath, true) ; } Job job = Job.getInstance(conf, "scorecount") ; job.setJarByClass(ScoreCount.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(ScoreMapper.class); job.setReducerClass(ScoreReducer.class); //若是是自定义的类型,须要进行设置 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ScoreWritable.class); //设置自定义的输入格式 job.setInputFormatClass(ScoreInputFormat.class); job.waitForCompletion(true) ; return 0; } public static void main(String[] args) throws Exception { String[] args0 = {"hdfs://192.168.153.111:9000/input5", "hdfs://192.168.153.111:9000/output15"} ; int res = ToolRunner.run(new ScoreCount(), args0) ; System.exit(res); } }
package com.vip09; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class ScoreInputFormat extends FileInputFormat<Text, ScoreWritable> { //须要注意的是: /* * 对于一个数据输入格式,都须要一个对应的RecordReader * 重写createRecordReader()方法,其实也就是重写其返回的对象 * 这里就是自定义的ScoreRecordReader类,该类须要继承RecordReader,实现数据的读取 * */ @Override public RecordReader<Text, ScoreWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new ScoreRecordReader(); } }
package com.vip09; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; public class ScoreRecordReader extends RecordReader<Text, ScoreWritable>{ public LineReader in ; //行读取器 public Text lineKey ; //自定义key类型 public ScoreWritable linevalue ; //自定义的value类型 public Text line ; //行数据 //初始化方法,只执行一次 @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fsplit = (FileSplit)split ; Configuration conf = context.getConfiguration(); Path file = fsplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream filein = fs.open(file); in = new LineReader(filein, conf) ; line = new Text() ; lineKey = new Text() ; linevalue = new ScoreWritable() ; } //读取每一行数据的时候,都会执行该方法 //咱们只须要根据本身的需求,重点编写该方法便可,其余的方法比较固定,仿照就好 @Override public boolean nextKeyValue() throws IOException, InterruptedException { int linesize = in.readLine(line); if(linesize == 0){ return false ; } String[] pieces = line.toString().split("\\s+") ; if(pieces.length != 7){ throw new IOException("无效的数据") ; } //将学生的每门成绩转换为float类型 float a =0 , b= 0 , c = 0 ,d = 0, e =0 ; try{ a = Float.parseFloat(pieces[2].trim()) ; b = Float.parseFloat(pieces[3].trim()) ; c = Float.parseFloat(pieces[4].trim()) ; d = Float.parseFloat(pieces[5].trim()) ; e = Float.parseFloat(pieces[6].trim()) ; }catch(NumberFormatException nfe){ nfe.printStackTrace(); } lineKey.set(pieces[0] + "\t" + pieces[1]); //完成自定义的key数据 linevalue.set(a, b, c, d, e); //封装自定义的value数据 return true; } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return lineKey; } @Override public ScoreWritable getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return linevalue; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void close() throws IOException { if(in != null){ in.close(); } } }
package com.vip09; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapReduceCaseEmail extends Configured implements Tool{ public static class EmailMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1) ; @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { context.write(value, one); } } public static class EmailReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable() ; //输出到多个文件或多个文件夹,使用Multipleoutputs private MultipleOutputs<Text, IntWritable> mout ; @Override protected void setup(Context context) throws IOException, InterruptedException { mout = new MultipleOutputs<Text, IntWritable>(context) ; } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int begin = key.toString().indexOf("@") ; int end = key.toString().indexOf(".") ; if(begin >= end){ return ; } //获取邮箱类别,好比qq,163等 String name = key.toString().substring(begin + 1, end); int sum = 0 ; for (IntWritable value : values) { sum += value.get() ; } result.set(sum); //baseoutputpath-r-nnnnn mout.write(key, result, name); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mout.close(); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration() ; //删除已经存在的输出目录 Path mypath = new Path(args[1]) ; FileSystem hdfs = mypath.getFileSystem(conf); if(hdfs.isDirectory(mypath)){ hdfs.delete(mypath, true) ; } Job job = Job.getInstance(conf, "emailcount") ; job.setJarByClass(MapReduceCaseEmail.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(EmailMapper.class); job.setReducerClass(EmailReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true) ; return 0; } public static void main(String[] args) throws Exception { String[] args0 = {"hdfs://192.168.153.111:9000/input6", "hdfs://192.168.153.111:9000/output16"} ; int res = ToolRunner.run(new MapReduceCaseEmail(), args0) ; System.exit(res); } }