hadoop(03)、Hadoop Map/Reduce框架的简单实践

        本文源码码云地址:https://gitee.com/MaxBill/hadoophtml

       在上篇《hadoop(02)、使用JAVA API对HDFS进行基本操做》中,经过JAVA API链接HDFS系统进行了基本的操做实践,本文将使用Hadoop的Map/Reduce框架进行简单的实践操做。java

1、Hadoop Map/Reduce框架

       Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序可以运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。git

      一个Map/Reduce 做业(job) 一般会把输入的数据集切分为若干独立的数据块,由 map任务(task)以彻底并行的方式处理它们。框架会对map的输出先进行排序, 而后把结果输入给reduce任务。一般做业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及从新执行已经失败的任务。apache

       一般,Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点一般在一块儿。这种配置容许框架在那些已经存好数据的节点上高效地调度任务,这可使整个集群的网络带宽被很是高效地利用。windows

      Map/Reduce框架由一个单独的master JobTracker 和每一个集群节点一个slave TaskTracker共同组成。master负责调度构成一个做业的全部任务,这些任务分布在不一样的slave上,master监控它们的执行,从新执行已经失败的任务。而slave仅负责执行由master指派的任务。网络

      应用程序至少应该指明输入/输出的位置(路径),并经过实现合适的接口或抽象类提供map和reduce函数。再加上其余做业的参数,就构成了做业配置(job configuration)。而后,Hadoop的 job client提交做业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。app

注:以上Hadoop Map/Reduce摘自hadoop官方介绍,地址:http://hadoop.apache.org/docs/r1.0.4/cn/mapred_tutorial.html框架

2、环境准备

1.windows下hadoop开发环境:参见《hadoop(01)、windows平台下hadoop环境搭建dom

2.IDEA 开发编辑器编辑器

3.下载一个部小说(本文使用著名小说:三国演义)

4.上一篇中的项目基础,码云地址:本文源码码云地址:https://gitee.com/MaxBill/hadoop

3、开发编码

1.启动hdfs服务

2.编写WordCount程序(它能够计算出指定数据集中指定单词出现的次数)

官方的例子是统计单词的,比较简单,本文则使用分词器对三国演义的指定词频进行统计。在上篇《hadoop(02)、使用JAVA API对HDFS进行基本操做》的基础上,打开项目,在pom文件中添加分词的依赖:

<dependency>

         <groupId>cn.bestwu</groupId>

         <artifactId>ik-analyzers</artifactId>

         <version>5.1.0</version>

</dependency>

3.开始词频统计编码

主要有如下几个步骤:

<1>.上传三国演义小说(分词数据集)到HDFS中

<2>.编写统计词频代码

<3>.添加分词器

<4>.统计指定的词频

主要编码以下:

package com.maxbill.hadoop.reduce;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

import java.util.UUID;

/**
 * @功能
 * @做者 zuoshuai(MaxBill)
 * @日期 2017/11/17
 * @时间 14:39
 * @备注 WordCountV1
 */
public class WordCountV1 {

    private final static String userPath = "/user/Administrator/";


    /**
     * @功能 单词统计任务
     * @做者 zuoshuai(MaxBill)
     * @日期 2017/11/16
     * @时间 12:12
     */
    public static void wordCount(String jobName, String inputPath, String outputPath) throws Exception {
        JobConf jobConf = JobsUtils.getJobsConf(jobName);
        FileInputFormat.setInputPaths(jobConf, new Path(inputPath));
        FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
        JobClient.runJob(jobConf);
    }

    /**
     * @功能 主类测试
     * @做者 zuoshuai(MaxBill)
     * @日期 2017/11/17
     */
    public static void main(String[] args) throws Exception {
        String inputPath = userPath + "input/";
        String outputPath = userPath + "output/" + UUID.randomUUID().toString().toUpperCase();
        //1.建立输入输出目录
        //HdfsUtils.mkdir(inputPath);
        //2.上传三国演义到Administrator目录下
        //HdfsUtils.uploadFile("D:\\sgyy.txt", inputPath);
        //3.调用统计任务
        wordCount("wordCountV1", inputPath, outputPath);
    }

}

这是词频统计及测试主类。

package com.maxbill.hadoop.reduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

/**
 * @功能 Hadoop Map/Reduce操做工具类
 * @做者 zuoshuai(MaxBill)
 * @日期 2017/11/16
 * @时间 12:12
 */
public class JobsUtils {

    private final static String hdfsPath = "hdfs://127.0.0.1:10000";
    private final static String jobsPath = "hdfs://127.0.0.1:20000";

    /**
     * @功能 获取HDFS的配置信息
     * @做者 zuoshuai(MaxBill)
     * @日期 2017/11/16
     * @时间 12:12
     */
    public static Configuration getConfig() {
        Configuration config = new Configuration();
        config.set("fs.default.name", hdfsPath);
        config.set("mapred.job.tracker", jobsPath);
        return config;
    }

    /**
     * @功能 获取HDFS的job配置信息
     * @做者 zuoshuai(MaxBill)
     * @日期 2017/11/16
     * @时间 12:12
     */
    public static JobConf getJobsConf(String jobName) {
        JobConf jobConf = new JobConf(getConfig());
        jobConf.setJobName(jobName);
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(IntWritable.class);
        jobConf.setMapperClass(MyMap.class);
        jobConf.setCombinerClass(MyReduce.class);
        jobConf.setReducerClass(MyReduce.class);
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setOutputFormat(TextOutputFormat.class);
        return jobConf;
    }

}

这是 Hadoop Map/Reduce操做工具类及做业配置

package com.maxbill.hadoop.reduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.*;

/**
 * @功能
 * @做者 zuoshuai(MaxBill)
 * @日期 2017/11/17
 */
public class MyMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    /**
     * @做者 zuoshuai(MaxBill)
     * @日期 2017/11/17
     * @时间 14:46
     */
    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        //未使用分词器
        //String line = value.toString();
        //StringTokenizer tokenizer = new StringTokenizer(line);
        // hile (tokenizer.hasMoreTokens()) {
        //word.set(tokenizer.nextToken());
        //output.collect(word, one);
        //}
        //使用分词器
        byte[] btValue = value.getBytes();
        InputStream ip = new ByteArrayInputStream(btValue);
        Reader reader = new InputStreamReader(ip);
        IKSegmenter iks = new IKSegmenter(reader, true);
        Lexeme lexeme;
        while ((lexeme = iks.next()) != null) {
            //打印所有分词
            //System.err.println(lexeme.getLexemeText());
            word.set(lexeme.getLexemeText());
            output.collect(word, one);
        }
    }

}

MAP中使用分词器

package com.maxbill.hadoop.reduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * @功能
 * @做者 zuoshuai(MaxBill)
 * @日期 2017/11/17
 * @时间 14:46
 * @备注 Reduce
 */
public class MyReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    List<String> textList = null;

    public MyReduce() {
        textList = new ArrayList<>();
        textList.add("孙权");
        textList.add("姜维");
    }

    /**
     * @做者 zuoshuai(MaxBill)
     * @日期 2017/11/17
     * @时间 14:46
     */
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
        String keyStr = new String(key.toString());
        boolean isHas = textList.contains(keyStr);
        if (isHas) {
            System.out.println(">>>>>" + keyStr + " [" + sum + "]");
        }
    }

}

过滤指定词频

4.运行测试主类

咱们从运行测试记过能够看到,已经统计出孙权和姜维在文中出现的次数。

4、本文总结

      本文经过使用一些Map/Reduce框架提供的功能,实现的是的利用hadoop hdfs存储一个数据集,而后使用Hadoop Map/Reduce框架对数据进行简单的分析处理的一个小实践。在实践过程当中也遇到了许多的问题,在查阅官方文档和网上资料中都一一解决了,对于Map/Reduce还有不少的内容,将在之后的内容中慢慢的学习补充。若有遗漏或者错误,欢迎提出!

交流QQ:1370581389

本文源码码云地址:https://gitee.com/MaxBill/hadoop

相关文章
相关标签/搜索