Hadoop工做原理学习笔记

应用开发 java

主要知识点以下:node

Configuration类(支持overwrite,variable $)linux

测试(mock单元测试,本地测试,集群测试)web

    Tool, ToolRunner正则表达式

    集群测试(package, 启动job, Job web UI for namenode and jobtracker)数据库

    运程调试器(keep.failed.task.files = true, 使用ISolationRunner)apache

做业调优(HPROF)数组

MapReduce工做流 (oozie)缓存

 

1. 在本地运行测试数据tomcat

public class MaxTemperatureDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception {

         Job job = new Job(getConf(), “compute max temperature”);

         job.setJarByClass();

         job.setMapperClass();

        job.setReducerClass();

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.addOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true);

    }

    public static void main(String[] args) {

        int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);

        System.exit(exitCode);

    }

}

编译上面的代码,在根节点处运行hadoop命令(事先将hadoop进程在本地启动):

hadoop MaxTemperature –conf conf/hadoop-local.xml input/ncdc max-temp

2. 集群上运行

使用jar命令将class文件打包,而后使用jar命令上传并启动任务(事先将hadoop在集群中启动):

%hadoop jar job.jar MaxTempratureDriver –conf conf/hadoop-cluster.xml input output

3. Hadoop守护进程的地址和端口

RPC

    namenode RPC地址和端口 hdfs://localhost:8020 (fs.default.name)

    jobtracker RPC地址和端口 localhost:8021           (mapred.job.tracker)

    datanode TCP/IP服务器(块传输) 50010           (dfs.datanode.address)

    datanode RPC 地址和端口 localhost:50020         (dfs.datanode.ipc.address)

    tasktracker RPC 地址和端口 (mapred.task.tracker.report.address)

HTTP

    jobtracker     50030   (mapred.job.tracker.http.address)

    tasktracker    50060   (mapred.task.tracker.http.address)

    namenode    50070   (dfs.http.address)

    datanode      50075  (dfs.datanode.http.address)

    secondary     50090  (dfs.secondary.http.address)

4. 做业调试(计数器和状态)

在map/reduce程序中能够经过计数器和状态来记录数据中的一些状态,能够经过webUI或脚本指令来查看运行后的计数器或状态。

context.setStatus(“”);

context.incrCounter(String group, String counter, int num);

命令行查询计数器:

%hadoop job –counter job_201111160811_0003 ‘MaxTemperatureMaper$Temperature’ ENUM

 

远程调试器

在集群上运行做业很难调试,可是能够配置Hadoop保留做业运行期间产生的全部中间值,以便稍后在调试器上从新运行这些出错的任务。

1) 设置属性保留中间数据 keep.failed.task.files = true

2) 运行做业,在web界面上查看故障节点和task_attempt_ID;

3) 经过上面的ID来查找保存的中间数据文件。mapred.local.dir定义了本地缓存目录,在指定的一个或多个目录下寻找对应的job_id下的task_temp_id目录,下面存放着job.xml,map输入的序列化文件,map输出备份(在output目录下),和work目录(task_attempt的工做目录)。

4) 在脚本控制台cd到上面的work目录,设置运程调试器属性并启动hadoop进入debug模式:

%export HADOOP_OPTS=”-agentlib:jdwp=transfport=dt_socket,server=y,suspend=y,address=8787”

%hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml

5) 在运程客户端启动Java IDE如Eclipse远程链接上面主机的8787端口,在map/reduce源代码中设置断点等待。

上述调试技术不仅适用于失败的任务,还能够保留成功完成的任务数据来调试内部逻辑。这是,可将属性keep.task.files.pattern设置为一个正则表达式(与保留的任务ID匹配)。

其它一些调试的技巧:

在linux下dump Java thread stack trace

若是是在控制台中运行,则直接ctrl+\

若是是在后台运行,能够先找到运行java的pid,而后kill -QUIT PID,会将thread stack内容输出到该java进程的标准输出流里,例如tomcat就会写在catalina.out里。

jstack[-l]pid

若是java程序崩溃生成core文件,jstack工具能够用来得到core文件的javastack和nativestack的信息,从而能够轻松地知道java程序是如何崩溃和在程序何处发生问题。另外,jstack工具还能够附属到正在运行的java程序中,看到当时运行的java程序的javastack和nativestack的信息,若是如今运行的java程序呈现hung的状态,jstack是很是有用的。

5 做业调优

哪些因素影响做业的运行效率?

mapper的数量:尽可能将输入数据切分红数据块的整数倍。若有太多小文件,则考虑CombineFileInputFormat;

reducer的数量:为了达到最高性能,集群中reducer数应该略小于reducer的任务槽数。

combiner: 充分使用合并函数减小map和reduce之间传递的数据量,combiner在map后运行;

中间值的压缩:对map输出值进行压缩减小到reduce前的传递量(conf.setCompressMapOutput(true)和setMapOutputCompressorClass(GzipCodec.class));

自定义序列:若是使用自定义的Writable对象或自定义的comparator,则必须确保已实现RawComparator

调整shuffle:MapReduce的shuffle过程能够对一些内存管理的参数进行调整,以弥补性能不足;

另外一个有用的方法是启用JDK的HPROF分析来获取程序的CPU和堆栈使用状况。

conf.setProfileEnabled(true);  // “mapred.task.profile”

conf.setProfileParams(“-agentlib:hprof=cpu=samples,heap=sites,depth=6,force=n,thread=y,verbose=n,file=%s”);   // “mapred.task.profile.params”

conf.setProfileTaskRange(true, “0-2”); // 第一个参数表示map,false则分析reduce;第二个参数任务ID范围

将上述程序加入驱动程序后从新运行,分析结果将输出到做业日志的末尾。

MapReduce工做机制


知识点小结:

shuffle影响性能的因素

1 Map –>buffer –> partition, sort, spill to disk (输出缓冲区,溢出写磁盘比例,运行combiner最小溢出写文件数3, task tracker工做线程数)

2 Reduce

copy (5 threads) –> memory (buffer size) –> disk (threhold) –> merge –> reduce

 

1 剖析MapReduce做业运行机制

1.1 做业的提交

客户端经过JobClient.runJob()来提交一个做业到jobtracker,JobClient程序逻辑以下:

  a) 向Jobtracker请求一个新的job id (JobTracker.getNewJobId());

  b) 检查做业的输出说明,如已存在抛错误给客户端;计算做业的输入分片;

  c) 将运行做业所须要的资源(包括做业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中以job id命名的目录下。做业jar副本较多(mapred.submit.replication = 10);

  d) 告知jobtracker做业准备执行 (submit job)。

1.2 做业的初始化

job tracker接收到对其submitJob()方法的调用后,将其放入内部队列,交由job scheduler进行调度,并对其进行初始化,包括建立一个正在运行做业的对象(封装任务和记录信息)。

为了建立任务运行列表,job scheduler首先从共享文件系统中获取JobClient已计算好的输入分片信息,而后为每一个分片建立一个map任务;建立的reduce任务数量由JobConf的mapred.reduce.task属性决定,schedule建立相应数量的reduce任务。任务此时被执行ID。

1.3 任务的分配

jobtacker应该先选择哪一个job来运行?这个由job scheduler来决定,下面会详细讲到。

jobtracker如何选择tasktracker来运行选中做业的任务呢?

每一个tasktracker按期发送心跳给jobtracker,告知本身还活着,是否能够接受新的任务。jobtracker以此来决定将任务分配给谁(仍然使用心跳的返回值与tasktracker通讯)。每一个tasktracker会有固定数量的任务槽来处理map和reduce(好比2,表示tasktracker能够同时运行两个map和reduce),由机器内核的数量和内存大小来决定。job tracker会先将tasktracker的map槽填满,而后分配reduce任务到tasktracker。

jobtracker选择哪一个tasktracker来运行map任务须要考虑网络位置,它会选择一个离输入分片较近的tasktracker,优先级是数据本地化(data-local)–>机架本地化(rack-local)。

对于reduce任务,没有什么标准来选择哪一个tasktracker,由于没法考虑数据的本地化。map的输出始终是须要通过整理(切分排序合并)后经过网络传输到reduce的,可能多个map的输出会切分出一部分送给一个reduce,因此reduce任务没有必要选择和map相同或最近的机器上。

1.4 任务的执行

1. tasktracker分配到一个任务后,首先从HDFS中把做业的jar文件复制到tasktracker所在的本地文件系统(jar本地化用来启动JVM)。同时将应用程序所须要的所有文件从分布式缓存复制到本地磁盘。

2. 接下来tasktracker为任务新建一个本地工做目录work,并把jar文件的内容解压到这个文件夹下。

3. tasktracker新建一个taskRunner实例来运行该任务。TaskRunner启动一个新的JVM来运行每一个任务,以便客户的map/reduce不会影响tasktracker守护进程。但在不一样任务之间重用JVM仍是可能的。子进程经过umbilical接口(?什么含义,暂时未知)与父进程进行通讯。任务的子进程每隔几秒便告知父进程的进度,直到任务完成。

Streaming和Pipes是用来运行其它语言编写的map和reduce。Streaming任务特指任务使用标准输入输出steaming与进程通讯,能够是任何语言编写的。pipes特指C++语言编写的任务,其经过socket来通讯(persistent socket connection)。

1.5 进度和状态的更新

一个做业和每一个任务都有一个状态信息,包括:做业或任务的运行状态(running, successful, failed),map和reduce的进度,计数器值,状态消息或描述。

这些信息经过必定的时间间隔由child JVM –> task tracker –> job tracker汇聚。job tracker将产生一个代表全部运行做业及其任务状态的全局试图。你能够经过Web UI查看。同时JobClient经过每秒查询jobtracker来得到最新状态。

1.6 做业的完成

 

1.7 做业的失败

 

2. 做业的调度

默认调度器 – 基于队列的FIFO调度器

公平调度器(Fair Scheduler)- 每一个用户都有本身的做业池,用map和reduce的任务槽数来定制做业池的最小容量,也能够设置每一个池的权重。Fair Scheduler支持抢占,若是一个池在特定的一段时间内未获得公平的资源共享,它会停止运行池获得过多资源的任务,以便把任务槽让给运行资源不足的池。启动步骤:

1) 拷贝contrib/fairscheduler下的jar复制到lib下;

2) mapred.jobtracker.taskScheduler = org.apache.hadoop.mapred.FairScheduler

3) 重启节点hadoop

能力调度器(Capacity Scheduler)-

3. shuffle和排序

shuffle特指map输出后到reduce运行前获得输入的整个过程,它是MapReduce的心脏,属于不断被优化和改进的代码库的一部分,下面主要针对0.20版本。

Map端

1)Map输出首先放在内存缓冲区(io.sort.mb属性定义,默认100MB);

2)守护进程会将缓冲区的数据按照目标reducer划分红不一样的分区(partition),同时按键进行内排序;若是客户端定义了combiner,则combiner会在排序后运行,继续压缩缓存区的数据;

3)缓冲区上定义了一个阈值(io.sort.spill.percent,默认为0.8),当存储内容达到这个值时,缓冲区的值会被写到本地文件中(mapred.local.dir定义,能够是一个或多个目录);这种文件会有多个,每一个的内容都是按照reducer分区且局部排序的。这个过程简称spill to disk;

4)Map输出完毕前,这些中间的输出文件会合并成一个已分区且已排序的输出文件中,合并会分屡次,每次合并的中间文件个数有io.sort.factor来定义,默认是10;这个过程也会伴随着combiner的运行,min.num.spills.for.combine定义了运行combiner以前溢出写的次数;

5)写磁盘时能够压缩文件。mapred.compress.map.output设置为true,mapred.map.output.compression.codec指定压缩实现类;

map任务完成后,会通知父tasktracker状态已更新,而后tasktracker经过心跳通知jobtracker。下面的reduce所在的tasktracker有一个线程按期询问jobtracker以便得到map输出的位置,直到它得到全部输出的位置。

Reduce端

1)每一个map任务的完成时间可能不一样,但只要有一个任务完成,reduce任务得知后就开始复制对应它的输出,复制线程数由mapred.reduce.parallel.copies定义,默认为5;

2)若是map输出至关小,则不用复制到文件中,而是reduce tasktracker的内存中。缓冲区大小由mapred.job.shuffle.input.buffer.percent定义用于此用途的堆空间的百分比,默认0.7;一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent,默认值为0.66)或达到reduce输出阈值(mapred.inmem.merge.threshold,默认值为1000),则合并后溢出写到磁盘中;

3)随着磁盘上副本的增多,后台线程会将它们合并为更大的排好序的文件。为了合并,压缩的map输出必须在内存中被解压缩;

4) 复制完全部的map输出后,reduce任务进入合并阶段(sort phase,合并多个文件,并按键排序)。io.sort.factor定义了每次合并数,默认为10,即每10个map输出合并一次。会有不少个合并后的中间文件。

5)最后直接把中间文件数据输入给reduce函数,对已排序输出中的每一个键都要调用reduce函数,此阶段的输出直接写到HDFS中。

配置的调优

总原则:给shuffle过程尽可能多提供内存空间,但也要确保map函数和reduce函数能获得足够的内存。

运行map和reduce任务的JVM内存大小有mapred.child.java.opts属性设置。

在map端,避免屡次溢出写磁盘来得到最佳性能。计数器spilled.records计算在做业运行整个阶段中溢出写磁盘的记录数,大则代表写磁盘太频繁;

在reduce端,中间数据所有驻留在内存中就能获得最佳性能。若是reduce函数的内存需求不大,那么把mapred.inmem.merg.threshold设置为0,把mapred.job.reduce.input.buffer.percent设置为1会带来性能的提高。

4. 任务的执行

Hadoop发现一个任务运行比预期慢的时候,它会尽可能检测,并启动另外一个相同的任务做为备份,即“推测执行”(speculative execution)。

推测执行是一种优化措施,并不能使做业运行更可靠。默认启用,但能够单独为map/reduce任务设置,mapred.map.tasks.speculative.execution和mapred.reduce.tasks.speculative.execution。开启此功能会减小整个吞吐量,在集群中倾向于关闭此选项,而让用户根据个别做业须要开启该功能。

Hadoop为每一个任务启动一个新JVM须要耗时1秒,对于大量超短任务若是重用JVM会提高性能。当启用JVM重用后,JVM不会同时运行多个任务,而是顺序执行。tasktracker能够一次启动多个JVM而后同时运行,接着重用这些JVM。控制任务重用JVM的属性是mapred.job.reuse.jvm.num.tasks,它指定给定做业每一个JVM运行的任务的最大数,默认为1,即无重用;-1表示无限制即该做业的全部的任务都是有一个JVM。

在map/reduce程序中,能够经过某些环境属性(Configuration)得知做业和任务的信息。

mapred.job.id              做业ID,如job_201104121233_0001

mapred.tip.id               任务ID,如task_201104121233_0001_m_000003

mapred.task.id             任务尝试ID,如attempt_201104121233_0001_m_000003_0

mapred.task.partition    做业中任务的ID,如3

mapred.task.is.map       此任务是否为map任务,如true


MapReduce类型和格式


1. MapReduce的类型

map(K1, V1) –> list (K2, V2)                // 对输入数据进行抽取过滤排序等操做

combine(K2, list(V2)) –> list(K2, V2)   // 为了减小reduce的输入,须要在map端对输出进行预处理,相似reduce。不是全部的reduce都在部分数据集上有效,好比求平均值就不能简单用于combine

partition(K2, V2) –> integer                //将中间键值对划分到一个reduce分区,返回分区索引号。分区内的键会排序,相同的键的全部值会合成一个组(list(V2))

reduce(K2, list(V2)) –> list(K3, V3)   //每一个reduce会处理具备某些特性的键,每一个键上都有值的序列,是经过对全部map输出的值进行统计得来的;当得到一个分区后,tasktracker会对每条记录调用reduce。

默认的map和reduce函数是IdentityMapper和IdentityReducer,均是泛型类型,简单的将全部输入写到输出中。默认的 partitioner是HashPartitioner,对天天记录的键进行哈希操做以决定该记录属于那个分区让reduce处理。

输入数据的类型有输入格式(InputFormat类)进行设置,其它的类型经过JobConf上的方法显示设置。这里显式设置中间和最终输出类型的缘由是由于Java语言的泛型实现是type erasure。另外若是K2和K3是相同类型,就不须要调用setMapOutputKeyClass(),由于它将调用setOutputKeyClass()来设置。

2. 输入格式

2.1输入分片与记录

一个输入分片(split)是由单个map处理的输入块(分片个数即map所需的tasktracker个数),每一个分片包含若干记录(key+value),map函数依次处理每条记录。输入分片表示为InputSplit接口,其包含一个以字节为单位的长度和一组存储位置,分片不包含数据自己,而是指向数据的引用。

InputSplit是由InputFormat建立的,通常无需应用开发人员处理。InputFormat负责产生输入分片并将它们分割成记录。

1) JobClient调用InputFormat.getSplites()方法,传入预期的map任务数(只是一个参考值);

2)InputFormat计算好分片数后,客户端将它们发送到jobtracker,jobtracker便使用其存储位置信息来调度map任务从而在tasktracker上处理这些分片数据。

3)在tasktracker上,map任务把输入分片传给InputFormat的getRecordReader()方法来得到这个分片的RecordReader;RecordReader基本上就是记录上的迭代器,map任务用一个RecordReader来生成记录的键值对,而后在传给map函数。

2.2 FileInputFormat

输入路径可由多个函数FileInputFormat.addInputPath()指定,还能够利用FileInputFormat.setInputPathFilter()设置过滤器。输入分片的大小有上个属性控制:分片最小字节数,分片最大字节数和HDFS数据块字节数。

mapred.min.split.size, mapred.max.split.size, dfs.block.size

计算公式是:

max(minSplitSize, min(maxSplitSize, blockSize))

没有特殊需求,应该尽可能让分片大小和数据块大小一致。若是HDFS中存在大批量的小文件,则须要使用CombineFileInputFormat将多个文件打包到一个分片中,以便mapper能够处理更多的数据。一个能够减小大量小文件的方法(适合于小文件在本地文件系统,在上传至HDFS以前将它们合并成大文件)是使用SequenceFile将小文件合并成一个或多个大文件,能够将文件名做为键,文件内容做为值。

有时候不但愿输入文件被切分,只需覆盖InputFormat的isSplitable()方法返回false便可。

有时候map程序想知道正在处理的分片信息,能够经过Configuration中的属性获得,包括map.input.file(正在处理的输入文件的路径),map.input.start(分片开始处的字节偏移量), map.input.length(分片的字节长度)。

有时候map想访问一个文件的全部内容,须要一个RecordReader来读取文件内容做为record的值。可行的方法是实现一个FileInputFormat的子类,将文件标记为不可切分,同时指定一个特定的RecordReader;该RecordReader只是在第一次next()时返回文件的内容。

2.3 文本输入

TextInputFormat是默认的InputFormat。每条记录是一行输入。键是LongWritable类型,存储该行在整个文件中的字节偏移量;值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。因为一行的长度不定,因此极易出现split分片会跨越HDFS的数据块。

KeyValueTextInputFormat将文件的每一行看做一个键值对,使用某个分界符进行分隔,好比制表符。Hadoop默认输出的TextOutputFormat格式即键值对为一行组成一个文件,处理这类文件就可使用键值文本输入格式。

NLineInputFormat能够保证map收到固定行数的输入分片,键是文件中行的字节偏移量,值是行内容。默认为1,即一行为一个分片,送给每一个map。

2.4 二进制输入

SequenceFileInputFormat存储二进制的键值对的序列。顺序文件SequenceFile是可分割的,也支持压缩,很符合MapReduce数据的格式。

2.5 多种输入

Hadoop也支持在一个做业中对不一样的数据集进行链接(join),即定义多个不一样的数据输入源,每一个源对应不一样的目录、输入格式和Map函数。

MultipleInputs.addInputpath(conf, inputPath, TextInputFormat.class, MaxTemperatureMapper.class);

2.6 数据库输入和输出

DBInputFormat用于使用JDBC从关系数据库中读取数据,但只适合少许的数据集。若是须要与来自HDFS的大数据集链接,要使用MultipleInputs。

在关系数据库和HDFS之间移动数据的另外一个方法是Sqoop

HBase和HDFS之间移动数据使用TableInputFormat和TableOutputFormat。

3. 输出格式

TextOutputFormat是默认的输出格式,它把每条记录写为文本行,键和值能够是任意类型。

SequenceFileOutputFormat将输出写入一个顺序文件,是二进制格式。MapFileOutputFormat把MapFile做为输出,键必须顺序添加,因此必须确保reducer输出的键已经排好序。

FileOutputFormat及其子类产生的文件放在输出目录下,每一个reducer一个文件而且文件由分区号命名,如part-00000,part-00001等。有时候须要对文件名进行控制,或让每一个reduce输出多个文件,则可以使用MultipleOutputFormat和MultipleOutputs类。

MultipleFileOuputFormat能够将数据写到多个文件,关键是如何控制输出文件的命名。它有两个子类:MultipleTextOutputFormat和MultipleSequenceFileOutputFormat。在使用多文件输出时,只需实现它们任何一个的子类,并覆盖generateFileNameForKeyValue()返回输出文件名。

MultipleOutputs类不一样的是,能够为不一样的输出产生不一样的类型。

MultipleOutputs.addMultiNameOutput(conf, “name”, TextOutputFormat.class, KeyClass, valueClass);

新版本Hadoop中上述两个多输出类也合并。

FileOutputFormat的子类会产生输出文件,即便文件是空的。可使用LazyOutputFormat来去除空文件。

MapReduce的特性


这章主要总结MapReduce的高级特性,包括计数器,数据集的排序和链接。

1. 计数器

计数器是一种收集做业统计信息的有效手段,因为质量控制或应用统计。计数器还可辅助诊断系统故障。

Hadoop为每一个做业维护若干内置计数器,以描述该做业的各项指标。计数器由关联任务维护,并按期(3秒)传到tasktracker,再由tasktracker传给jobtracker(5秒,心跳)。一个任务的计数器值每次都是完整传输的,而非增量值。

MapReduce容许用户编写程序定义计数器,通常是由一个Java枚举(enum)类型定义。枚举类型的名称即计数器组名称,枚举类型的字段即计数器名称。计数器在做业实例级别是全局的,MapReduce框架会跨全部的map和reduce来统计这些计数器,并在做业结束时产生一个最终的结果。

enum Temperature {

    MISSING, MAlFORMED

}

context.incrCounter(Temperature.MISSING, 1);

MapReduce同时支持非枚举类型的动态计数器。

context.incrContext(String group, String counter, int amount);

计数器能够经过不少方式获取,Web界面和命令行(hadoop job -counter指令)以外,用户能够用Java API获取计数器的值。

RunningJob job = jobClient.getJob(JobID.forName(id));

Counters counters = job.getCounters();

long missing = counters.getCounter(MaxTemperatue.Temperature.MISSING);

2. 排序

排序是MapReduce的核心技术,尽管应用程序自己不须要对数据排序,但可使用MapReduce的排序功能来组织数据。默认状况下,MapReduce根据输入记录的键对数据排序。键的排列顺序是由RawComparator控制的,规则以下:

1)若属性mapred.output.key.comparator.class已设置,则使用该类的实例;

2)不然键必须是WritableComparable的子类,并使用针对该键类的已登记的comparator;

3)若是尚未已登记的comparator,则使用RawComparator将字节流反序列化为一个对象,再由WritableComparable的compareTo()方法进行操做。

全排序

如何用Hadoop产生一个键全局排序的文件?(最好的回答是使用Pig或Hive,二者都可使用一条指令进行排序)

大体方法是,想办法建立一系列排好序的文件,并且这些文件直接也是排序的,比方说第一个文件的值都不第二个文件的值小,则简单的拼装这些文件就能够获得全局排序的结果。问题是如何划分这些文件,并把原始文件的值放入这些排序的文件中?可使用map的partition来将某一范围的键放入对于的reduce,每一个reduce的输入能够保证已排序(局部排序),默认直接输出到part-000×,那全部这些输出组合成一个文件就是全局排序的。为了获得合适的范围,须要对全部输入数据进行统计,实际作法是经过抽样,Hadoop提供InputSampler和IntervalSampler。使用抽样函数事先对input数据进行抽样,获得抽样范围,而后将范围写入分布式缓存,供集群上其它任务使用。

DistributedCache.addCacheFile(cacheFile, conf);

DistributedCache.createSymlink(conf);

辅助排序

MapReduce框架在记录达到reducer以前按键对记录排序,但键所对应的值并无排序。大多状况下不需考虑值在reduce函数中的出现顺序,可是,有时也须要经过对键进行排序和分组等以实现对值的排序。

例子:设计一个MapReduce程序以计算每一年最高气温。

1)使用组合键IntPair,将年份和睦温都做为键;

2)按照年份来分区和分组,但排序须要按照年份升序和睦温降序。

conf.setPartitionerClass();

conf.setOutputKeyComparatorClass();

conf.setOutputValueGroupingComparator();

3 链接

MapReduce能执行大型数据集间的“链接”操做。

Map端链接指在数据到达map函数以前就执行链接操做。为达到此目的,各map的输入数据必须先分区而且以特定方式排序。各个数据集被划分红相同数量的分区,而且均按相同的键(链接键)排序。同一键的全部记录均会放在同一分区之中。

map链接操做能够链接多个做业的输出,只要这些做业的reduce数量相同,键相同,而且输出文件是不可切分的(如小于HDFS块大小,或gzip压缩)。利用org.apache.mapred.join包中的CompositeInputFormat类来运行一个map端链接,其输入源和链接类型(内链接或外链接)能够经过一个链接表达式进行配置。

Reduce链接不要求数据集符合特定结构,所以比Map链接更为经常使用。可是,因为数据集均通过mapReduce的shuffle过程,因此reduce端链接的效率每每更低一些。基本思路是mapper为各个记录标记源,而且使用链接键做为map输出键,使键相同的记录放在同一个reducer中。

1)可使用MultipleInputs来解析和标注各个源;

2)先将某一个数据源传输到reduce。举天气数据为例,气象站信息(气象站id和名字)以气象站ID+“0”为组合键,名字为值,可是按照ID来分区和分组;气象站天气状况(气象站id,时间和睦温)以气象站ID+“1”为组合键,气温为值,可是按照ID来分区和分组。两组数据通过不一样的map以后,具备相同的ID的记录被合并做为一个记录输入reduce程序,值列表中的第一个是气象站名称,其他的记录都是温度信息。reduce程序只须要取出一个值,并将其做为后续每条输出记录的一部分写到输出文件便可。

conf.setPartitionerClass();

conf.setOutputValueGroupingComparator(Textpair.FirstComparator.class);

4 边数据分布(side data)

边数据是做业所需的额外的只读数据,已辅助处理主数据集。面临的挑战是如何让全部的map和reduce都能方便高效地使用边数据。

1)若是仅需向任务传递少许元数据,则能够经过Configuration来设置每一个job的属性,则map/reduce能够覆盖configure()方法来获取这些元数据值。若是你设置的值是复杂对象,则须要处理序列化工做。在几百个做业同在一个系统中运行的状况下,这种方法会增多内存开销,并且元数据信息在全部节点都缓存,即便在不须要它的jobtracker和tasktracker上。

2)针对小数据量边数据的经常使用办法是将在map/reduce数据缓存在内存中,并经过重用JVM使tasktracker上同一个做业的后续任务共享这些数据。

3)分布式缓存 (-files, -archives)

a)启动做业时,使用files或archives传入元数据文件路径,

%hadoop jar job.jar MaxTempratureSample –file input/metadata/stations-fixed-width.txt input/all output

b)当tasktracker得到任务后,首先将jobtracker中的上述文件复制到本地磁盘,具体在${mapred.local.dir}/taskTracker/archive,缓存的容量是有限的,默认10GB,能够经过local.cache.size来设置。

c)在map/reduce程序中,直接读取“stations-fixed-width.txt”文件。同时能够经过JobConf.getLocalCacheFiles()和JobConf.getLocalCacheArchives()来获取本地文件路径的数组。

5 MapReduce类库

Hadoop还提供了一个MapReduce类库,方便完成经常使用的功能。

ChainMapper, ChainReducer        在一个MapReduce中运行多个mapper或reducer。(M+RM*)

IntSumReducer, LongSumReducer 对各键的全部整数值进行求和操做的reducer

TokenCounterMapper                  输出各单词及其出现的次数

RegexMapper                             检查输入值是否匹配某正则表达式,输出匹配字符串和计数器值

相关文章
相关标签/搜索