Hadoop是一个由Apache基金会所开发的分布式系统基础架构,主要解决,海量数据的存储和海量数据的分析计算问题。java
向namenode询问是否须要checkpointnode
checkpoint触发条件:web
- 定时时间到了
- edits中数据满了
在$HADOOP_HOME/data/tmp/dfs/namesecondary/current
这个目录中查看SecondaryNameNode目录结构。
在主namenode发生故障时(假设没有及时备份数据),能够从SecondaryNameNode恢复数据:
方案一:将SecondaryNameNode中数据拷贝到namenode存储数据的目录。
方案二:使用-importCheckpoint选项启动namenode守护进程,从而将SecondaryNameNode中数据拷贝到namenode目录中。(极慢)数据库
1.在namenode主机的$HADOOP_HOME/etc/hadoop
目录下建立dfs.hosts文件,在该文件中添加全部主机名(包括新节点)e.g.apache
hadoop102 hadoop103 hadoop104 hadoop105
2.在namenode的hdfs-site.xml配置文件中增长dfs.hosts属性浏览器
<property> <name>dfs.hosts</name> <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts</value> </property>
3.刷新namenode:hdfs dfsadmin -refreshNodes
4.更新resourcemanager节点:yarn rmadmin -refreshNodes
5.在namenode的slaves文件中增长新主机名称
6.单独命令启动新的数据节点和节点管理器hadoop-daemon.sh start datanode
yarn-daemon.sh start nodemanager
缓存
1.在namenode的/opt/module/hadoop-2.7.2/etc/hadoop目录下建立dfs.hosts.exclude文件,向其中添加要退役的主机名称
2.在namenode的hdfs-site.xml配置文件中增长dfs.hosts.exclude属性安全
<property> <name>dfs.hosts.exclude</name> <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude</value> </property>
3.刷新namenode、刷新resourcemanagerhdfs dfsadmin -refreshNodes
yarn rmadmin -refreshNodes
4.在web浏览器端检查,退役节点的状态为decommission in progress(退役中),说明数据节点正在复制块到其余节点。
5.等待退役节点状态为decommissioned(全部块已经复制完成),中止该节点及节点资源管理器。hadoop-daemon.sh stop datanode
yarn-daemon.sh stop nodemanager
性能优化
PS:若是副本数是3,服役的节点小于等于3,是不能退役成功的,须要修改副本数后才能退役。
6.从include文件中删除退役节点,再运行刷新节点的命令
7.从namenode的slave文件中删除退役节点服务器
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带不少额外的信息(各类校验信息,header,继承体系等),不便于在网络中高效传输。因此,hadoop本身开发了一套序列化机制(Writable),精简、高效。
boolean
--> BooleanWritable
byte
--> ByteWritable
int
--> IntWritable
float
--> FloatWritable
long
--> LongWritable
double
--> DoubleWritable
string
--> Text
map
--> MapWritable
array
--> ArrayWritable
maptask调用InputFormat,InputFormat又调用RecordReader从输入文件中解析出一个个K/V。
将解析出的K/V交给客户端map()方法处理,并产生新的K/V。
当map()方法处理完数据后,通常会调用OutputCollector.collect()输出结果,在该函数内部调用Partitioner对K/V进行分区,且根据K进行分区内排序,并写入一个环形缓冲区中。
当环形缓冲区达到80%时,会将数据写到本地磁盘上生成一个临时文件。
将数据写入本地磁盘以前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操做。
当全部数据处理完成后,MapTask对全部临时文件进行一次合并,以确保最终只会生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
编写一个类继承FileInputFormat
public class WholeInputFormat extends FileInputFormat<NullWritable, BytesWritable> { @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // 自定义recordreader WholeRecordReader recordReader = new WholeRecordReader(); recordReader.initialize(split, context); return recordReader; } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } }
自定义一个RecordReader
public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit split; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (FileSplit) split; this.conf = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) split.getLength()]; Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); FSDataInputStream fis = null; try { fis = fs.open(path); IOUtils.readFully(fis, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(fis); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return this.value; } @Override public float getProgress() throws IOException, InterruptedException { return processed ? 1 : 0; } @Override public void close() throws IOException { } }
自定义RecordReader重点是nextKeyValue()
方法,它定义了如何封装向maptask输入的键值对,本例中是将每一个文件的全部内容做为value输入到maptask中。
// 若是不设置InputFormat,它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
通过以上设置,可有效减小切片数
MapReduce保证每一个reducer的输入都是按键有序排列的,系统执行排序的过程(即将map输出做为输入传给reducer)称为shuffle。
public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }
默认采用的是org.apache.hadoop.mapreduce.lib.partition.HashPartitioner
,当中的getPartition()
方法是根据key的hashCode对reduceTasks个数取模获得的。
1.继承Partitioner,重写getPartition()方法
public class MyPartitioner extends Partitioner<K, V> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { int partition=xxx; if (exp0) { partition = 0; }else if (exp1) { partition = 1; }else if (exp2) { partition = 2; }else if (exp3) { partition = 3; } return partition; } }
2.在job驱动中,设置自定义partitioner
job.setPartitionerClass(MyPartitioner.class)
3.根据自定义partitioner的逻辑设置相应数量的reduce task
job.setNumReduceTasks(5);
PS:
若是reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
若是1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会产生异常;
若是reduceTask的数量=1,则无论mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000。
bean对象实现WritableComparable接口重写compareTo方法,就能够实现排序
@Override public int compareTo(Bean o) { // 倒序排列,从大到小 return this.xxx > o.getXxx() ? -1 : 1; }
对reduce阶段的数据根据某一个或几个字段进行分组。
public class OrderGroupingComparator extends WritableComparator { protected OrderGroupingComparator() { // 记得此处设置true,不然会报空指针异常 super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; return abean.getOrder_id().compareTo(bbean.getOrder_id()); } }
combiner是MR程序中Mapper和Reducer以外的一种组件,combiner组件的父类就是Reducer,combiner和reducer的区别在于运行的位置:Combiner是在每个maptask所在的节点运行,Reducer是接收全局全部Mapper的输出结果。combiner的意义就是对每个maptask的输出进行局部汇总,以减少网络传输量。
1.自定义一个combiner继承Reducer,重写reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable v :values){ count = v.get(); } context.write(key, new IntWritable(count)); } }
2.在job驱动类中设置
job.setCombinerClass(WordcountCombiner.class);
combiner可以应用的前提是不能影响最终的业务逻辑,并且,combiner的输出kv应该跟reducer的输入kv类型要对应起来。
从控制台观察使用combiner先后变化:
若是是多张表的操做都是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。
在map端缓存多张表,提早处理业务逻辑,这样增长map端业务,减小reduce端数据的压力,尽量的减小数据倾斜。
job.addCacheFile(new URI("file:/e:/cache/pd.txt")); // 缓存普通文件到task运行节点
1.自定义一个outputformat继承org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { // 建立一个RecordWriter return new FilterRecordWriter(job); } }
2.具体的写数据RecordWriter
public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream atguiguOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 建立输出文件路径 Path atguiguPath = new Path("e:/atguigu.log"); Path otherPath = new Path("e:/other.log"); // 3 建立输出流 atguiguOut = fs.create(atguiguPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“atguigu”输出到不一样文件 if (key.toString().contains("atguigu")) { atguiguOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 if (atguiguOut != null) { atguiguOut.close(); } if (otherOut != null) { otherOut.close(); } } }
// 开启map端输出压缩 configuration.setBoolean("mapreduce.map.output.compress", true); // 设置map端输出压缩方式 configuration.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class);
// 设置reduce端输出压缩开启 FileOutputFormat.setCompressOutput(job, true); // 设置压缩的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // FileOutputFormat.setOutputCompressorClass(job, Lz4Codec.class); // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
ReduceTask从各个MapTask远程拷贝一片数据,若是某一片数据大小超过阈值,则写到磁盘上,不然直接放在内存中。
在远程拷贝数据时,reducetask后台启动了两个线程对内存和磁盘上的文件进行合并,以防止内存或硬盘使用过多。
用户编写的reduce()方法输入数据是按照key进行汇集的,为了将key相同的数据聚在一块儿,Hadoop采用了基于排序的策略。因为各个MapTask已经实现对本身的处理结果进行了局部排序,所以,ReduceTask只需对全部数据进行一次归并排序便可。
reduce()函数将计算结果写到HDFS上。
用户在mr应用程序中配置能够生效:
mapreduce.map.memory.mb
一个Map Task可以使用的资源上限(单位:MB),默认为1024。若是Map Task实际使用的资源量超过该值,则会被强制杀死。mapreduce.reduce.memory.mb
一个Reduce Task可以使用的资源上限(单位:MB),默认为1024。若是Reduce Task实际使用的资源量超过该值,则会被强制杀死。mapreduce.map.java.opts
Map Task的JVM参数,你能够在此配置默认的java heap size等参数, e.g."-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc" (@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: ""mapreduce.reduce.java.opts
Reduce Task的JVM参数,你能够在此配置默认的java heap size等参数, e.g."-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc", 默认值: ""mapreduce.map.cpu.vcores
每一个Map task可以使用的最多cpu core数目, 默认值: 1mapreduce.reduce.cpu.vcores
每一个Reduce task可以使用的最多cpu core数目, 默认值: 1应该在yarn启动以前就配置在服务器的配置文件中才能生效:
yarn.scheduler.minimum-allocation-mb 1024
给应用程序container分配的最小内存yarn.scheduler.maximum-allocation-mb 8192
给应用程序container分配的最大内存shuffle性能优化的关键参数,应在yarn启动以前就配置好:
mapreduce.task.io.sort.mb 100
shuffle的环形缓冲区大小,默认100mmapreduce.map.sort.spill.percent 0.8
环形缓冲区溢出的阈值,默认80%mapreduce.map.maxattempts
每一个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。mapreduce.reduce.maxattempts
每一个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。mapreduce.map.failures.maxpercent
当失败的Map Task失败比例超过该值为,整个做业则失败,默认值为0. 若是你的应用程序容许丢弃部分输入数据,则该该值设为一个大于0的值,好比5,表示若是有低于5%的Map Task失败(若是一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个做业扔认为成功。mapreduce.reduce.failures.maxpercent
当失败的Reduce Task失败比例超过该值为,整个做业则失败,默认值为0。mapreduce.task.timeout
Task超时时间,常常须要设置的一个参数,该参数表达的意思为:若是一个task在必定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,多是卡住了,也许永远会卡主,为了防止由于用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。若是你的程序对每条输入数据的处理时间过长(好比会访问数据库,经过网络拉取数据等),建议将该参数调大,该参数太小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,至关于一个分布式的操做系统平台,而mapreduce等运算程序则至关于运行于操做系统之上的应用程序。