上一篇博客分享了mapreduce的编程思想,本节博主将带小伙伴们了解wordcount程序的原理和代码实现/运行细节。经过本节能够对mapreduce程序有一个大概的认识,其实hadoop中的map、reduce程序只是其中的两个组件,其他的组件(如input/output)也是能够重写的,默认状况下是使用默认组件。java
1、wordcount统计程序实现:linux
WordcountMapper (map task业务实现)shell
package com.empire.hadoop.mr.wcdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * KEYIN: 默认状况下,是mr框架所读到的一行文本的起始偏移量,Long, * 可是在hadoop中有本身的更精简的序列化接口,因此不直接用Long,而用LongWritable * VALUEIN:默认状况下,是mr框架所读到的一行文本的内容,String,同上,用Text * KEYOUT:是用户自定义逻辑处理完成以后输出数据中的key,在此处是单词,String,同上,用Text * VALUEOUT:是用户自定义逻辑处理完成以后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable * * @author */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * map阶段的业务逻辑就写在自定义的map()方法中 maptask会对每一行输入数据调用一次咱们自定义的map()方法 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将maptask传给咱们的文本内容先转换成String String line = value.toString(); //根据空格将这一行切分红单词 String[] words = line.split(" "); //将单词输出为<单词,1> for (String word : words) { //将单词做为key,将次数1做为value,以便于后续的数据分发,能够根据单词分发,以便于相同单词会到相同的reduce task context.write(new Text(word), new IntWritable(1)); } } }
WordcountReducer(reduce业务代码实现)apache
package com.empire.hadoop.mr.wcdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * KEYIN, VALUEIN 对应 mapper输出的KEYOUT,VALUEOUT类型对应 KEYOUT, VALUEOUT * 是自定义reduce逻辑处理结果的输出数据类型 KEYOUT是单词 VLAUEOUT是总次数 * * @author */ public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1> * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1> * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1> * 入参key,是一组相同单词kv对的key */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; /* * Iterator<IntWritable> iterator = values.iterator(); * while(iterator.hasNext()){ count += iterator.next().get(); } */ for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } }
WordcountDriver (提交yarn的程序)编程
package com.empire.hadoop.mr.wcdemo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 至关于一个yarn集群的客户端 须要在此封装咱们的mr程序的相关运行参数,指定jar包 最后提交给yarn * * @author */ public class WordcountDriver { public static void main(String[] args) throws Exception { if (args == null || args.length == 0) { args = new String[2]; args[0] = "hdfs://master:9000/wordcount/input/wordcount.txt"; args[1] = "hdfs://master:9000/wordcount/output8"; } Configuration conf = new Configuration(); //设置的没有用! ?????? // conf.set("HADOOP_USER_NAME", "hadoop"); // conf.set("dfs.permissions.enabled", "false"); /* * conf.set("mapreduce.framework.name", "yarn"); * conf.set("yarn.resoucemanager.hostname", "mini1"); */ Job job = Job.getInstance(conf); /* job.setJar("/home/hadoop/wc.jar"); */ //指定本程序的jar包所在的本地路径 job.setJarByClass(WordcountDriver.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行 /* job.submit(); */ boolean res = job.waitForCompletion(true); //linux shell脚本中,上一条命令返回0表示成功,其它表示失败 System.exit(res ? 0 : 1); } }
2、运行mapreducecentos
(1)jar打包服务器
(2)上传到hadoop集群上,并运行app
#上传jar Alt+p lcd d:/ put wordcount_aaron.jar #准备hadoop处理的数据文件 cd /home/hadoop/apps/hadoop-2.9.1 hadoop fs -mkdir -p /wordcount/input hadoop fs -put LICENSE.txt NOTICE.txt /wordcount/input #运行wordcount程序 hadoop jar wordcount_aaron.jar com.empire.hadoop.mr.wcdemo.WordcountDriver /wordcount/input /wordcount/outputs
运行效果图:框架
[hadoop@centos-aaron-h1 ~]$ hadoop jar wordcount_aaron.jar com.empire.hadoop.mr.wcdemo.WordcountDriver /wordcount/input /wordcount/output 18/11/19 22:48:54 INFO client.RMProxy: Connecting to ResourceManager at centos-aaron-h1/192.168.29.144:8032 18/11/19 22:48:55 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 18/11/19 22:48:55 INFO input.FileInputFormat: Total input files to process : 2 18/11/19 22:48:55 WARN hdfs.DataStreamer: Caught exception java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1280) at java.lang.Thread.join(Thread.java:1354) at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:980) at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:630) at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:807) 18/11/19 22:48:55 WARN hdfs.DataStreamer: Caught exception java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1280) at java.lang.Thread.join(Thread.java:1354) at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:980) at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:630) at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:807) 18/11/19 22:48:55 INFO mapreduce.JobSubmitter: number of splits:2 18/11/19 22:48:55 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled 18/11/19 22:48:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1542637441480_0002 18/11/19 22:48:56 INFO impl.YarnClientImpl: Submitted application application_1542637441480_0002 18/11/19 22:48:56 INFO mapreduce.Job: The url to track the job: http://centos-aaron-h1:8088/proxy/application_1542637441480_0002/ 18/11/19 22:48:56 INFO mapreduce.Job: Running job: job_1542637441480_0002 18/11/19 22:49:03 INFO mapreduce.Job: Job job_1542637441480_0002 running in uber mode : false 18/11/19 22:49:03 INFO mapreduce.Job: map 0% reduce 0% 18/11/19 22:49:09 INFO mapreduce.Job: map 100% reduce 0% 18/11/19 22:49:14 INFO mapreduce.Job: map 100% reduce 100% 18/11/19 22:49:15 INFO mapreduce.Job: Job job_1542637441480_0002 completed successfully 18/11/19 22:49:15 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=241219 FILE: Number of bytes written=1074952 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=122364 HDFS: Number of bytes written=35348 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=7588 Total time spent by all reduces in occupied slots (ms)=3742 Total time spent by all map tasks (ms)=7588 Total time spent by all reduce tasks (ms)=3742 Total vcore-milliseconds taken by all map tasks=7588 Total vcore-milliseconds taken by all reduce tasks=3742 Total megabyte-milliseconds taken by all map tasks=7770112 Total megabyte-milliseconds taken by all reduce tasks=3831808 Map-Reduce Framework Map input records=2430 Map output records=19848 Map output bytes=201516 Map output materialized bytes=241225 Input split bytes=239 Combine input records=0 Combine output records=0 Reduce input groups=2794 Reduce shuffle bytes=241225 Reduce input records=19848 Reduce output records=2794 Spilled Records=39696 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=332 CPU time spent (ms)=2830 Physical memory (bytes) snapshot=557314048 Virtual memory (bytes) snapshot=2538102784 Total committed heap usage (bytes)=259411968 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=122125 File Output Format Counters Bytes Written=35348
运行结果:ide
#查看处理结果文件 hadoop fs -ls /wordcount/output hadoop fs -cat /wordcount/output/part-r-00000|more
问题处理:
18/11/19 22:48:55 WARN hdfs.DataStreamer: Caught exception java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1280) at java.lang.Thread.join(Thread.java:1354) at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:980) at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:630) at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:807)
发生上面错误是由于咱们新建hdfs目录时未按照官方文档新建形成,问题不大;博主这边没影响正常使用;
解决方法:
#建立目录 hdfs dfs -mkdir -p /user/hadoop hdfs dfs -put NOTICE.txt LICENSE.txt /user/hadoop
总结:使用如下两种方式来执行并无区别,hadoop jar,底层就是调用的java -cp命令来执行。
hadoop jar wordcount_aaron.jar com.empire.hadoop.mr.wcdemo.WordcountDriver /wordcount/input /wordcount/outputs java -cp .:/home/hadoop/wordcount_aaron.jar:/home/hadoop/apps/hadoop-2.9.1....jar com.empire.hadoop.mr.wcdemo.WordcountDriver /user/hadoop/ /wordcount/outputs
最后寄语,以上是博主本次文章的所有内容,若是你们以为博主的文章还不错,请点赞;若是您对博主其它服务器大数据技术或者博主本人感兴趣,请关注博主博客,而且欢迎随时跟博主沟通交流。