以前运行过了hadoop官方自带的第一个例子wordcount,此次咱们本身手写一个,这个至关因而编程语言中的helloworld同样. 首先咱们了解一下咱们要写的MapReduce是处理的哪一个部分,咱们知道hadoop处理文件是先将要处理的文件拆分红不少个部分,分别处理完成,最后再将结果给汇聚起来, 造成最终的处理结果.(也就是分治法的思想)咱们接下来举个单词统计的例子,看看咱们写的代码是整个MapReduce过程当中的哪些部分.html
首先我们有这么一个文件,文件内容以下:java
hello world hello java
hello hadoop
复制代码
很简单的一个文件就两行.那么hadoop是怎么作单词统计的呢?咱们用步骤来描述下:
第一步:读取这个文件,按行来将这个文件每一行的单词给拆分出来,而后造成不少key/value的结果,处理完就是这样
<hello,1>
<world,1>
<hello,1>
<java,1>
<hello,1>
<hadoop,1>
第二步:排序
排序完会变成这样的结果
<hadoop,1>
<hello,1>
<hello,1>
<hello,1>
<java,1>
<world,1>
第三步:合并
合并后的结果以下
<hadoop,1>
<hello,1,1,1>
<java,1>
<world,1>
第四步:汇聚结果
<hadoop,1>
<hello,3>
<java,1>
<world,1>apache
到第四步完成,单词统计其实也就完成了.看完这个具体的实例,想必你们对mapreduce的处理过程有了一个比较清晰的理解. 而后咱们要知道第二步和第三步是hadoop框架帮助咱们完成的,咱们实际上须要写代码的地方是第一步和第四步. 第一步对应的就是Map的过程,第四步对应的是Reduce的过程.编程
如今咱们要作的就是完成第一步和第四步的代码 1.建立项目 bash
3.引入包完成之后,咱们建立一个叫WordCount的java文件,而后开始敲代码 这里直接贴一下代码,__要注意import的部分,是否是和我同样?__由于好些个名字同样的类,来自于不一样的jar,容易弄错.服务器
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
/** * @author wxwwt * @since 2019-09-15 */
public class WordCount {
/** * Object : 输入文件的内容 * Text : 输入的每一行的数据 * Text : 输出的key的类型 * IntWritable : 输出value的类型 */
private static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
context.write(new Text(itr.nextToken()), new IntWritable(1));
}
}
}
/** * Text : Mapper输入的key * IntWritable : Mapper输入的value * Text : Reducer输出的key * IntWritable : Reducer输出的value */
private static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable item : values) {
count += item.get();
}
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 建立配置
Configuration configuration = new Configuration();
// 设置hadoop的做业 jobName是WordCount
Job job = Job.getInstance(configuration, "WordCount");
// 设置jar
job.setJarByClass(WordCount.class);
// 设置Mapper的class
job.setMapperClass(WordCountMapper.class);
// 设置Reducer的class
job.setReducerClass(WordCountReducer.class);
// 设置输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 待job执行完 程序退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
复制代码
Mapper程序:app
/** * Object : 输入文件的内容 * Text : 输入的每一行的数据 * Text : 输出的key的类型 * IntWritable : 输出value的类型 */
private static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
context.write(new Text(itr.nextToken()), new IntWritable(1));
}
}
}
复制代码
context是全局的上下文,先使用了StringTokenizer将value(也就是每行的数据)按照空格分红了不少份,StringTokenizer若是没有传入指定的分割符的话,默认会将 " \t\n\r\f" 空格制表符换行符等符号做为分隔符,而后使用nextToken()来遍历这个按照空格分割的字符串.context.write(new Text(itr.nextToken()), new IntWritable(1)); 的意思就是将key/value写入到上下文中.
注:在hadoop编程中String是Text,Integer是IntWritable.这是hadoop本身封装的类.记住就行了,使用起来和原来的类差很少
这里就是写入了key为Text的单词,和value为Writable的1(统计数量).框架
Reduce程序:编程语言
/** * Text : Mapper输入的key * IntWritable : Mapper输入的value * Text : Reducer输出的key * IntWritable : Reducer输出的value */
private static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable item : values) {
count += item.get();
}
context.write(key, new IntWritable(count));
}
}
复制代码
reduce完成的是第四步的内容,咱们看看上面的实例过程就回知道此时的输入参数大概是这样
<hello,1,1,1>
因此这里会有一个遍历values的过程,就是将这三个1给累加起来了.ide
程序入口:
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 建立配置
Configuration configuration = new Configuration();
// 设置hadoop的做业 jobName是WordCount
Job job = Job.getInstance(configuration, "WordCount");
// 设置jar
job.setJarByClass(WordCount.class);
// 设置Mapper的class
job.setMapperClass(WordCountMapper.class);
// 设置Reducer的class
job.setReducerClass(WordCountReducer.class);
// 设置输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 待job执行完 程序退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
复制代码
程序入口这里其实看注释就已经比较清楚了,都是设置一些mapreduce须要的参数和路径之类的, 照着写就好了.这里稍微要注意一点的就是
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
复制代码
咱们回顾一下以前运行hadoop的第一个程序的时候,命令大概是 hadoop jar WordCount.jar /input/wordcount/file1 /output/wcoutput 后面的两个参数就是文件的输入路径和输出路径,若是我们代码修改了参数的位置或者有其余参数的操做. 就要对应好args下标的位置.
4.指定jar包运行的入口 代码完成后我们就能够打包了 先选择File -> Project Structure -> Artifacts -> + -> JAR -> From modules with dependencies
有可能直接运行
hadoop jar WordCount.jar /input/wordcount/file1 /output/wcoutput
复制代码
会失败,报一个异常:
Exception in thread "main" java.io.IOException: Mkdirs failed to create /XXX/XXX
at org.apache.hadoop.util.RunJar.ensureDirectory(RunJar.java:106)
at org.apache.hadoop.util.RunJar.main(RunJar.java:150)
复制代码
相似上面这样的.
这时候须要删除掉jar包里面的License文件夹和里面的东西,能够参考这个连接:stackoverflow 查看下jar中license的文件和文件夹 jar tvf XXX.jar | grep -i license 而后删除掉 META-INF/LICENSE里面的内容 zip -d XXX.jar META-INF/LICENSE
1.了解了mapReduce的运行步骤,这样知道了咱们只须要写map和reduce的过程,中间步骤hadoop框架已经作了处理,之后其余的程序也能够参考这个步骤来写
2.hadoop中String是Text,Integer是IntWritable这个要记住,用错了会报异常的
3.报 Mkdirs failed to create /XXX/XXX异常的时候先检查是否是路径有问题,没有的话就删除掉jar包中的META-INF/LICENSE
1.hadoop.apache.org/docs/stable…
2.stackoverflow.com/questions/1…