做者:张光辉前端
本文将为你们展现字节跳动公司怎么把Storm从Jstorm迁移到Flink的整个过程以及后续的计划。你能够借此了解字节跳动公司引入Flink的背景以及Flink集群的构建过程。字节跳动公司是如何兼容之前的Jstorm做业以及基于Flink作一个任务管理平台的呢?本文将一一为你揭开这些神秘的面纱。java
本文内容以下:node
下面这幅图展现的是字节跳动公司的业务场景python
首先,应用层有广告,也有AB测,也有推送和数据仓库的一些业务。而后在使用J storm的过程当中,增长了一层模板主要应用于storm的计算模型,使用的语言是python。因此说中间相对抽象了一个schema,跑在最下面一层J storm计算引擎的上面。网络
字节跳动公司有不少J-storm集群,在当时17年7月份的时候,也就是在计划迁移到Flink以前,J storm集群的规模大概是下图所示的规模级别,当时已经有5000台机器左右了。并发
接下来,介绍下迁移Flink的整个过程。先详细地介绍一下当时J-Storm是怎么用的。运维
上面是一个word count的例子:左边是一个目录结构,这个目录结构在resources下面,里面的Spout/Bolt的逻辑都是一些python脚本写的。而后在最外层还有一个topology_online.yaml配置文件。
这个配置文件是用来干什么的?就是把全部的Spout和Bolt串联起来构成一个有向无关图,也就是DAG图。这就是使用J storm时的整个目录结构,大部分用户都是这样用的。右边是Spout和Bolt的逻辑,实际上是抽象出来了一个函数,就在这里面写业务方面的函数,而后将tuple_batch也就是上游流下来的数据去作一些计算逻辑。maven
下面详细介绍一下配置文件的信息,其实咱们有整个拓扑结构拓扑的信息,好比说做业名叫什么,做业须要多少资源,须要多少work数。这里面会有单个的spout和Bolt的配置信息,好比是消费的topic仍是一些并发度?分布式
除了这些信息还有整个这个数据流的流转,好比说spout的输出,输出messsage的消息等等。最后还有整个的Spout到Bolt之间的shuffle逻辑。这就是咱们以前Jstorm的整个使用方式。最后会把整个目录结构里面的内容去解析出来,根据配置文件把整个storm的拓扑结构构建出来,而后提交到集群上面去跑。函数
使用Jstorm集群遇到了什么问题呢?第一个问题,由于咱们当时是用使用python写的代码,整个集群是没有内存隔离的,job和work之间是没有内存限制的。好比说在实际过程当中会常常遇到一个用户,他可能代码写的有问题致使一个work可能占了70G内存,把机器的内存占了1/3。第二个问题就是说业务团队之间没有扩大管理,预算和审核是无头绪的。咱们当时都是都是跑在一个大集群上面,而后个别业务是单独跑在一些小集群,可是咱们每次都是资源不足,也没办法梳理这个预算。
第三个问题就是集群过多,运维平台化作得不太好,都是靠人来运维的。这个时候集群多了基本上是管不过来的。
第四个问题就是说咱们用python写的代码,有些性能比较差。可是咱们在Storm的基础上面去推广这个Java也比较难,由于咱们部分同事其实是不承认Java的,由于他以为java开发速度太慢了。
咱们当时想解决上面的问题,一个思路是把Jstorm放在yarn上面,直接把Jstorm在yarn上面兼容作这一套。后来由于知道阿里在用Flink因此去调研Flink,发现了Flink的一些优点,因此想尝试用Flink解决存在的问题。
使用Flink首先第一个问题能够成功解决,由于Flink做业是跑在yarn上面的,这就解决了内存隔离的问题。而后Yarn也是支持队列的,咱们能够根据业务去划分队列,这样咱们的扩大预算审核的问题获得解决了。咱们也不须要本身运维一个集群了,由于有yarn去管理咱们的资源,这样也节省了运维成员。在此基础上还能够作一些物理隔离队列,其实物理隔离队列如今也遇到了问题。由于物理隔离队列只是说这个机器隔离了,可是至关因而机柜也没有隔离网络带宽也没有隔离,因此说即便是物理隔离队列,如今也遇到好比说和离线做业共用机柜的时候,这个机柜的出口带宽被打满的问题。针对这些问题,咱们后续可能想在这个离线离线集群上面作QOS这种流量级别的方式来解决这个问题。
Flink其实是能够兼容Storm的,好比说以前的历史做业是能够迁移过来的,不须要维护两套计算引擎。Flink支持一些高优先级的API好比说支持SQL以及窗口等特性包括说checkpoint。咱们头条的业务对exactly-once的需求不是特别的强烈。
以上就是Flink的优点,因而咱们就决定从J storm往Flink去迁移。
在迁移的过程当中,第一件事情是先把Flink集群创建起来。一开始确定都是追求稳定性,好比说把离线的yarn集群隔离开,而后不依赖于HDFS也能够把Hdfs线上的name node, name space隔离出来。而后咱们梳理了原来storm上面的做业,哪些做业属于不一样的业务,而后映射到不一样的队列里面去,最后把一些特殊的队列也隔离开来。这是咱们准备这个Fink集群的时候考虑的几个点。
下面就考虑Flink怎么兼容J storm,而后把它迁移过来。
咱们当时Flink用的是1.32版本,由于Flink有Flink-storm这个工程,它能把Storm做业转化成Flink做业,咱们就借鉴这些技术上实现了一个Flink –jstorm。至关于把一个J storm的拓扑结构转化成了一个Flink job。只作完这件事情是不够的,由于咱们有一系列的外围工具须要去对齐。好比说以前提交做业的时候是经过一个脚本提交的让用户去屏蔽一些其余的参数。使用 flink的话咱们一样也是须要构建这么一个脚本,而后去提交Flink Job,最后中止flink Job。第三点是构建flink job外围工具,自动注册报警,好比说消费延迟报警,自动注册这个Dashboard以及一些log service,全部的这些为外围工具都要和原来的服务去对齐。
对齐完以后,咱们须要构建一个迁移脚本,迁移的过程当中最困难的是资源配置这一块。由于原来Storm用了多少资源,Storm怎么配,这对于迁移的用户来讲,若是是第一次作确定是不了解这些东西。所以咱们写这么一个脚本,帮用户生成它Flink集群里面对应的资源使用状况。这些工做作完了以后,咱们就开始去迁移。到如今为止,总体迁移完了,还剩下十个左右的做业没有迁移完。如今集群规模达到了大概是6000多台。
在迁移的过程当中咱们有一些其余优化,好比说J storm是可以支持task和work维度的重启的,Flink这一块作得不是特别好。咱们在这方面作了一些优化实现了一个single task和single tm粒度的重启,这样就解决部分做业由于task重启致使整个做业所有重启。
迁移完以后,咱们又构建了一个流式管理平台。这个平台是为了解决实际过程当中遇到了一些问题,好比说整个机群挂了没法肯定哪些做业在上面跑着,也通知不到具体的用户,有些用户做业都不知道本身提交了哪些做业。咱们构建流式做业的时候目标实际上就是和其余的管理平台是同样的,好比说咱们提供一些界面操做,而后提供一个版本管理,就是为了方便方便用户升级和回滚的操做,咱们还提供了一站式的查问题的工具:把一些用户须要的信息都聚合在一个页面上面,防止用户不断跳来跳去以及避免不一样系统之间的切换。有一些历史记录以前不论是跑在yarn上面仍是跑到storm上面,我一个做业被别人kill到了,其实我都是不知道的。针对这个问题咱们提供了一些历史操做记录的一些目标。
设计这个管理平台的时候,咱们考虑到提供这么一个前端管理平台可能只是针对公司内部的一部分产品,其余的产品也作了本身的一套前端。他们能够用一个模板,根据本身的逻辑去生成一个storm任务。基于此,咱们把整个管理平台抽象了两层:最上一层实际上至关于一个面向用户或者说是相似于前端的一个产品。中间这一层其实是一个相似于提交做业调度任务,这一层只负责提任务,而后停任务,管理生命周期以及由于故障致使做业失败了,将做业从新拉起来。这是中间层TSS层作的事情。
这样,咱们就能够对接到全部的前端平台。经过一个RPC进行TSS通讯,就把全部的底层的服务和Filnk和Yarn还有HDFS这些交互的底层的逻辑彻底屏蔽开来了。
接下来,用户写一个做业就比较简单了,流程以下:
第一步用户先要生成本身的一个做业模板,咱们这边经过maven提供的脚本架去生成一些做业的schema,这个做业执行完以后,它会把帮你把一些porm文件,还有一些相似于kafkasource这种常规的组件都帮你准备好,而后你直接在这个模板里面填本身的主要逻辑就能够了。由于咱们写Java程序遇到最多的一个问题就是包冲突问题。因此porm文件帮助用户把一些可能冲突的一些jar包都给以exclude掉,这样包冲突的几率会愈来愈小。
咱们测试做业基本上是用IDEA或者local模式去测试,也提供了一个脚本去提交做业,经过这个脚本提交到stage环境上面。在提交注册在平台上面去注册这个做业,而后添加一些配置信息。
下面是一个代码版本管理的界面:
把整个做业提交以后以下图所示:
提交完一个做业以后,用户可能想看做业运行的状态怎么样,咱们经过四种方式去给用户展现他的做业运行状态的。
第一个是Flink UI,也就是官方自带的UI用户能够去看。第二个是Dashboard,咱们展现了做业里面的task维度,QPS以及task之间的网络buffer,这些重要的信息汇聚到一块儿建立了一个Dashboard,这样可能查问题的时候方便一些。第三个是错误日志,其实和你们的思路同样,把一个分布式的日志而后聚合起来,而后写到ES上面去。第四是作了一个Jobtrace的工具,就是咱们把Flink里面常见的一些异常匹配出来,而后直接给用户一个wiki的使用指南,告诉用户好比说你的做业OM了须要扩大内存。只要用户的做业出现了某些问题,咱们把已知的全部的异常都会匹配给用户。
下面是ES的kibana:
这是咱们Jobtrace的功能,咱们把Flink的这些常见的异常都匹配出来,每个异常其实对应了一个wiki而后去让用户去解决本身的问题。
最后分享下咱们的近期规划,前面的基本作完而且趋于稳定了,可是如今又遇到了一些新的问题。好比资源使用率这个问题,由于用户提交做业的时候,用户对资源不是特别敏感就随意把一个资源提上去了,可能他明明须要两个CPU,可是他提了四个CPU。咱们想经过一个工具可以监控到他须要多少资源,而后通知yarn去把这个资源给重置了。就是动态调整job资源,自动把资源重置。
第二个问题是优化做业重启速度。咱们这边好多业务是根据流式计算的指标来监控它业务的稳定性,若是最上游重启一个做业,底下一群人收到报警说线上出现一些问题了。缘由是最上游某一个做业再重启。咱们想把重启时间间隔去作到最短或者是无缝重启,这是下一阶段须要去探索探索的一个问题。
第四点:Flink SQL也刚上线,可能须要一些精力投入去推广。
最后一点,咱们但愿在此抽象出更多的模式做业模型来,由于咱们自己是有一些好比说kafka2ES,kafka2hdfs这些需求,能不能把他们抽象成一个schema,而后去对外提供一些服务。
以上就是我本次分享的主要内容,感谢Flink的举办者和参与者,感谢咱们的同事,由于以上的分享内容是我和咱们同事一块儿作的。
更多资讯请访问 Apache Flink 中文社区网站