大数据-MapReduce

源码见:https://github.com/hiszm/hadoop-trainjava

MapReduce概述

是一个分布式计算框架 ,用于编写批处理应用程序。编写好的程序能够提交到 Hadoop 集群上用于并行处理大规模的数据集。MapReduce 做业经过将输入的数据集拆分为独立的块,这些块由 map并行 的方式处理,框架对 map 的输出进行排序,而后输入到 reducegit

  • 源自于Google的MapReduce论文 ,论文发表于2004年12月
  • Hadoop MapReduce是Google MapReduce的克隆版
  • MapReduce优势:海量数据离线处理&易开发&易运行
  • MapReduce缺点:实时流式计算

MapReduce编程模型

MapReduce

咱们编程主要关注的是如何Splitting和如何Reduce
MapReduce 框架专门用于 <key,value> 键值对处理,它将做业的输入视为一组 <key,value> 对,并生成一组 <key,value> 对做为输出。github

MapReduce将做业拆分红Map阶段和Reduce阶段apache

  1. input : 读取文本文件;编程

  2. splitting : 将文件按照行进行拆分,此时获得的 K1 行数,V1 表示对应行的文本内容;markdown

  3. mapping : 并行将每一行按照空格进行拆分,拆分获得的 List(K2,V2),其中 K2 表明每个单词,因为是作词频统计,因此 V2 的值为 1,表明出现 1 次;app

  4. shuffling:因为 Mapping 操做多是在不一样的机器上并行处理的,因此须要经过 shuffling 将相同 key 值的数据分发到同一个节点上去合并,这样才能统计出最终的结果,此时获得 K2 为每个单词,List(V2) 为可迭代集合,V2 就是 Mapping 中的 V2;框架

  5. Reducing : 这里的案例是统计单词出现的总次数,因此 ReducingList(V2) 进行归约求和操做,最终输出。
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

Mapper分布式

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public Mapper() {
    }

    protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }

    protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKeyValue()) {
                this.map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        } finally {
            this.cleanup(context);
        }

    }

    public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public Context() {
        }
    }
}

Reduceride

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.mapreduce.ReduceContext.ValueIterator;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;

@Checkpointable
@Public
@Stable
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public Reducer() {
    }

    protected void setup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        Iterator i$ = values.iterator();

        while(i$.hasNext()) {
            VALUEIN value = i$.next();
            context.write(key, value);
        }

    }

    protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKey()) {
                this.reduce(context.getCurrentKey(), context.getValues(), context);
                Iterator<VALUEIN> iter = context.getValues().iterator();
                if (iter instanceof ValueIterator) {
                    ((ValueIterator)iter).resetBackupStore();
                }
            }
        } finally {
            this.cleanup(context);
        }

    }

    public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public Context() {
        }
    }
}

MapReduce编程模型之执行步骤

  • 准备map处理的输入数据
  • Mapper处理
  • Shuffle
  • Reduce
  • 输出结果

MapReduce编程模型之核心概念

  • Split
  • InputFormat
  • OutputFormat
  • Combiner
  • Partitioner

image.png

相关文章
相关标签/搜索