这篇文章我以前是拜读过的,今天闲来没事,就想拿来当作MapReduce的练习。 java
MapReduce这把刀太大,刀大了问题就抵不住这刀锋了,事实上一开始我想着,这么多些题目,当是要花很多功夫的,但当我作完一题继续看下面的题目的时候,才发现这些题目在MapReduce模型下显得大同小异了,看来拿大刀的人是无论砍的是木头仍是人头的,而是直接抽象成柱形物而后抡起刀一刀就下去了。 程序员
直入主题: 面试
一、海量日志数据,提取出某日访问百度次数最多的前K个IP。[稍微改变] 算法
说明:每一次访问网页就在日志中记录1次访问者的IP,独占一行,一个小数据能够在这里下载。 apache
实在是想不出如何能在一个Job中解决这个问题,因此仍是把它拉扯成了两个Job来解决。
Job1:将相同IP的记录合并,造成<ip,count>形式,其中count是对这个ip的计数。
Job2:按count排序<ip,count>并选择前K个进行输出。
这里我写了一个可序列化的类IPAndCount,若是稍微熟悉MapReduce或者看明白我以前写的
关系型MapReduce模式:选择、分组和组内排序
你就知道这是为了排序而准备的。MapReduce有一个“Shuffle and sort”,这个阶段是利用key来对tuple进行排序的,而排序时调用的即是key的compareTo()方法。事实上若是job1输出的数据两足够小,咱们彻底能够在内存中进行排序而利用MapReduce框架,这样就能够省下一个Reduce阶段,可是对于这个问题显然不行。
IPAndCount很直白,就是包装了上述的<ip,count>。
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
-
-
- public class IPAndCount implements WritableComparable{
- Text ip;
- IntWritable count;
-
- public IPAndCount(){
- this.ip = new Text("");
- this.count = new IntWritable(1);
- }
-
- public IPAndCount(Text ip, IntWritable count){
- this.ip = ip;
- this.count = count;
- }
-
- public IPAndCount(String ip, int count){
- this.ip = new Text(ip);
- this.count = new IntWritable(count);
- }
-
- public void readFields(DataInput in) throws IOException {
- ip.readFields(in);
- count.readFields(in);
- }
-
- public void write(DataOutput out) throws IOException {
- ip.write(out);
- count.write(out);
- }
-
- public int compareTo(Object o) {
- return ((IPAndCount)o).count.compareTo(count) == 0?
- ip.compareTo(((IPAndCount)o).ip):((IPAndCount)o).count.compareTo(count);//若是只比较count会丢失数据,应该是suffle阶段的问题
- }
-
- public int hashCode(){
- return ip.hashCode();
- }
-
- public boolean equals(Object o){
- if(!(o instanceof IPAndCount))
- return false;
- IPAndCount other = (IPAndCount)o;
- return ip.equals(other.ip) && count.equals(other.count);
- }
-
- public String toString(){
- StringBuffer buf = new StringBuffer("[ip=");
- buf.append(ip.toString());
- buf.append(",count=");
- buf.append(count.toString());
- buf.append("]");
- return buf.toString();
- }
-
- public Text getIp() {
- return ip;
- }
- public void setIp(Text ip) {
- this.ip = ip;
- }
- public IntWritable getCount() {
- return count;
- }
- public void setCount(IntWritable count) {
- this.count = count;
- }
- }
下面对FindActiveIp进行说明:
SumUpIpMapper和SumUpIPReducer事实上就是一个MapReduce中最基础的词频统计程序WordCount。你能够加一个Combiner来优化一下,我遗漏了。
从配置中能够看见两个Job的配置:job 和job2。
依赖关系是job -> job2,代码中使用了JobControl来解决做业间的依赖关系,JobControl.run()方法会在做业都运行完后才返回。
Job2的输入路径是Job1的输出路径,从参数中能够看出这一点。
Job1的输出在输出文件中的表现是:ip,count
Job2再从文件中读入,使用的是KeyValueTextInputFormat,它对应的是TextOutputFormat,咱们能够从job1的配置中看出来。
BeforeSortIPMapper从job1的输出中读取数据并包装成IPAndCount类型,以便MapReduce框架在“shuffle and sort”阶段利用它来排序。
最后SelectTopKIPReducer选出前K个进行输出便可,在这里咱们设置最后的reduce只有一个reduce task,以使全部数据汇聚到一台机子上进行处理。
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.lib.ChainMapper;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
-
-
- public class FindActiveIP extends Configured implements Tool{
-
- public static class SumUpIPMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
- IntWritable one = new IntWritable(1);
- public void map(LongWritable key, Text value, Context context)
- throws IOException,InterruptedException{
- context.write(value, one);
- }
- }
-
- public static class SumUpIPReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
- //这里能够选择前k个进行输出以优化
- public void reduce(Text key, Iterable<IntWritable> values, Context context)
- throws IOException, InterruptedException{
- int sum = 0;
- for(IntWritable v : values){
- sum += v.get();
- }
- context.write(key, new IntWritable(sum));
- }
- }
-
-
- public static class BeforeSortIPMapper extends Mapper<Text,Text,IPAndCount,Text>{
- public void map(Text key, Text value, Context context)
- throws IOException,InterruptedException{
- IPAndCount tmp = new IPAndCount(key,new IntWritable(Integer.valueOf(value.toString())));
- System.out.println(tmp);
- context.write(tmp,new Text());
- }
- }
-
-
- //set num of this reducer to one
- public static class SelectTopKIPReducer extends Reducer<IPAndCount,Text,IPAndCount,Text>{
- int counter = 0;
- int K = 10;
- public void reduce(IPAndCount key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException{
- System.out.println(key);
- if(counter < K){
- context.write(key, null);
-
- counter++;
- }
-
- }
- }
- public int run(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = new Job(conf,"SumUpIP");
- job.setJarByClass(FindActiveIP.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.getConfiguration().set("mapred.textoutputformat.separator", ",");
- Path in = new Path(args[0]);
- Path out = new Path(args[1]);
- FileInputFormat.setInputPaths(job, in);
- FileOutputFormat.setOutputPath(job, out);
- job.setMapperClass(SumUpIPMapper.class);
- job.setReducerClass(SumUpIPReducer.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- job.setNumReduceTasks(7);
-
- Configuration conf2 = new Configuration();
- Job job2 = new Job(conf2,"SortAndFindTopK");
- job2.setJarByClass(FindActiveIP.class);
- job2.setInputFormatClass(KeyValueTextInputFormat.class);
- job2.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
- job2.setOutputFormatClass(TextOutputFormat.class);
- Path in2 = new Path(args[1]);
- Path out2 = new Path(args[2]);
- FileInputFormat.setInputPaths(job2,in2);
- FileOutputFormat.setOutputPath(job2, out2);
- job2.setMapperClass(BeforeSortIPMapper.class);
- job2.setReducerClass(SelectTopKIPReducer.class);
- job2.setMapOutputKeyClass(IPAndCount.class);
- job2.setMapOutputValueClass(Text.class);
- job2.setOutputKeyClass(IPAndCount.class);
- job2.setOutputValueClass(Text.class);
- job2.setNumReduceTasks(1);
-
- JobControl jobControl = new JobControl("FindTopKIP");
- ControlledJob cJob1 = new ControlledJob(conf);
- cJob1.setJob(job);
- ControlledJob cJob2 = new ControlledJob(conf2);
- cJob2.setJob(job2);
- jobControl.addJob(cJob1);
- jobControl.addJob(cJob2);
- cJob2.addDependingJob(cJob1);
- jobControl.run();
- return 0;
- }
-
- public static void main(String args[]) throws Exception{
- int res = ToolRunner.run(new FindActiveIP(), args);
- System.exit(res);
- }
-
- }
大刀拿习惯了,从前的大刀就成了如今的绣花针,不是绣花针很差,只是用着不顺手。当你听着歌用java写着MapReduce,忽然有人在你耳边喊了一句:Pig~Pig~Pig~
你很难不心动!程序员爱偷懒堪比女人爱逛街,都是为了快乐啊~
下面是用Pig来处理上述的问题:
- grunt> records = LOAD 'input/ipdata' AS (ip:chararray);
- grunt> grouped_records = GROUP records BY ip;
- grunt> counted_records = FOREACH grouped_records GENERATE group, COUNT(records);
- grunt> sorted_records = ORDER counted_records BY $1 DESC;
- grunt> topK = LIMIT sorted_records 10;
- grunt> DUMP topK;
你看,数数,没晕数一数,6行!仅仅6行就解决了。
行1:将文件装入
行2:按ip分组
行3:组内计数
行4:组间按ip访问计数排序
行5:选择前10个数据
行6:运行并输出。
虽然咱们的Pig方法实际上跑了3个job才完成任务,相比于java写的MapReduce多了一个job,但Pig显然更愉快些。
这是最后结果:
(192.168.0.1,1559)
(192.168.0.21,7)
(192.168.0.14,4)
(192.168.0.10,4)
(192.168.0.12,4)
(192.168.0.32,4)
(192.168.0.13,3)
(192.168.0.3,3)
(192.168.0.2,2)
(192.168.0.11,1)
这个算法对带宽的压力仍是比较大的,除了加一个Combiner以外,代码中还提到了另外一个小小的优化在进入第一个Reduce阶段的一个reduce task中的数据足以装入内存时,这是很容易解决的。这不是多好的优化,应当有更加好的优化方式能过滤更多的数据,不然。。。这不科学~
二、寻找热门查询:搜索引擎会经过日志文件把用户每次检索使用的全部检索串都记录下来,每一个查询串的长度为1-255字节。
假设目前有100亿个记录(这些查询串的重复度比较高,虽然总数是100亿,但若是除去重复后,不超过10亿个。一个查询串的重复度越高,说明查询它的用户越多,也就是越热门),请你统计最热门的10个查询串。[我仍是稍微修改了题目]
看见这个题目,我以为我写下去会对不起July费了万千脑细胞辛苦的写做成果,尽管我心里十分但愿它跟上面那题同样,但这多么让人不甘心又不尽兴~
如今暂且不考虑优化的问题:这个题目无非就是统计查询串的计数,而后排序,而后取出前10个。事实上,这个问题在不考虑细节上彻底能够用上面的pig脚原本处理。
不写了,以如今的水平继续写实在是不优美:
3题:有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M。返回频数最高的100个词。
4题:海量数据分布在100台电脑中,想个办法高效统计出这批数据的TOP10。
5题:有10个文件,每一个文件1G,每一个文件的每一行存放的都是用户的query,每一个文件的query均可能重复。要求你按照query的频度排序。
7题:怎么在海量数据中找出重复次数最多的一个?
8题:上千万或上亿数据(有重复),统计其中出现次数最多的前N个数据。
9题:一个文本文件,大约有一万行,每行一个词,要求统计出其中最频繁出现的前10个词。
1,2,3,4,5,7,8,9题思路基本一致,值得注意的是,有时候咱们彻底能够肯定咱们须要的数据的一些特征,好比上面的热门查询中热门串必定被查询超过1000次,那么咱们就可使用FILTER来进行过滤以减小处理的数据(从而减小对带宽的压力)[filted_records = FILTER grouped_records BY SIZE(records) > 1000;]。
6题: 给定a、b两个文件,各存放50亿个url,每一个url各占64字节,内存限制是4G,让你找出a、b文件共同的url?
见
[Hadoop]使用DistributedCache进行复制联结
以及
使用hadoop的datajoin包进行关系型join操做,你也能够参考Data-Intensive Text Processing with MapReduce看看原生态的join操做是怎么进行的。
- grunt> A = LOAD 'input/url1' AS (url:chararray);
- grunt> B = LOAD 'input/url2' AS (url:chararray);
- grunt> grouped_A = GROUP A BY url;
- grunt> non_duplicated_A = FOREACH grouped_A GENERATE group; --去重
- grunt> grouped_B = GROUP B BY url;
- grunt> non_duplicated_B = FOREACH grouped_B GENERATE group; --B去重
- grunt> C = JOIN non_duplicated_B BY group, non_duplicated_A BY group; --A 、B 内联结
- grunt> D = FOREACH C GENERATE $0; //生成重复url
- grunt> DUMP D;
10题: 1000万字符串,其中有些是重复的,须要把重复的所有去掉,保留没有重复的字符串。
使用pig:
- grunt> records = LOAD 'input/retrived_strings' AS (str:chararray);
- grunt> grouped_records = GROUP records BY str;
- grunt> filted_records = FILTER grouped_records BY SIZE(records) <= 1;
- grunt> DUMP filted_records;
今日在CSDN看再次碰见July的这篇博文:教你如何迅速秒杀掉:99%的海量数据处理面试题。 app
这篇文章我以前是拜读过的,今天闲来没事,就想拿来当作MapReduce的练习。 框架
MapReduce这把刀太大,刀大了问题就抵不住这刀锋了,事实上一开始我想着,这么多些题目,当是要花很多功夫的,但当我作完一题继续看下面的题目的时候,才发现这些题目在MapReduce模型下显得大同小异了,看来拿大刀的人是无论砍的是木头仍是人头的,而是直接抽象成柱形物而后抡起刀一刀就下去了。 grunt
直入主题: oop
一、海量日志数据,提取出某日访问百度次数最多的前K个IP。[稍微改变] 优化
说明:每一次访问网页就在日志中记录1次访问者的IP,独占一行,一个小数据能够在这里下载。
实在是想不出如何能在一个Job中解决这个问题,因此仍是把它拉扯成了两个Job来解决。
Job1:将相同IP的记录合并,造成<ip,count>形式,其中count是对这个ip的计数。
Job2:按count排序<ip,count>并选择前K个进行输出。
这里我写了一个可序列化的类IPAndCount,若是稍微熟悉MapReduce或者看明白我以前写的
关系型MapReduce模式:选择、分组和组内排序
你就知道这是为了排序而准备的。MapReduce有一个“Shuffle and sort”,这个阶段是利用key来对tuple进行排序的,而排序时调用的即是key的compareTo()方法。事实上若是job1输出的数据两足够小,咱们彻底能够在内存中进行排序而利用MapReduce框架,这样就能够省下一个Reduce阶段,可是对于这个问题显然不行。
IPAndCount很直白,就是包装了上述的<ip,count>。
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
-
-
- public class IPAndCount implements WritableComparable{
- Text ip;
- IntWritable count;
-
- public IPAndCount(){
- this.ip = new Text("");
- this.count = new IntWritable(1);
- }
-
- public IPAndCount(Text ip, IntWritable count){
- this.ip = ip;
- this.count = count;
- }
-
- public IPAndCount(String ip, int count){
- this.ip = new Text(ip);
- this.count = new IntWritable(count);
- }
-
- public void readFields(DataInput in) throws IOException {
- ip.readFields(in);
- count.readFields(in);
- }
-
- public void write(DataOutput out) throws IOException {
- ip.write(out);
- count.write(out);
- }
-
- public int compareTo(Object o) {
- return ((IPAndCount)o).count.compareTo(count) == 0?
- ip.compareTo(((IPAndCount)o).ip):((IPAndCount)o).count.compareTo(count);//若是只比较count会丢失数据,应该是suffle阶段的问题
- }
-
- public int hashCode(){
- return ip.hashCode();
- }
-
- public boolean equals(Object o){
- if(!(o instanceof IPAndCount))
- return false;
- IPAndCount other = (IPAndCount)o;
- return ip.equals(other.ip) && count.equals(other.count);
- }
-
- public String toString(){
- StringBuffer buf = new StringBuffer("[ip=");
- buf.append(ip.toString());
- buf.append(",count=");
- buf.append(count.toString());
- buf.append("]");
- return buf.toString();
- }
-
- public Text getIp() {
- return ip;
- }
- public void setIp(Text ip) {
- this.ip = ip;
- }
- public IntWritable getCount() {
- return count;
- }
- public void setCount(IntWritable count) {
- this.count = count;
- }
- }
下面对FindActiveIp进行说明:
SumUpIpMapper和SumUpIPReducer事实上就是一个MapReduce中最基础的词频统计程序WordCount。你能够加一个Combiner来优化一下,我遗漏了。
从配置中能够看见两个Job的配置:job 和job2。
依赖关系是job -> job2,代码中使用了JobControl来解决做业间的依赖关系,JobControl.run()方法会在做业都运行完后才返回。
Job2的输入路径是Job1的输出路径,从参数中能够看出这一点。
Job1的输出在输出文件中的表现是:ip,count
Job2再从文件中读入,使用的是KeyValueTextInputFormat,它对应的是TextOutputFormat,咱们能够从job1的配置中看出来。
BeforeSortIPMapper从job1的输出中读取数据并包装成IPAndCount类型,以便MapReduce框架在“shuffle and sort”阶段利用它来排序。
最后SelectTopKIPReducer选出前K个进行输出便可,在这里咱们设置最后的reduce只有一个reduce task,以使全部数据汇聚到一台机子上进行处理。
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.lib.ChainMapper;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
-
-
- public class FindActiveIP extends Configured implements Tool{
-
- public static class SumUpIPMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
- IntWritable one = new IntWritable(1);
- public void map(LongWritable key, Text value, Context context)
- throws IOException,InterruptedException{
- context.write(value, one);
- }
- }
-
- public static class SumUpIPReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
- //这里能够选择前k个进行输出以优化
- public void reduce(Text key, Iterable<IntWritable> values, Context context)
- throws IOException, InterruptedException{
- int sum = 0;
- for(IntWritable v : values){
- sum += v.get();
- }
- context.write(key, new IntWritable(sum));
- }
- }
-
-
- public static class BeforeSortIPMapper extends Mapper<Text,Text,IPAndCount,Text>{
- public void map(Text key, Text value, Context context)
- throws IOException,InterruptedException{
- IPAndCount tmp = new IPAndCount(key,new IntWritable(Integer.valueOf(value.toString())));
- System.out.println(tmp);
- context.write(tmp,new Text());
- }
- }
-
-
- //set num of this reducer to one
- public static class SelectTopKIPReducer extends Reducer<IPAndCount,Text,IPAndCount,Text>{
- int counter = 0;
- int K = 10;
- public void reduce(IPAndCount key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException{
- System.out.println(key);
- if(counter < K){
- context.write(key, null);
-
- counter++;
- }
-
- }
- }
- public int run(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = new Job(conf,"SumUpIP");
- job.setJarByClass(FindActiveIP.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.getConfiguration().set("mapred.textoutputformat.separator", ",");
- Path in = new Path(args[0]);
- Path out = new Path(args[1]);
- FileInputFormat.setInputPaths(job, in);
- FileOutputFormat.setOutputPath(job, out);
- job.setMapperClass(SumUpIPMapper.class);
- job.setReducerClass(SumUpIPReducer.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- job.setNumReduceTasks(7);
-
- Configuration conf2 = new Configuration();
- Job job2 = new Job(conf2,"SortAndFindTopK");
- job2.setJarByClass(FindActiveIP.class);
- job2.setInputFormatClass(KeyValueTextInputFormat.class);
- job2.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
- job2.setOutputFormatClass(TextOutputFormat.class);
- Path in2 = new Path(args[1]);
- Path out2 = new Path(args[2]);
- FileInputFormat.setInputPaths(job2,in2);
- FileOutputFormat.setOutputPath(job2, out2);
- job2.setMapperClass(BeforeSortIPMapper.class);
- job2.setReducerClass(SelectTopKIPReducer.class);
- job2.setMapOutputKeyClass(IPAndCount.class);
- job2.setMapOutputValueClass(Text.class);
- job2.setOutputKeyClass(IPAndCount.class);
- job2.setOutputValueClass(Text.class);
- job2.setNumReduceTasks(1);
-
- JobControl jobControl = new JobControl("FindTopKIP");
- ControlledJob cJob1 = new ControlledJob(conf);
- cJob1.setJob(job);
- ControlledJob cJob2 = new ControlledJob(conf2);
- cJob2.setJob(job2);
- jobControl.addJob(cJob1);
- jobControl.addJob(cJob2);
- cJob2.addDependingJob(cJob1);
- jobControl.run();
- return 0;
- }
-
- public static void main(String args[]) throws Exception{
- int res = ToolRunner.run(new FindActiveIP(), args);
- System.exit(res);
- }
-
- }
大刀拿习惯了,从前的大刀就成了如今的绣花针,不是绣花针很差,只是用着不顺手。当你听着歌用java写着MapReduce,忽然有人在你耳边喊了一句:Pig~Pig~Pig~
你很难不心动!程序员爱偷懒堪比女人爱逛街,都是为了快乐啊~
下面是用Pig来处理上述的问题:
- grunt> records = LOAD 'input/ipdata' AS (ip:chararray);
- grunt> grouped_records = GROUP records BY ip;
- grunt> counted_records = FOREACH grouped_records GENERATE group, COUNT(records);
- grunt> sorted_records = ORDER counted_records BY $1 DESC;
- grunt> topK = LIMIT sorted_records 10;
- grunt> DUMP topK;
你看,数数,没晕数一数,6行!仅仅6行就解决了。
行1:将文件装入
行2:按ip分组
行3:组内计数
行4:组间按ip访问计数排序
行5:选择前10个数据
行6:运行并输出。
虽然咱们的Pig方法实际上跑了3个job才完成任务,相比于java写的MapReduce多了一个job,但Pig显然更愉快些。
这是最后结果:
(192.168.0.1,1559)
(192.168.0.21,7)
(192.168.0.14,4)
(192.168.0.10,4)
(192.168.0.12,4)
(192.168.0.32,4)
(192.168.0.13,3)
(192.168.0.3,3)
(192.168.0.2,2)
(192.168.0.11,1)
这个算法对带宽的压力仍是比较大的,除了加一个Combiner以外,代码中还提到了另外一个小小的优化在进入第一个Reduce阶段的一个reduce task中的数据足以装入内存时,这是很容易解决的。这不是多好的优化,应当有更加好的优化方式能过滤更多的数据,不然。。。这不科学~
二、寻找热门查询:搜索引擎会经过日志文件把用户每次检索使用的全部检索串都记录下来,每一个查询串的长度为1-255字节。
假设目前有100亿个记录(这些查询串的重复度比较高,虽然总数是100亿,但若是除去重复后,不超过10亿个。一个查询串的重复度越高,说明查询它的用户越多,也就是越热门),请你统计最热门的10个查询串。[我仍是稍微修改了题目]
看见这个题目,我以为我写下去会对不起July费了万千脑细胞辛苦的写做成果,尽管我心里十分但愿它跟上面那题同样,但这多么让人不甘心又不尽兴~
如今暂且不考虑优化的问题:这个题目无非就是统计查询串的计数,而后排序,而后取出前10个。事实上,这个问题在不考虑细节上彻底能够用上面的pig脚原本处理。
不写了,以如今的水平继续写实在是不优美:
3题:有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M。返回频数最高的100个词。
4题:海量数据分布在100台电脑中,想个办法高效统计出这批数据的TOP10。
5题:有10个文件,每一个文件1G,每一个文件的每一行存放的都是用户的query,每一个文件的query均可能重复。要求你按照query的频度排序。
7题:怎么在海量数据中找出重复次数最多的一个?
8题:上千万或上亿数据(有重复),统计其中出现次数最多的前N个数据。
9题:一个文本文件,大约有一万行,每行一个词,要求统计出其中最频繁出现的前10个词。
1,2,3,4,5,7,8,9题思路基本一致,值得注意的是,有时候咱们彻底能够肯定咱们须要的数据的一些特征,好比上面的热门查询中热门串必定被查询超过1000次,那么咱们就可使用FILTER来进行过滤以减小处理的数据(从而减小对带宽的压力)[filted_records = FILTER grouped_records BY SIZE(records) > 1000;]。
6题: 给定a、b两个文件,各存放50亿个url,每一个url各占64字节,内存限制是4G,让你找出a、b文件共同的url?
见
[Hadoop]使用DistributedCache进行复制联结
以及
使用hadoop的datajoin包进行关系型join操做,你也能够参考Data-Intensive Text Processing with MapReduce看看原生态的join操做是怎么进行的。
- grunt> A = LOAD 'input/url1' AS (url:chararray);
- grunt> B = LOAD 'input/url2' AS (url:chararray);
- grunt> grouped_A = GROUP A BY url;
- grunt> non_duplicated_A = FOREACH grouped_A GENERATE group; --去重
- grunt> grouped_B = GROUP B BY url;
- grunt> non_duplicated_B = FOREACH grouped_B GENERATE group; --B去重
- grunt> C = JOIN non_duplicated_B BY group, non_duplicated_A BY group; --A 、B 内联结
- grunt> D = FOREACH C GENERATE $0; //生成重复url
- grunt> DUMP D;
10题: 1000万字符串,其中有些是重复的,须要把重复的所有去掉,保留没有重复的字符串。
使用pig:
- grunt> records = LOAD 'input/retrived_strings' AS (str:chararray);
- grunt> grouped_records = GROUP records BY str;
- grunt> filted_records = FILTER grouped_records BY SIZE(records) <= 1;
- grunt> DUMP filted_records;