走进分布式系统

首先声明,文章架构及思路为我的原创。文章中涉及的内容有些是网络资源整理而来,都有附上参考网址。php

随着互联网的不断发展,愈来愈多的人涌入这片天地,在互联网这片广袤的土地上,不仅留下了人们踏过的脚印,还留下了人们生活中数不清的细节,假以时日,即可以成为时光机,虽然回不去,却也能够经过它回顾人们那已过往的人生。

有点说偏了,咱们仍是回到正题来。本文试图站在必定的高度,从全局高度纵览分布式系统的前世此生,而在一些节点上,也会结合细节描述来讲明问题。经过本文,你会对分布式系统的出现背景,致力于解决什么问题,存在哪些问题,有哪些可用的组件,以及它的发展历程有一个综合性的了解。

随着互联网基础设备的不断完善,世间万物均有被互联网记录跟踪的可能性,而现在,正是起点。互联网产生的数据量之大,已不是传统数据库能够处理得了的了。传统数据库数据量急剧增加致使数据库变慢,已不适合超大规模的数据分析操做。传统关系型数据库(RDBMS)很难扩展,一般是纵向扩展,即增长CPU核数和磁盘数目,但到达必定程度也只能横向扩展,横向扩展通常作法是主从复制(读写分离,数据延迟)、集群(如oracle的RAC,共享存储)、分片(分表分库,操做不便),但这些扩展并非能够一直进行下去,到达必定量就会出现瓶颈而且给使用者带来的阵痛并不是三言两语可道得尽的。RDBMS很是适合事务性操做,但不见长于超大规模的数据分析处理,这就为引入分布式系统带来了契机。分布式系统的目标就是在可扩展的方式下解决超过TB级别数据处理过程当中的问题,创建一个性能随机器数量增长而线性提高的系统。
(附:上文提到的RDBMS系统扩展方法可参见 http://blog.csdn.net/stevensxiao/article/details/51872795

说到分布式系统,就不得不提它最重要的理论——CAP定理。CAP是Consistency,Availability,Partition tolerance的首字母缩写,分别表示一致性,可用性,分区容忍性。CAP定理说的是:一个分布式系统最多只能知足一致性、可用性、分区容错性中的两项,不可能三项同时知足。

一致性说的是:更新数据完成后,全部节点的数据都保持一致。对于一致性,能够经过客户端和服务器两个视角来看,从客户端来看,说的是多并发访问时如何获取更新过的数据的问题;从服务器来看,说的是数据更新如何同步到全部节点的问题。

可用性说的是:服务一直可用,并且是正常的响应时间。

分区容错性说的是:分布式系统在遇到某节点或网络分区故障时,仍然能对外提供服务。

CAP定理说明了要知足分区容错性的分布式系统,只能在一致性和可用性二者中,选择其中一个。对于多数大型互联网应用的场景,主机众多、部署分散,并且如今的集群规模愈来愈大,因此节点故障、网络故障是常态,并且要保证服务可用性达到N个9,即保证P和A,舍弃C(退而求其次保证最终一致性)。虽然某些地方会影响客户体验,但还处于可接受范围内。
(附:上文提到的CAP理论的详细说明及简单证实能够参见 http://www.hollischuang.com/archives/666

了解分布式系统都存在的CAP问题,有助了后面咱们更深刻的学习分布式系统,包括它为何如此设计、架构等等,均可以追根溯源到此问题上。

在CAP定理上,咱们能够延伸出其余一些相关理论,好比BASE理论,ACID特征等。
同时知足CAP的是RDBMS的事务,它知足ACID特征,但它不是分布式系统。
A: Atomic 原子性:说的是要么整个事务都成功,要么整个事务都失败。
C: Consistency 一致性:说的是业务逻辑一致性,如两我的转帐无论成功与否,双方帐户余额必须保持不变。
I: Isolation 隔离性:说的是并发事务中对相同数据进行修改,事务不能修改该数据在另外一个事务的中间状态。
D: Durability 持久性:说的是事务成功后,更新的数据必须永久的保存下来。
(附:上文提到的ACID特征能够参见 http://www.cnblogs.com/wangchuanqi/p/5554708.html

BASE理论是对CAP理论的延伸,核心思想是即便没法作到强一致性(Strong Consistency, CAP的一致性就是强一致性),但应用能够采用适合的方式达到最终一致性(Eventual Consitency)。BASE是指基本可用(Basically Available)、软状态( Soft State)、最终一致性( Eventual Consistency)。

基本可用:是指分布式系统在出现故障的时候,容许损失部分可用性,即保证核心可用。电商大促时,为了应对访问量激增,部分用户可能会被引导到降级页面,服务层也可能只提供降级服务。这就是损失部分可用性的体现。

软状态:是指容许系统存在中间状态,而该中间状态不会影响系统总体可用性。分布式存储中通常一份数据至少会有三个副本,容许不一样节点间副本同步的延时就是软状态的体现。

最终一致性:是指系统中的全部数据副本通过必定时间后,最终可以达到一致的状态。
(附:上文中提到的BASE理论能够参见: https://my.oschina.net/foodon/blog/372703

分布式系统没法同时知足CAP的三项规定,但能够知足BASE理论。

在CAP定理中,咱们知道在分布式系统中,为了要保障分区容错性,只能在一致性和可用性之中选择其一,而咱们通常的作法是选择可用性,牺牲一致性。在实际实践中,为了知足从客户端角度来看数据是知足一致性的(从服务器端角度来看,仍是非一致性的),咱们就必须引用某种算法来实现,咱们这里要说的Quorum算法,就属于这一类。

Quorum 算法,是一种分布式系统中经常使用的,用来保证数据冗余和最终一致性的投票算法。通常而言,一个写操做必需要对全部的冗余数据都更新完成了,才能称为成功结束。好比一份数据在5台设备上有冗余,Quorum算法可让写操做只要写完3台就返回,剩下的由系统内部缓慢同步完成。而读操做,则须要也至少读3台,才能保证至少能够读到一个最新的数据。hadoop的namenode ha中就用到了该算法。每次namenode写editlog时,除了写入本地磁盘外,也会并行地向每个journalnode发送写请求,只要大多数的journalnode写入成功,就认为写操做成功了。
(附:上文提到的Quorum算法,具体原理能够参见: http://www.cnblogs.com/netfocus/p/3622184.html
其在namenode的应用场景能够参见: http://www.cnblogs.com/hapjin/p/5626889.html

经过上面的讨论,咱们知道,在分布式系统中,最须要解决的是一致性问题。业界围绕一致性问题,提出了不少模型,下面咱们从2PC,3PC谈起,而后谈Paxos,最后再谈下Zookeeper底层的zab协议,经过这些模型的了解,有助于咱们对分布式系统存在的一致性问题有更深入的认识。

2PC协议分为两个阶段:

准备阶段: 协调者给每一个参考者发送prepare消息,每一个参考者要么直接返回失败(如权限验证失败),要么在本地执行事务,写本地的redo和undo日志,但不提交,达到一种”万事俱备,只欠东风“的状态。

提交阶段:若是协调者收到了参考者的失败消息或超过,直接给每一个参考者发送回滚(rollback)消息,不然发送提交(commit)消息。参考者根据协调者的指令执行提交或回滚操做,释放锁资源。

2PC协议存在如下缺点:

1. 同步阻塞问题。执行过程当中,全部参与者节点都是事务型阻塞的。当参与者占有公共资源时,其余第三方节点尝试访问该公共资源时,不得不处于阻塞状态。

2. 脑裂问题。当协调者发送commit以后宕机,而惟一接收到这条消息的参与者也同时宕机了,那么即便协调者经过选举协议产生了新的协调者,这条事务的状态也是不肯定的,没人知道该事务是否已被提交。

3. 数据不一致。当协调者在第二阶段发送commit指示后发生局部网络异常致使有的参与者执行了commit,有的参与者没法收到commit而处于阻塞状态,致使数据不一致。

因为二阶段提交存在着诸如同步阻塞、脑裂等缺陷,因此,研究者们在二阶段提交的基础上作了改进,提出了三阶段提交3PC。

3PC很好理解,就是在2PC的两个步骤以前加上一个评估操做,并引入超时机制。各参与者先根据自身状况评估协调者发出的事务请求是否可以成功执行,并回复给协调者。若是有参与者评估后说没办法执行,那么协调者就能够直接停止该事务,不涉及任务资源占用的状况(由于还没执行事务,占用资源)。一句话就是评估阶段,不管什么异常,均可以直接停止事务。若是全部参与者评估后都说没问题,能够执行,那么才会进入与2PC同样的后两步操做。若是在后两步操做过程当中,遇到宕机的状况,在2PC因为没有事先评估,因此会进入不肯定状态,在3PC中因为事前已经评估好能够执行(不能够执行就不会进入后面的步骤了),因此参与者在等待其余节点反馈超时时,就进行commit操做。3PC主要解决了2PC中存在的同步阻塞、脑裂问题。固然评估成功并不表明后续执行时真的可以成功,但从策略上来说,成功的几率比较大,因此其余节点无反馈时,就默认执行了commit操做。因此这种机制也会致使数据一致性问题,由于,因为网络缘由,协调者发送的abort响应(评估与实际执行不一致了)没有及时被参与者接收到,那么参与者在等待超时以后执行了commit操做。这样就和其余接到abort命令并执行回滚的参与者之间存在数据不一致的状况。

了解了2PC和3PC以后,咱们能够发现,不管是二阶段提交仍是三阶段提交都没法完全解决分布式的一致性问题。Google Chubby的做者Mike Burrows说过, there is only one consensus protocol, and that’s Paxos” – all other approaches are just broken versions of Paxos. 意即世上只有一种一致性算法,那就是Paxos,全部其余一致性算法都是Paxos算法的变种版本。下面咱们就来认识一下这个传说中的Paxos算法。

Paxos算法解决的问题是在一个 分布式系统中如何就某个值达成一致,保证不论发生任何异常,都不会破坏决议的一致性。

一个典型的场景是,在一个分布式数据库系统中,若是各节点的初始状态一致,每一个节点都执行相同的操做序列,那么他们最后能获得一个一致的状态。为保证每一个节点执行相同的命令序列,须要在每一条指令(能够认为是下面提到的value)上执行一个“一致性算法”以保证每一个节点看到的指令一致。

Paxos的算法分为两个阶段:

一. prepare阶段:
1. proposer选择一个提案编号n并将prepare请求发送给acceptors中的一个多数派。
2. acceptor收到prepare消息后,若是提案的编号大于它已经回复的全部prepare消息,则acceptor将本身上次接受的提案回复给proposer,并承诺再也不回复编号小于n的提案。

2、批准阶段
1. 当一个proposer收到多数acceptors对prepare的回复后,就进入批准阶段。它要向回复prepare请求的acceptors发送accpet请求,包括编号n和根据P2c决定的value(若是根据P2c没有已经接受的value,那么它能够自由决定value)。
2. 在不违背本身向其余proposer的承诺的前提下,acceptor收到acceptor请求后即接受这个请求。

能够这么来理解Paxos背后的设计原理,就是经过不断的提升编号n去抢占提议权,可是为了总体目标(即实现一致性)考虑,一旦有人以前已经夺得提议权,那么就放弃本身的诉求(value),直接提议别人的诉求(value),使别人的诉求尽快获得大多数人的承认,最终实现一致性。
(附:上文提到的Pasox算法详细介绍能够参见维基百科: https://zh.wikipedia.org/zh-cn/Paxos%E7%AE%97%E6%B3%95
图解Pasox有两篇写得不错: http://codemacro.com/2014/10/15/explain-poxos/

说到一致性算法,就不得不提目前分布式系统中普遍使用的zookeeper的zab协议。

zab协议分为三个阶段:

选举阶段:
这个算法比较复杂,能够参考后面附上的连接地址查看。这里,咱们只从感性上来理解一下选举算法。
集群中的每一个节点都拥有选举权(观察者除外),他们一开始都选举本身,在接收到别人的选票时,再进行对比,而后修改选票,选举他认为比较合适的人(有一套判断标准,好比zxid,sid),如此反复,直到有人被超过一半以上的人选举为Leader。

恢复阶段:
一旦leader选举完成,就开始进入恢复阶段,就是follower要同步leader上的数据信息。这一阶段 follower 发送它们的 lastZixd 给 leader,leader 根据 lastZixd 决定如何同步数据。

广播阶段:
到了这个阶段,Zookeeper 集群才能正式对外提供事务服务,而且 leader 能够进行消息广播。同时若是有新的节点加入,还须要对新节点进行同步。
(附:上文提到的zab协议能够参见:
注:1有流程图,但关键名词没解释清楚,2有关键名词解释,且文字描述仔细,能够配合1一块儿看)

Paxos算法在出现竞争的状况下,其收敛速度很慢,甚至可能出现活锁的状况,例如当有三个及三个以上的proposer在发送prepare请求后,很难有一个proposer收到半数以上的回复而不断地执行第一阶段的协议。所以,为了不竞争,加快收敛的速度,在算法中引入了一个Leader这个角色,在正常状况下同时应该最多只能有一个参与者扮演Leader角色,而其它的参与者则扮演Acceptor的角色。

在这种优化算法中,只有Leader能够提出议案,从而避免了竞争使得算法可以快速地收敛而趋于一致;而为了保证Leader的健壮性,又引入了Leader选举,再考虑到同步的阶段,渐渐的你会发现对Paxos算法的简化和优化已经和上面介绍的zab协议很类似了。

在下一篇,我将经过hadoop生态圈的相关介绍,来看看目前分布式系统都是怎么应用的。

经过上面关于分布式存在的问题的讨论,及为此提出的模型探讨,咱们对分布式系统有了比较初步的认识。后面咱们将经过hadoop生态圈的相关介绍,来看看目前分布式系统都是怎么应用的。

hadoop生态圈经常使用的工具以下:

Hadoop是分布式系统的最佳实践。目前它已经构成了一个大数据处理生态圈,如上图所示。广义上的hadoop是一个大数据处理生态圈,狭义上的hadoop是一个分布式存储系统HDFS和MapReduce离线计算框架,在2.0以后还包括YARN。用户能够在不了解分布式底层细节的状况下,开发分布式程序,充分运用集群的威力进行高速运算和存储。

Hdfs是由NameNode,DataNode,Secondary NameNode组成。

NameNode: Master节点,在Hadoop1.X中只有一个,Namenode 管理者文件系统的Namespace。它维护着文件系统树以及文件树中全部的文件和文件夹的元数据(metadata)。在hadoop2.X中能够有两个,实现HA(High Available 高可用集群)。

DataNode: Slave节点,存储实际的数据,汇报存储信息给NameNode。

Secondary NameNode:辅助NameNode,紧急状况下,可辅助恢复NameNode,但Secondary NameNode并不是NameNode的热备。在Hadoop2.x,通常使用Hadoop HA 作为NameNode单点故障的解决方案。

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。分而治之,一个大任务分红多个小的子任务(map),并行执行后,合并结果(reduce)。MapReduce的经典案例是统计文章中单词出现的次数(WordCount),工做原理以下图:

上面的Shuffle过程比较复杂,Shuffle过程就是map输出端到reduce输入端中间的数据处理过程。官方图示以下:

map端Shuffle:
map输出结果放在内存缓冲中,在放入缓冲区以前,会先作Partition(默认对key hash后再根据配置的reduce task数量取模,平均分摊到各reducer),这与官方图示有所不一样(官方图示只是力求简洁,有些细节与实际不同,也是能够理解的)。当缓冲区中数据量达到必定阀值时,会将数据Spill(溢写)到磁盘,在Spill时,会作sort和combine操做,若是用户有设置Combiner,也是在这时候一块儿作的。最终磁盘上就会有一个或者多个文件,将这些文件合并成一个最终文件的过程叫作Merge,Merge后的最终文件是分区信息的,不一样的分区数据会被不一样的reduce task拉取。

reduce端Shuffle:
从不一样的map节点拉取map文件中对应本身分区的数据到缓冲区中,当缓冲区中数据量达到必定阀值时,会将数据溢写到磁盘,在溢写时,会作sort和combine操做,若是用户有设置Combiner,会在这时候进行操做。最后将这些文件Merge成一个最终文件,做为reducer的输入文件。
(附:上文提到的Shuffle过程详细说明能够参见: http://www.javashuo.com/article/p-nvmymfnt-hz.html

Hadoop1.x任务运行机制
在hadoop1.x中,由JobTracker负责资源管理和任务调度。用户程序提交到集群之后,JobTracker向各个节点发出请求,查看每一个节点中的task个数,根据这些信息计算资源状况,并为用户程序分配资源,将用户程序的task分配到指定节点上去执行,由TaskTracker负责执行这些task,并向JobTracker汇报任务执行状况。以下图所示:


从上面的讨论能够知道,hadoop1.x存在如下问题:
1. JobTracker是集中处理点,存在单点故障
2. JobTracker任务重,资源过分消耗
3. JobTracker的资源管理模型太过简单(直接使用task数量评估资源状况),未考虑CPU和内存,资源利用率低。

为了改进以上问题,Hadoop2.x YARN横空出世。

YARN的根据思想是:将JobTracker功能分离,将资源管理和任务调度分开。使用Resource Manager进行资源管理,使用Application master进行任务调度。

YARN经过将资源管理和任务调度分开解决MR V1中JobTracker任务重,资源过分消耗问题。Resource Manager经过zookeeper实现热备,解决单点故障问题。经过Container作为资源容器(内存,CPU等资源的封装)来解决MR V1中的资源利用低效问题。

用户向YRAN提交做业,RM为做业分配第一个Container,并与对应的NM通讯,要求它在这个Container中启动AM,AM再向RM注册本身,再向RM申请任务执行资源,而后AM通知相应的NM去执行任务,NM负责向AM汇报任务执行状况。
(附:上文提到的hadoop1.x和yarn详细说明能够参见: http://m.blog.csdn.net/article/details?id=46554057
YARN中任务执行流程能够参见: http://www.bubuko.com/infodetail-953186.html?from=timeline

咱们讲到Hadoop2.x可使用Zookeeper实现HA方案,下面咱们来讲说如何实现。

JournalNodes(JNs)进行就是为了主备NameNode间数据同步用的, 当active状态的NameNode的命名空间有任何修改时,会告知大部分的JournalNodes进程(采用的是Quorun算法,写入大部分的JNs就算写入成功)。standby状态的NameNode有能力读取JNs中的变动信息,而且一直监控edit log的变化,把变化应用于本身的命名空间。standby能够确保在集群出错时,命名空间状态已经彻底同步了 。HDFS集群中的两个NameNode都在ZooKeeper中注册,当active状态的NameNode出故障时,ZooKeeper能检测到这种状况(zkfc watch zk节点变化),它就会自动把standby状态的NameNode切换为active状态。
(附:具体ha方案实现能够参见: http://blog.csdn.net/dingchenxixi/article/details/51131493

前面咱们已经屡次提到Zookeeper,可见它在分布式系统中的重要地位。ZooKeeper的架构经过冗余服务实现 高可用性 ,主要用来解决分布式系统的一致性问题。它包括Leader节点,Follower节点,还有Observer节点。它的底层实现就是基于咱们前面提到的Zab协议。当前Leader挂掉,就会进入选举阶段,各Follower节点均可以竞选Leader,Observer不参与选举,提供读写服务,只用于系统扩展,提升读取速度。当Leader选举结束,即进入恢复阶段,进行各节点数据同步,最后才进入可提供对外服务的广播阶段。当有新节点加入时,也会先进行数据同步操做。

客户端链接到zk后,zk会根据各server的压力状况,把这个链接分配给合适的server,而后客户端若是读取数据,server会直接返回数据给客户端,若是客户端要写数据,则server会将该请求转发给leader,由Leader执行写操做,并通知全部server进行更新。
(附:zk读写流程能够参见: http://www.jianshu.com/p/e3a1637f79a1

Zookeeper提供了相似文件系统的节点配置,如/znode1/zonde1_1,在节点上面能够写入数据。应用程序能够经过建立zk节点,并在该节点上设置watch,而且实现回调方法,当该节点的数据发生变化时,zk便会调用回调方法通知应用程序。实现应用程序分布式节点对于数据的一致性要求。

zk的应用场景很广,比较常见的有如下这些:
1. 配置管理
将分布式系统中的全局配置信息放到ZK节点上进行管理,应用在启动的时候主动到zk节点上获取配置信息,同时,在节点上注册一个watcher,之后配置信息有更新时,zk都会通知应用来获取最新配置信息。

2. 统一命名服务
在分布式系统中,经过使用命名服务,客户端应用可以根据指定名字来获取资源或服务的地址。

3. 分布通知/协调
不一样系统都对zk上的同一个znode进行注册,监听znode的变化,其中一个系统update了znode,那么另外一个系统就会接收到通知,并作相应处理,大大下降系统之间的耦合度。

4. 分布式锁
因为zk能够保证数据的强一致性,因此客户端能够把znode看做是一把锁,全部客户端都试图去建立同一个znode,建立成功者得到锁。释放锁,只要删除它所建立的节点便可。

5. 集群管理
集群监控: 集群中全部机器启动时都去zk某个节点下建立ephmeral节点,而且监控其父节点的变化,当有机器宕机,那么它在父节点建立的ephemeral节点便会被删除,从而被其余机器感知到。

动态选举master:集群中全部机器启动时都去zk某个节点下建立ephemeral_sequential节点,它会自动编号,咱们默认编号最小的为master。当master挂掉以后,其对应的节点便会消失,从而被其余机器感知到,推举当前时刻编号最小的节点作为新的master。

集群管理,利用的是zk节点的临时性。

6. 队列管理
同步队列:即全部成员聚齐,队列才可用。全部成员都在某个目录下建立/member_i, 并监视这个目录,直到全部成员都到位,再进行下一步操做。

FIFO队列:先进先出队列。全部成员入队时,都在某个目录下建立sequential节点/queue_i,这样全部成员加入队伍时,都是有编号的,出队时,经过getChildren()方法返回队伍中的全部元素,而后消费其中最小的一个,就能保证FIFO。

队列管理,利用的是zk节点的序列性。
(附:zk应用场景能够参见: http://www.th7.cn/Program/java/201606/873719.shtml

上面提到的Hadoop, Yarn, Zookeeper是分布式系统基础的框架,通常都是必选的。下面咱们来谈谈其余一些根据须要选择使用的工具。

咱们首先来认识下分布式系统中的数据仓库Hive。

Hive是Hadoop生态系统中必不可少的一个工具,它提供了一种SQL方言,能够查询存储在Hadoop分布式文件系统(HDFS)中结构化的数据,Hive能够将查询转换为mapreduce任务,消除大部分的mr通用代码,下降了用户分析大数据的门槛。

Hive最适合数据仓库应用程序,使用该程序进行相关的静态数据分析,不须要快速响应给出结果,并且数据自己不会频繁变化。

要知道的是Hive只是查询HDFS上的数据的一个客户端工具,而不是一个数据库(由于它操纵的是hdfs上的数据,没有作进一步的封装)。Hive元数据存储在mysql中,Hive中的表对应hdfs目录下的文件。Hadoop以及HDFS的设计自己的约束和局限性限制了Hive所能胜任的工做。其中是大的限制是Hive不支持记录级别的更新,插入或者删除操做。用户能够经过查询生成新表或者将查询结果致使到文件中。因为Hive底层是mapredure处理,因此查询延时比较严重。另外,Hive也不支持事务。

经过上面对Hive的描述,咱们能够知道,Hive并不支持OLTP(联机事务处理)所需的关键功能,而更接近成为一个OLAP(联机分析)工具。从Hadoop视角来看,它是用于大规模数据的处理,mr开销很大,因此Hive并无知足OLAP中的”联机“部分。因些,Hive仍是最适合用于数据仓库应用程序,用它维护海量数据,进行数据挖掘,造成意见报告。若是用户须要对大规模数据使用OLTP功能的话,那么就应该选择使用一个NoSql数据库,如Hbase,这个咱们后面讨论到Hbase再详说。

讲完Hive,咱们顺便讲一下同为数据仓库的Pig。

有了Hive,为何还要Pig呢?咱们假设有以下的需求,用户的输入数据具备一个或多个源,而用户须要进行一组复杂的转换来生成一个或多个输出数据集。若是使用Hive,用户可能会使用嵌套语句来实现,但在某些时刻,太多的嵌套可能会很不方便,可能会须要从新保存到临时表来控制复杂度。Pig是一种数据流语言,而不是一种查询语言,在Pig中,用户能够声明一些关系,而这些关系都会执行新的数据转换过程,Pig会根据这些声明,建立一系列有次序的mr,来对这些数据进行转换,生成用户预期的输出结果。这种步进式的”流“比一组复杂的查询更加直观。因此Pig也经常使用于ETL(数据抽取,数据转换和数据装载)过程的一部份,也就是将外部的数据装载到Hadoop集群中,而后转换成指望的数据格式。

Pig的步进迭代式语法能够经过下面的举例窥探通常:
A = LOAD 'a.txt' AS (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double);
B = GROUP A BY (col2, col3, col4);
C = FOREACH B GENERATE group, AVG(A.col5), AVG(A.col6);
DUMP C;

无论是Hive仍是Pig,咱们均可以根据须要作选择,或者结合使用。
(附:Hive和Pig详情能够参见: http://m.blog.csdn.net/article/details?id=52424490

说完数据仓库,下面咱们来讲说大规模数据处理中用于OLTP功能的NoSql数据库Hbase。

当数据量愈来愈大,RDBMS数据库撑不住了,就出现了读写分离策略,经过一个Master专门负责写操做,多个Slave负责读操做,服务器成本倍增。随着压力增长,Master撑不住了,这时就要分库了,把关联不大的数据分开部署,一些join查询不能用了,须要借助中间层。随着数据量的进一步增长,一个表的记录愈来愈大,查询就变得很慢,因而又得搞分表,好比按ID取模分红多个表以减小单个表的记录数。经历过这些事的人都知道过程是多么的折腾。采用HBase就简单了,只须要在集群中加入新的节点便可,HBase会自动水平切分扩展,跟Hadoop的无缝集成保障了数据的可靠性(HDFS)和海量数据分析的高性能(MapReduce)。

Hbase是一个面向列的分布式存储系统,支持数据随机访问和检索,弥补了Hdfs不一样随机访问的缺陷,适合实时性要求不是很是高的业务场景。适用于半结构化或非结构化数据,对于 数据结构 字段不够肯定或杂乱无章,很难按一个概念去进行抽取的数据适合用HBase。如随着业务发展须要存储更多的字段时,RDBMS须要停机维护更改表结构,而HBase支持动态增长。

Hbase由HMaster和HRegionServer组成,利用zookeeper管理集群。Hbase能够启动多个Hmaster,由Zookeeper进行选举出Leader,也就是Active Master,其余的就是Backup Master。集群中各节点信息状态都注册到zk中,使得HMaster能够随时感知HRegionServer的健康状态。

Hbase在逻辑上将表分红多个数据块即HRegion(逻辑表中行的子集),存储在HRegionServer中,且每个HRegion只会被一个HRegionServer维护。而当某个HRegionServer死机时,HMaster会把它负责的全部HRegion标记为未分配,而后再把它们分配到其余HRegionServer中。HMaster负责管理全部HRegionServer,它自己并不存储任何数据,而只是存储数据到HRegionServer的映射关系(元数据)。

HRegion由多个HStore组成,每一个HStore对应着逻辑表中列族的存储。这里能够看面向列数据库与面向行数据库的区别,面向列数据库是将同个列的数据一块儿存储在同个数据块中,而面向行数据库是将同个行的数据一块儿存储在同个数据块中。

HStore由MemStore和StoreFile两部分组成。MemStore是内存缓冲区,用户写入的数据首先会放在MemStore,当MemStore满了之后会Flush成一个StoreFile(底层实现是HFile),当StoreFile的文件数量增加到必定阀值后,会触发Compact合并操做(跟前面讲Shuffle时提到的Merge操做有点相似),将多个StoreFile合并成一个StoreFile,合并过程当中会进行版本合并和数据删除操做(所以,能够看出HBase其实只有增长数据,全部的更新和删除操做都是在后续的Compact过程当中进行的,这样使得用户的写操做只要进入内存就能够当即返回,保证了HBaseI/O的高性能)。随着StoreFile的屡次合并,会逐步造成愈来愈大的StoreFile,当其大小超过必定阀值时,会触发Split操做,同时把当前的HRegion Split成2个HRegion,父HRegion会下线,新分出的2个HRegion(等分为两个StoreFile)会被HMaster分配到其余相应的HRegionServer,使用原先1个HRegion的负载压力分流到2个HRegion上。Hbase会根据数据行健hash分散到不一样的HRegion的。

HLog其实就是对MemStore中数据的备份,当HRegionServer意外终止时,HMaster能够根据HLog从新分配HRegion。

HBase的全部HRegion元数据存储在.META.表,.META.也会不断增加,因此也会分裂成多个HRegion,HBase把.META.的HRegion的元数据保存在-ROOT-表中,-ROOT-永远只有一个HRegion。最后为zookeeper记录-ROOT-表的位置信息。

Hbase表由行健、时间戳、列族(列)组成。比较突出的特色有:
大:一个表能够有数十亿行,上百万列;
无模式:每行都有一个可排序的主键和任意多的列,列能够根据须要动态的增长,同一张表中不一样的行能够有大相径庭的列;
面向列:面向列(族)的存储和权限控制,列(族)独立检索;
稀疏:空(null)列并不占用存储空间,表能够设计的很是稀疏;
数据多版本:每一个单元中的数据能够有多个版本,默认状况下版本号自动分配,是单元格插入时的时间戳;
数据类型单一:Hbase中的数据都是字符串,没有类型。
(附:Hbase详细介绍能够参见: http://blog.csdn.net/carl810224/article/details/51970039

上面讲的Hbase,Hive在架构上都比较简单,这里要讲一个架构上比较复杂的Kafka。

首先先看下Kafka的拓扑结构图:

对照上图,咱们来解释一下kafka的内部运行机制。kafka使用zookeeper来管理集群,集群的meta信息都保存在zk上。

Producers是消息生产者,发布消息到kafka集群。发送的消息都有其归属的类别即topic。某个topic下的消息在物理上能够存储在不一样的目录下,也就是partition(用户能够本身指定消息的partition,若是没有指定,kafka则根据消息的key作hash选出partition,若是连key也没指定,那么则使用轮询机制选出一个partition)。用户在建立topic的时候,kafka就会为该topic设置它的一些属性,如它下面的消息存储在哪些brokers中,这些brokers中谁是leader,谁是follower(上图中的leader,follower是针对topic的,因此一个broker中能够存在多个leader或是follower)。producer在这个topic下发送消息时,它就发给了它的leader,它的follower主动跟leader保持数据同步。consumers从集群中消费消息,每条消息只能被consumer组内的一个consumer消息,但能够被多个组消费。

这里重点说下上图中的Controller。在topic建立时,就是Controller为topic设置的属性信息(如它的leader,follower),删除topic时,也是Controller通知它对应的broker再也不进行该topic的数据同步请求。当某个topic的leader挂掉,也是Controller为其从新选举leader。如broker宕机,也是Controller从新设置该broker上的全部topic的属性。 Controller为何能作这些,它是利用watch zk上的节点变化和读取zk上的meta数据来实现。能够看出来,Controller就是整个集群的管家,是他安排分配一切topic。若是Controller挂掉怎么办? 这时,全部的broker就会收到zk通知,试图在zk上建立/controller节点,固然,最终只有一个broker建立成功,并成为新的Controller。
(附:kafka相关文章:
原理与安装结合起来看,更容易快速熟悉一个事物)

从上面的介绍能够看了,kafka就是一个消息系统。咱们来看下它都有哪些优点:

1. 应用解耦、良好的扩展性
若是两个应用经过接口进行通讯,那么他们之间就会形成相互制约。若是使用消息系统,则两边均可以独立的扩展或修改相关的处理过程。

2. 消息冗余,保障消息不丢失。

3. 异步通讯
不少时候,用户不想也不须要当即处理消息,消息系统提供了异步处理机制,容许用户把消息放入消息队伍,想用的时候再来处理。

4. 消息的顺序保证。
(附:消息系统应用场景能够参考: http://www.cnblogs.com/stopfalling/p/5375492.html

讲到这里,也基本结束了,hadoop生态圈实在太庞大了,不少组件也没办法详细去研究,只有等真正须要用到时,再去深刻研究。下面就简单的说说各类组件对生态圈贡献的功能,知道有这么一种需求和解决这种需求的组件的存在便可,尽可能在全局上有个宏观的认识。

Flume
日志收集工具。Flume是一个高可靠的分布式海量日志采集,聚合和传输系统。

Sqoop
数据库ETL工具。Sqoop是一个Hadoop和关系型数据库之间的数据转移工具。可将关系型数据库中的数据导入到Hadoop的HDFS中,也可将HDFS中的数据导进到关系型数据库中。

Spark
spark基础的程序抽象称为弹性分布式数据集(RDDs),是一个能够并型操做、有容错机制的数据集合。 RDDs能够经过引用外部存储系统的数据集建立(例如:共享文件系统、HDFS、HBase或其余 Hadoop 数据格式的数据源)。或者是经过在现有RDDs的转换而建立(好比:map、filter、reduce、join等等)。Spark 和 Scala 可以紧密集成,其中的 Scala 能够像操做本地集合对象同样轻松地操做分布式数据集。

spark拥有Hadoop MapReduce所具备的优势;但不一样于MapReduce的是Job中间输出结果能够保存在内存中,从而再也不须要读写HDFS,所以Spark能更好地适用于数据挖掘与机器学习等须要迭代的MapReduce的算法。是hadoop mapreduce的替代方案,基于内存计算,速度比较mr快不少。spark会用的人相对比较少。还有spark对设备要求也相对比较高。

Spark Streaming
storm是实时数据流处理,而spark streaming是准实时数据流处理,它截取小批量的数据并对之运行RDD转换。

Spark Sql
Spark SQL的前身是Shark,是Spark的一个组件,用于结构化数据的计算。Spark SQL提供了一个称为DataFrames的编程抽象,DataFrames能够充当分布式SQL查询引擎。语法示例:df.filter(df("age") > 21).show()。

Oozie
Oozie是一个工做流调度系统,用于管理Hadoop里的job。它能够把多个Map/Reduce做业组合到一个逻辑工做单元来完成指定目标。好比从数据采集,数据清洗,数据转换,存储,而后到hive分析等等,能够合并为一个逻辑工做单元来执行。

ASmbari
安装、部署、配置和管理工具。Ambari是一个对Hadoop集群进行监控和管理的基于Web的系统。

Impala
与Hive相似,也是一种数据分析工具。与hive底层使用mr不一样,impala底层使用本身的impalad进程处理数据。从客户端使用来看Impala与Hive有不少的共同之处,如数据表元数据、ODBC/JDBC驱动、SQL语法、灵活的文件格式、存储资源池等。Hive适合于长时间的批处理查询分析,而Impala适合于实时交互式SQL查询,能够先使用hive进行数据转换处理,以后使用Impala在Hive处理后的结果数据集上进行快速的数据分析。

Hue
HUE=Hadoop User Experience
Hue是一个开源的Hadoop UI系统。经过使用Hue咱们能够在浏览器端的Web控制台上与Hadoop集群进行交互来分析处理数据,例如操做HDFS上的数据,运行MapReduce Job,执行Hive的SQL语句,浏览HBase数据库等等。

Avro
数据序列化系统。Avro是一个基于二进制数据高性能传输的中间件,Avro能够将数据结构或对象转化成便于存储或传输的格式,适合于远程或本地大规模数据的存储和交换。

Chukwa
当 1000+ 以上个节点的 hadoop 集群变得常见时,集群自身的相关信息如何收集和分析呢?针对这个问题, Apache 一样提出了相应的解决方案,那就是 chukwa。

chukwa 是一个开源的用于监控大型 分布式系统的数据收集系统。Chukwa 还包含了一个强大和灵活的工具集,可用于展现、监控和分析已收集的数据。

Docker
Docker 是一个开源的应用容器引擎,让开发者能够打包他们的应用以及依赖包到一个可移植的容器中,而后发布到任何流行的  Linux 机器上。
1. 简化配置 能够直接从已有镜像开始创建项目(保持文件系统,环境变量等等一致),使开发和生产保持一致的环境变得容易。
2. 升级回滚。
3. 隔离应用。容器之间互相独立(包括操做系统)。

Nutch
Nutch 是一个开源 Java实现的 搜索引擎。它提供了咱们运行本身的搜索引擎所需的所有工具。包括 全文搜索和Web 爬虫

Lucene
是一个 开放源代码的全文检索引擎工具包,但它不是一个完整的全文检索引擎,而是一个全文检索引擎的架构,提供了完整的查询引擎和索引引擎,部分 文本分析引擎。

Solr
Solr是一个独立的 企业级搜索应用服务器,它对外提供相似于Web-service的API接口。用户能够经过http请求,向搜索引擎服务器提交必定格式的XML文件,生成索引;也能够经过Http Get操做提出查找请求,并获得XML格式的返回结果。

Lily
Lily以NoSQL技术为主题,是创建在云计算上的内容仓库(content repository)。它是基于Apache的  HBase(存储)和 Solr(索引/搜索),并提供了大型内容集合存储与检索的解决方案。

Tez
Tez是Apache最新的支持DAG做业的开源计算框架,它能够将多个有依赖的做业转换为一个做业从而大幅提高DAG做业的性能,减小MR中间结果的磁盘 IO。Tez并不直接面向最终用户——事实上它容许开发者为最终用户构建性能更快、扩展性更好的应用程序。Tez计算框架的引入,至少能够解决现有MR框架在迭代计算(如PageRank计算)和交互式计算方面的不足。

Azkaban
任务调度系统,相比oozie,其配置更简单,比较利于开发。

Phoenix
这是一个 Java中间层,可让开发者在HBase上执行SQL查询。能够把Phoenix只当作一种代替HBase的语法的一个工具。

Zeppelin
 Zeppelin是一个Web笔记形式的交互式数据查询分析工具,能够在线用scala和SQL对数据进行查询分析并生成报表。

Ganglia
Ganglia 是一款为 HPC(高性能计算)集群而设计的可扩展的分布式监控系统,它能够监视和显示集群中的节点的各类状态信息,它由运行在各个节点上的 gmond 守护进程来采集 CPU 、内存、硬盘利用率、 I/O 负载、网络流量状况等方面的数据,而后汇总到 gmetad守护进程下,使用 rrdtool 存储数据,最后将历史数据以曲线方式经过 PHP 页面呈现。

chukwa是经过收集log文件来分析集群状况的,ganglia偏向于cpu,内存,磁盘,网络等使用状况的分析监控。

到此为止,咱们应该能对愈来愈火的分布式系统有一个总体的认识。在实际应用中,再去深刻评估各个工具的适用范围,实现原理。

全文终。