AreaPartitionerjava
package cn.itcast.hadoop.mr.areapartition; import java.util.HashMap; import org.apache.hadoop.mapreduce.Partitioner; public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> { private static HashMap<String, Integer> areaMap = new HashMap<>(); static { areaMap.put("135", 0); areaMap.put("136", 1); areaMap.put("137", 2); areaMap.put("138", 3); areaMap.put("139", 4); } @Override public int getPartition(KEY key, VALUE value, int numPartitions) { // 从 key 中拿到手机号,查询手机归属字典,不一样省份返回不一样的组号 int areaCoder = areaMap.get(key.toString().substring(0, 3)) == null ? 5 : areaMap.get(key.toString().substring(0, 3)); return areaCoder; } }
FlowSumAreaapache
package cn.itcast.hadoop.mr.areapartition; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import cn.itcast.hadoop.mr.flowsum.FlowBean; /** * 对流量原始日志进行流量统计,将不一样省份的用户统计结果输出到不一样文件 * 须要自定义改造两个机制: * 1.改造分区的逻辑,自定义一个parttioner * 2.自定义 reduce task 的并发任务数量 * * @author duanhaitao@itcast.cn * */ public class FlowSumArea { public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //拿一行数据 String line = value.toString(); //切分各个字段 String[] fields = StringUtils.split(line, "\t"); //拿到咱们须要的字段 String phoneNB = fields[1]; long u_flow = Long.parseLong(fields[7]); long d_flow = Long.parseLong(fields[8]); //封装成 kv 并输出 context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow)); } } public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException { long up_flow_counter = 0; long d_flow_counter = 0; for(FlowBean bean: values){ up_flow_counter += bean.getUp_flow(); d_flow_counter += bean.getD_flow(); } context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumArea.class); job.setMapperClass(FlowSumAreaMapper.class); job.setReducerClass(FlowSumAreaReducer.class); //设置咱们自定义的逻辑定义 job.setPartitionerClass(AreaPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //设置reduce的任务并发数,应该跟分组的数量保持一致 job.setNumReduceTasks(6); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
打包 jar 包,上传:并发
上面最后一个加个 2 。。app
reduce 并发数量若是 < 分组数,会报错;可是改为 1 不会报错。。。ide
reduce 并发数量若是 < 分组数,多的分组没有数据oop
map 不会涉及到业务逻辑,,若是有 10 个map ,每一个就处理 1/10 的数据,map 的并发量是能够任意去设置的。3d