个推 Spark实践教你绕过开发那些“坑”

     Spark做为一个开源数据处理框架,它在数据计算过程当中把中间数据直接缓存到内存里,能大大提升处理速度,特别是复杂的迭代计算。Spark主要包括SparkSQL,SparkStreaming,Spark MLLib以及图计算。java

 

 

Spark核心概念简介python

 

一、RDD即弹性分布式数据集,经过RDD能够执行各类算子实现数据处理和计算。好比用Spark作统计词频,即拿到一串文字进行WordCount,能够把这个文字数据load到RDD以后,调用map、reducebyKey 算子,最后执行count动做触发真正的计算。算法

 

二、宽依赖和窄依赖。工厂里面有不少流水线,一款产品上游有一我的操做,下游有人进行第二个操做,窄依赖和这个很相似,下游依赖上游。而所谓宽依赖相似于有多条流水线,A流水线的一个操做是须要依赖一条流水线B,才能够继续执行,要求两条流水线之间要作材料运输,作协调,但效率低。缓存

 

      从上图能够看到,若是B只依赖A则是一种窄依赖。像图中这种reduceByKey的操做,就是刚刚举例的宽依赖,相似于多条流水线之间某一些操做相互依赖,如:F对E、B的依赖。宽依赖最大的问题是会致使洗牌过程。服务器

 

Spark Streaming介绍 架构

 

      流式计算,即数据生成后,实时对数据进行处理。Spark 是一个批处理框架,那它如何实现流式处理?Spark 是把数据裁成一段一段的处理,即一个数据流离散化成许多个连续批次,而后Spark对每一个批次进行处理。框架

 

 

 

个推为何选择Spark运维

 

一、Spark 比较适合迭代计算,解决咱们团队在以前使用hadoop mapreduce迭代数据计算这一块的瓶颈。分布式

 

二、Spark是一个技术栈,但能够作不少类型的数据处理:批处理,SQL,流式处理以及ML等,基本知足咱们团队当时的诉求。工具

 

三、它的API抽象层次很是高,经过使用map、reduce、groupby等多种算子可快速实现数据处理,极大下降开发成本,而且灵活。另外Spark框架对于多语言支持也是很是好,不少负责数据挖掘算法同窗对于python 熟悉,而工程开发的同窗熟悉java, 多语言支持能够把开发和分析的同窗快速地引入过来。

 

四、在2014年的时候,咱们用hadoop Yarn,而Spark能够在Yarn部署起来,使用Spark大大下降了切换成本,而且能够把以前的hadoop资源利用起来。

 

五、Spark在社区很火,找资料很是方便。

 

个推数据处理架构

 

 

      上图是一个典型的lambda架构。主要分三层。上面蓝色的框,是作离线批量处理,下面一层是实时数据处理这一块,中间这一层是对于结果数据作一些存储和检索。

 

       有两种方式导入数据到HDFS,一部分数据从业务平台日志收集写入到 Kafka,而后直接Linkedin Camus(咱们作过扩展) 准实时地传输到 HDFS,另外部分数经过运维那边的脚本定时导入到 HDFS 上。

 

       离线处理部分咱们仍是使用两个方式(Hadoop MR 和 Spark)。原有的hadoop MR没有放弃掉, 由于原来不少的工程已是用MR作的了,很是稳定,没有必要推倒重来,只有部分迭代任务使用Spark 从新实现。另外Hive是直接能够跟Spark作结合,Spark Sql中就可使用Hive的命令 。

 

个推Spark集群的部署情况

 

      个推最开始用Spark是1.3.1版本,用的是刀片服务器,就是刀框里面能够塞 16 个刀片服务器,单个内存大小192G, CPU 核数是24 核的。在Spark官方也推荐用万兆网卡,大内存设备。咱们权衡了需求和成本后,选择了就用刀片机器来搭建 Spark集群。刀框有个好处就是经过背板把刀片机器链接起来,传输速度快,相对成本小。部署模式上采用的是 Spark on Yarn,实现资源复用。

 

Spark 在个推业务上的具体使用

 

一、个推作用户画像、模型迭代以及一些推荐的时候直接用了MLLib,MLLib集成了不少算法,很是方便。

 

二、个推有一个BI工具箱,让一些运营人员提取数据,咱们是用Spark SQL+Parquet格式宽表实现,Parquet是列式存储格式,使用它你不用加载整个表,只会去加载关心那些字段,大大减小IO消耗。

 

三、实时统计分析这块:例如个推有款产品叫个图,就是使用Spark streaming 来实时统计。

 

四、复杂的 ETL 任务咱们也使用 Spark。例如:咱们个推推送报表这一块,天天须要作不少维度的推送报表统计。使用 Spark 经过 cache 中间结果缓存,而后再统计其余维度,大大地减小了 I/O 消耗,显著地提高了统计处理速度。

 

个推Spark实践案例分享

 

      上图是个推热力图的处理架构。左边这一侧利用业务平台获得设备的实时位置数据,经过Spark Streaming以及计算获得每个geohash格子上的人数,而后统计结果实时传输给业务服务层,在push到客户端地图上面去渲染,最终造成一个实时热力图。Spark Streaming 主要用于数据实时统计处理上。

 

 

个推教你绕过开发那些坑

 

一、数据处理常常出现数据倾斜,致使负载不均衡的问题,须要作统计分析找到倾斜数据特征,定散列策略。

 

二、使用Parquet列式存储,减小IO,提升Spark SQL效率。

 

三、实时处理方面:一方面要注意数据源(Kafka)topic须要多个partition,而且数据要散列均匀,使得Spark Streaming的Recevier可以多个并行,而且均衡地消费数据 。使用Spark Streaming,要多经过Spark History 排查DStream的操做中哪些处理慢,而后进行优化。另一方面咱们本身还作了实时处理的监控系统,用来监控处理状况如流 入、流出数据速度等。经过监控系统报警,可以方便地运维Spark Streaming 实时处理程序。这个小监控系统主要用了 influxdb+grafana 等实现。

 

四、咱们测试网常常出现找不到第三方jar的状况,若是是用CDH的同窗通常会遇到,就是在CDH 5.4开始,CDH的技术支持人员说他们去掉了hbase等一些jar,他们认那些jar已经不须要耦合在本身的classpath中,这个状况能够经过spark.executor.extraClassPath方式添加进来。

 

五、一些新入门的人会遇到搞不清transform和action,没有明白transform是lazy的,须要action触发,而且两个action先后调用效果可能不同。

 

六、你们使用过程中,对须要重复使用的RDD,必定要作cache,性能提高会很明显。

相关文章
相关标签/搜索