http://www.cnblogs.com/zjfstudio/p/3887551.htmlhtml
Hadoop学习笔记(7) 编程
——高级编程 网络
从前面的学习中,咱们了解到了MapReduce整个过程须要通过如下几个步骤: app
1.输入(input):将输入数据分红一个个split,并将split进一步拆成<key, value>。 ide
2.映射(map):根据输入的<key, value>进生处理, 函数
3.合并(combiner):合并中间相两同的key值。 oop
4.分区(Partition):将<key, value>分红N分,分别送到下一环节。 post
5.化简(Reduce):将中间结果合并,获得最终结果 学习
6.输出(output):负责输入最终结果。 spa
其中第三、4步又成洗牌(shuffle)过程。
从前面HelloWorld示例中,咱们看到,咱们只去个性化了Map和Reduce函数,那其余函数呢,是否能够个性化?答案固然是确定的。下面咱们就对每一个环节的个性化进行介绍。
自定义输入格式
输 入格式(InputFormat)用于描述整个MapReduce做业的数据输入规范。先对输入的文件进行格式规范检查,如输入路径,后缀等检查;而后对 数据文件进行输入分块(split);再对数据块逐一读出;最后转换成Map所须要的<key, value>健值对。
系统中提供丰富的预置输入格式。最经常使用的如下两种:
TextInputFormat:系统默认的数据输入格式。将文件分块,并逐行读入,每一行记录行成一对<key, value>。其中,key值为当前行在整个文件中的偏移量,value值为这一行的文本内容。
KeyValueTextInputFormat:这是另外一个经常使用的数据输入格式,读入的文本文件内容要求是以<key, value>形式。读出的结果也就直接造成<key, value>送入map函数中。
若是选择输入格式呢?那就只要在job函数中调用
-
job.setInputFormatClass(TextInputFormat.class);
在Hello中咱们没有设定,系统默认选择了TextInputFormat。
通常状况够用了,但某些状况下,仍是没法知足用户的需求,因此仍是须要个性化。个性化则按下面的方式进行:
若是数据咱们是来源于文件,则能够继承FileInputFormat:
-
public class MyInputFormat extends FileInputFormat<Text,Text> {
-
@Override
-
public RecordReader<Text, Text> createRecordReader(InputSplit split,
-
TaskAttemptContext context) throws IOException, InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
}
若是数据咱们是来源于非文件,如关系数据,则继承
-
public class MyInputFormat extends InputFormat<Text,Text> {
-
-
@Override
-
public RecordReader<Text, Text> createRecordReader(InputSplit arg0,
-
TaskAttemptContext arg1) throws IOException, InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
-
@Override
-
public List<InputSplit> getSplits(JobContext arg0) throws IOException,
-
InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
-
}
这里比较清晰了,下面个函数为拆分红split,上面个函数跟据split输出成Key,value。
自定义map处理
这个好理解,咱们的HelloWorld程序中就自定义了map处理函数。而后在job中指定了咱们的处理类:
-
job.setMapperClass(TokenizerMapper.class);
能不能没有map呢? 能够的,若是没有map,也就是这与上面的这个setMapperClass,则系统自动指定一个null,这时处理是将输入的<key,value>值,不做任何修改,直接送到下一环节中。
个性化代码以下:
-
public static class TokenizerMapper
-
extends Mapper<Object, Text, Text, IntWritable>{
-
-
public void map(Object key, Text value, Context context
-
) throws IOException, InterruptedException {
-
-
context.write(key, value);
-
}
-
}
自定义合并Combiner
自定义合并Combiner类,主要目的是减小Map阶段输出中间结果的数据量,下降数据的网络传输开销。
Combine 过程,实际跟Reduce过程类似,只是执行不一样,Reduce是在Reducer环节运行,而Combine是紧跟着Map以后,在同一台机器上预先将 结时进行一轮合并,以减小送到Reducer的数据量。因此在HelloWorld时,能够看到,Combiner和Reducer用的是同一个类:
-
job.setCombinerClass(IntSumReducer.class);
-
job.setReducerClass(IntSumReducer.class);
如何个性化呢,这个跟Reducer差很少了:
-
public static class MyCombiner
-
extends Reducer<Text,IntWritable,Text,IntWritable> {
-
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
-
context.write(key, new IntWritable(1));
-
}
-
}
自定义分区Partitioner
在 MapReduce程序中,Partitioner决定着Map节点的输出将被分区到哪一个Reduce节点。而默认的Partitioner是 HashPartitioner,它根据每条数据记录的主健值进行Hash操做,得到一个非负整数的Hash码,而后用当前做业的Reduce节点数取模 运算,有N个结点的话,就会平均分配置到N个节点上,一个隔一个依次。大多状况下这个平均分配是够用了,但也会有一些特殊状况,好比某个文件的,不能被拆 开到两个结点中,这样就须要个性化了。
个性化方式以下:
-
public static class MyPartitioner
-
extends HashPartitioner<K,V> {
-
-
public void getPartition(K key, V value,int numReduceTasks) {
-
-
super.getPartition(key,value,numReduceTasks);
-
}
-
}
方式其实就是在执行以前能够改变一下key,来欺骗这个hash表。
自定义化简(Reducer)
这一块是将Map送来的结果进行化简处理,并造成最终的输出值。与前面map同样,在HelloWorld中咱们就见到过了。经过下面代码能够设置其值:
-
job.setReducerClass(IntSumReducer.class);
一样,也能够这样类能够不设置,若是不设置的话,就是把前面送来的值,直接送向输出格式器中。
若是要个性化,则以下:
-
public static class IntSumReducer
-
extends Reducer<Text,IntWritable,Text,IntWritable> {
-
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
context.write(key, result);
-
}
-
}
自定义输出格式
数 据输出格式(OutPutFormat)用于描述MapReduce做业的数据输出规范。Hadoop提供了丰富的内置数据输出格式。最常的数据输出格式 是TextOutputFormat,也是系统默认的数据输出格式,将结果以"key+\t+value"的形式逐行输出到文本文件中。还有其它的, 如:DBOutputFormat,FileOutputFormat,FilterOutputFormat,IndexUpdataOutputFormat,LazyOutputFormat,MapFileOutputFormat, 等等。
若是要个性化,则按下面方式进行:
-
public class MyOutputFormat extends OutputFormat<Text,Text> {
-
-
@Override
-
public void checkOutputSpecs(JobContext arg0) throws IOException,
-
InterruptedException {
-
// TODO Auto-generated method stub
-
-
}
-
-
@Override
-
public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)
-
throws IOException, InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
-
@Override
-
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext arg0)
-
throws IOException, InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
-
}
复合健——用户自定义类型。
从前面的整个过程当中能够看到,都是采用key-value的方式进行传入传出,而这些类型大可能是单一的字符串,和整型。若是个人key中须要包含多个信息怎么办?用字符串直接拼接么? 太不方便了,最好可以本身定义一个类,做为这个key,这样就方便了。
若是定义一个类做为key 或value的类型? 有什么要求?就是这个类型必需要继承WritableComparable<T>这个类,因此若是要自定义一个类型则能够这么实现:
-
public class MyType implements WritableComparable<MyType> {
-
-
private float x,y;
-
public float GetX(){return x;}
-
public float GetY(){return y;}
-
-
@Override
-
public void readFields(DataInput in) throws IOException {
-
x = in.readFloat();
-
y = in.readFloat();
-
}
-
-
@Override
-
public void write(DataOutput out) throws IOException {
-
out.writeFloat(x);
-
out.writeFloat(y);
-
}
-
-
@Override
-
public int compareTo(MyType arg0) {
-
//输入:-1(小于) 0(等于) 1(大于)
-
return 0;
-
}
-
}
这个示例中,咱们添加了两个float变量:x,y 。 这个信息能过int 和out按次序进行输入输出。最后,再实现一个比较函数便可。
Job任务的建立
-
Job job = new Job(conf, "word count");
-
job.setJarByClass(WordCount.class);
-
job.setInputFormatClass(MyInputFormat.class);
-
job.setMapperClass(TokenizerMapper.class);
-
job.setCombinerClass(IntSumReducer.class);
-
job.setPartitionerClass(MyPartitioner.class);
-
job.setReducerClass(IntSumReducer.class);
-
job.setOutputFormatClass(TextOutputFormat.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(IntWritable.class);
-
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
任务建立比较容易,其实就是new一个实例,而后把上面描述的过程类设置好,而后加上第2行中,jar包的主类,第十、11行的输入输出路径。这样就完事了。
Job任务的执行
单个任务的执行,没有什么问题,能够用这个:
-
job.waitForCompletion(true);
但多个任务呢? 多个任务的话,就会造成其组织方式,有串行,有并行,有无关,有组合的,以下图:

图中,Job2和Job3将会等Job1执行完了再执行,且能够同时开始,而Job4必须等Job2和Job3同时结束后才结束。
这个组合,就能够采用这样的代码来实现:
-
Configuration conf = new Configuration();
-
Job job1 = new Job(conf, "job1");
-
//.. config Job1
-
Job job2 = new Job(conf, "job2");
-
//.. config Job2
-
Job job3 = new Job(conf, "job3");
-
//.. config Job3
-
Job job4 = new Job(conf, "job4");
-
//.. config Job4
-
-
//添加依赖关系
-
job2.addDependingJob(job1);
-
job3.addDependingJob(job1);
-
job4.addDependingJob(job2);
-
job4.addDependingJob(job3);
-
-
JobControl jc = new JobControl("jbo name");
-
jc.addJob(job1);
-
jc.addJob(job2);
-
jc.addJob(job3);
-
jc.addJob(job4);
-
jc.run();
总述
如今回头看看,其实整个hadoop编程,也就是这几块内容了,要实现某个功能,咱们就往上面这些步骤上套,而后联起来执行,达到咱们的目的。