2011年小规模试水
前端
这一阶段的主要工做是创建了一个小的集群,并导入了少许用户进行测试。为了知足用户的需求,咱们还调研了任务调度系统和数据交换系统。git
咱们使用的版本是当时最新的稳定版,Hadoop 0.20.203和Hive 0.7.1。此后经历过屡次升级与Bugfix。如今使用的是Hadoop 1.0.3+自有Patch与Hive 0.9+自有Patch。考虑到人手不足及本身的Patch很少等问题,咱们采起的策略是,以Apache的稳定版本为基础,尽可能将本身的修改提交到社区,而且应用这些尚未被接受的Patch。由于如今Hadoop生态圈中尚未出现一个相似Red Hat地位的公司,咱们也不但愿被锁定在某个特定的发行版上,更重要的是Apache Jira与Maillist依然是获取Hadoop相关知识、解决Hadoop相关问题最好的地方(Cloudera为CDH创建了私有的Jira,但人气不足),因此没有采用Cloudera或者Hortonworks的发行版。目前咱们正对Hadoop 2.1.0进行测试。github
在前期,咱们团队的主要工做是ops+solution,如今DBA已接手了很大一部分ops的工做,咱们正在转向solution+dev的工做。数据库
咱们使用Puppet管理整个集群,用Ganglia和Zabbix作监控与报警。浏览器
集群搭建好,用户便开始使用,面临的第一个问题是须要任务级别的调度、报警和工做流服务。当用户的任务出现异常或其余状况时,须要以邮件或者短信的方式通知用户。并且用户的任务间可能有复杂的依赖关系,须要工做流系统来描述任务间的依赖关系。咱们首先将目光投向开源项目Apache Oozie。Oozie是Apache开发的工做流引擎,以XML的方式描述任务及任务间的依赖,功能强大。但在测试后,发现Oozie并非一个很好的选择。安全
Oozie采用XML做为任务的配置,特别是对于MapReduce Job,须要在XML里配置Map、Reduce类、输入输出路径、Distributed Cache和各类参数。在运行时,先由Oozie提交一个Map only的Job,在这个Job的Map里,再拼装用户的Job,经过JobClient提交给JobTracker。相对于Java编写的Job Runner,这种XML的方式缺少灵活性,并且难以调试和维 护。先提交一个Job,再由这个Job提交真正Job的设计,我我的认为至关不优雅。网络
另外一个问题在于,公司内的不少用户,但愿调度系统不只能够调度Hadoop任务,也能够调度单机任务,甚至Spring容器里的任务,而Oozie并不支持Hadoop集群以外的任务。多线程
因此咱们转而自行开发调度系统Taurus(https://github.com/dianping/taurus)。Taurus是一个调度系统, 经过时间依赖与任务依赖,触发任务的执行,并经过任务间的依赖管理将任务组织成工做流;支持Hadoop/Hive Job、Spring容器里的任务及通常性任务的调度/监控。架构
图1 Taurus的结构图并发
图1是Taurus的结构图,Taurus的主节点称为Master,Web界面与Master在一块儿。用户在Web界面上建立任务后,写入MySQL作持久化存储,当Master判断任务触发的条件知足时,则从MySQL中读出 任务信息,写入ZooKeeper;Agent部署在用户的机器上,观察ZooKeeper上的变化,得到任务信息,启动任务。Taurus在2012年 中上线。
另外一个迫切需求是数据交换系统。用户须要将MySQL、MongoDB甚至文件中的数据导入到HDFS上进行分析。另一些用户要将HDFS中生成的数据再导入MySQL做为报表展示或者供在线系统使用。
咱们首先调研了Apache Sqoop,它主要用于HDFS与关系型数据库间的数据传输。通过测试,发现Sqoop的主要问题在于数据的一致性。Sqoop采用MapReduce Job进行数据库的插入,而Hadoop自带Task的重试机制,当一个Task失败,会自动重启这个Task。这是一个很好的特性,大大提升了Hadoop的容错能力,但对于数据库插入操做,却带来了麻烦。
考虑有10个Map,每一个Map插入十分之一的数据,若是有一个Map插入到一半时failed,再经过Task rerun执行成功,那么fail那次插入的一半数据就重复了,这在不少应用场景下是不可接受的。 并且Sqoop不支持MongoDB和MySQL之间的数据交换,但公司内却有这需求。最终咱们参考淘宝的DataX,于2011年末开始设计并开发了Wormhole。之因此采用自行开发而没有直接使用DataX主要出于维护上的考虑,并且DataX并未造成良好的社区。
2012年大规模应用
2012年,出于成本、稳定性与源码级别维护性的考虑,公司的Data Warehouse系统由商业的OLAP数据库转向Hadoop/Hive。2012年初,Wormhole开发完成;以后Taurus也上线部署;大量应用接入到Hadoop平台上。为了保证数据的安全性,咱们开启了Hadoop的Security特性。为了提升数据的压缩率,咱们将默认存储格式替换为RCFile,并开发了Hive Web供公司内部使用。2012年末,咱们开始调研HBase。
图2 Wormhole的结构图
Wormhole(https://github.com /dianping/wormhole)是一个结构化数据传输工具,用于解决多种异构数据源间的数据交换,具备高效、易扩展等特色,由Reader、Storage、Writer三部分组成(如图2所示)。Reader是个线程池,能够启动多个Reader线程从数据源读出数据,写入Storage。Writer也是线程池,多线程的Writer不只用于提升吞吐量,还用于写入多个目的地。Storage是个双缓冲队列,若是使用一读多写,则每一个目的地都拥有本身的Storage。
当写入过程出错时,将自动执行用户配置的Rollback方法,消除错误状态,从而保证数据的完整性。经过开发不一样的Reader和Writer插件,如MySQL、MongoDB、Hive、HDFS、SFTP和Salesforce,咱们就能够支持多种数据源间的数据交换。Wormhole在大众点评内部获得了大量使用,得到了普遍好评。
随着愈来愈多的部门接入Hadoop,特别是数据仓库(DW)部门接入后,咱们对数据的安全性需求变得更为迫切。而Hadoop默认采用Simple的用户认证模式,具备很大的安全风险。
默认的Simple认证模式,会在Hadoop的客户端执行whoami命令,并以whoami命令的形式返回结果,做为访问Hadoop的用户名(准确地说,是以whoami的形式返回结果,做为Hadoop RPC的userGroupInformation参数发起RPC Call)。这样会产生如下三个问题。
(1)User Authentication。假设有帐号A和帐号B,分别在Host1和Host2上。若是恶意用户在Host2上创建了一个同名的帐号A,那么经过RPC Call得到的UGI就和真正的帐号A相同,伪造了帐号A的身份。用这种方式,恶意用户能够访问/修改其余用户的数据。
(2)Service Authentication。Hadoop采用主从结构,如NameNode-DataNode、JobTracker-Tasktracker。Slave节点启动时,主动链接Master节点。Slave到Master的链接过程,没有通过认证。假设某个用户在某台非Hadoop机器上,错误地启动了一个Slave实例,那么也会链接到Master;Master会为它分配任务/数据,可能会影响任务的执行。
(3)可管理性。任何能够连到Master节点的机器,均可以请求集群的服务,访问HDFS,运行Hadoop Job,没法对用户的访问进行控制。
从Hadoop 0.20.203开始,社区开发了Hadoop Security,实现了基于Kerberos的Authentication。任何访问Hadoop的用户,都必须持有KDC(Key Distribution Center)发布的Ticket或者Keytab File(准确地说,是Ticket Granting Ticket),才能调用Hadoop的服务。用户经过密码,获取Ticket,Hadoop Client在发起RPC Call时读取Ticket的内容,使用其中的Principal字段,做为RPC Call的UserGroupInformation参数,解决了问题(1)。Hadoop的任何Daemon进程在启动时,都须要使用Keytab File作Authentication。由于Keytab File的分发是由管理员控制的,因此解决了问题(2)。最后,不管是Ticket,仍是Keytab File,都由KDC管理/生成,而KDC由管理员控制,解决了问题(3)。
在使用了Hadoop Security以后,只有经过了身份认证的用户才能访问Hadoop,大大加强了数据的安全性和集群的可管理性。以后咱们基于Hadoop Secuirty,与DW部门一块儿开发了ACL系统,用户能够自助申请Hive上表的权限。在申请经过审批工做流以后,就能够访问了。
JDBC是一种很经常使用的数据访问接口,Hive自带了Hive Server,能够接受Hive JDBC Driver的链接。实际 上,Hive JDBC Driver是将JDBC的请求转化为Thrift Call发给Hive Server,再由Hive Server将Job启动起来。但Hive自带的Hive Server并不支持Security,默认会使用启动Hive Server的用户做为Job的owner提交到Hadoop,形成安全漏洞。所以,咱们本身开发了Hive Server的Security,解决了这个问题。
但在Hive Server的使用过程当中,咱们发现Hive Server并不稳定,并且存在内存泄漏。更严重的是因为Hive Server自身的设计缺陷,不能很好地应对并发访问的状况,因此咱们如今并不推荐使用Hive JDBC的访问方式。
社区后来从新开发了Hive Server 2,解决了并发的问题,咱们正在对Hive Server 2进行测试。
有一些同事,特别是BI的同事,不熟悉以CLI的方式使用Hive,但愿Hive能够有个GUI界面。在上线Hive Server以后,咱们调研了开源的SQL GUI Client——Squirrel,惋惜使用Squirrel访问Hive存在一些问题。
基于以上考虑,咱们本身开发了Hive Web,让用户经过浏览器就可使用Hive。Hive Web最初是做为大众点评第一届Hackathon的一个项目被开发出来的,技术上很简单,但得到了良好的反响。如今Hive Web已经发展成了一个RESTful的Service,称为Polestar(https://github.com/dianping /polestar)。
图3 Polestar的结构
图3是Polestar的结构图。目前Hive Web只是一个GWT的前端,经过HAProxy将RESTfull Call分发到执行引擎Worker执行。Worker将自身的状态保存在MySQL,将数据保存在HDFS,并使用JSON返回数据或数据在HDFS的 路径。咱们还将Shark与Hive Web集成到了一块儿,用户能够选择以Hive或者Shark执行Query。
一开始咱们使用LZO做为存储格式,使大文件能够在MapReduce处理中被切分,提升并行度。但LZO的压缩比不够高,按照咱们的测试,Lzo压缩的文件,压缩比基本只有Gz的一半。
通过调研,咱们将默认存储格式替换成RCFile,在RCFile内部再使用Gz压缩,这样既可保持文件可切分的特性,同时又可得到Gz的高压缩比,并且因 为RCFile是一种列存储的格式,因此对于不须要的字段就不用从I/O读入,从而提升了性能。图4显示了将Nginx数据分别用Lzo、RCFile+Gz、RCFfile+Lzo压缩,再不断增长Select的Column数,在Hive上消耗的CPU时间(越小越好)。
图4 几种压缩方式在Hive上消耗的CPU时间
但RCFile的读写须要知道数据的Schema,并且须要熟悉Hive的Ser/De接口。为了让MapReduce Job能方便地访问RCFile,咱们使用了Apache Hcatalog。
社区又针对Hive 0.11开发了ORCFile,咱们正在对ORCFile进行测试。
随着Facebook、淘宝等大公司成功地在生产环境应用HBase,HBase愈来愈受到你们的关注,咱们也开始对HBase进行测试。经过测试咱们发现HBase很是依赖参数的调整,在默认配置下,HBase能得到很好的写性能,但读性能不是特别出色。经过调整HBase的参数,在5台机器的HBase集群上,对于1KB大小的数据,也能得到5万左右的TPS。在HBase 0.94以后,HBase已经优化了默认配置。
原来咱们但愿HBase集群与主Hadoop集群共享HDFS,这样能够简化运维成本。但在测试中,发现即便主Hadoop集群上没有任何负载,HBase的性能也很糟糕。咱们认为,这是因为大量数据属于远程读写所引发的。因此咱们如今的HBase集群都是单独部署的。而且经过封装HBase Client与Master-Slave Replication,使用2套HBase集群实现了HBase的HA,用来支撑线上业务。
2013年持续演进
在创建了公司主要的大数据架构后,咱们上线了HBase的应用,并引入Spark/Shark以提升Ad Hoc Query的执行时间,并调研分布式日志收集系统,来取代手工脚本作日志导入。
如今HBase上线的应用主要有OpenAPI和手机团购推荐。OpenAPI相似于HBase的典型应用Click Stream,将开放平台开发者的访问日志记录在HBase中,经过Scan操做,查询开发者在一段时间内的Log,但这一功能目前尚未对外开放。手机 团购推荐是一个典型的KVDB用法,将用户的历史访问行为记录在HBase中,当用户使用手机端访问时,从HBase得到用户的历史行为数据,作团购推 荐。
当Hive大规模使用以后,特别是原来使用OLAP数据库的BI部门的同事转入后,一个愈来愈大的抱怨就是Hive的执行速度。对于离 线的ETL任务,Hadoop/Hive是一个良好的选择,但动辄分钟级的响应时间,使得Ad Hoc Query的用户难以忍受。为了提升Ad Hoc Query的响应时间,咱们将目光转向了Spark/Shark。
Spark是美国加州大学伯克利分校AMPLab开发的分布式计算系统,基于RDD(Resilient Distributed Dataset),主要使用内存而不是硬盘,能够很好地支持迭代计算。由于是一个基于Memory的系统,因此在数据量可以放进Memory的状况下,能 够大幅缩短响应时间。Shark相似于Hive,将SQL解析为Spark任务,而且Shark复用了大量Hive的已有代码。
在Shark接入以后,大大下降了Ad Hoc Query的执行时间。好比SQL语句:
select host, count(1) from HIPPOLOG where dt = '2013-08-28' group by host order by host desc;
在Hive执行的时间是352秒,而Shark只须要60~70秒。但对于Memory中放不下的大数据量,Shark反而会变慢。
目前用户须要在Hive Web中选择使用Hive仍是Shark,将来咱们会在Hive中添加Semantic-AnalysisHook,经过解析用户提交的Query,根据 数据量的大小,自动选择Hive或者Shark。另外,由于咱们目前使用的是Hadoop 1,不支持YARN,因此咱们单独部署了一个小集群用于Shark任务的执行。
Wormhole解决告终构化数据的交换问题,但对于非结构化数据,例如各类日志,并不适合。咱们一直采用脚本或用户程序直接写HDFS的方式将用户的Log导入HDFS。缺点是,须要必定的开发和维护成本。咱们 但愿使用Apache Flume解决这个问题,但在测试了Flume以后,发现了Flume存在一些问题:Flume不能保证端到端的数据完整性,数据可能丢失,也可能重复。
例如,Flume的HDFSsink在数据写入/读出Channel时,都有Transcation的保证。当Transaction失败时,会回滚,而后重试。但因为HDFS不可修改文件的内容,假设有1万行数据要写入HDFS,而在写入5000行时,网络出现问题致使写入失败,Transaction回滚,而后重写这10000条记录成功,就会致使第一次写入的5000行重复。咱们试图修正Flume的这些问题,但因为这些问题是设计上的,并不能经过简单的Bugfix来解决,因此咱们转而开发Blackhole系统将数据流导入HDFS。目前Blackhole正在开发中。
总结
图5是各系统整体结构图,深蓝部分为自行开发的系统。
图5 大众点评各系统整体结构图
在这2年多的Hadoop实践中,咱们获得了一些宝贵经验。
做者房明,大众点评网平台架构组高级工程师,Apache Contributor。2011年加入点评网,目前负责大数据处理的基础架构及全部Hadoop相关技术的研发。
出处:http://www.csdn.net/article/2013-12-18/2817838-big-data-practice-in-dianping