Hadoop浅度学习指南(HDFS、YARN、MapReduce)

大数据

  1. 概念:big data
  2. 5V特征:java

    1. Volume:量大
    2. Value:价值高,价值密度低
    3. Variety:多样性
    4. Velocity:速度快
    5. Veracity:准确性

hadoop

主要组成

GFS --> HDFS
MapReduce --> MapReduce
BigTable -- > HBasenode

模块

  • Hadoop Common: The common utilities that support the other Hadoop modules.支持hadoop其余模块的通常工具
  • Hadoop Distributed File System (HDFS™): A distributed file system that provides high-throughput access to application data.高吞吐分布式文件系统
  • Hadoop YARN: A framework for job scheduling and cluster resource management. 资源调度和任务管理
  • Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.基于yarn的大数据并行处理系统

HDFS

组成

  • namenode:管理元数据,处理来自客户端的请求
    元数据:描述数据属性的数据、描述数据的数据
  • secondarynamenode:元数据的合并
  • datanode:具体数据的读写
  • client:文件读写请求的发起

HDFS机制

namenode

  • 负责元数据的管理,DataNode负责处理文件内容的读写请求
  • 处理client的读写的请求,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问
  • 副本存放在哪些DataNode上由 NameNode来控制,根据全局状况作出块放置决定,读取文件时NameNode尽可能让用户先读取最近的副本,下降带块消耗和读取时延
  • 全权管理数据块的复制、它周期性地从集群中的每一个Datanode接收心跳信号和块状态报告(Blockreport),块状态报告包含了一个该Datanode上全部数据块的列表

datanode

  • 一个数据块在DataNode以文件存储在磁盘上,包括数据块自己、数据块的元数据(数据块的长度,块数据的校验和,以及时间戳)
  • DataNode启动后向NameNode注册,经过后,周期性(1小时)的向NameNode上报全部的块信息
  • 心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另外一台机器,或删除某个数据块。若是超过10分钟没有收到某个DataNode 的心跳,则认为该节点不可用。

文件

  • block 默认128M,每一个块有多个副本存储在不一样的机器上
  • NameNode 是主节点,存储文件的元数据如文件名,文件目录结构,文件属性(生成时间,副本数,文件权限),以及每一个文件的块列表以及块所在的DataNode等等
  • DataNode 在本地文件系统存储文件块数据,以及块数据的校验和
  • 能够建立、删除、移动或重命名文件,当文件建立、写入和关闭以后不能修改文件内容

namenode从datanode接受心跳和块报告

  • namenode启动后,datanode向namenode进行注册
  • 心跳

    心跳是每3秒一次,linux

    心跳返回结果带有NameNode给该DataNode的命令如删除块,
    复制块等apache

    若是超过10分钟没有收到某个DataNode 的心跳,则认为该
    节点不可用编程

  • 块报告

    DataNode启动后向NameNode注册,windows

    经过后,周期性(1小时)的向NameNode上报全部的块信息centos

  • 块损坏

    当DataNode读取block的时候,从新计算checksum,和建立
    时的对比缓存

    DataNode 在其文件建立后三周验证其checksum安全

  • HDFS有哪些进程

    NameNode服务器

    DataNode

    NodeManager

    ResourceManager

NameNode启动过程

  • NameNode元数据/命名空间持久化fsimage与edits
  • NameNode格式化,具体作什么事

    建立fsimage文件,存储fsimage信息

    建立edits文件

  • NameNode 启动过程

    加载fsimage和edits文件

    生成新的fsimage和edits文件

    等待DataNode注册与发送Block Report

  • DataNode 启动过程

    向NameNode注册、发送Block Report

  • NameNode SafeMode 安全模式

    namenode启动时会进入安全模式,此时只可读不可写


  1. Name启动的时候首先将fsimage(镜像)载入内存,并执行(replay)编辑日志editlog的的各项操做;
  2. 一旦在内存中创建文件系统元数据映射,则建立一个新的fsimage文件(这个过程不需SecondaryNameNode) 和一个空的editlog;
  3. 在安全模式下,各个datanode会向namenode发送块列表的最新状况;
  4. 此刻namenode运行在安全模式。即NameNode的文件系统对于客服端来讲是只读的。(显示目录,显示文件内容等。写、删除、重命名都会失败);
  5. NameNode开始监听RPC和HTTP请求
    解释RPC:RPC(Remote Procedure Call Protocol)——远程过程经过协议,它是一种经过网络从远程计算机程序上请求服务,而不须要了解底层网络技术的协议;
  6. 系统中数据块的位置并非由namenode维护的,而是以块列表形式存储在datanode中;
  7. 在系统的正常操做期间,namenode会在内存中保留全部块信息的映射信息。

HDFS启动流程及元数据的同步

  • 元数据的同步
    流程图:
    触发的阈值(hdfs-default.xml)
    dfs.namenode.checkpoint.period 3600
    dfs.namenode.checkpoint.txns 1百万个事务
  • NameNode 启动过程

    1. 加载fsimage和edits文件
    2. 合并生成新的fsimage,并生成edits文件
    3. 等待DataNode注册与发送心跳和Block Report
    4. NameNode 启动过程当中会进入SafeMode(安全模式)

安全模式

在安全模式下,文件系统不容许修改
目的,是在系统启动时检查各个datanode数据的有效性

进入安全模式的三种方式

  1. 手动进入

    $ bin/hdfs dfsadmin -safemode enter

    $ bin/hdfs dfsadmin -safemode leave

  2. namenode启动会自动进入
  3. 正常块的个数/总的块个数<0.999 也会进入安全模式
<property>
        <name>dfs.namenode.safemode.threshold-pct</name>
        <value>0.999f</value>
    </property>

HDFS特色

  • 优势

    1. 处理超大文件
    2. 一次写入,屡次读取
    3. 运行与廉价服务器
    4. 不移动数据到计算点,而是就地计算,减小网络阻塞
  • 缺点:

    1. 高延迟,不适合接入前台业务
    2. 不支持任意的修改

HDFS API

Java API

package com.ct.test;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;

import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.junit.Before;
import org.junit.Test;

public class TestDemo {
    
    FileSystem fs = null;
    
//    public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
//        
////        FileSystem fs = FileSystem.get(new URI("hdfs://centos01:8020"),
////                new Configuration(),
////                "chen");
////        
////        boolean success = fs.mkdirs(new Path("/test"));
////        
////        System.out.println(success);
////        test.setUp();
////        test.testMkdir();
////        test.testDelete();
//        
//        
//        
//        
//    }
    @Before
    //获取文件对象
    public void setUp() {
        Configuration conf = new Configuration();
        conf.set("dfs.replication", "7");
        
        try {
            fs = FileSystem.get(new URI("hdfs://centos01:8020"), 
                    conf, 
                    "chen");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (URISyntaxException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    //建立文件夹
    @Test
    public void testMkdir() throws IllegalArgumentException, IOException {
        boolean success = fs.mkdirs(new Path("/result"));
        System.out.println(success);
    }
    
    
    //删除文件夹
    public void testDelete() throws IllegalArgumentException, IOException {
        fs.delete(new Path("/result"), true);
    }
    
    @Test
    //上传文件
    public void testUpload() throws IllegalArgumentException, IOException {
        FSDataOutputStream out = fs.create(new Path("/input/testUpload.log"));
        FileInputStream input = new FileInputStream("F:/test.txt");
        
        IOUtils.copy(input, out, 1024);
    }
    
    @Test
    public void testDownload() throws IllegalArgumentException, IOException {
        FSDataInputStream input = fs.open(new Path("/input/testUpload.log"));
        FileOutputStream out = new FileOutputStream("F:/test-copy.txt");
        
        IOUtils.copy(input, out, 1024);

    }
    
    @Test
    public void testList() throws FileNotFoundException, IllegalArgumentException, IOException {
        RemoteIterator<LocatedFileStatus> ri = fs.listFiles(new Path("/input"), true);
        
        while(ri.hasNext()) {
            LocatedFileStatus next = ri.next();
            next.getBlockLocations();
            String group = next.getGroup();
            long len = next.getLen();
            String owner = next.getOwner();
            FsPermission permission = next.getPermission();
            long blockSize = next.getBlockSize();
            short rep = next.getReplication();
            
            System.out.println(permission+"\t"+owner+"\t"+group);
            System.out.println(len+"\t"+blockSize+"\t"+rep);

            BlockLocation[] blockLocations = next.getBlockLocations();
            for (BlockLocation blktn : blockLocations) {
                System.out.println("length:"+blktn.getLength());
                System.out.println("offset:"+blktn.getOffset());
                System.out.println(Arrays.toString(blktn.getHosts()));
            }
            
        }
    }

}

HDFS读流程

read

  1. 打开分布式文件调用 分布式文件DistributedFileSystem.open()方法
  2. 从 NameNode 得到 DataNode 地址DistributedFileSystem 使用 RPC 调用 NameNode,NameNode返回存有该副本的 DataNode 地址,DistributedFileSystem 返回一个输入流 FSDataInputStream对象,该对象封存了输入流DFSInputStream
  3. 链接到DataNode调用 输入流 FSDataInputStream 的 read() 方法,从而 输入流DFSInputStream 链接 DataNodes
  4. 读取DataNode反复调用 read()方法,从而将数据从 DataNode 传输到客户端
  5. 读取另外的DataNode直到完成到达块的末端时候,输入流 DFSInputStream 关闭与DataNode链接, 寻找下一个 DataNode
  6. 完成读取,关闭链接,即调用输入流 FSDataInputStream.close()

HDFS写流程

write

  1. 发送建立文件请求:调用分布式文件系统DistributedFileSystem.create()方法
  2. NameNode中建立文件记录:分布式文件系统DistributedFileSystem 发送 RPC 请求给namenode,namenode 检查权限后建立一条记录,返回输出流 FSDataOutputStream,封装了输出流 DFSOutputDtream
  3. 客户端写入数据:输出流 DFSOutputDtream 将数据分红一个个的数据包,并写入内部队列。DataStreamer 根据 DataNode 列表来要求 namenode 分配适合的新块来存储数据备份。一组DataNode 构成管线(管线的 DataNode 之间使用 Socket 流式通讯)
  4. 使用管线传输数据:DataStreamer 将数据包流式传输到管线第一个DataNode,第一个DataNode 再传到第二个DataNode ,直到完成。
  5. 确认队列:DataNode 收到数据后发送确认,管线的DataNode全部的确认组成一个确认队列。全部DataNode 都确认,管线数据包删除。
  6. 关闭:客户端对数据量调用close()方法。将剩余全部数据写入DataNode管线,并联系NameNode且发送文件写入完成信息以前等待确认。
  7. NameNode确认
  8. 故障处理:若过程当中发生故障,则先关闭管线, 把队列中全部数据包添加回去队列,确保数据包不漏。为另外一个正常DataNode的当前数据块指定一个新的标识,并将该标识传送给NameNode, 一遍故障DataNode在恢复后删除上面的不完整数据块. 从管线中删除故障DataNode 并把余下的数据块写入余下正常的DataNode。NameNode发现复本两不足时,会在另外一个节点建立一个新的复本

YARN

组成

  • resourcemanger:负责全局的任务调度和资源管理(内存、CPU)、启动/监控applicationMaster 、监控NodeManager
  • nodemanger:单个节点的资源管理、处理来自resourcemanger和applicationmaster的任务请求
  • client:发起任务的请求
  • container:对环境的抽象,封装了CPU、内存、环境变量
  • applicationmaster:负责管理应用,为应用申请资源,任务的监控和容错

服务功能

  • ResourceManager

    • 处理客户端请求
    • 启动/监控ApplicationMaster
    • 监控NodeManager
    • 资源分配与调度
  • NodeManager

    • 单个节点上的资源管理
    • 处理来自ResourceManager的命令
    • 处理来自ApplicationMaster的命令
  • ApplicationMaster

    • 数据切分
    • 为应用程序申请资源,并分配给内部任务
    • 任务监控与容错
  • Container

    • 对任务运行环境的抽象,封装了CPU、内存等多维资源以及环境变量、启动命令等任务运行相关的信息

YARN工做流程

  1. 客户端向ResourceManager提交应用程序,其中包括ApplicationMaster、启动ApplicationMaster的命令、用户程序等;
  2. ResourceManager为该应用程序分配第一个Container,并与对应NodeManager通讯,要求它在这个Container中启动应用程序的ApplicationMaster;
  3. ApplicationMaster向ResourceManager注册本身,启动成功后与ResourceManager保持心跳;
  4. ApplicationMaster向ResourceManager申请资源;
  5. 申请资源成功后,由ApplicationMaster进行初始化,而后与NodeManager通讯,要求NodeManager启动Container。而后ApplicationMaster与NodeManager保持心跳,从而对NodeManager上运行的任务进行监控和管理;
  6. Container运行期间,向ApplicationMaster汇报本身的进度和状态信息,以便ApplicationMaster掌握任务运行状态,从而在任务失败是能够从新启动;
  7. 应用运行结束后,ApplicationMaster向ResourceManager注销本身,容许其所属的Container回收。

MapReduce

Map和Reduce 计算框架,编程模型 “分而治之”的思想, 分布式并行计算

Mapper

对一些独立元素组成的列表的每个元素进行制定的操做,可高度并行

// step 1: Map Class
    /**
     * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * 
     */
    //TODO update paragram
    public static class ModuleMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
 
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
        }

Reducer

对一个列表元素进行合并

// step 2: Reduce Class
    /**
     * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * 
     */
    //TODO
    public static class ModuleReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
 
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
        }
    }

Job

// step 3: Driver ,component job, implements Tool
    public int run(String[] args) throws Exception {
        // 1: get configration
        Configuration configuration = getConf();
 
        // 2: create Job
        Job job = Job.getInstance(configuration, this.getClass()
                .getSimpleName());
        // run jar
        job.setJarByClass(this.getClass());
 
        // 3: set job
        // input -> map -> reduce -> output
        // 3.1 input
        Path inPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inPath);
 
        // 3.2: map
        job.setMapperClass(ModuleMapper.class);
        //TODO update paragram
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
 
        // 3.3: reduce
        job.setReducerClass(ModuleReducer.class);
        //TODO
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
 
        // 3.4: output
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);
 
        // 4: submit job
        boolean isSuccess = job.waitForCompletion(true);
 
        return isSuccess ? 0 : 1;
    }

WordCount

package com.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountDemo extends Configured implements Tool {

    /**
     * map 任务的定义
     * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * KEYIN    偏移量                    LongWritable
     * VALUEIN    一行文本                    Text
     * KEYOUT    单词                        Text
     * VALUEOUT    1                        IntWritable
     * 
     * map任务
     * 将一行文本拆分红单词
     * 
     *
     */
    
    public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        
        Text keyOut = new Text();
        IntWritable valueOut = new IntWritable();
        
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            
            System.out.println("keyIn:"+key+"\t\t"+"valueIn:"+value);
            //1. 单词拆分
            String[] vals = value.toString().split(" ");
            
            //2. 遍历输出
            for (String val : vals) {
                keyOut.set(val);
                valueOut.set(1);
                context.write(keyOut, valueOut);
                
                System.out.println("keyOut:"+keyOut+"\t\t"+"valueOut:"+valueOut);
            }
        }
    }
    
    
    /**
     * 
     * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * KEYIN    单词                        Text
     * VALUEIN    单词次数的集合                list的元素    IntWritable
     * KEYOUT    单词                        Text
     * VALUEOUT    总次数                    IntWritable
     *
     */
    
    public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>    {
        
        IntWritable valueOut = new IntWritable();
        
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            
            System.out.print("keyIn:"+key+"\t\t[");
            //1. 求次数综合
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
                
                System.out.print(value+",\t");
            }
            System.out.println("]");
            //2. 输出
            valueOut.set(sum);
            context.write(key, valueOut);
        }
    }
    
    
    
    @Override
    public int run(String[] args) throws Exception {
        //1 设置job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName(this.getClass().getSimpleName());
        
        //2. 设置map类和reduce类
        job.setMapperClass(WCMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        job.setReducerClass(WCReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //3 设置输入输出路径
        FileInputFormat.setInputPaths(job, args[0]);
        
        Path out = new Path(args[1]);
        FileSystem fs = out.getFileSystem(conf);
        if(fs.exists(out)) {
            fs.delete(out, true);
        }
        FileOutputFormat.setOutputPath(job, out);
        boolean success = job.waitForCompletion(true);
        return success?1:0;
    }
    
    public static void main(String[] args) {
        try {
            int run = ToolRunner.run(new WordCountDemo(), args);
            System.out.println(run==1?"成功":"失败");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

MapReduce实现表的join

map join

适合大小表join,将小表缓存在内存中,join发生在map端

只缓存一次,在Mapper子类中重写setup方法,在setup方法中将小表文件装入内存中

Mapper子类中map方法读取大表

package com.join;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapJoin extends Configured implements Tool {
    

    
    public static class MJMapper extends Mapper<LongWritable, Text, Text, Text> {
        
        HashMap<String, String> cacheMap = new HashMap<String, String>();
        
        // 首相将小表读入内存
        // 该方法只在每次任务开始时加载一次
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String path = "F:\\input\\join\\dept.log";
            
            FileReader fr = new FileReader(path);
            BufferedReader br = new BufferedReader(fr);
            
            String line = null;
            while((line=br.readLine()) != null) {
                String[] vals = line.split("\t");
                cacheMap.put(vals[0], vals[1]);
            }
            
            br.close();
            fr.close();
        }
        
        // map端根据两张表的key进行合并
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String[] vals = value.toString().split("\t");
            
            String deptno = cacheMap.get(vals[2]);
            String dname = cacheMap.get(deptno);
            
            context.write(new Text(deptno), new Text(dname+"\t"+vals[0]+vals[1]));
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        //1 设置job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName(this.getClass().getSimpleName());
        //2 设置map类和reduce
        job.setMapperClass(MJMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        
        //3 设置输入输出路径
        FileInputFormat.setInputPaths(job, args[0]);
        
        Path out = new Path(args[1]);
        FileSystem fs = out.getFileSystem(conf);
        if(fs.exists(out)) {
            fs.delete(out, true);
        }
        FileOutputFormat.setOutputPath(job, out);
        //4 提交
        boolean success = job.waitForCompletion(true);
        return success?1:0;
    }

    
    public static void main(String[] args) {
        try {
            int run = ToolRunner.run(new MapJoin(), args);
            System.out.println(run==1?"成功":"失败");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

reduce join

适合两张大表join

package com.join;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReduceJoin extends Configured implements Tool {
    /*
     * 1    技术部
     * 1002    rose    1
     */
    
    public static class RJMapper extends Mapper<LongWritable, Text, Text, Text>{
        Text keyOut = new Text();
        Text valueOut = new Text();

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String[] vals = value.toString().split("\t");
            
            if(vals.length == 2) {
                keyOut.set(vals[0]);
                valueOut.set(vals[1]);
            }else {
                keyOut.set(vals[2]);
                valueOut.set(vals[0]+"\t"+vals[1]);
            }
            context.write(keyOut, valueOut);
            
        }
    }
    
    /*
     * keyIn:1
     * valueIn    List{[1007    lily], [1002    rose], [1001    jack], [技术部]}
     */
     
    // reduce端合并是依靠MapReduce shuffle过程当中将相同key的行放入同一台机器
    
    public static class RJReducer extends Reducer<Text, Text, Text, Text> {
        ArrayList<String> employees = new ArrayList<String>();
        
        @Override
        protected void reduce(Text keyIn, Iterable<Text> valueIn, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String department = null;
            employees.clear();    //这里要注意清空list
            
            for (Text tmp : valueIn) {
                String[] vals = tmp.toString().split("\t");
                // 根据length判断这是张什么表
                if(vals.length == 1) {
                    department = vals[0];
                }else if(vals.length == 2) {
                    employees.add(tmp.toString());
                }
            }
            
            for (String employee : employees) {
                context.write(keyIn, new Text(employee+"\t"+department));
            }
            
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        //1 设置job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName(this.getClass().getSimpleName());
        //2 设置map类和reduce
        job.setMapperClass(RJMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setReducerClass(RJReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //3 设置输入输出路径
        FileInputFormat.setInputPaths(job, args[0]);
        
        Path out = new Path(args[1]);
        FileSystem fs = out.getFileSystem(conf);
        if(fs.exists(out)) {
            fs.delete(out, true);
        }
        FileOutputFormat.setOutputPath(job, out);
        //4 提交
        boolean success = job.waitForCompletion(true);
        return success?1:0;
    }

    
    public static void main(String[] args) {
        try {
            int run = ToolRunner.run(new ReduceJoin(), args);
            System.out.println(run==1?"成功":"失败");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

Hadoop的安装模式

  1. 单机模式
  2. 伪分布模式(pseudo)
  3. 彻底分布模式

hadoop 开发环境搭建

maven环境搭建

  1. 安装maven

    1. 解压apache-maven-3.0.5.tar.gz
    2. 配置maven环境变量
      MAVEN_HOME=[maven的解压目录]
      %MAVEN_HOME%/bin;
    3. 命令提示符 mvn -version
  2. 解压repository.tar.gz到windows磁盘(如 E:toolsrepository)
  3. 修改settings.xml配置文件中指定的repository(修改apache-maven-3.0.5confsettings.xml)

    <localRepository>D:/repository</localRepository>
  4. 配置eclipse的maven环境
    windows->preferences->maven->

    ->installations->add->勾选本身安装的maven
    ->user settings->选择mave家目录/conf/settings
  5. 建立maven工程
  6. 将${hadoop_Home}/ect/hadoop/log4j.properties拷贝到项目的src目录
  7. 修改pom.xml

windows下搭建 hadoop开发环境

  1. Windows安装hadoop

    1. 解压hadoop-2.5.0.tar.gz到本地windows磁盘
    2. 配置hadoop的环境变量

      添加环境变量            HADOOP_HOME=hadoop解压目录
      在PATH环境变量中追加    %HADOOP_HOME%/bin;
    3. 测试

      hadoop -h
  2. eclipse安装插件

    1. 解压eclipse
    2. 将hadoop-eclipse-plugin-2.6.0.jar拷贝到${MyEclispe_HOME}/plugins
    3. 打开(重启)eclispe,菜单栏->windows->Preferneces->Hadoop MapReduce
  3. eclipse配置插件参数,链接HDFS

    1. 在linux中的hadoop安装目录下的etc/hadoop/hdfs-site.xml添加以下配置,重启HDFS的进程
    <!--关闭hdfs的文件权限控制-->
     <property>
         <name>dfs.permissions</name>
             <value>false</value>        
     </property>

    eclipse->windows->show views->other->输入MapReduce->点击map reduce locations
    右击->new hadoop locations

    1. Map/Reduce Master

      Mapreduce(V2) 
      host:[hostname]
      port:8032            //resourcemanager 的默认端口号
    2. DFS Master

      DFS Master
      host:[hostname]
      port:8020
  4. 拷贝winutils.exe 和hadoop.dll到${hadoop_HOME}/bin
  5. 单独拷贝hadoop.dll到C:WindowsSystem32
  6. 建立maven工程,经过pom.xml导包

    将lo4j.perperties文件拷贝到src/main/resources

打jar包,提交集群运行

  1. jar包时,指定主类

    yarn jar pv.jar /input/2015082818 /output

  2. jar包时,不指定主类

    yarn jar pv.jar 类的全限定名 /input/2015082818 /output
    不一样包中可能有相同类名,因此要指定类的全限定名

Shuffle

shuffle

MapReduce框架核心部分(设计精髓):内核

shuffle 定义

​ map() 输出开始 到 reduce()输入开始 此阶段是shuffle
​ input -> map -> shuffle -> reduce -> output

shuffle分为两个阶段

​ map shuffle phase

​ reduce shuffle phase

shuffle主要操做

​ partitioner - map

​ sorter - map & reduce

​ combiner: map phase局部聚合操做 不是全部的MapReduce程序均可以进行局部聚合的

​ compress:map phase的输出数据压缩 针对全部MapReduce程序均可以进行设置

​ group - reduce

shuffle详解

全部操做都是针对map()输出的<key, value>数据进行的

map shuffle phase

  1. 进入环形缓冲区(默认100MB)

    当达到环形缓冲区内存的80%默认状况下,将会将缓冲区中的数据spill到本地磁盘中(溢出到MapTask所运行的NodeManager机器的本地磁盘中)

  2. 溢写

    并非当即将缓冲区中的数据溢写到本地磁盘,而是须要通过一些操做

    1. 分区paritioner

      依据此MapReduce Job中Reduce Task个数进行分区决定map输出的数据被哪一个reduce任务进行处理分析默认状况下,依据key采用HashPartitioner

// 经过取余将数据分配到哪一个reduce处理
HashPartitioner
    int getParitition(key, value, numreducetask) {
        return ( key.hashCode&Integer.maxValue)%numreducetask;
    }
    1. 排序sorter

      会对每一个分区中的数据进行排序,默认状况下依据key进行排序

    2. spill溢写

      将分区排序后的数据写到本地磁盘的一个文件中

      反复上述的操做,产生多个小文件

    1. 当溢写结束后

      • 此时将spill到本地磁盘的小文件进行一次合并。
      • combiner: (可选)map端的reduce
      • compress:(可配置) 数据减小了, 减小网络IO; 但压缩消耗CPU性能,也须要时间

    reduce shuffle phase

    • merge 合并

      各个分区的数据合并在一块儿(当MapTask处理数据完成之后,告知AppMaster,而后AppMaster通知全部的ReduceTask,各个ReduceTask主动到已经完成的MapTask的本地磁盘,去拉取属于本身要处理的数据(分区中))

    • 排序 对各个分区中的数据进行排序

      最后每一个分区造成一个文件(map输出的数据最后在个文件中),分区的,而且各个分区的数据已经进行了排序。

    • 分组group

      将相同key的value值存入到list集合,造成新的key, list(value),将key/value对数据传递给reduce()函数进行处理。

    最后将(key, list(value))传给 reduce()

    map个数及reduce个数肯定

    map个数肯定

    FileInputFormat.setMaxInputSplitSize(job, size);        设置切片最大值
    FileInputFormat.setMinInputSplitSize(job, size);        设置切片最小值

    FileInputFormat
            public List<InputSplit> getSplits(JobContext job){。。。}
                
            protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
                        return Math.max(minSize, Math.min(maxSize, blockSize));
            }
        
            // minSize<=maxSize<blockSize    提升并发
            // minSize>blockSize            下降并发

    reduce个数肯定

    job.setNumReduceTasks(2);
    HashParitioner 决定map输出的类被哪一个reduce处理

    自定义shuffle

    自定义key

    • key 和 value 均可以使用自定义类
    • 自定义的类不使用 Java 自带的 serializable 接口,改用hadoop 提供的Writable 接口
    • 注意重写 toString、 write、readFields
    package com.flow;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    /**
     * 不用serializable
     * 
     * 用Hadoop的Writable
     *
     */
    
    public class Flow implements Writable {
        
        private long up;
        private long down;
        private long sum;
        
        
        public long getUp() {
            return up;
        }
        public void setUp(long up) {
            this.up = up;
        }
        public long getDown() {
            return down;
        }
        public void setDown(long down) {
            this.down = down;
        }
        public long getSum() {
            return sum;
        }
        public void setSum(long sum) {
            this.sum = sum;
        }
        
        
    
        @Override
        public String toString() {
            return up + "\t" + down + "\t" + sum;
        }
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(up);
            out.writeLong(down);
            out.writeLong(sum);
    
        }
        @Override
        public void readFields(DataInput in) throws IOException {
            up = in.readLong();
            down = in.readLong();
            sum = in.readLong();
        }
    }

    自定义分区

    • 调用 job 的 setNumReduceTasks 方法设置reduce 个数
    • setPartitionerClass 设置分区
    public static class MyPartitioner extends Partitioner<Text, Flow> {
    
            @Override
            public int getPartition(Text key, Flow value, int numPartitions) {
                if(value.getSum()<1024) {
                    return 0;
                }else if(value.getSum()<10*1024) {
                    return 1;
                }
                return 2;
            }    
        }

    排序

    只能按照key排序,若是须要多重排序,须要自定义key
    在shuffle过程当中自动排序,无需手动调用方法

    public class MyKey implements WritableComparable<MyKey>
    //要排序的类要实现WritableComparable接口
    
        @Override
        public int compareTo(MyKey o) {
            long result = o.getSum() - this.getSum();
            if(result>0) {
                return 1;
            }else if(result<0) {
                return -1;
            }
            return o.getPhone().compareTo(this.getPhone());
        }

    combiner

    map端的小reduce,对每一个map后的value进行reduce,减小数据传输

    能够经过设置job.setCombinerClass(WCReducer.class);设置combiner

    先后效果对比

    原始数据
    hello world
    hello hadoop
    
    hello world
    hello java
    
    keyIn:hadoop        [1,    ]
    keyIn:hello        [1,    1,    1,    1,    ]
    keyIn:java        [1,    ]
    keyIn:world        [1,    1,    ]
    
    
    
    keyIn:hadoop        [1,    ]
    keyIn:hello        [2,    2,    ]
    keyIn:java        [1,    ]
    keyIn:world        [1,    1,    ]

    分组

    根据需求将key中相同的字段做为同一个key以减小键值对,做为一种优化的手段

    重写 RawComparator 方法合并key中相同字段

    经过 job.setGroupingComparatorClass(Mygroup.class); 调用

    public static class Mygroup implements RawComparator<Person> {
    
            @Override
            public int compare(Person o1, Person o2) {
                // TODO Auto-generated method stub
                return 0;
            }
    
            @Override
            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
                return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);
            }
            
        }

    hadoop优化

    1. 能够设置block默认大小
    2. 设置map个数
    3. 调整环形缓冲区大小
    4. 自定义分区 --> 解决数据清倾斜问题
    5. 自定义 combiner --> map端的小reduce,减小网络传输损耗
    6. 自定义分组 --> 减小键值对
    7. 设置reduce个数 --> 加快处理速度
    8. CombinerFileInputFormat --> 合并小文件
    9. 根据业务自定义key和value

    Java MapReduce编程错误

    • org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable

      map方法把文件的行号当成key,因此要用LongWritable。
    相关文章
    相关标签/搜索