本文由京东搜索算法架构团队分享,主要介绍 Apache Flink 在京东商品搜索排序在线学习中的应用实践。文章的主要大纲以下:算法
一、背景缓存
二、京东搜索在线学习架构网络
三、实时样本生成多线程
四、Flink Online Learning架构
五、监控系统并发
六、规划总结框架
在京东的商品搜索排序中,常常会遇到搜索结果多样性不足致使系统非最优解的问题。为了解决数据马太效应带来的模型商品排序多样性的不足,咱们利用基于二项式汤普森采样建模,可是该算法仍存在对全部用户采用一致的策略,未有效考虑用户和商品的个性化信息。基于该现状,咱们采起在线学习,使深度学习和汤普森采样融合,实现个性化多样性排序方案,实时更新模型的关参数。机器学习
在该方案中,Flink 主要应用于实时样本的生成和 online learning 的实现。在在线学习过程当中,样本是模型训练的基石,在超大规模样本数据的处理上,咱们对比了 Flink、Storm 和 Spark Streaming 以后,最终选择用 Flink 做为实时样本流数据的生产以及迭代 online learning 参数的框架。在线学习的总体链路特别长,涉及在线端特征日志、流式特征处理、流式特征与用户行为标签关联、异常样本处理、模型动态参数实时训练与更新等环节,online learning 对样本处理和参数状态处理的准确性和稳定性要求较高,任何一个阶段都有可能出现问题,为此咱们接入京东的 observer 体系,拥有完整的全链路监控系统,保证各个阶段数据的稳定性和完整性;下面咱们首先介绍一下京东搜索在线学习架构。异步
京东搜索的排序模型系统架构主要包括如下几个部分:分布式
一、Predictor 是模型预估服务,在 load 模型中分为 static 部分和 dynamic 部分,static 部分由离线数据训练获得,主要学习 user 和 doc 的稠密特征表示,dynamic 部分主要包含 doc 粒度的权重向量,这部分由实时的 online learning 任务实时更新。 二、Rank 主要包括一些排序策略,在排序最终结果肯定以后,会实时落特征日志,将 doc 的特征按顺序写入特征数据流,做为后续实时样本的数据源(feature)。 三、Feature Collector 的任务是承接在线预估系统发出的特征数据,对下游屏蔽缓存、去重、筛选等在线系统特有逻辑,产出 Query+Doc 粒度的特征流。 四、Sample join 的任务将上面的 feature 数据、曝光、点击、加购、下单等用户行为标签数据做为数据源,经过 Flink 的 union + timer 数据模型关联成为符合业务要求的样本数据,算法可根据目标需求选择不一样的标签做为正负样本标记。 五、**Online learning **任务负责消费上游生成的实时样本作训练,负责更新 model 的 dynamic 部分。
Online Learning 对于在线样本生成的时效性和准确性都有很高的要求,同时也对做业的稳定性有很高的要求。在海量用户日志数据实时涌入的状况下,咱们不只要保证做业的数据延时低、样本关联率高且任务稳定,并且做业的吞吐不受影响、资源使用率达到最高。
京东搜索排序在线样本的主要流程以下:
一、数据源大体有曝光流、feature 流和用户行为流等做为实时样本的数据源,统一以 JDQ 管道流的形式,由京东实时计算平台提供平台支撑。 二、接到 feature 流和曝光流、label 流后,进行数据清洗,获得任务须要的数据格式。 三、拿到各个标准流后,对各个流进行 union 操做,以后进行 keyby。 四、咱们在 process function 里面添加 Flink timer 定时器,做为样本生成的实时窗口。 五、将生成的样本实时落入 jdq 和 HDFS,jdq 能够用做后面的 online learning 的 input,HDFS 持久存储样本数据,用于离线训练、增量学习和数据分析。
在线样本任务优化实践:
京东搜索样本数据吞吐量每秒达到 GB 规模,对分布式处理分片、超大状态和异常处理提出很高的优化要求。
使用 keyby 的时候,不免会有数据倾斜的状况,这里咱们假设 key 设计合理、 shuffle 方式选择正确、任务没有反压且资源足够使用,因为任务 parallelism 设置致使的数据倾斜的状况。咱们先看 Flink 里面 key 是如何被分发到 subtask 上面的。
keygroup = assignToKeyGroup(key, maxParallelism) subtaskNum = computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroupId)复制代码
假设咱们的并发设置的是 300,那么 maxParallelism 就是 512,如此设计,必然致使有的 subtask 分布 1 个 keygroup 有的分配两个,同时也致使了数据天然倾斜。针对上述问题,有两个解决方案:
● 设置并行度为 2 的 n 次方; ● 设置最大并行度为 并行度的 n 倍。
若是使用方案 1 ,调整并发的话只能调整 2 的幂次,建议使用方案 2,且假如 parallelism 为 300,maxParallelism 设置为 1200 的状况下假如数据仍是有倾斜,能够再相应的把 maxParallelism 设置大一些保证每一个 keygroup 的 key 少一些,如此也能够下降数据倾斜的发生。
在线样本用到了 Flink 的 state,咱们以前默认将 state 放到了内存里面,可是随着放量的增长,state 数据量激增,发现 GC 时间特别长,以后改变策略,将 state 放入了 RocksDB,GC 问题得以解决。咱们针对 checkpoint 作了以下配置:
● 开启增量 checkpoint; ● 合理设置 checkpoint 的超时时间、间隔时间和最小暂停时间。
● 让 Flink 本身管理 RocksDB 占用的内存,对 RocksDB 的 blockcache、writebuffer 等进行调优。 ● 优化 state 的数据使用,将 state 数据放入多个 state object 里面使用,下降序列化/反序列化的代价。
在任务调优的时候咱们发现咱们的任务访问 RocksDB 的时间很是长,查看 jstack 发现,不少线程都在等待数据的序列化和反序列化,随着算法特征的逐渐增多,样本中的特征个数超过 500 个,使得每条数据的量级愈来愈大。可是在作样本关联的时候实际上是不须要特征关联的,只须要相应的主键关联就能够了,所以,咱们用 ValueState 存储主键,用 MapState/ListState 存储特征等值。固然了还能够将这些特征值存储到外部存储里面,这里就须要对网络 io 和 本地 io 之间的选择作一个取舍。
● failure recovery 的时候开启本地恢复。
因为咱们的 checkpoint 数据达到了 TB 级别,一旦任务发生 failover,不论是针对 HDFS 仍是针对任务自己,压力都很是大,所以,咱们优先使用本地进行 recovery,这样,不只能够下降 HDFS 的压力还能够增长 recovery 的速度。
对于 online learning,咱们先介绍一下伯努利汤普森采样算法,假设每一个商品的 reward 几率服从 Beta 分布,所以给每一个商品维护两个参数成功次数 si 及失败次数 fi,及全部商品的公共先验参数成功次数 α 和失败次数 β。
每次根据商品相应的 Beta 分布采样为最优商品的指望 reward: Q(at) = θi,并选择指望 reward 最大的商品展示给用户。最后根据环境给出真实 reward,更新模型相应的参数达到 online learning 的效果。该参数表明一个商品特征,用一个 n 维向量表示,该向量由原始特征经过 MLP 网络预测获得。原始特征通过 DNN 网络获得一个 N 维向量做为该商品的个性化表征,采用 Logistic Regression 函数建模似然函数,利用 Flink 构建该表征和实时反馈所组成的实时样本,用于不断迭代近似更新参数分布。
从 jdq 接过实时样本以后,因为以前并无保证数据的有序性,这里采用 watermark 机制保证数据的有序性。
把只曝光无行为的商品看作负样本,有点击及后续行为的商品看作正样本,当窗口将达到必定正负比例或数据量时进行一次 batch 训练,迭代出新的参数向量,将商品 embedding 数据放到 Flink 的 state 里面,以后做为 model 的 dynamic 部分更新参数。
个性化 ee 参数在线学习采用异步更新方式的时候,存在参数更新顺序错乱问题,这会下降在线学习模型收敛速度,从而形成了流量的浪费,所以,参数异步更新方式更改成同步更新方式,避免参数读写错乱问题。在同步更新的方式下,存储在 status 中的参数向量须要在下一次训练迭代时使用,若参数发生丢失会使该商品的迭代过程当中断,为防止系统风险形成参数丢失,设计了参数双重保障。通常的任务异常或重启后参数可从 checkpoint 或 savepoint 中恢复,若是意外状况下参数没法恢复,从远程在线服务中取回上一版参数并记录到 state。
在线学习任务使用同一个 Flink 任务来支持多个版本模型在不一样实验桶下进行 AB 实验,经过版本号区分不一样的 AB 流量桶,对应的实时样本以 docid+version 做为 key 进行处理,迭代过程互不影响。
为了提升带宽利用率以及性能的需求,咱们内部采用 pb 格式传输数据,通过调研,pb 的传输格式优于 Flink 的兜底的 general class 的 kryo 序列化方式,所以咱们采用了 Flink 的 custom serialization 解决方案,直接用 pb 格式在 op 之间传输数据。
这里咱们区分业务全链路监控和任务稳定性相关监控,具体状况下面将详细介绍。
整个系统使用京东内部的 observer 平台来实现业务全链路监控,主要包括 predictor 服务相关的监控、feature dump 的 QPS 监控、特征和标签质量监控、关联状况监控、train 相关的监控以及 AB 指标相关的一些监控,以下:
任务稳定性监控这里主要是指 Flink 的任务稳定性监控,链路吞吐量达 GB/s规模,特征消息 QPS 达 10W 规模,且 online learning 的不可间断性,无论对于在线样本任务仍是 online learning 的任务,相关监控报警都是必不可少的。
■ 容器的内存、cpu 监控、thread 个数,gc 监控
■ 样本相关业务监控
Flink 在实时数据处理方面有优秀的性能、容灾、吞吐等表现、算子丰富易上手使用、天然支持批流一体化,且目前已有在线学习的框架开源,作在线学习是个不二的选择,随着机器学习数据规模的扩大和对数据时效性、模型时效性要求的提高,在线学习不只仅做为离线模型训练的补充,更成为模型系统效率发展的趋势。为此咱们作的规划以下:
做者致谢:感谢实时计算研发部、搜索排序算法团队的支持。