Apache Flink在惟品会的实践

做者:王新春docker

整理:郭旭策安全

本文来自于王新春在2018年7月29日 Flink China社区线下 Meetup·上海站的分享。王新春目前在惟品会负责实时平台相关内容,主要包括实时计算框架和提供实时基础数据,以及机器学习平台的工做。以前在美团点评,也是负责大数据平台工做。他已经在大数据实时处理方向积累了丰富的工做经验。bash

本文主要内容主要包括如下几个方面:微信

  1. 惟品会实时平台现状网络

  2. Apache Flink(如下简称Flink)在惟品会的实践架构

  3. Flink On K8S框架

  4. 后续规划机器学习

1、惟品会实时平台现状

目前在惟品会实时平台并非一个统一的计算框架,而是包括 Storm,Spark,Flink 在内的三个主要计算框架。因为历史缘由,当前在 Storm 平台上的 job 数量是最多的,可是从去年开始,业务重心逐渐切换到 Flink 上面,因此今年在 Flink 上面的应用数量有了大幅增长。oop

实时平台的核心业务包含八大部分:实时推荐做为电商的重点业务,包含多个实时特征;大促看板,包含各类维度的统计指标(例如:各类维度的订单、UV、转化率、漏斗等),供领导层、运营、产品决策使用;实时数据清洗,从用户埋点收集来数据,进行实时清洗和关联,为下游的各个业务提供更好的数据;此外还有互联网金融、安全风控、与友商比价等业务,以及 Logview、Mercury、Titan 做为内部服务的监控系统、VDRC 实时数据同步系统等。性能

实时平台的职责主要包括实时计算平台和实时基础数据。实时计算平台在 Storm、Spark、Flink 等计算框架的基础上,为监控、稳定性提供了保障,为业务开发提供了数据的输入与输出。实时基础数据包含对上游埋点的定义和规范化,对用户行为数据、MySQL 的 Binlog 日志等数据进行清洗、打宽等处理,为下游提供质量保证的数据。

在架构设计上,包括两大数据源。一种是在App、微信、H5等应用上的埋点数据,原始数据收集后发送到在kafka中;另外一种是线上实时数据的 MySQL Binlog 日志。数据在计算框架里面作清洗关联,把原始的数据经过实时ETL为下游的业务应用(包括离线宽表等)提供更易于使用的数据。

2、Flink在惟品会的实践

场景一:Dataeye实时看板

Dataeye 实时看板是支持须要对全部的埋点数据、订单数据等进行实时计算时,具备数据量大的特色,而且须要统计的维度有不少,例如全站、二级平台、部类、档期、人群、活动、时间维度等,提升了计算的复杂程度,统计的数据输出指标每秒钟能够达到几十万。

以 UV 计算为例,首先对 Kafka 内的埋点数据进行清洗,而后与Redis数据进行关联,关联好的数据写入Kafka中;后续 Flink 计算任务消费 Kafka 的关联数据。一般任务的计算结果的量也很大(因为计算维度和指标特别多,能够达到上千万),数据输出经过也是经过 Kafka 做为缓冲,最终使用同步任务同步到 HBase 中,做为实时数据展现。同步任务会对写入 HBase 的数据限流和同类型的指标合并,保护 HBase。与此同时还有另外一路计算方案做为容灾。

在以 Storm 进行计算引擎中进行计算时,须要使用 Redis 做为中间状态的存储,而切换到 Flink 后,Flink 自身具有状态存储,节省了存储空间;因为不须要访问 Redis,也提高了性能,总体资源消耗下降到了原来的1/3。

在将计算任务从 Storm 逐步迁移到Flink的过程当中,对两路方案前后进行迁移,同时将计算任务和同步任务分离,缓解了数据写入 HBase 的压力。

切换到 Flink 后也须要对一些问题进行追踪和改进。对于 FlinkKafkaConsumer,因为业务缘由对 kafka 中的 Aotu Commit 进行修改,以及对 offset 的设定,须要本身实现支持 kafka 集群切换的功能。对不带 window 的state 数据须要手动清理。还有计算框架的通病——数据倾斜问题须要处理。同时对于同步任务追数问题,Storm能够从 Redis 中取值,Flink 只能等待。

场景二:Kafka数据落地HDFS

以前都是经过 Spark Streaming 的方式去实现,如今正在逐步切换到 Flink 上面,经过 OrcBucketingTableSink 将埋点数据落地到 HDFS上 的 Hive 表中。在 Flink 处理中单 Task Write 可达到3.5K/s左右,使用 Flink 后资源消耗下降了90%,同时将延迟30s下降到了3s之内。目前还在作 Flink 对 Spark Bucket Table 的支持。

场景三:实时的ETL

对于 ETL 处理工做而言,存在的一个痛点就是字典表存储在 HDFS 中,而且是不断变化的,而实时的数据流须要与字典表进行 join。字典表的变化是由离线批处理任务引发的,目前的作法是使用ContinuousFileMonitoringFunctionContinuousFileReaderOperator 定时监听 HDFS 数据变化,不断地将新数据刷入,使用最新的数据去作 join 实时数据。

咱们计划作更加通用的方式,去支持 Hive 表和 stream 的 join,实现Hive表数据变化以后,数据自动推送的效果。

3、Flink On K8S

在惟品会内部有一些不一样的计算框架,有实时计算的,有机器学习的,还有离线计算的,因此须要一个统一的底层框架来进行管理,所以将 Flink 迁移到了 K8S 上。

在 K8S 上使用了思科的网络组件,每一个docker容器都有独立的 ip,对外也是可见的。实时平台的融合器总体架构以下图所示。

惟品会在K8S上的实现方案与 Flink 社区提供的方案差别仍是很大的。惟品会使用 K8S StatefulSet 模式部署,内部实现了cluster相关的一些接口。一个job对应一个mini cluster,而且支持HA。对于Flink来讲,使用 StatefulSet 的最大的缘由是 pod 的 hostname 是有序的;这样潜在的好处有:

1.hostname为-0和-1的pod能够直接指定为jobmanager;可使用一个statefulset启动一个cluster,而deployment必须2个;Jobmanager和TaskManager分别独立的deployment。

  1. pod因为各类缘由fail后,因为StatefulSet从新拉起的pod的hostname不变,集群recover的速度理论上能够比deployment更快(deployment每次主机名随机)。

容器的entrypoint

因为要由主机名来判断是启动jobmanager仍是taskmanager,所以须要在entrypoint中去匹配设置的jobmanager的主机名是否有一致。 传入参数为:"cluster ha";则自动根据主机名判断启动那个角色;也能够直接指定角色名称 docker-entrypoint.sh的脚本内容以下:

#!/bin/sh 
# If unspecified, the hostname of the container is taken as the JobManager address

ACTION_CMD="$1"

# if use cluster model, pod ${JOB_CLUSTER_NAME}-0,${JOB_CLUSTER_NAME}-1 as jobmanager
if [ ${ACTION_CMD} == "cluster" ]; then
  jobmanagers=(${JOB_MANGER_HOSTS//,/ })
  ACTION_CMD="taskmanager"
  for i in ${!jobmanagers[@]}
  do
      if [ "$(hostname -s)" == "${jobmanagers[i]}" ]; then
          ACTION_CMD="jobmanager"
          echo "pod hostname match jobmanager config host, change action to jobmanager."
      fi
  done
fi

# if ha model, replace ha configuration
if [ "$2" == "ha" ]; then
  sed -i -e "s|high-availability.cluster-id: cluster-id|high-availability.cluster-id: ${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
  sed -i -e "s|high-availability.zookeeper.quorum: localhost:2181|high-availability.zookeeper.quorum: ${FLINK_ZK_QUORUM}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
  sed -i -e "s|state.backend.fs.checkpointdir: checkpointdir|state.backend.fs.checkpointdir: hdfs:///user/flink/flink-checkpoints/${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
  sed -i -e "s|high-availability.storageDir: hdfs:///flink/ha/|high-availability.storageDir: hdfs:///user/flink/ha/${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
fi

if [ ${ACTION_CMD} == "help" ]; then
    echo "Usage: $(basename "$0") (cluster ha|jobmanager|taskmanager|local|help)"
    exit 0
elif [ ${ACTION_CMD} == "jobmanager" ]; then
    JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
    echo "Starting Job Manager"
    sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
    sed -i -e "s/jobmanager.heap.mb: 1024/jobmanager.heap.mb: ${JOB_MANAGER_HEAP_MB}/g" "$FLINK_CONF_DIR/flink-conf.yaml"

    echo "config file: " && grep '^[^\n#]' "$FLINK_CONF_DIR/flink-conf.yaml"
    exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground cluster

elif [ ${ACTION_CMD} == "taskmanager" ]; then
    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
    echo "Starting Task Manager"

    sed -i -e "s/taskmanager.heap.mb: 1024/taskmanager.heap.mb: ${TASK_MANAGER_HEAP_MB}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
    sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_CONF_DIR/flink-conf.yaml"

    echo "config file: " && grep '^[^\n#]' "$FLINK_CONF_DIR/flink-conf.yaml"
    exec "$FLINK_HOME/bin/taskmanager.sh" start-foreground
elif [ ${ACTION_CMD} == "local" ]; then
    echo "Starting local cluster"
    exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground local
fi

exec "$@"
复制代码

entrypoint变量说明

镜像的docker entrypoint脚本里面须要设置的环境变量设置说明:

环境变量名称 参数 示例****内容 说明
JOB_MANGER_HOSTS StatefulSet.name-0,StatefulSet.name-1 flink-cluster-0,flink-cluster-1 JM的主机名,短主机名;能够不用FQDN
FLINK_CLUSTER_IDENT namespace/StatefulSet.name default/flink-cluster 用来作zk ha设置和hdfs checkpiont的根目录
TASK_MANAGER_NUMBER_OF_TASK_SLOTS containers.resources.cpu.limits 2 TM的slot数量,根据resources.cpu.limits来设置
FLINK_ZK_QUORUM env:FLINK_ZK_QUORUM ...:2181 HA ZK的地址
JOB_MANAGER_HEAP_MB env:JOB_MANAGER_HEAP_MBvalue:containers.resources.memory.limit -1024 4096 JM的Heap大小,因为存在堆外内存,须要小于container.resources.memory.limits;不然容易OOM kill
TASK_MANAGER_HEAP_MB env:TASK_MANAGER_HEAP_MB value: containers.resources.memory.limit -1024 4096 TM的Heap大小,因为存在Netty的堆外内存,须要小于container.resources.memory.limits;不然容易OOM kill

使用ConfigMap维护配置

对应 Flink 集群所依赖的 HDFS 等其余配置,则经过建立 configmap 来管理和维护。

kubectl create configmap hdfs-conf --from-file=hdfs-site.xml --from-file=core-site.xml

[hadoop@flink-jm-0 hadoop]$ ll /home/vipshop/conf/hadoop
total 0
lrwxrwxrwx. 1 root root 20 Apr  9 06:54 core-site.xml -> ..data/core-site.xml
lrwxrwxrwx. 1 root root 20 Apr  9 06:54 hdfs-site.xml -> ..data/hdfs-site.xml
复制代码

4、后续计划

当前实时系统,机器学习平台要处理的数据分布在各类数据存储组件中,如Kafka、Redis、Tair和HDFS等,如何方便高效的访问,处理,共享这些数据是一个很大的挑战,对于当前的数据访问和解析经常须要耗费不少的精力,主要的痛点包括:

  1. 对于Kafka,Redis,Tair中的 binary(PB/Avro等格式)数据,使用者没法快速直接的了解数据的 schema 与数据内容,采集数据内容及与写入者的沟通成本很高。

  2. 因为缺乏独立的统一数据系统服务,对Kafka,Redis,Tair等中的binary数据访问须要依赖写入者提供的信息,如proto生成类,数据格式wiki定义等,维护成本高,容易出错。

  3. 缺少 relational schema 使得使用者没法直接基于更高效易用的 SQL 或 LINQ 层 API 开发业务。

  4. 没法经过一个独立的服务方便的发布和共享数据。

  5. 实时数据没法直接提供给Batch SQL引擎使用。

  6. 此外,对于当前大部分的数据源的访问也缺乏审计,权限管理,访问监控,跟踪等特性。

UDM(统一数据管理系统) 包括 Location Manager, Schema Metastore 以及 Client Proxy 等模块,主要的功能包括:

  1. 提供从名字到地址的映射服务,使用者经过抽象名字而不是具体地址访问数据。

  2. 用户能够方便的经过Web GUI界面方便的查看数据Schema,探查数据内容。

  3. 提供支持审计,监控,溯源等附加功能的Client API Proxy。

  4. 在Spark/Flink/Storm等框架中,以最适合使用的形式提供这些数据源的封装。

UDM的总体架构以下图所示。

UDM的使用者包括实时,机器学习以及离线平台中数据的生产者和使用者。在使用Sql API或Table API的时候,首先完成Schema的注册,以后使用Sql进行开发,下降了开发代码量。

以Spark访问Kafka PB数据的时序图来讲明UDM的内部流程

在Flink中,使用UDMExternalCatalog来打通Flink计算框架和UDM之间的桥梁,经过实现ExternalCatalog的各个接口,以及实现各自数据源的TableSourceFactory,完成Schema和接入管控等各项功能。同时加强Flink的SQL Client的各项功能,能够经过调用API查询UDM的Schema,完成SQL任务的生成和提交。

更多资讯请访问 Apache Flink 中文社区网站

相关文章
相关标签/搜索