MapReduce程式调用第三方包:我在使用过程当中须要用到hbase的jar包,若要使用,常规是添加到每台机器的classpath中,可是 经过DistributeCache,在初始化前加入就ok了。要不就要将这些jar包打成一个新jar,经过hadoop jar XXX.jar运行,可是不利于代码更新和维护。
html
咱们知道,在Hadoop中有一个叫作DistributedCache的东东,它是用来分发应用特定的只读文件和一个jar包的,以供Map- Reduce框架在启动任务和运行的时候使用这些缓冲的文件或者是把第三方jar包添加到其classpath路径中去,要注意的是 DistributedCache的使用是有一个前提的,就它会认为这些经过urls来表示的文件已经在hdfs文件系统里面,因此这里在使用的时候第一 步就是要把这些文件上传到HDFS中。
而后Hadoop框架会把这些应用所须要的文件复制到每一个准备启动的节点上去,它会把这些复制到mapred.temp.dir配置的目录中去,以供相应的Task节点使用。
这里要注意的DistriubtedCache分发的文件分红公有与私有文件,公有文件能够给HDFS中的全部用户使用,而私有文件只能被特定的用户所使用,用户能够配置上传文件的访问权限来达到这种效果。
public boolean run(Configuration conf, String inputPath, String outPath,String category)
throws Exception {
Job job = new Job(conf, "DIP_DIPLOGFILTER-"+category);
DistributedCache.addFileToClassPath(new Path("/libs/hbase-0.92.1-cdh4.0.0-security.jar"), job.getConfiguration());
job.setJarByClass(AnalysisLoader.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(AnalysisMapper.class);
job.setMapOutputKeyClass(ComplexKey.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(ComplexKeyPartitioner.class);
// job.setCombinerClass(AnalysisReducer.class);
job.setReducerClass(AnalysisReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(LogConfig.reduceCount);
String hdfs = ServerConfig.getHDFS();
String[] inputPaths =inputPath.split(",");
for (String p : inputPaths) {
if (!p.startsWith(hdfs)) {
p = hdfs + p;
}
MultipleInputs.addInputPath(job, new Path(p),TextInputFormat.class, AnalysisMapper.class);
}
FileOutputFormat.setOutputPath(job, new Path(outPath));
return(job.waitForCompletion(true));
}
DistributeCache的使用通常分红三步:
1. 配置应用程序的cache,把须要使用的文件上传到DFS中去
app
[html] view plaincopy框架
$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat oop
$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip url
$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar spa
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar .net
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz orm
$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz htm
2. 配置JobConf
对象
[html] view plaincopy
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),job); // 这里的lookup.dat加了一个符号链接
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); // 这里是把相应的jar包加到Task的启动路径上去
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
3. 在Mapper或者Reducer任务中使用这些文件
[html] view plaincopy
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {
private Path[] localArchives;
private Path[] localFiles;
public void configure(JobConf job) {
// Get the cached archives/files
localArchives = DistributedCache.getLocalCacheArchives(job); // 获得本地打包的文件,通常是数据文件,如字典文件
localFiles = DistributedCache.getLocalCacheFiles(job); // 获得本地缓冲的文件,通常是配置文件等
}
public void map(K key, V value,
OutputCollector<K, V> output, Reporter reporter)
throws IOException {
// Use data from the cached archives/files here
// ...
// ...
output.collect(k, v);
}
}
1. 咱们知道,新的MP接口使用了Job这个类来对MP任务进行配置,这里使用的时候要注意一点 Configuration conf = new Configuration(); // 对conf加入配置信息 - 正确方法 Job job = new Job(conf,"word count"); // 对conf加入配置信息 - 这是有问题的,这些配置不会生效,由于这里生成Job的时候它会对conf进行复制,这个看一下Job的源代码就知道。 // 这里能够用job.getConfiguration()来获得其内部的conf对象,这样就不会有问题。2. 若是你在启动MP任务以前调用了第三方jar包的类,那这就会有问题,会在启动任务的时候找不到这个类。这个问题我尚未找到好的解决办法,一个办法就是 把这些类想办法移到MP任务中,若是有朋友知道更加好的办法,请告诉我一下,多谢了。我感受Nutch中也会有一样的问题,何时研究一下Nutch的 代码,说不定会有不少关于Hadoop方面的收获。