Hadoop学习笔记(6) 算法
——从新认识Hadoop 数据库
以前,咱们把hadoop从下载包部署到编写了helloworld,看到告终果。现是得开始稍微更深刻地了解hadoop了。 编程
Hadoop包含了两大功能DFS和MapReduce, DFS能够理解为一个分布式文件系统,存储而已,因此这里暂时就不深刻研究了,等后面读了其源码后,再来深刻分析。 因此这里主要来研究一下MapReduce。 数组
这样,咱们先来看一下MapReduce的思想来源: 服务器
alert("I'd like some Spaghetti!"); 网络
alert("I'd like some ChocolateMousse!"); app
看到这代码,感受不顺眼,改之: 负载均衡
function SwedishChef(food) 框架
{ 分布式
alert("I'd like some " + food + " !");
}
SwedishChef("Spaghetti");
SwedishChef("Chocolate Mousse");
这类用函数取代重复代码的重构方式好处不少,优势不少,就很少说了!
再看这段:
alert("get the lobster");
PutInPot("lobster");
PutInPot("water");
alert("get the chicken");
BoomBoom("chicken");
BoomBoom("coconut");
看起来这段代码好像也有点重复的味道在里面,改之:
function Cook(i1,i2,f)
{
alert("get the " + i1);
f(i1);
f(i2);
}
Cook("lobster","water",PutInPot);
Cook("chicken","coconut",BoomBoom);
OK,这里面把一个函数当成了一个参数传来传去的。并且能够再省略点,把函数定义直接放入到函数调用者的地方:
Cook("lobster","water",
Function(x) { alert("pot " + x); } );
Cook("chicken","coconut",
Function(x) { alert("boom" + x); } );
嘿嘿,方便多了吧,这就是如今比较流程的匿名函数。
一旦你开始意识到能够将匿名函数做为参数,你就会发现有更多的应用了,来看:
var a=[1,2,3];
for(i = 0; I < a.length; i++)
{
a[i] = a[i] * 2;
}
for(i = 0; I < a.length; i++)
{
alert(a[i]);
}
遍历数组元素是一种很觉的操做,那好,既然很经常使用,咱们提取出来写成一个函数得了:
function map(fn,a)
{
for(i = 0; i < a.length; i++)
{
a[i] = fn(a[i]);
}
}
有了这个map函数后,上面的程序咱们就可来调用咯:
map(function (x) {return x*2; } , a);
map(alert, a);
除了遍历,通常咱们还会有一些经常使用的语句,如:
function sum(a)
{
var s = 0;
for( i = 0; i < a.length ;i ++)
s += a[i];
return s;
}
function join(a)
{
var s = "";
for( i = 0; i < a.length ;i ++)
s += a[i];
return s;
}
alert(sum([1,2,3]);
alert(join(["a","b","c"]);
彷佛sum和join这两个函数看起来很象,可否再抽象呢:
function reduce(fn, a, init)
{
var s = init;
for (i= 0;i < a.length; i++)
s = fn(s, a[i]);
return s;
}
function sum(a)
{
return reduce( function(a,b){return a+b;}, a, 0);
}
Function join(a)
{
return reduce( function(a,b){return a+b;}, a, "");
}
OK,这个函数都提取好了,那么,一个这么微小的函数,只能对数组中的元素进行遍历,到底能给你带来多大好处呢?
让咱们回头再来看map函数,当你须要对数组中每个元素为依次进行处理时,那么实际状况极可能是,到底按照哪种次序进行遍历是可有可无的,你能够从头到尾,也能够从尾到头遍历,都会获得相同的结果。事实上,若是你有两个CPU能够用,也许你能够写一些代码,使得每一个CPU来处理一半的元素,因而一瞬间这个map函数的运行速度就快了一倍。
进一步设想,你有几十万台服务器,分布在世界各地的机房中,而后你有一个超级庞大的数组,好比互联网的N多个网页信息,因而你让这些服务器来运行你的map函数,要快N多吧,每台机器实际上只处理很小的一部份运算。
OK,运算提升了很多,有了这个,你就敢去想互联网的搜索是怎么来的了。可是机器好像也是有极限的,不可能有这么多资源让你无限量的扩展,那咋办,不要紧,你能够找一个超级天才,让他写出可以在全世界庞大的服务器阵列上分布式运行的map函数和reduce函数。
简单的函数提出来了,加上函数指定的涉入,那这个高效的函数,可让咱们在各个地方去用,而后让咱们不一样的程序都能在阵列上运行,甚至能够不用告诉这位天才你真正在实现啥功能,只要让他作出这个map函数来。
这是函数式编程里的理念,如是不知道的函数式编程天然也就想不出mapReduce,正是这种算法使得Google的可扩展性达到如此巨大的规模。其中术语Map(映射)和Reduce(化简)分别来自Lisp语言和函数式编程。
再来看看Hadoop中的mapRecude。
Hadoop就是由天才想出来的,可以在N多庞大服务器群上运行你的map和reduce函数的框架。有了这个平台,咱们需要作的,仅仅要咱们实现传入的map和reduce中的处理函数而已。
MapReudue采用"分而治之"的思想,把对大规模数据集的操做,分发给一个主节点管理下的各分节点共同完成,而后经过整合各分节点中间结果,获得最终的结果。简单地说,MapRedue就是"任务的分解与结果的汇总"。上述处理过程被高度的抽象为两个函数:mapper和reducer,mapper负责把任务分解成多个任务,reducer负责把分解后多任务处理的结果汇总起来。至于在并行编程中的其余种种复杂问题,如分式。。。。 工做调度、负载均衡、容错处理、网络通信等,均由MapReduce框架负责处理。
在Mapper阶段,框架将任务的输入数据分割成固定大小的片断(split),随后将每一个split进一步分解成一批键值对<K1,V1>。Hadoop为每个split建立一个Map任务用于执行用户自定义的map函数,并将对应的split中的<K1,V1>对做为输入,获得计算的中间结果<K2,V2>。接着将中间结果按照K2进行排序,并将key值相同的value放在一块儿造成一个新的列表,开成<K2,list(V2)>元组。最后再根据key值的范围将这些元组进行分组,对应不一样的Reduce任务。
在Reducre阶段,Reducer把从不一样Mapper接收来的数据整合在一块儿并进行排序,而后调用用户自定义的reduce函数,对输入的<K2,list(V2)>对进行相应的处理,获得健值对<K3,V3>并输出到HD
框架为每一个split建立一个Mapper,那么谁来肯定Reducers的数量呢? 是配置文件,在mapred-site.xml中,有配置Reducers的数量,默认是1。通常要设置多少为宜呢?等后面更深刻了再找答案吧。
看看Hello World程序
看了前面的概念有点抽象,因此咱们仍是来看点实际的,看看咱们以前写的helloword程序。
咱们第一个helloworld程序是WordCount,——单词计数程序。这个程序的输入输出以下:
这是一个怎样的过程呢?咱们一步步分解,WordCount程序通过了如下过程:
为了好解释,咱们将输入数据中各增长了一行 bye world 和bye hadoop
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
System.out.println("key=" +key.toString());
System.out.println("Value=" + value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
这段代码中,为了方便调试,每调用咱们的map函数,都会输入key和value值。因此在执行结果中,咱们能够看到程序的确有2次输入,与上面的值一至。StringTokenizer是一个单词切词的类,将字符串中的单词一个个切出来,而后while循环,往context中输出key-value,分为为切出来的单词,value为固定值1。
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}}
这段代码将收到的key和values值,将values进行累加,而后key不变输出到key-value里面,最终获得结果:
好了,这个Hello World就程序整个过程就这样。写了map和recude类,如何让它关联执行呢? 因此回到示例程序的main函数:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
System.out.println("url:" + conf.get("fs.default.name"));
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
这里,咱们定义了一个Job类,而后把TokenizerMapper和IntSumReducer类指派给job,而后一切就交给job. waitForCompletion去执行,并等待其完成结果了。
OK了, 这个Hello World也总算是读懂了,不过。。。 仍是有好多疑问,中间那么多过程,能不有个性化呢?如:输入能不能从数据库?输出能不能到Excel?等等,好了,后面再来分析吧。
本文参考:
《软件随想录》Joel Spolsky著
《实践Hadoop》刘鹏著