本篇文章将介绍shuffle的过程以及MapReduce中的其余一些组件。java
Shuffle实际上是一个过程,并非MapperReducer的一个组件,这个过程是从map输出数据,到reduce接收处理数据以前,横跨Mapper和Reducer两端的,以下图:node
shuffle分为Mapper阶段和Reducer阶段,下面就两个阶段作具体分析。正则表达式
每一个MapperTask有一个环形内存缓冲区,用于存储map任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值默认0.8(io.sort.spill.percent),就会启动一个后台线程把环形缓冲区中的内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。shell
这里有一个map()方法(简称m)写入的速度和spill溢出(简称s)的速度,m的数据在内存中移动,s是数据由内存到磁盘,虽然s是磁盘的连续写,可是也比不上m的内存速度,产生的现象是s在前面跑,m在后面追,m的速度>s的速度,那么m确定在一个时间节点上会追上s,那么当m追上s的时候,m写入环形缓冲区的线程就会被阻塞暂停,直到s将环形缓冲区中的数据所有写入到磁盘中,m的写入线程才会被启动。因此环形缓冲区大小和阀值的大小是能够根据业务进行调优的点。bash
写磁盘前,要partition、sort、Combiner。若是有后续的数据,将会继续写入环形缓冲区中,最终写入下一个溢出文件中。app
等最后记录写完,合并所有溢出写文件为一个分区且排序的文件。ide
若是在最终合并时,被合并的文件大于等于3个,则合并完会再执行一次Combiner,不然不会。工具
总体Mapper阶段的流程以下:oop
input->map->buffer->split(partition-sort-combiner)->merge(partition-sort-combiner(file>=3))->数据落地。性能
Reducer主动找Mapper获取本身负责的分区的数据,并不须要全部的Mapper都执行完成后再获取,哪一个Mapper执行完,当即就去复制。
复制后,来自多个Mapper的数据要进行merge合并操做。合并后进行分组、排序,造成k3v3,进入reduce处理,处理后产生的结果输出到目的地。
总体Reducer阶段流程以下:
fetch->merge(combiner)->grouping->sort->reduce->output。
Mapper的数量在默认状况下不可直接控制干预,Mapper的数量由输入的大小和个数决定。在默认状况下,最终input占据了多少block,就应该启动多少个Mapper。
此种状况下,若是有大量的小文件须要处理,则会形成Hadoop集群崩溃。大量的小文件,每一个小文件都独占一个Mapper处理线程,这样启动线程和关闭线程消耗的资源会很庞大,文件数量到达一个量级会直接致使集群崩溃。
鉴于以上状况,能够经过配置mapred.min.split.size来控制split的size的最小值。当每一个split的大小达不到设置的最小值,Hadoop会将这些达不到最小值的split拼接到一块儿,使用一个Mapper来处理这些文件,当大小超过最小值,才启动一个新Mapper进行处理。这样就能够避免Mapper线程过多致使集群崩溃的结果。
求一组数据的最大值或者最小值。
数据样例:
123 235345 234 654768 234 4545 324
public class MaxMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> { private int max = Integer.MIN_VALUE; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, NullWritable>.Context context) throws IOException, InterruptedException { int num = Integer.parseInt(value.toString()); max = max < num ? num : max; } @Override protected void cleanup(Mapper<LongWritable, Text, IntWritable, NullWritable>.Context context) throws IOException, InterruptedException { context.write(new IntWritable(max), NullWritable.get()); } }
public class MaxReducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> { private int max=Integer.MIN_VALUE; @Override protected void reduce(IntWritable k3, Iterable<NullWritable> v3s, Reducer<IntWritable, NullWritable, IntWritable, NullWritable>.Context context) throws IOException, InterruptedException { int num=Integer.parseInt(k3.toString()); max=max>num?max:num; } @Override protected void cleanup(Reducer<IntWritable, NullWritable, IntWritable, NullWritable>.Context context) throws IOException, InterruptedException { context.write(new IntWritable(max), NullWritable.get()); } }
public class MaxDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Max_Job"); job.setJarByClass(cn.tedu.max.MaxDriver.class); job.setMapperClass(cn.tedu.max.MaxMapper.class); job.setReducerClass(cn.tedu.max.MaxReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/maxdata")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/maxresult")); if (!job.waitForCompletion(true)) return; } }
按月份产生文件,统计每一个人的语数外及总分。
数据样例:
math.txt
1 zhang 85 2 zhang 59 3 zhang 95 1 wang 74 2 wang 67 3 wang 96 1 li 45 2 li 76 3 li 67
chinese.txt
1 zhang 89 2 zhang 73 3 zhang 67 1 wang 49 2 wang 83 3 wang 27 1 li 77 2 li 66 3 li 89
english.txt)
1 zhang 55 2 zhang 69 3 zhang 75 1 wang 44 2 wang 64 3 wang 86 1 li 76 2 li 84 3 li 93
//如下代码涉及到的重要方法 FileSplit inputSplit = (FileSplit) context.getInputSplit(); String fn = inputSplit.getPath().getName();
public class ScoreBean implements Writable { private String name; private String subject; private int month; private int score; //这里省去了如下方法,记得补上 //……get/set…… //……有参/无参构造…… //……read/write…… }
public class ScoreMapper extends Mapper<LongWritable, Text, Text, ScoreBean> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ScoreBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String attr[] = line.split(" "); String name = attr[1]; FileSplit fs = (FileSplit) context.getInputSplit(); String path = fs.getPath().getName(); String subject = path.substring(0, path.lastIndexOf(".")); ScoreBean sb = new ScoreBean(attr[1], subject, Integer.parseInt(attr[0]), Integer.parseInt(attr[2])); context.write(new Text(name), sb); } }
public class ScoreReducer extends Reducer<Text, ScoreBean, Text, NullWritable> { @Override protected void reduce(Text k3, Iterable<ScoreBean> v3s, Reducer<Text, ScoreBean, Text, NullWritable>.Context context) throws IOException, InterruptedException { String name = k3.toString(); Map<String, Integer> map = new HashMap<String, Integer>(); int count = 0; Iterator<ScoreBean> it = v3s.iterator(); while (it.hasNext()) { ScoreBean sb = it.next(); map.put(sb.getSubject(), sb.getScore()); count += sb.getScore(); } String result = name + " " + map.get("chinese") + " " + map.get("math") + " " + map.get("english") + " " + count; context.write(new Text(result), NullWritable.get()); } }
public class ScoreMonthPartitioner extends Partitioner<Text, ScoreBean>{ @Override public int getPartition(Text key, ScoreBean value, int numPartitions) { return value.getMonth()-1; } }
public class ScoreDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Scorec_Job"); job.setJarByClass(cn.tedu.score.ScoreDriver.class); job.setMapperClass(cn.tedu.score.ScoreMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ScoreBean.class); job.setReducerClass(cn.tedu.score.ScoreReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setPartitionerClass(ScoreMonthPartitioner.class); job.setNumReduceTasks(3); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/scoredata")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/scoresult")); if (!job.waitForCompletion(true)) return; } }
InputFormat:输入格式化器。
MapReduce开始阶段阶段,InputFormat类用来产生InputSplit,并把基于RecordReader它切分红record,造成Mapper的输入。
Hadoop自己提供了若干内置的InputFormat,其中若是不明确指定默认使用TextInputFormat。
做为默认的文件输入格式,用于读取纯文本文件,文件被分为一系列以LF或者CR结束的行,key是每一行的位置偏移量,是LongWritable类型的,value是每一行的内容,为Text类型。
一样用于读取文件,若是行被分隔符(缺省是tab)分割为两部分,第一部分为key,剩下的部分为value;若是没有分隔符,整行做为key,value为空。
用于读取sequence file。sequence file是Hadoop用于存储数据自定义格式的binary文件。它有两个子类:SequenceFileAsBinaryInputFormat,将key和value以BytesWritable的类型读出;SequenceFileAsTextInputFormat,将key和value以Text类型读出。
根据filter从sequence文件中取得部分知足条件的数据,经过setFilterClass指定Filter,内置了三种Filter,RegexFilter取key值知足指定的正则表达式的记录;PercentFilter经过指定参数f,取记录行数%f==0的记录;MD5Filter经过指定参数f,取MD5(key)%f==0的记录。
0.18.x版本新加入,能够将文件以行为单位进行split,好比文件的每一行对应一个mapper。获得的key是每一行的位置偏移量(LongWritable类型),value是每一行的内容,Text类型。适用于行少列多的文件。
用于多个数据源的join。
能够经过job.setInputFormatClass(XxxInputFormat.class);来设定选用哪一种InputFormat。
若是以上InputFormat不够用,咱们也能够本身定义InputFormat。
全部InputFormat都要直接或间接的继承InputFormat抽象类。
InputFormat抽象类中主要定义了以下两个方法:
① getSplits(JobContext context)
生产InputSplit集合的方法,此方法接受JobContext接受环境信息,获得要处理的文件信息后,进行逻辑切割,产生InputSplit集合返回。
List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
②createRecordReader(InputSplit split,TaskAttemptContext context)
此方法返回RecordReader对象。一个RecordReader包含方法描述如何从InputSplit切分出要送入Mapper的K一、V1对。
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException;
咱们能够直接继承InputFormat,但更多的时候咱们会选择继承他的一个实现子类,好比:FileInputFormat。此类是全部来源为文件的InputFormat的基类,默认的TextInputFormat就继承自它。
FileInputFormat实现了InputFormat抽象类,实现了getSplits方法,根据配置去逻辑切割文件,返回FileSplit的集合,并提供了isSplitable()方法,子类能够经过在这个方法中返回boolean类型的值代表是否要对文件进行逻辑切割,若是返回false则不管文件是否超过一个Block大小都不会进行切割,而将这个文件做为一个逻辑块返回。而对createRecordReader方法则没有提供实现,设置为了抽象方法,要求子类实现。
若是想要更精细的改变逻辑切块规则能够覆盖getSplits方法本身编写代码实现。
而更多的时候,咱们直接使用父类中的方法而将精力放置在createRecordReader上,决定如何将InputSplit转换为一个个的Recoder。
读取score1.txt文件,从中每4行读取成绩,其中第一行为姓名,后3行为单科成绩,计算总分,最终输出为姓名:总分格式的文件。
文件内容样例:
张三 语文 97 数学 77 英语 69 李四 语文 87 数学 57 英语 63 王五 语文 47 数学 54 英语 39
分析:
在此例子中,须要按照每三行一次进行读取称为一个InputSplit,通过比较可使用NLineInputFormat,可是为了演示,使用自定义InputFormat来作这件事。
/** * 自定义InputFormat类,来实现每三行做为一个Recorder触发一次Mapper的效果 */ public class MyFileInputFormat extends FileInputFormat<Text, Text> { /** * 由于要每三行读取做为一个Recoder,因此若是切块形成块内数据行数不是3的倍数可能形成处理出问题 * 所以返回false,标识文件不要进行切块 */ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } /** * 返回自定义的MyRecordReader */ @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new MyRecordReader(); } }
/** * 自定义的RecordReader,代表了如何将一个InputSplit切割出一个个的Recorder * */ public class MyRecordReader extends RecordReader<Text, Text> { private LineReader lineReader = null; private Text key = null; private Text value = null; private boolean hasMore = true; /** * 初始化的方法,将在InputSplit被切割前调用 */ @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //获取文件路径 FileSplit fs = (FileSplit) split; Path path = fs.getPath(); //获取文件系统 Configuration conf = context.getConfiguration(); FileSystem fileSystem = path.getFileSystem(conf); //从文件系统中读取文件路径获得文件流 FSDataInputStream fin = fileSystem.open(path); //将文件流包装为行读取器 lineReader = new LineReader(fin); } /** * 下一个keyvalue方法 * 返回值表当前是否读取到了新的纪录 */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { key = new Text(); value = new Text(); Text temp = new Text(); int count = 0; for (int i = 0; i < 4; i++) { int len = lineReader.readLine(temp); if (len == 0) { hasMore = false; break; } else { count++; value.append(temp.getBytes(), 0, temp.getLength()); value.append("\r\n".getBytes(), 0, "\r\n".length()); temp.clear(); } } key.set(count+""); return count != 0; } /** * 获取当前key方法 */ @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } /** * 获取当前value方法 */ @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } /** * 获取处理进度的方法,返回0.0f-1.0f之间的值表示进度 */ @Override public float getProgress() throws IOException, InterruptedException { if (hasMore) return 0.0f; else return 1.0f; } /** * 关闭资源的方法 * 当切割InputSplit结束会被调用用来释放资源 */ @Override public void close() throws IOException { lineReader.close(); } }
/** * 计算成绩的Mapper * */ public class ScoreMapper extends Mapper<Text, Text, Text, Text> { public void map(Text key, Text value, Context context) throws IOException, InterruptedException { String str = value.toString(); String lines [] = str.split("\r\n"); String name = lines[0]; for(int i = 1;i<lines.length;i++){ context.write(new Text(name), new Text(lines[i])); } } }
/** * 计算成绩的Reducer */ public class ScoreReducer extends Reducer<Text, Text, Text, IntWritable> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String name = key.toString(); int countFen = 0; for(Text v : values){ String line = v.toString(); String subject = line.split(" ")[0]; int fen = Integer.parseInt(line.split(" ")[1]); countFen += fen; } context.write(new Text(name), new IntWritable(countFen)); } }
/** * ScoreDriver */ public class ScoreDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "score_job"); job.setJarByClass(ScoreDriver.class); job.setMapperClass(ScoreMapper.class); job.setReducerClass(ScoreReducer.class); //指定InputFormat为自定义的MyFileInputFormat job.setInputFormatClass(MyFileInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/score/score1.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/score/result")); if (!job.waitForCompletion(true)) return; } }
MultipleInputs能够将多个输入组装起来,同时为Mapper提供数据,当咱们但愿从多个来源读取数据时可使用。甚至,在指定来源时能够为不一样来源的数据指定不一样的InputFormat和Mapper以应对不一样格式的输入数据。
此类上提供的静态方法有:
/** * 指定数据来源及对应的InputFormat */ MultipleInputs.addInputPath(job, path, inputFormatClass); /** * 指定数据来源及对应的InputFormat 和 Mapper */ MultipleInputs.addInputPath(job, path, inputFormatClass, mapperClass);
改造上述案例,同时从另外一个文件score2.txt中读取数据统计成绩。score2.txt中的数据是一行为一个学生的成绩。
数据样例:
赵六 56 47 69 陈七 73 84 91 刘八 45 56 66
代码:
/** * 计算成绩的Mapper */ public class ScoreMapper2 extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String attrs [] = line.split(" "); String name = attrs[0]; for(int i = 1;i<attrs.length;i++){ if(i == 1){ context.write(new Text(name), new Text("语文\t"+attrs[i])); }else if(i == 2){ context.write(new Text(name), new Text("数学\t"+attrs[i])); }else if(i == 3){ context.write(new Text(name), new Text("英语\t"+attrs[i])); } } } }
/** * ScoreDriver */ public class ScoreDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "score_job"); job.setJarByClass(ScoreDriver.class); //job.setMapperClass(ScoreMapper.class); job.setReducerClass(ScoreReducer.class); //指定InputFormat为自定义的MyFileInputFormat job.setInputFormatClass(MyFileInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/score/score1.txt")); MultipleInputs.addInputPath(job, new Path("hdfs://hadoop01:9000/score/score1.txt"), MyFileInputFormat.class,ScoreMapper.class); MultipleInputs.addInputPath(job, new Path("hdfs://hadoop01:9000/score/score2.txt"), TextInputFormat.class,ScoreMapper2.class); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/score/result")); if (!job.waitForCompletion(true)) return; } }
OutputFormat:输出格式化器。
MapReduce结束阶段,OutputFormat类决定了Reducer如何产生输出。
Hadoop自己提供了若干内置的OutputFormat,其中若是不明确指定默认使用TextOutputFormat。
(实现OutputFormat接口)- 全部输出到文件的OutputFormats的基类。
以行分隔、包含制表符定界的键值对的文本文件格式。
二进制键值数据的压缩格式。
原生二进制数据的压缩格式。
一种使用部分索引键的格式。
使用键值对参数写入文件的抽象类。
输出多个以标准行分割、制表符定界格式的文件。
输出多个压缩格式的文件。
能够经过job.setOutputFormatClass(XxxoutputFormat.class);来设定选用哪一种OutputFormat。
若是以上OutputFormat不够用,一样也能够本身定义OutputFormat。
全部的OutputFormat都要直接或间接的继承OutputFormat抽象类
OutputFormat抽象类中定义了以下的抽象方法:
①getRecordWriter(TaskAttemptContext context)
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; public abstract void checkOutputSpecs(JobContext context ) throws IOException,InterruptedException; public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException;
咱们能够直接继承OutputFormat,但更多的时候咱们会选择继承他的一个实现子类,好比FileOutputFormat。- 此类是全部目的地为文件的OutputFormat的基类,例如默认的TextOutputFormat就继承自它。
FileOutputFormat实现了OutputFormat接口,默认实现了checkOutputSpecs和getOutputCommitter方法,并将getRecordWriter()设置为抽象方法要求咱们去实现。
若是想要更精细的改变逻辑能够本身去编写getOutputCommitter和checkOutputSpecs方法。
而更多的时候,咱们直接使用父类中的方法而将精力放置在getRecordWriter上,决定如何产生输出。
编写wordcount案例,并将输出按照'#'进行分割,输出为一行。
数据样例:
hello tom hello joy hello rose hello joy hello jerry hello tom hello rose hello joy
代码以下:
public class MyRecordWriter<K,V> extends RecordWriter<K,V> { protected DataOutputStream out = null; private final byte[] keyValueSeparator; //public static final String NEW_LINE = System.getProperty("line.separator"); public static final String NEW_LINE = "#"; public MyRecordWriter(DataOutputStream out,String keyValueSeparator) { this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(); } @Override public void write(K key, V value) throws IOException, InterruptedException { if(key!=null){ out.write(key.toString().getBytes()); out.write(keyValueSeparator); } out.write(value.toString().getBytes()); out.write(NEW_LINE.getBytes()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { out.close(); } }
public class MyFileOutputFormat<K,V> extends FileOutputFormat<K,V> { @Override public RecordWriter<K,V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); Path file = getDefaultWorkFile(context, ""); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, false); return new MyRecordWriter<K,V>(fileOut, " "); } }
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String words [] = line.split(" "); for(String word : words){ context.write(new Text(word), new LongWritable(1)); } } }
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { Iterator<LongWritable> it = values.iterator(); long count = 0; while(it.hasNext()){ long c = it.next().get(); count += c; } context.write(key, new LongWritable(count)); } }
public class WCDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "WCJob"); job.setJarByClass(cn.tedu.wc.WCDriver.class); job.setMapperClass(cn.tedu.wc.WCMapper.class); job.setReducerClass(cn.tedu.wc.WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setOutputFormatClass(MyFileOutputFormat.class); FileInputFormat.setInputPaths(job, new Path("hdfs://yun01:9000/park/words.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://yun01:9000/park/result2")); if (!job.waitForCompletion(true)) return; } }
MultipleOutputs能够令一个Reducer产生多个输出文件。
为特定kv打上指定标记
MultipleOutputs<Text,LongWritable> mos = new MultipleOutputs<Text,LongWritable>(context); mos.write("flag", key, value); /** 为指定标记内容指定输出方式 能够指定多个 */ MultipleOutputs.addNamedOutput(job, "flag", XxxOutputFormat.class, Key.class, Value.class);
改造上面的wordcount案例,将首字母为a-j的输出到"small"中。其余输出到"big"中。
代码以下:
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String words [] = line.split(" "); for(String word : words){ context.write(new Text(word), new LongWritable(1)); } } }
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { MultipleOutputs<Text,LongWritable> mos = null; @Override protected void setup(Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { this.mos = new MultipleOutputs<Text,LongWritable>(context); } public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { Iterator<LongWritable> it = values.iterator(); long count = 0; while(it.hasNext()){ long c = it.next().get(); count += c; } if(key.toString().matches("[a-j][a-z]*")){ mos.write("small", key, new LongWritable(count)); }else{ mos.write("big", key, new LongWritable(count)); } } }
public class WCDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "WCJob"); job.setJarByClass(cn.tedu.wc.WCDriver.class); job.setMapperClass(cn.tedu.wc.WCMapper.class); job.setReducerClass(cn.tedu.wc.WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); MultipleOutputs.addNamedOutput(job, "small", MyFileOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "big", MyFileOutputFormat.class, Text.class, IntWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.242.101:9000/park/words.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.242.101:9000/park/result2")); if (!job.waitForCompletion(true)) return; } }
在reduce阶段进行,对mapper发送过来的数据会进行分组的操做,这个过程称为为Grouping。默认状况下会将k2相同的内容做为一组。
咱们能够经过job.setGroupingComparatorClass(MyGroupingComparator.class);方法本身指定Grouping规则。
改造WordCount案例,实现统计a-h 和 i-z开头的单词数量统计。
public class WCComparator extends WritableComparator { @Override public int compare(byte[] b1, int begin1, int len1, byte[] b2, int begin2, int len2) { try { DataInput in = new DataInputStream(new ByteArrayInputStream(b1,begin1,len1)); Text ta = new Text(); ta.readFields(in); in = new DataInputStream(new ByteArrayInputStream(b2,begin2,len2)); Text tb = new Text(); tb.readFields(in); if(ta.toString().matches("^[a-n][a-z]*$") && tb.toString().matches("^[a-n][a-z]*$")){ return 0; }else if(ta.toString().matches("^[o-z][a-z]*$") && tb.toString().matches("^[o-z][a-z]*$")){ return 0; }else{ return 1; } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e); } } }
所谓二次排序就是首先按照第一字段排序,而后再对第一字段相同的行按照第二字段排序,注意不能破坏第一个字段的排序顺序。
/** * 开发bean封装数据 */ public class NumBean implements WritableComparable<NumBean> { private int lnum; private int rnum; public NumBean() { } public NumBean(int lnum, int rnum) { this.lnum = lnum; this.rnum = rnum; } public int getLnum() { return lnum; } public void setLnum(int lnum) { this.lnum = lnum; } public int getRnum() { return rnum; } public void setRnum(int rnum) { this.rnum = rnum; } @Override public void readFields(DataInput in) throws IOException { this.lnum = in.readInt(); this.rnum = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(lnum); out.writeInt(rnum); } @Override public int compareTo(NumBean o) { if(this.lnum != o.getLnum()){ return this.lnum - o.getLnum(); }else{ return this.rnum - o.getRnum(); } } }
/** * 开发Mapper 用 NumBean做为k2 因为 NumBean覆盖了compareTo方法 能够实现自动二次排序 */ public class NumMapper extends Mapper<LongWritable, Text, NumBean, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NumBean, NullWritable>.Context context) throws IOException, InterruptedException { //1.读取行 String line = value.toString(); //2.切分出lnum 和 rnum String [] attrs = line.split(" "); int lnum = Integer.parseInt(attrs[0]); int rnum = Integer.parseInt(attrs[1]); //3.封装数据到bean NumBean nb = new NumBean(lnum,rnum); //4.发送数据 context.write(nb, NullWritable.get()); } }
/** * 开发Reducer输出结果 shuffle阶段已经完成了二次排序 此处直接输出便可 */ public class NumReducer extends Reducer<NumBean, NullWritable, IntWritable, IntWritable> { @Override protected void reduce(NumBean key, Iterable<NullWritable> values, Reducer<NumBean, NullWritable, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException { int lnum = key.getLnum(); int rnum = key.getRnum(); context.write(new IntWritable(lnum), new IntWritable(rnum)); } }
/** * 为了防止重复 数据被grouping成一条数据 形成结果丢失 自定义gourping过程 固定返回-1 表示不管什么状况都不合并数据 */ public class NumWritableComparator extends WritableComparator { @Override public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) { return -1; } }
/** * 开发Driver组装程序 */ public class NumDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "num_job"); job.setJarByClass(cn.tedu.mr.grouping.num.NumDriver.class); job.setMapperClass(cn.tedu.mr.grouping.num.NumMapper.class); job.setMapOutputKeyClass(NumBean.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(cn.tedu.mr.grouping.num.NumReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); job.setGroupingComparatorClass(NumWritableComparator.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/ndata")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/nresult")); if (!job.waitForCompletion(true)) return; } }
在开发MR程序时,可能遇到的数据分配不一致,形成程序性能降低的问题,这个问题称之为数据倾斜问题。
解决办法:
若是是由于shuffle分配数据不均匀形成数据倾斜,重写parition均匀分配数据便可。
若是是数据自己带有倾斜的特色没法经过修改parition来解决倾斜问题,那么能够经过如下几个方法尝试解决:
1.利用combiner减轻倾斜的状况。
2.将形成倾斜的数据拿出单独处理。
3.将一个mr拆分红多个mr下降倾斜形成的危害。
每一个小文件不管多小都会对应一个block,而每个block在NameNode中都要有元数据的记录,若是存在大量小文件,则NameNode中的大量空间都用来存放这些小文件的元数据信息,实际上是至关浪费的,对于NameNode的性能有比较大的影响。
当使用mapreduce处理大量小文件时,默认状况下mapreduce在进行切片操做时规则是和block切的规则同样,即一个block一个InputSplit,而一个InputSplit就对应一个Mapper,这样会形成开启大量的MapperTask,可是每一个MapperTask处理的数据量都颇有限。极端状况下开启大量Mapper耗费内存甚至可能形成程序的崩溃。
Hadoop Archive或者HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它可以将多个小文件打包成一个HAR文件,这样在减小namenode内存使用的同时,仍然容许对文件进行透明的访问。
HAR是在Hadoop file system之上的一个文件系统,所以全部fs shell命令对HAR文件都可用,只不过是文件路径格式不同,HAR的访问路径能够是如下两种格式:
har://scheme-hostname:port/archivepath/fileinarchive
har:///archivepath/fileinarchive(本节点)
第一,对小文件进行存档后,原文件并不会自动被删除,须要用户本身删除;
第二,建立HAR文件的过程其实是在运行一个mapreduce做业,于是须要有一个hadoop集群运行此命令。
第一,一旦建立,Archives便不可改变。要增长或移除里面的文件,必须从新建立归档文件。
第二,要归档的文件名中不能有空格,不然会抛出异常,能够将空格用其余符号替换(使用-Dhar.space.replacement.enable=true 和-Dhar.space.replacement参数)。
hadoop archive -archiveName <NAME>.har -p <parent path>[-r <replication factor>]<src>* <dest>
案例:将hdfs:///small中的内容压缩成small.har
将某个文件打成har:
hadoop archive -archiveName small.har -p /small/small1.txt /small
将多个small开头的文件打成har:
hadoop archive -archiveName small.har -p /small/small* /small
将某个文件夹下全部文件打成har:
hadoop archive -archiveName small.har -p /small /small
查看HAR文件存档中的文件
hadoop fs -ls har:///small/small.har
输出har文件内容到本地系统
hadoop fs -get har:///small/small.har/smallx
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。目前,也有很多人在该文件的基础之上提出了一些HDFS中小文件存储的解决方案,他们的基本思路就是将小文件进行合并成一个大文件,同时对这些小文件的位置信息构建索引。
文件不支持复写操做,不能向已存在的SequenceFile(MapFile)追加存储记录。
当write流不关闭的时候,没有办法构造read流。也就是在执行文件写操做的时候,该文件是不可读取的。
@Test /** * SequenceFile 写操做 */ public void SequenceWriter() throws Exception{ final String INPUT_PATH= "hdfs://192.168.242.101:9000/big"; final String OUTPUT_PATH= "hdfs://192.168.242.101:9000/big2"; //获取文件系统 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.242.101:9000"); FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); //建立seq的输出流 Text key = new Text(); Text value = new Text(); SequenceFile.Writer writer = SequenceFile.createWriter(fileSystem, conf, new Path(OUTPUT_PATH), key.getClass(), value.getClass()); //写新的数据 System.out.println(writer.getLength()); key.set("small4.txt".getBytes()); value.set("ddddddd".getBytes()); writer.append(key, value); //关闭流 IOUtils.closeStream(writer); } @Test /** * SequenceFile 读操做 */ public void sequenceRead() throws Exception { final String INPUT_PATH= "hdfs://192.168.242.101:9000/big/big.seq"; //获取文件系统 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.242.101:9000"); FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); //准备读取seq的流 Path path = new Path(INPUT_PATH); SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, path, conf); //经过seq流得到key和value准备承载数据 Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); //循环从流中读取key和value long position = reader.getPosition(); while(reader.next(key, value)){ //打印当前key value System.out.println(key+":"+value); //移动游标指向下一个key value position=reader.getPosition(); } //关闭流 IOUtils.closeStream(reader); } @Test /** * 多个小文件合并成大seq文件 * @throws Exception */ public void small2Big() throws Exception{ final String INPUT_PATH= "hdfs://192.168.242.101:9000/small"; final String OUTPUT_PATH= "hdfs://192.168.242.101:9000/big/big.seq"; //获取文件系统 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.242.101:9000"); FileSystem fs = FileSystem.get(conf); //经过文件系统获取全部要处理的文件 FileStatus[] files = fs.listStatus(new Path(INPUT_PATH)); //建立能够输出seq文件的输出流 Text key = new Text(); Text value = new Text(); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(OUTPUT_PATH), key.getClass(),value.getClass()); //循环处理每一个文件 for (int i = 0; i < files.length; i++) { //key设置为文件名 key.set(files[i].getPath().getName()); //读取文件内容 InputStream in = fs.open(files[i].getPath()); byte[] buffer = new byte[(int) files[i].getLen()]; IOUtils.readFully(in, buffer, 0, buffer.length); //值设置为文件内容 value.set(buffer); //关闭输入流 IOUtils.closeStream(in); //将key文件名value文件内容写入seq流中 writer.append(key, value); } //关闭seq流 IOUtils.closeStream(writer); }
用于多个数据源的join。
此类能够解决多个小文件在进行mr操做时map建立过多的问题。
此类的原理在于,它本质上市一个InputFormat,在其中的getSplits方法中,将他能读到的全部的文件生成一个InputSplit
使用此类须要配合自定义的RecordReader,须要本身开发一个RecordReader指定如何从InputSplit中读取数据。
也能够经过参数控制最大的InputSplit大小。
CombineTextInputFormat.setMaxInputSplitSize(job, 256*1024*1024);