如何用 Hadoop/Spark 构建七牛数据平台

数据平台在大部分公司都属于支撑性平台,作的很差马上会被吐槽,这点和运维部门很像。因此在技术选型上优先考虑现成的工具,快速出成果,不必去担忧有技术负担。早期,咱们走过弯路,认为没多少工做量,收集存储和计算都本身研发,发现是吃力不讨好。去年上半年开始,咱们全面拥抱开源工具,搭建本身的数据平台。html

一、数据平台设计理念

公司的主要数据来源是散落在各个业务服务器上的半结构化日志,好比系统日志、程序日志、访问日志、审计日志等。日志是最原始的数据记录,若是不是日志,确定会有信息上的丢失。说个简单的例子,需求是统计Nginx上每一个域名的的流量,这个彻底能够经过一个简单的Nginx模块去完成,可是若是统计的是不一样来源的流量就没法作了。因此须要原始的完整的日志。redis

有种手法是业务程序把日志经过网络直接发送出去,可是这并不可取,由于网络和接收端并不彻底可靠,当出问题时会对业务形成影响或者日志丢失。所以,对业务侵入最小最天然的方式是把日志落到本地硬盘上。数据库

二、数据平台设计架构

 

2.1 Agent设计需求

每台机器上会有一个Agent去同步这些日志,这是个典型的队列模型,业务进程在不断的push,Agent在不停的pop。Agent须要有记忆功能,用来保存同步的位置(offset),这样才尽量保证数据准确性,但不可能作到彻底准确。因为发送数据和保存offset是两个动做,不具备事务性,不可避免的会出现数据不一致性状况,一般是发送成功后保存offset,那么在Agent异常退出或机器断电时可能会形成多余的数据。安全

在这里,Agent须要足够轻,这主要体如今运维和逻辑两个方面。Agent在每台机器上都会部署,运维成本、接入成本是须要考虑的。Agent不该该有解析日志、过滤、统计等动做,这些逻辑应该给数据消费者。假若Agent有较多的逻辑,那它是不可完成的,不可避免的常常会有升级变动动做。性能优化

2.2 数据收集流程

数据收集这块的技术选择,Agent是用Go本身研发的,消息中间件Kafka,数据传输工具Flume。说到数据收集常常有人拿Flume和Kafka作比较,我看来这二者定位是不一样的,Flume更倾向于数据传输自己,Kakfa是典型的消息中间件用于解耦生产者消费者。服务器

具体架构上,Agent并没把数据直接发送到Kafka,在Kafka前面有层由Flume构成的forward。这样作有两个缘由:网络

1. Kafka的API对非JVM系的语言支持很不友好,forward对外提供更加通用的http接口。架构

2. forward层能够作路由、Kafka topic和Kafka partition key等逻辑,进一步减小Agent端的逻辑。并发

forward层不含状态,彻底能够作到水平扩展,不用担忧成为瓶颈。出于高可用考虑,forward一般不止一个实例,这会带来日志顺序问题,Agent按必定规则(round-robin、failover等)来选择forward实例,即便Kafka partition key同样,因为forward层的存在,最终落入Kafka的数据顺序和Agent发送的顺序可能会不同。咱们对乱序是容忍的,由于产生日志的业务基本是分布式的,保证单台机器的日志顺序意义不大。若是业务对顺序性有要求,那得把数据直接发到Kafka,并选择好partition key,Kafka只能保证partition级的顺序性。运维

2.3 跨机房收集要点

多机房的情形,经过上述流程,先把数据汇到本地机房Kafka 集群,而后汇聚到核心机房的Kafka,最终供消费者使用。因为Kafka的mirror对网络不友好,这里咱们选择更加的简单的Flume去完成跨机房的数据传送。Flume在不一样的数据源传输数据仍是比较灵活的,但有几个点须要注意:

1. memory-channel效率高但可能有丢数据的风险,file-channel安全性高但性能不高。咱们是用memory-channel,但把capacity设置的足够小,使内存中的数据尽量少,在乎外重启和断电时丢的数据不多。我的比较排斥file-channel,效率是一方面,另外一个是对Flume的指望是数据传输,引入file-channel时,它的角色会向存储转变,这在整个流程中是不合适的。一般Flume的sink端是Kafka和HDFS这种可用性和扩张性比较好的系统,不用担忧数据拥堵问题。

2. 默认的http souce 没有设置线程池,有性能问题,若是有用到,须要本身修改代码。

3. 单sink速度跟不上时,须要多个sink。像跨机房数据传输网络延迟高单rpc sink吞吐上不去和HDFS sink效率不高情形,咱们在一个channel后会配十多个sink。

笔者注:

七牛遇到的flume入hdfs的性能问题我们这边也遇到了。这边开始也是提升sink的数量,可是这没从根本解决问题,这个问题须要在flume收集日志的机器进行消息合并才是根本。单纯增长sink数量会形成hdfs小文件过多,增长nn内存压力。不是好的解决方案。

2.4 Kafka使用要点

Kafka在性能和扩展性很不错,如下几个点须要注意下:

1. topic的划分,大topic对生产者有利且维护成本低,小topic对消费者比较友好。若是是彻底不相关的相关数据源且topic数不是发散的,优先考虑分topic。

2. Kafka的并行单位是partition,partition数目直接关系总体的吞吐量,但parition数并非越大越高,3个partition就能吃满一块普通硬盘IO了。因此partition数是由数据规模决定,最终仍是须要硬盘来抗。

3. partition key选择不当,可能会形成数据倾斜。在对数据有顺序性要求才需使用partition key。Kafka的producer sdk在没指定partition key时,在必定时间内只会往一个partition写数据,这种状况下当producer数少于partition数也会形成数据倾斜,能够提升producer数目来解决这个问题。

2.5 数据离线和实时计算

数据到Kafka后,一路数据同步到HDFS,用于离线统计。另外一路用于实时计算。因为今天时间有限,接下来只能和你们分享下实时计算的一些经验。

实时计算咱们选择的Spark Streaming。咱们目前只有统计需求,没迭代计算的需求,因此Spark Streaming使用比较保守,从Kakfa读数据统计完落入mongo中,中间状态数据不多。带来的好处是系统吞吐量很大,但几乎没遇到内存相关问题

Spark Streaming对存储计算结果的数据库tps要求较高。好比有10万个域名须要统计流量,batch interval为10s,每一个域名有4个相关统计项,算下来平均是4万 tps,考虑到峰值可能更高,固态硬盘上的mongo也只能抗1万tps,后续咱们会考虑用redis来抗这么高的tps

有外部状态的task逻辑上不可重入的,当开启speculation参数时候,可能会形成计算的结果不许确。说个简单的例子。这个任务,若是被重作了,会形成落入mongo的结果比实际多。

有状态的对象生命周期很差管理,这种对象不可能作到每一个task都去new一个。咱们的策略是一个JVM内一个对象,同时在代码层面作好并发控制。相似下面。

 

在Spark1.3的后版本,引入了 Kafka Direct API试图来解决数据准确性问题,使用Direct在必定程序能缓解准确性问题,但不可避免还会有一致性问题。为何这样说呢?Direct API 把Kafka consumer offset的管理暴露出来(之前是异步存入ZooKeeper),当保存计算结果和保存offset在一个事务里,才能保证准确。

这个事务有两种手段作到,一是用MySQL这种支持事务的数据库保存计算结果offset,一是本身实现两阶段提交。这两种方法在流式计算里实现的成本都很大。其次Direct API 还有性能问题,由于它到计算的时候才实际从Kafka读数据,这对总体吞吐有很大影响。

三、七牛数据平台规模

 

要分享的就这些了,最后秀下咱们线上的规模:Flume + Kafka + Spark8台高配机器,日均500亿条数据,峰值80万tps。

Refer:

[1] 美团 Spark 性能优化指南——基础篇

http://tech.meituan.com/spark-tuning-basic.html

[2] 诸葛io基于Spark的用户行为路径分析的产品化实践

http://bit.ly/2n58OIA

相关文章
相关标签/搜索