对于java来讲,读取本地文件再正常不过。可是对于mapreduce程序来讲,读取本地文件经常会陷入误区。本地明明有这个文件,在本地运行jar包,mapreduce为何读不到?由于咱们知道,mapreduce程序原本就不是在本地执行的,程序会分布式的在各个机器上执行,你固然读不到文件,那所谓的“本地文件”就不叫“本地文件”,固然只有一个例外:你的hadoop集群是伪集群。 好比下面的示例: package test; import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FileTest { public static void main(String args[]) { int mr = 0; try { mr = ToolRunner .run(new Configuration(), new FileTestDriver(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(mr); } } class FileTestDriver extends Configured implements Tool { @Override public int run(String[] arg0) throws Exception { Configuration config = getConf(); JobConf conf = new JobConf(config, FileTestDriver.class); String[] otherArgs = new GenericOptionsParser(config, arg0) .getRemainingArgs(); String input = otherArgs[0]; String ouput = otherArgs[1]; conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.set("mapred.task.timeout", "6000000"); conf.setMapperClass(FileTestMapper.class); conf.setReducerClass(FileTestReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(input)); FileOutputFormat.setOutputPath(conf, new Path(ouput)); JobClient.runJob(conf); return 0; } } class FileTestMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { private String filepath = ""; public void configure(JobConf job) { filepath = job.get("files"); } public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String url = "qq.com"; String host = getTop100DomainTest(url, filepath); output.collect(new Text(url + "\t" + host), new Text("")); } public String getTop100DomainTest(String url, String filepath) { try { BufferedReader reader = new BufferedReader(new FileReader(new File( filepath))); String line = ""; while ((line = reader.readLine()) != null) { // splitLine[0]为host 后面跟着域名 line = line.replaceAll("( )+", " "); String[] splitLine = line.split(" "); for (int i = 1; i < splitLine.length; i++) { String host = splitLine[i]; if (url.equals(host)) { return splitLine[0]; } } } return ""; } catch (FileNotFoundException e) { return ""; } catch (IOException e) { return ""; } } } class FileTestReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { output.collect(key, new Text("")); } } public String getTop100DomainTest(String url, String filepath)方法读取文件,并根据url返回url的domain。 将上述程序打包test.jar后, 运行命令: hadoop jar test.jar test.FileTest -D files="/opt/top100.txt" /test/test /test/test1 若是您是伪集群,那么恭喜,程序成功运行,若是您是分布式,那么程序极可能运行不成功? 咱们知道原理后,这段代码在分布式的状况下,也能够运行成功,怎么办?那就把集群的全部机器都拷贝top100.txt到/opt下! 程序运行成功了吧?但实际上是很老土的。当你集群数多,你要一一拷贝,那是多么麻烦的一件事,并且全部的配置文件必须在一样的文件夹下,若是你能忍受,那go ahead。 实际上mapreduce提供了一个缓存方法DistributedCache。 只需在配置阶段加入: DistributedCache.addCacheFile(new URI("/test/top100.txt"), conf); 便可,但此处的"/test/top100.txt"为hdfs的路径。 而后在mapper 的public void configure(JobConf job)方法中加入 public void configure(JobConf job) { try { localFiles = DistributedCache.getLocalCacheFiles(job); } catch (IOException e) { e.printStackTrace(); } } 便可。 map中引用,经过 path.toUri().getPath()便可访问到file。