友信金服公司推行全域的数据体系战略,经过打通和整合集团各个业务线数据,利用大数据、人工智能等技术构建统一的数据资产,如 ID-Mapping、用户标签等。友信金服用户画像项目正是以此为背景成立,旨在实现“数据驱动业务与运营”的集团战略。目前该系统支持日处理数据量超 10 亿,接入上百种合规数据源。算法
做者 | 杨毅,穆超峰,贺小兵,胡夕数据库
导读:当此生活节奏日益加快,企业面对不断增长的海量信息,其信息筛选和处理效率低下的困扰与日俱增。因为用户营销不够细化,企业 App 中许多不合时宜或不合偏好的消息推送很大程度上影响了用户体验,甚至引起了用户流失。在此背景下,友信金服公司推行全域的数据体系战略,经过打通和整合集团各个业务线数据,利用大数据、人工智能等技术构建统一的数据资产,如 ID-Mapping、用户标签等。友信金服用户画像项目正是以此为背景成立,旨在实现“数据驱动业务与运营”的集团战略。目前该系统支持日处理数据量超 10 亿,接入上百种合规数据源。
传统基于 Hadoop 生态的离线数据存储计算方案已在业界大规模应用,但受制于离线计算的高时延性,愈来愈多的数据应用场景已从离线转为实时。这里引用一张表格对目前主流的实时计算框架作个对比。后端
Apache Storm 的容错机制须要对每条数据进行应答(ACK),所以其吞吐量备受影响,在数据大吞吐量的场景下会有问题,所以不适用此项目的需求。浏览器
Apache Spark 整体生态更为完善,且在机器学习的集成和应用性暂时领先,但 Spark 底层仍是采用微批(Micro Batching)处理的形式。安全
Apache Flink 在流式计算上有明显优点:首先其流式计算属于真正意义上的单条处理,即每一条数据都会触发计算。在这一点上明显与 Spark 的微批流式处理方式不一样。其次,Flink 的容错机制较为轻量,对吞吐量影响较小,使得 Flink 能够达到很高的吞吐量。最后 Flink 还拥有易用性高,部署简单等优点。相比之下咱们最终决定采用基于 Flink 的架构方案。性能优化
用户画像系统目前为集团线上业务提供用户实时标签数据服务。为此咱们的服务须要打通多种数据源,对海量的数字信息进行实时不间断的数据清洗、聚类、分析,从而将它们抽象成标签,并最终为应用方提供高质量的标签服务。在此背景下,咱们设计用户画像系统的总体架构以下图所示:cookie
总体架构分为五层:数据结构
在总体架构设计方案设计完成以后,咱们针对数据也设计了详尽的处理方案。在数据处理阶段,鉴于 Kafka 高吞吐量、高稳定性的特色,咱们的用户画像系通通一采用 Kafka 做为分布式发布订阅消息系统。数据清洗阶段利用 Flink 来实现用户惟一性识别、行为数据的清洗等,去除冗余数据。这一过程支持交互计算和多种复杂算法,并支持数据实时 / 离线计算。目前咱们数据处理流程迭代了两版,具体方案以下:架构
总体数据来源包含两种:app
根据不一样业务的指标需求咱们直接从集团数据仓库抽取数据并落入 Kafka,或者直接从业务端以 CDC(Capture Data Change)的方式写入 Kafka。在计算层,数据被导入到 Flink 中,经过 DataStream 生成 ID-Mapping、用户标签碎片等数据,而后将生成数据存入 JanusGraph(JanusGraph 是以 HBase 做为后端存储的图数据库介质)与 Kafka,并由 Flink 消费落入 Kafka 的用户标签碎片数据,进行聚合生成最新的用户标签碎片(用户标签碎片是由用户画像系统获取来自多种渠道的碎片化数据块处理后生成的)。
服务层将存储层存储的用户标签碎片数据,经过 JanusGraph Spark On Yarn 模式,执行 TinkerPop OLAP 计算生成全量用户 Yids 列表文件。Yid 是用户画像系统中定义的集团级用户 ID 标识。结合 Yids 列表文件,在 Flink 中批量读取 HBase 聚合成完整用户画像数据,生成 HDFS 文件,再经过 Flink 批量操做新生成的数据生成用户评分预测标签,将用户评分预测标签落入 Phoenix,以后数据即可经过统一数据服务接口进行获取。下图完整地展现了这一流程。
为了实现用户标签的整合,用户 ID 之间的强打通,咱们将用户 ID 标识当作图的顶点、ID pair 关系看做图的边,好比已经识别浏览器 Cookie 的用户使用手机号登录了公司网站就造成了<cookie,mobile>对应关系。这样全部用户 ID 标识就构成了一张大图,其中每一个小的连通子图 / 连通分支就是一个用户的所有标识 ID 信息。
ID-Mapping 数据由图结构模型构建,图节点包含 UserKey、Device、IdCard、Phone 等类型,分别表示用户的业务 ID、设备 ID、身份证以及电话等信息。节点之间边的生成规则是经过解析数据流中包含的节点信息,以必定的优先级顺序进行节点之间的链接,从而生成节点之间的边。好比,识别了用户手机系统的 Android_ID,以后用户使用邮箱登录了公司 App,在系统中找到了业务线 UID 就造成了<Android_ID,mail>和<mail,UID>关系的 ID pair,而后系统根据节点类型进行优先级排序,生成 Android_ID、mail、UID 的关系图。数据图结构模型以下图所示:
<p style="text-align:center">Gephi</p>
1.0 版本数据处理流程在系统初期较好地知足了咱们的平常需求,但随着数据量的增加,该方案遇到了一些性能瓶颈:
<p style="text-align:center">Gephi</p>
鉴于这些问题,咱们提出了 2.0 版本的解决方案。在 2.0 版本中,咱们经过利用 HBase 列式存储、修改图数据结构等优化方案尝试解决以上三个问题。
以下图所示,2.0 版本数据处理流程大部分承袭了 1.0 版本。新版本数据处理流程在如下几个方面作了优化:
<p style="text-align:center">2.0 版本数据处理流程</p>
<p style="text-align:center">Gephi</p>
目前,线上部署的用户画像系统中的数据绝大部分是来自于 Kafka 的实时数据。随着数据量愈来愈多,系统的压力也愈来愈大,以致于出现了 Flink 背压与 Checkpoint 超时等问题,致使 Flink 提交 Kafka 位移失败,从而影响了数据一致性。这些线上出现的问题让咱们开始关注 Flink 的可靠性、稳定性以及性能。针对这些问题,咱们进行了详细的分析并结合自身的业务特色,探索并实践出了一些相应的解决方案。
下图展现了 Flink 中 checkpointing 执行流程图:
<p style="text-align:center">Flink 中 checkpointing 执行流程</p>
经过以上流程分析,咱们经过三种方式来提升 Checkpointing 性能。这些方案分别是:
CheckPoint 存储方式有 MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。由官方文档可知,不一样 StateBackend 之间的性能以及安全性是有很大差别的。一般状况下,MemoryStateBackend 适合应用于测试环境,线上环境则最好选择 RocksDBStateBackend。
这有两个缘由:首先,RocksDBStateBackend 是外部存储,其余两种 Checkpoint 存储方式都是 JVM 堆存储。受限于 JVM 堆内存的大小,Checkpoint 状态大小以及安全性可能会受到必定的制约;其次,RocksDBStateBackend 支持增量检查点。增量检查点机制(Incremental Checkpoints)仅仅记录对先前完成的检查点的更改,而不是生成完整的状态。与完整检查点相比,增量检查点能够显著缩短 checkpointing 时间,但代价是须要更长的恢复时间。
Checkpointing 须要对每一个 Task 进行数据状态采集。单个 Task 状态数据越多则 Checkpointing 越慢。因此咱们能够经过增长 Task 并行度,减小单个 Task 状态数据的数量来达到缩短 CheckPointing 时间的效果。
Flink 算子链(Operator Chains)越长,Task 也会越多,相应的状态数据也就更多,Checkpointing 也会越慢。经过缩短算子链长度,能够减小 Task 数量,从而减小系统中的状态数据总量,间接的达到优化 Checkpointing 的目的。下面展现了 Flink 算子链的合并规则:
基于以上这些规则,咱们在代码层面上合并了相关度较大的一些 Task,使得平均的操做算子链长度至少缩短了 60%~70%。
在 Flink 运行过程当中,每个操做算子都会消费一个中间 / 过渡状态的流,并对它们进行转换,而后生产一个新的流。这种机制能够类比为:Flink 使用阻塞队列做为有界的缓冲区。跟 Java 里阻塞队列同样,一旦队列达到容量上限,处理速度较慢的消费者会阻塞生产者向队列发送新的消息或事件。下图展现了 Flink 中两个操做算子之间的数据传输以及如何感知到背压的:
首先,Source 中的事件进入 Flink 并被操做算子 1 处理且被序列化到 Buffer 中,而后操做算子 2 从这个 Buffer 中读出该事件。当操做算子 2 处理能力不足的时候,操做算子 1 中的数据便没法放入 Buffer,从而造成背压。背压出现的缘由可能有如下两点:
实践中咱们经过如下方式解决背压问题。首先,缩短算子链会合理的合并算子,节省出资源。其次缩短算子链也会减小 Task(线程)之间的切换、消息的序列化 / 反序列化以及数据在缓冲区的交换次数,进而提升系统的总体吞吐量。最后,根据数据特性将不须要或者暂不须要的数据进行过滤,而后根据业务需求将数据分别处理,好比有些数据源须要实时的处理,有些数据是能够延迟的,最后经过使用 keyBy 关键字,控制 Flink 时间窗口大小,在上游算子处理逻辑中尽可能合并更多数据来达到下降下游算子的处理压力。
通过以上优化,在天天亿级数据量下,用户画像能够作到实时信息实时处理并没有持续背压,Checkpointing 平均时长稳定在 1 秒之内。
目前用户画像部分数据都是从 Hive 数据仓库拿到的,数据仓库自己是 T+1 模式,数据延时性较大,因此为了提升数据实时性,端到端的实时流处理颇有必要。
端到端是指一端采集原始数据,另外一端以报表 / 标签 / 接口的方式对这些对数进行呈现与应用,链接两端的是中间实时流。在后续的工做中,咱们计划将现有的非实时数据源所有切换到实时数据源,统一通过 Kafka 和 Flink 处理后再导入到 Phoenix/JanusGraph/HBase。强制全部数据源数据进入 Kafka 的一个好处在于它可以提升总体流程的稳定性与可用性:首先 Kafka 做为下游系统的缓冲,能够避免下游系统的异常影响实时流的计算,起到“削峰填谷”的做用;其次,Flink 自 1.4 版本开始正式支持与 Kafka 的端到端精确一次处理语义,在一致性方面上更有保证。
做者介绍:杨毅:友信金服计算平台部 JAVA 工程师穆超峰:友信金服计算平台部数据开发高级工程师贺小兵:友信金服计算平台部数据开发工程师胡夕:友信金服计算平台部技术总监