提示:如下代码都是在 Hadoop2.7.x 最新API下进行。java
示例:计算学生的平均分红绩,输出:学生姓名和平均分红绩;要求:根据成绩的范围(0~59, 60~70, 70~80, 80~90, 90~100)输出到不一样的文件中,文件名前缀为:student_score_05九、student_score_6070、student_score_7080、student_score_8090、student_score_90100。ide
// 重载 MultipleTextOutputFormat 的 generateFileNameForKeyValue()方法来实现 public class PartitionScoreOutputFormat extends MultipleTextOutputFormat<Text, IntWritable> { private static final String PREFIX = "student_score_"; @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, String name) { int score = value.get(); if(score < 60) { return PREFIX + "059"; } if(score < 70) { return PREFIX + "6070"; } if(score < 80) { return PREFIX + "7080"; } if(score < 90) { return PREFIX + "8090"; } return PREFIX + "90100"; } } // 调用 job.setOutputFormat(PartitionScoreOutputFormat.class)
public class StudentScoreReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // 使用 MultipleOutputs private MultipleOutputs<Text, IntWritable> mos; @Override protected void setup(Context context) throws ... { super.setup(context); mos = new MultipleOutputs<Text,IntWritable>(context); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws ... { int totalScore = 0; int count = 0; for(IntWritable score : values) { totalScore += score.get(); count++; } int avgScore = count > 0 ? totalScore/count : 0; // 使用 named output,对应在 Task 中的 MultipleOutputs.addNamedOutput(...) 定义 mos.write(getNamed(avgScore), key, new IntWritable(avgScore)); } @Override protected void cleanup(...) { // ... mos.close(); } private static String getNamed(int score) { if(score < 60) { return "score059"; } if(score < 70) { return "score6070"; } if(score < 80) { return "score7080"; } if(score < 90) { return "score8090"; } return "score90100"; } }
public class StudentScoreTask { public static void main(String[] args) throws ... { Job job = Job.getInstance(new Configuration()); job.setJobName("..."); // ... // 重要:定义命名输出规则: // 第二个参数:score059 等名字要和 Reducer中的 MultipleOutputs.write(namedParam, ...) 命名一致 MultipleOutputs.addNamedOutput(job, "score059", TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "score6070", ...); MultipleOutputs.addNamedOutput(job, "score7080", ...); MultipleOutputs.addNamedOutput(job, "score8090", ...); MultipleOutputs.addNamedOutput(job, "score90100", ...); // ... System.exit(job.waitForCompletion(true) ? 0 : 1); } }
示例:计算学生的平均分红绩,将学生姓名和成绩输出到文件中,文件名格式为:student_score_${yyyyMMdd}_${taskId}.txt(student_score_20160930_0.txt)oop
public class StudentScoreOutputFormat<K,V> extends TextOutputFormcat<K,V> { @Override public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { FileOutputCommitter comitter = (FileOutputCommitter) super.getOutputCommitter(context); return new Path(committer.getWorkPath(), generateFileName(context)); } public synchronized static String generateFileName(TaskAttemptContext context) { TaskID taskId = context.getTaskAttemptID().getTaskID(); int partition = taskId.getId(); String currentDate = new SimpleDateFormat("yyyyMMdd").format(Calendar.getInstance().getTime()); return String.format("student_score_%s_%d.txt", currentDate, partition); } } // 使用方式 Job job = Job.getInstance(...); //... // 将输出文件格式化类指定为自定义的 StudentScoreOutputFormat 便可 job.setOutputFormatClass(StudentScoreOutputFormat.class); // 若是不想生成空文件 part-r-xxxx 等,使用 LazyOutputFormat 设置替代上面的设置便可 // LazyOutputFormat.setOutputFormatClass(job, StudentScoreOutputFormat.class);
================== 未完待续,后面会持续补充遇到的特殊文件输出要求,更欢迎你们提供~~~spa