GitChat 做者:潘国庆
原文:如何基于 Spark Streaming 构建实时计算平台
关注微信公众号:「GitChat 技术杂谈」 一本正经的讲技术html
###前言前端
随着互联网技术的迅速发展,用户对于数据处理的时效性、准确性与稳定性要求愈来愈高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了不少公司一个很大的挑战。java
自2015年携程实时计算平台搭建以来,通过两年多不断的技术演进,目前实时集群规模已达上百台,平台涵盖各个SBU与公共部门数百个实时应用,整年JStorm集群稳定性达到100%。目前实时平台主要基于JStorm与Spark Streaming构建而成,相信关注携程实时平台的朋友在去年已经看到一篇关于携程实时平台的分享:携程实时大数据平台实践分享。git
本次分享将着重于介绍携程如何基于Spark Streaming构建实时计算平台,文章将从如下几个方面分别阐述平台的构建与应用:github
Spark Streaming vs JStorm正则表达式
Spark Streaming设计与封装redis
Spark Streaming在携程的实践算法
曾经踩过的坑sql
将来展望数据库
###Spark Streaming vs JStorm
携程实时平台在接入Spark Streaming以前,JStorm已稳定运行有一年半,基本可以知足大部分的应用场景。接入Spark Streaming主要有如下几点考虑:首先携程使用的JStorm版本为2.1.1版本,此版本的JStorm封装与抽象程度较低,并无提供High Level抽象方法以及对窗口、状态和Sql等方面的功能支持,这大大的提升了用户使用JStorm实现实时应用的门槛以及开发复杂实时应用场景的难度。在这几个方面,Spark Streaming表现就相对好的多,不但提供了高度集成的抽象方法(各类算子),而且用户还能够与Spark SQL相结合直接使用SQL处理数据。
其次,用户在处理数据的过程当中每每须要维护两套数据处理逻辑,实时计算使用JStorm,离线计算使用Hive或Spark。为了下降开发和维护成本,实现流式与离线计算引擎的统一,Spark为此提供了良好的支撑。
最后,在引入Spark Streaming以前,咱们重点分析了Spark与Flink两套技术的引入成本。Flink当时的版本为1.2版本,Spark的版本为2.0.1。相比较于Spark,Flink在SQL与MLlib上的支持相对弱于Spark,而且公司许多部门都是基于Spark SQL与MLlib开发离线任务与算法模型,使得大大下降了用户使用Spark的学习成本。
下图简单的给出了当前咱们使用Spark Streaming与JStorm的对比:
###Spark Streaming设计与封装
在接入Spark Streaming的初期,首先须要考虑的是如何基于现有的实时平台无缝的嵌入Spark Streaming。原先的实时平台已经包含了许多功能:元数据管理、监控与告警等功能,因此第一步咱们先针对Spark Streaming进行了封装并提供了丰富的功能。整套体系总共包含了Muise Spark Core、Muise Portal以及外部系统。
####Muise Spark Core
Muise Spark Core是咱们基于Spark Streaming实现的二次封装,用于支持携程多种消息队列,其中Hermes Kafka与源生的Kafka基于Direct Approach的方式消费数据,Hermes Mysql与Qmq基于Receiver的方式消费数据。接下来将要讲的诸多特性主要是针对Kafka类型的数据源。
Muise spark core主要包含了如下特性:
Kafka Offset自动管理
支持Exactly Once与At Least Once语义
提供Metric注册系统,用户可注册自定义metric
基于系统与用户自定义metric进行预警
Long running on Yarn,提供容错机制
Kafka Offset自动管理
封装muise spark core的第一目标就是简单易用,让用户以最简单的方式可以上手使用Spark Streaming。首先咱们实现了帮助用户自动读取与存储Kafka Offset的功能,用户无需关心Offset是如何被处理的。其次咱们也对Kafka Offset的有效性进行了校验,有的用户的做业可能在中止了较长时间后从新运行会出现Offset失效的情形,咱们也对此做了对应的操做,目前的操做是将失效的Offset设置为当前有效的最老的Offset。下图展示了用户基于muise spark core编写一个Spark streaming做业的简单示例,用户只须要短短几行代码便可完成代码的初始化并建立好对应的DStream:
默认状况下,做业每次都是基于上次存储的Kafka Offset继续消费,可是用户也能够自行决定Offset的消费起点。下图中展现了设置消费起点的三种方式:
Exactly Once的实现
若是实时做业要实现端对端的exactly once则须要数据源、数据处理与数据存储的三个阶段都保证exactly once的语义。目前基于Kafka Direct API加上Spark RDD算子精确一次的保证可以实现端对端的exactly once的语义。在数据存储阶段通常实现exactly once须要保证存储的过程是幂等操做或事务操做。不少系统自己就支持了幂等操做,好比相同数据写hdfs同一个文件,这自己就是幂等操做,保证了屡次操做最终获取的值仍是相同;HBase、ElasticSearch与redis等都可以实现幂等操做。对于关系型数据库的操做通常都是可以支持事务性操做。
官方在建立DirectKafkaInputStream时只须要输入消费Kafka的From Offset,而后其自行获取本次消费的End Offset,也就是当前最新的Offset。保存的Offset是本批次的End Offset,下次消费从上次的End Offset开始消费。当程序宕机或重启任务后,这其中存在一些问题。若是在数据处理完成前存储Offset,则可能存在做业处理数据失败与做业宕机等状况,重启后会没法追溯上次处理的数据致使数据出现丢失。若是在数据处理完成后存储Offset,可是存储Offset过程当中发生失败或做业宕机等状况,则在重启后会重复消费上次已经消费过的数据。并且此时又没法保证重启后消费的数据与宕机前的数据量相同数据至关,这又会引入另一个问题,若是是基于聚合统计指标做更新操做,这会带来没法判断上次数据是否已经更新成功。
因此在muise spark core中咱们加入了本身的实现用以保证Exactly once的语义。具体的实现是咱们对Spark源码进行了改造,保证在建立DirectKafkaInputStream能够同时输入From Offset与End Offset,而且咱们在存储Kafka Offset的时候保存了每一个批次的起始Offset与结束Offset,具体格式以下:
如此作的用意在于可以确保不管是宕机仍是人为重启,重启后的第一个批次与重启前的最后一个批次数据如出一辙。这样的设计使得后面用户在后面对于第一个批次的数据处理很是灵活可变,若是用户直接忽略第一个批次的数据,那此时保证的是at most once的语义,由于咱们没法获知重启前的最后一个批次数据操做是否有成功完成;若是用户依照原有逻辑处理第一个批次的数据,不对其作去重操做,那此时保证的是at least once的语义,最终结果中可能存在重复数据;最后若是用户想要实现exactly once,muise spark core提供了根据topic、partition与offset生成UID的功能,只要确保两个批次消费的Offset相同,则最终生成的UID也相同,用户能够根据此UID做为判断上个批次数据是否有存储成功的依据。下面简单的给出了重启后第一个批次操做的行为。
Metrics系统
Musie spark core基于Spark自己的metrics系统进行了改造,添加了许多定制的metrics,而且向用户暴露了metrics注册接口,用户能够很是方便的注册本身的metrics并在程序中更新metrics的数值。最后全部的metrics会根据做业设定的批次间隔写入Graphite,基于公司定制的预警系统进行报警,前端能够经过Grafana展示各项metrics指标。
Muise spark core自己定制的metrics包含如下三种:
Fail 批次时间内spark task失败次数超过4次便报警,用于监控程序的运行状态。
Ack 批次时间内spark streaming处理的数据量小0便报警,用于监控程序是否在正常消费数据。
Lag 批次时间内数据消费延迟大于设定值便报警。
其中因为咱们大部分做业开启了Back Pressure功能,这就致使在Spark UI中看到每一个批次数据都能在正常时间内消费完成,然而可能此时kafka中已经积压了大量数据,故每一个批次咱们都会计算当前消费时间与数据自己时间的一个平均差值,若是这个差值大于批次时间,说明自己数据消费就已经存在了延迟。
下图展示了预警系统中,基于用户自定义注册的Metrics以及系统定制的Metrics进行预警。
容错
其实在上面Exactly Once一章中已经详细的描述了muise spark core如何在程序宕机后可以保证数据正确的处理。可是为了可以让Spark Sreaming可以长时间稳定的运行在Yarn集群上,还须要添加许多配置,感兴趣的朋友能够查看:Long running Spark Streaming Jobs on Yarn Cluster。
除了上述容错保证以外,Muise Portal(后面会讲)也提供了对Spark Streaming做业定时检测的功能。目前每过5分钟对当前全部数据库中状态标记为Running的Spark Streaming做业进行状态检测,经过Yarn提供的REST APIs能够根据每一个做业的Application Id查询做业在Yarn上的状态,若是状态处于非运行状态,则会尝试重启做业。
####Muise Portal
在封装完全部的Spark Streaming以后,咱们就须要有一个平台可以管理配置做业,Muise Portal就是这样的存在。Muise Portal目前主要支持了Storm与Spark Streaming两类做业,支持新建做业、Jar包发布、做业运行与中止等一系列功能。下图展示了新建做业的界面:
Spark Streaming做业基于Yarn Cluster模式运行,全部做业经过在Muise Portal上的Spark客户端提交到Yarn集群上运行。具体的一个做业运行流程以下图所示:
####总体架构
最后这边给出一下目前携程实时平台的总体架构。
###Spark Streaming在携程的实践
目前Spark Streaming在携程的业务场景主要能够分为如下几块:ETL、实时报表统计、个性化推荐类的营销场景以及风控与安全的应用。从抽象上来讲,主要能够分为数据过滤抽取、数据指标统计与模型算法的使用。
####ETL
现在市面上有形形色色的工具能够从Kafka实时消费数据并进行过滤清洗最终落地到对应的存储系统,如:Camus、Flume等。相比较于此类产品,Spark Streaming的优点首先在于能够支持更为复杂的处理逻辑,其次基于Yarn系统的资源调度使得Spark Streaming的资源配置更加灵活,最后用户能够将Spark RDD数据转换成Spark Dataframe数据,使得能够与Spark SQL相结合,而且最终将数据输出到HDFS和Alluxio等分布式文件系统时能够存储为Parquet之类的格式化数据,用户在后续使用Spark SQL处理数据时更为的简便。
目前在ETL使用场景中较为典型的是携程度假部门的Data Lake应用,度假部门使用Spark Streaming对数据作ETL操做最终将数据存储至Alluxio,期间基于muise-spark-core的自定义metric功能对数据的数据量、字段数、数据格式与重复数据进行了数据质量校验与监控,具体的监控预警已在上面说过。
####实时报表统计
实时报表统计与展示也是Spark Streaming使用较多的一个场景,数据能够基于Process Time统计,也能够基于Event Time统计。因为自己Spark Streaming不一样批次的job能够视为一个个的滚动窗口,某个独立的窗口中包含了多个时间段的数据,这使得使用Spark Streaming基于Event Time统计时存在必定的限制。通常较为经常使用的方式是统计每一个批次中不一样时间维度的累积值并导入到外部系统,如ES;而后在报表展示的时基于时间作二次聚合得到完整的累加值最终求得聚合值。下图展现了携程IBU基于Spark Streaming实现的实时看板。
####个性化推荐与风控安全
这两类应用的共同点莫过于它们都须要基于算法模型对用户的行为做出相对应的预测或分类,携程目前全部模型都是基于离线数据天天定时离线训练。在引入Spark Streaming以后,许多部门开始积极的尝试特征的实时提取、模型的在线训练。而且Spark Streaming能够很好的与Spark MLlib相结合,其中最为成功的案例为信安部门之前是基于各种过滤条件抓取攻击请求,后来他们采用离线模型训练,Spark Streaming加Spark MLlib对用户进行实时预测,性能上较JStorm(基于大量正则表达式匹配用户,十分消耗CPU)提升了十倍,漏报率下降了20%。
###曾经踩过的坑
目前携程的Spark Streaming做业运行的YARN集群与离线做业同属一个集群,这对做业不管是性能仍是稳定性都带来了诸多影响。尤为是当YARN或者Hadoop集群须要更新维护重启服务时,在很大程度上会致使Spark Streaming做业出现报错、挂掉等情况,虽然有诸多的容错保障,但也会致使数据积压数据处理延迟。后期将会独立部署Hadoop与Yarn集群,全部的实时做业都运行在独立的集群上,不受外部的影响,这也方便后期对于Flink做业的开发与维护。后期经过Alluxio实现主集群与子集群间的数据共享。
在使用过程当中,也遇到了形形色色不一样的Bug,这边简单的介绍几个较为严重的问题。首先第一个问题是,Spark Streaming每一个批次Job都会经过DirectKafkaInputStream的comput方法获取消费的Kafka Topic当前最新的offset,若是此时kafka集群因为某些缘由不稳定,就会致使 java.lang.RuntimeException: No leader found for partition xx的问题,因为此段代码运行在Driver端,若是没有作任何配置和处理的状况下,会致使程序直接挂掉。对应的解决方法是配置spark.streaming.kafka.maxRetries大于1,而且能够经过配置refresh.leader.backoff.ms参数设置每次重试的间隔时间。
其次在使用Spark Streaming与Spark Sql相结合的过程当中,也会有诸多问题。好比在使用过程当中可能出现out of memory:PermGen space,这是因为Spark sql使用code generator致使大量使用PermGen space,经过在spark.driver.extraJavaOptions中添加-XX:MaxPermSize=1024m -XX:PermSize=512m解决。还有Spark Sql须要建立Spark Warehouse,若是基于Yarn来运行,默承认能是在HDFS上建立相对应的目录,若是没有权限会报出Permission denied的问题,用户能够经过配置config("spark.sql.warehouse.dir", "file:${system:user.dir}/spark-warehouse")来解决。
###将来展望
上面主要针对Spark Streaming在携程实时平台中的运用作了详细的介绍,在使用Spark Streaming过程当中仍是存在一些痛点,好比窗口功能比较单1、基于Event Time统计指标过于繁琐以及官方在新的版本中基本没有新的特性加入等,这使得咱们更加倾向于尝试Flink。Flink基本实现了Google提出的各种实时处理的理念,引入了WaterMark的实现,感兴趣的朋友能够查看Google官方文档:The world beyond batch: Streaming 102。
目前Flink 1.4 release版本发布在即,Spark 2.2.0基于kafka数据源的Structured Streaming也支持了更多的特性。前期咱们已对Flink作了充分的调研,下半年主要工做将放在Flink的对接上。在提供了诸多实时计算框架的支持后,随之而来的是带来了更多的学习成本,从此咱们的重心将放在如何使用户更加容易的实现实时计算逻辑。其中Apache Beam对各类实时场景提供了良好的封装并对多种实时计算引擎作了支持,其次基于Stream Sql实现复杂的实时应用场景都将是咱们主要调研的方向。