MapReduce处理数据过程主要分红2个阶段:Map阶段和Reduce阶段。首先执行Map阶段,再执行Reduce阶段。Map和Reduce的处理逻辑由用户自定义实现, 但要符合MapReduce框架的约定。java
ODPS MapReduce的输入和输出是ODPS表或者分区(不容许用户自定输入输出格式,不提供相似文件系统的接口)app
下面以WordCount为例,详解ODPS的MapReduce的执行过程。输入表只有一列,每条记录是一个单词,要求统计单词出现次数,写入另一张表中(两列,一列单词,一列次数)框架
代码实现:ide
一、Mapper实现函数
package mr.test; import java.io.IOException; import com.aliyun.odps.data.Record; import com.aliyun.odps.mapred.MapperBase; public class WCMapper extends MapperBase { private Record word; private Record one; @Override public void setup(TaskContext context) throws IOException { word = context.createMapOutputKeyRecord(); one = context.createMapOutputValueRecord(); one.set(new Object[] { 1L }); } @Override public void map(long recordNum, Record record, TaskContext context) throws IOException { System.out.println("recordNum:"+recordNum); for (int i = 0; i < record.getColumnCount(); i++) { String[] words = record.get(i).toString().split("\\s+"); for (String w : words) { word.set(new Object[] { w }); context.write(word, one); } } } @Override public void cleanup(TaskContext context) throws IOException { } }
1)先经过setup()初始化Record实例,setup()只执行一次。ui
2)map()函数多每条Record都会调用一次,在这个函数内,循环遍历Record字段,对每一个字段经过正则切分,而后输出<word,one>这种键值对。code
3)setup()和map()都会传入一个TaskContext实例,保存MR任务运行时的上下文信息。blog
4)cleanup()执行收尾工做排序
二、Reducer实现接口
package mr.test; import java.io.IOException; import java.util.Iterator; import com.aliyun.odps.data.Record; import com.aliyun.odps.mapred.ReducerBase; public class WCReducer extends ReducerBase { private Record result; @Override public void setup(TaskContext context) throws IOException { result = context.createOutputRecord(); } @Override public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException { long count = 0; while (values.hasNext()) { Record val = values.next(); count += (Long) val.get(0); } result.set(0, key.get(0)); result.set(1, count); context.write(result); } @Override public void cleanup(TaskContext context) throws IOException { } }
1)一样reduce也提供setup和cleanup方法。
2)reduce函数是对每一个key调用一次,在函数内,它遍历同一个key的不一样值,对其进行累加操做,而后生成结果Record输出。
2)注意:map和reduce在输出上的区别为:Reduce调用的context.write(result)输出结果Record;Map是调用context.write(word,one)输出键值对形式。
三、Driver实现
package mr.test; import com.aliyun.odps.OdpsException; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.mapred.JobClient; import com.aliyun.odps.mapred.RunningJob; import com.aliyun.odps.mapred.conf.JobConf; import com.aliyun.odps.mapred.utils.InputUtils; import com.aliyun.odps.mapred.utils.OutputUtils; import com.aliyun.odps.mapred.utils.SchemaUtils; public class WCDriver { public static void main(String[] args) throws OdpsException { if(args.length !=2){ System.out.println("参数错误"); System.exit(2); } JobConf job = new JobConf(); job.setMapOutputKeySchema(SchemaUtils.fromString("word:string")); job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint")); InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job); OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job); job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); RunningJob rj = JobClient.runJob(job); rj.waitForCompletion(); } }
1)先初始化一个JobConf实例
2)指定运行的Maper、Reduce类:
job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class);
3)设置map的输出key和value的类型:
job.setMapOutputKeySchema(SchemaUtils.fromString("word:string")); job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));