获取map端处理的数据来自于那个文件java
主要是map端的setup方法有一个Context参数,里面包含了此map对应的split是来自于哪一个文件apache
一、GetSourceMapper.classapp
package com.bigdata.surfilter.mapgetsource; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; /** * [@Author](https://my.oschina.net/arthor) liufu * @CreateTime 2016/7/26 16:44 */ public class GetSourceMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ //一个map默认处理一个block,那么他的数据必定来自于同一个文件 String sourceFile = null; Text outPutText = null; [@Override](https://my.oschina.net/u/1162528) protected void setup(Context context) throws IOException, InterruptedException { FileSplit inputSplit = (FileSplit)context.getInputSplit(); Path path = inputSplit.getPath(); sourceFile = path.getName(); outPutText = new Text(); } [@Override](https://my.oschina.net/u/1162528) protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { outPutText.set(value.toString() + " recevie from " + sourceFile); context.write(outPutText, null); } [@Override](https://my.oschina.net/u/1162528) protected void cleanup(Context context) throws IOException, InterruptedException { outPutText = null; } }
二、ApplicationRun.classide
package com.bigdata.surfilter; import com.bigdata.surfilter.flowcount.*; import com.bigdata.surfilter.mapgetsource.GetSourceMapper; import com.bigdata.surfilter.wordcount.WordCountMapper; import com.bigdata.surfilter.wordcount.WordCountReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /** * @Author liufu * @CreateTime 2016/7/25 15:55 */ public class ApplicationRun { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); //在集群中使用hadoop jar 命令去启动,就不须要设置下面这两行,由于hadoop安装目录下的/etc/hadoop中配置文件有这些配置 // conf.set("fs.default.name", "hdfs://192.168.0.186:9000"); //假装本身的身份为root // System.setProperty("HADOOP_USER_NAME", "root"); Job job = new Job(conf, "getsourcemapper"); //经过classpath中主类找到jar job.setJarByClass(ApplicationRun.class); //job的map端和reduce端代码 job.setMapperClass(GetSourceMapper.class); //设置map端和reduce输出的类型,这样才可以作反射获得对应的类 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //job 如何读取数据,如何写出数据 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //job 的数据从哪里来; 绑定输入目录,能够使用setInputPaths, 也能够使用 addInputPaths FileInputFormat.setInputPaths(job, new Path("/mapgetsource/input1/"),new Path("/mapgetsource/input2/")); //写到哪里去 FileOutputFormat.setOutputPath(job, new Path("/mapgetsource/output/")); //不须要reduce job.setNumReduceTasks(0); try { boolean b = job.waitForCompletion(true); System.exit(b == true ? 0 : 1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } }