经过以前的操做, html
http://www.cnblogs.com/wenbronk/p/6636926.htmljava
http://www.cnblogs.com/wenbronk/p/6659481.htmlapache
hadoop-HA的集群已经搭建完成了, 须要写个小程序来认识下hadoop了小程序
统计文本文件中, 每一个单词出现的次数数组
hadoop-2.5.1\share\hadoop\common 全部jar, hadoop-2.5.1\share\hadoop\common\lib 全部jar, hadoop-2.5.1\share\hadoop\hdfs 全部jar hadoop-2.5.1\share\hadoop\mapreduce 全部jar hadoop-2.5.1\share\hadoop\yarn 全部jar
package com.wenbronk.mapreduce; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 测试mapreduce, 计算单词出现的次数 * @author wenbronk * KEYIN: split的键, 行坐在的下标 * VALUEIN: split的值, 行值 * KEYOUT: 需求, 输出给reduce * VALUEOUT: 需求, 输出给reduce */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * 重写map方法, 循环调用 * 从split中读取一行调用一次, 以行所在下标为key, 行内容为value */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // text 转string, toString(), 使用空格分隔为单词数组 String[] words = StringUtils.split(value.toString(), ' '); for (String word : words) { // 键值对输出, 输出给reduce context.write(new Text(word), new IntWritable(1)); } } }
package com.wenbronk.mapreduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * shuffling 后传给 reduce * @author wenbronk * KEYIN: mapper的输出 * VALUEIN: mapper的输出 */ public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ /** * 循环调用 * 每组调用一次, key相同, value可能多个, 使用迭代器 */ @Override protected void reduce(Text arg0, Iterable<IntWritable> arg1, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 对值进行累加 int sum = 0; // 使用迭代器 for (IntWritable value : arg1) { sum += value.get(); } // 使用context输出 context.write(arg0 , new IntWritable(sum)); } }
package com.wenbronk.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 执行mapreduce * 统计单词出新的次数 * @author wenbr * */ public class RunMapReduce { public static void main(String[] args) throws Exception { // 初始化时加载src或classpath下全部的hadoop配置文件 Configuration configuration = new Configuration(); // 获得执行的任务 Job job = Job.getInstance(config); // 入口类 job.setJarByClass(RunMapReduce.class); // job名字 job.setJobName("wordCount"); // job执行是map执行的类 job.setMapperClass(WordCountMapper.class); // job执行的reduce类 job.setReducerClass(WordCountReduce.class); // job输出的键类型 job.setMapOutputKeyClass(Text.class); // job输出的value类型 job.setMapOutputValueClass(IntWritable.class); //**** 使用插件上传data.txt到hdfs/root/usr/data.txt // 使用文件 FileInputFormat.addInputPath(job, new Path("/root/usr/")); // 使用一个不存在的目录进行 Path path = new Path("/root/usr/output"); // 若是存在删除 FileSystem fs = FileSystem.get(configuration); if (fs.exists(path)) { fs.delete(path, true); } // 输出 FileOutputFormat.setOutputPath(job, path); boolean forCompletion = job.waitForCompletion(true); if (forCompletion) { System.out.println("success"); } } }
全部的类编写好了, 接下来是上传文件服务器
我是用的插件为: app
这儿使用直接发布到服务器运行的方式eclipse
eclipse打包项目成jar包(只须要源码便可), 而后上传到服务器目录下, 使用hadoop命令执行
格式: hadoop jar jar路径 类全限定名ide
hadoop jar wc.jar com.wenbronk.mapreduce.RunMapReduce
以后在hadoop的目录下就能够看到统计后输出的文件了oop