这篇主要由五个部分来组成:数据库
首先是有赞的实时平台架构。缓存
其次是在调研阶段咱们为何选择了 Flink。在这个部分,主要是 Flink 与 Spark 的 structured streaming 的一些对比和选择 Flink 的缘由。架构
第三个就是比较重点的内容,Flink 在有赞的实践。这其中包括了咱们在使用 Flink 的过程当中碰到的一些坑,也有一些具体的经验。异步
第四部分是将实时计算 SQL 化,界面化的一些实践。jvm
最后的话就是对 Flink 将来的一些展望。这块能够分为两个部分,一部分是咱们公司接下来会怎么去更深刻的使用 Flink,另外一部分就是 Flink 之后可能会有的的一些新的特性。分布式
有赞的实时平台架构呢有几个主要的组成部分。函数
首先,对于实时数据来讲,一个消息中间件确定是必不可少的。在有赞呢,除了业界经常使用的 Kafka 之外,还有 NSQ。与 Kafka 有别的是,NSQ 是使用 Go 开发的,因此公司封了一层 Java 的客户端是经过 push 和 ack 的模式去保证消息至少投递一次,因此 Connector 也会有比较大的差距,尤为是实现容错的部分。在实现的过程当中呢,参考了 Flink 官方提供的 Rabbit MQ 的链接器,结合 NSQ client 的特性作了一些改造。性能
接下来就是计算引擎了,最古老的就是 Storm 了,如今依然还有一些任务在 Storm 上面跑,至于新的任务基本已经不会基于它来开发了,由于除了开发成本高之外,语义的支持,SQL 的支持包括状态管理的支持都作得不太好,吞吐量还比较低,将 Storm 的任务迁移到 Flink 上也是咱们接下来的任务之一。还有呢就是 Spark Streaming 了,相对来讲 Spark 有一个比较好的生态,可是 Spark Streaming 是微批处理的,这给它带来了不少限制,除了延迟高之外还会比较依赖外部存储来保存中间状态。 Flink 在有赞是比较新的引擎,为何在有了 Spark 和 Storm 的状况下咱们还要引入 Flink 呢,下一个部分我会提到。大数据
存储引擎,除了传统的 MySQL 之外,咱们还使用 HBase ,ES 和 ZanKV。ZanKV 是咱们公司开发的一个兼容 Redis 协议的分布式 KV 数据库,因此姑且就把它当成 Redis 来理解好了。优化
实时 OLAP 引擎的话基于 Druid,在多维的统计上面有很是好的应用。
最后是咱们的实时平台。实时平台提供了集群管理,项目管理,任务管理和报警监控的功能。。
关于实时平台的架构就简单介绍到这里,接下来是 Flink 在有赞的探索阶段。在这个部分,我主要会对比的 Spark Structured Streaming。
至于为何和 Spark Structured Streaming(SSS) 进行对比呢?由于这是实时SQL化这个大背景下比较有表明性的两个引擎。
首先是性能上,从几个角度来比较一下。首先是延迟,毫无疑问,Flink 做为一个流式引擎是优于 SSS 的微批引擎的。虽说 Spark 也引入了一个连续的计算引擎,可是无论从语义的保证上,仍是从成熟度上,都是不如 Flink 的。据我所知,他们是经过将 rdd 长期分配到一个结点上来实现的。
其次比较直观的指标就是吞吐了,这一点在某些场景下 Flink 略逊于 Spark 。可是当涉及到中间状态比较大的任务呢,Flink 基于 RocksDB 的状态管理就显示出了它的优点。
Flink 在中间状态的管理上可使用纯内存,也可使用 RocksDB 。至于 RocksDB ,简单点理解的话就是一个带缓存的嵌入式数据库。借助持久化到磁盘的能力,Flink 相比 SSS 来讲能够保存的状态量大得多,而且不容易OOM。而且在作 checkpoint 中选用了增量模式,应该是只须要备份与上一次 checkpoint 时不一样的 sst 文件。使用过程当中,发现 RocksDB 做为状态管理性能也是能够知足咱们需求的。
聊完性能,接下来就说一说 SQL 化,这也是如今的一个大方向吧。我在开始尝试 SSS 的时候,尝试了一个 SQL 语句中有多个聚合操做,可是却抛了异常。 后面仔细看了文档,发现确实这在 SSS 中是不支持的。第二个是 distinct 也是不支持的。这两点 Flink 是远优于 SSS 的。因此从实时 SQL 的角度,Flink 又为本身赢得了一票。除此以外,Flink 有更灵活的窗口。还有输出的话,一样参考的是 DataFlow 模型,Flink 实现支持删除并更新的操做,SSS 仅支持更新的操做。(这边 SSS 是基于 Spark 的 2.3版本)
API 的灵活性。在 SSS 中,诚然 table 带来了比较大的方便,可是对于有一些操做依然会想经过 DStream 或者 rdd 的形式来操做,可是 SSS 并无提供这样的转换,只能编写一些 UDF。可是在 Flink 中,Table 和 DataStream 能够灵活地互相转换,以应对更复杂的场景。
在真正开始使用 Flink 以前呢,第一个要考虑的就是部署的问题。由于现有的技术栈,因此选择了部署在 Yarn 上,而且使用的是 Single Job 的模式,虽然会有更多的 ApplicationMaster,但无疑是增长了隔离性的。
在开始部署的时候我遇到了一个比较奇怪的问题。先讲一下背景吧,由于还处于调研阶段,因此使用的是 Yarn 的默认队列,优先级比较低,在资源紧张的时候也容易被抢占。
有一个上午,我起了一个任务,申请了5个 Container 来运行 TaskExecutor ,一个比较简单地带状态的流式任务,想多跑一段时间看看稳定不稳定。这个 Flink 任务最后占了100多个 container,还在不停增长,可是只有五个 Container 在工做,其余的 container 都注册了 slot,而且 slot 都处于闲置的状态。如下两张图分别表明正常状态下的任务,和出问题的任务。
出错后
在涉及到这个问题细节以前,我先介绍一下 Flink 是如何和 Yarn 整合到一块的。根据下图,咱们从下往上一个一个介绍这些组件是作什么的。
TaskExecutor 是实际任务的执行者,它可能有多个槽位,每一个槽位执行一个具体的子任务。每一个 TaskExecutor 会将本身的槽位注册到 SlotManager 上,并汇报本身的状态,是忙碌状态,仍是处于一个闲置的状态。
SlotManager 既是 Slot 的管理者,也负责给正在运行的任务提供符合需求的槽位。还记录了当前积压的槽位申请。当槽位不够的时候向Flink的ResourceManager申请容器。
Pending slots 积压的 Slot 申请及计数器
Flink 的 ResourceManager 则负责了与 Yarn 的 ResourceManager 进行交互,进行一系列例如申请容器,启动容器,处理容器的退出等等操做。由于采用的是异步申请的方式,因此还须要记录当前积压的容器申请,防止接收过多容器。
Pending container request 积压容器的计数器
AMRMClient 是异步申请的执行者,CallbackHandler 则在接收到容器和容器退出的时候通知 Flink 的 ResourceManager。
Yarn 的 ResourceManager 则像是一个资源的分发器,负责接收容器请求,并为 Client 准备好容器。
这边一会儿引入的概念有点多,下面我用一个简单地例子来描述一下这些组件在运行中起到的角色。
首先,咱们的配置是3个 TaskManager,每一个 TaskManager 有两个 Slot,也就是总共须要6个槽位。当前已经拥有了4个槽位,任务的调度器向 Slot 申请还须要两个槽位来运行子任务。
这时 SlotManager 发现全部的槽位都已经被占用了,因此它将这个 slot 的 request 放入了 pending slots 当中。因此能够看到 pending slots 的那个计数器从刚才的0跳转到了如今的2. 以后 SlotManager 就向 Flink 的 ResourceManager 申请一个新的 TaskExecutor,正好就能够知足这两个槽位的需求。因而 Flink 的 ResourceManager 将 pending container request 加1,并经过 AMRM Client 去向 Yarn 申请资源。
当 Yarn 将相应的 Container 准备好之后,经过 CallbackHandler 去通知 Flink 的 ResourceManager。Flink 就会根据在每个收到的 container 中启动一个 TaskExecutor ,而且将 pending container request 减1,当 pending container request 变为0以后,即便收到新的 container 也会立刻退回。
当 TaskExecutor 启动以后,会向 SlotManager 注册本身的两个 Slot 可用,SlotManager 便会将两个积压的 SlotRequest 完成,通知调度器这两个子任务能够到这个新的 TaskExecutor 上执行,而且 pending requests 也被置为0. 到这儿一切都符合预期。
那这个超发的问题又是如何出现的呢?首先咱们看一看这就是刚刚那个正常运行的任务。它占用了6个 Slot。
若是在这个时候,出现了一些缘由致使了 TaskExecutor 非正常退出,好比说 Yarn 将资源给抢占了。这时 Yarn 就会通知 Flink 的 ResourceManager 这三个 Container 已经异常退出。因此 Flink 的 ResourceManager 会当即申请三个新的 container。在这儿会讨论的是一个 worst case,由于这个问题其实也不是稳定复现的。
CallbackHandler 两次接收到回调发现 Container 是异常退出,因此当即申请新的 Container,pending container requests 也被置为了3.
若是在这时,任务重启,调度器会向 SlotManager 申请6个 Slot,SlotManager 中也没有可用 Slot,就会向 Flink 的 ResourceManager 申请3个 Container,这时 pending container requests 变为了6.
最后呢结果就如图所示,起了6个 TaskExecutor,总共12个 Slot,可是只有6个是被正常使用的,还有6个一直处于闲置的状态。
在修复这个问题的过程当中,我有两次尝试。第一次尝试,在 Container 异常退出之后,我不去当即申请新的 container。可是问题在于,若是 Container 在启动 TaskExecutor 的过程当中出错,那么失去了这种补偿的机制,有些 Slot Request 会被一直积压,由于 SlotManager 已经为它们申请了 Container。
第二次尝试是在 Flink 的 ResourceManager 申请新的 container 以前先去检查 pending slots,若是当前的积压 slots 已经能够被积压的 container 给知足,那就没有必要申请新的 container 了。
咱们使用过程当中踩到的第二个坑,实际上是跟延迟监控相关的。例子是一个很简单的任务,两个 source,两个除了 source 以外的 operator,并行度都是2. 每一个 source 和 operator 它都有两个子任务。
任务的逻辑是很简单,可是呢当咱们打开延时监控。即便是这么简单的一个任务,它会记录每个 source 的子任务到每个算子的子任务的延迟数据。这个延迟数据里还包含了平均延迟,最大延迟,百分之99的延迟等等等等。那咱们能够得出一个公式,延迟数据的数量是 source 的子任务数量乘以的 source 的数量乘以算子的并行度乘以算子的数量。N = n(subtasks per source) n(sources) n(subtasks per operator) * n(operator)
这边我作一个比较简单地假设,那就是 source 的子任务数量和算则的子任务数量都是 p - 并行度。从下面这个公式咱们能够看出,监控的数量随着并行度的上升呈平方增加。N = p^2 n(sources) n(operator)
若是咱们把上个任务提高到10个并行度,那么就会收到400份的延迟数据。这可能看起来尚未太大的问题,这貌似并不影响组件的正常运行。
可是,在 Flink 的 dev mailing list 当中,有一个用户反馈在开启了延迟监控以后,JobMaster 很快就会挂掉。他收到了24000+的监控数据,而且包含这些数据的 ConcurrentHashMap 在内存中占用了1.6 G 的内存。常规状况 Flink 的 JobMaster 时会给到多少内存,我通常会配1-2 g,最后会致使长期 FullGC 和 OOM 的状况。
那怎么去解决这个问题呢?当延迟监控已经开始影响到系统的正常工做的时候,最简单的办法就是把它给关掉。但是把延时监控关掉,一方面咱们没法得知当前任务的延时,另外一方面,又没有办法去针对延时作一些报警的功能。
因此另外一个解决方案就以下。首先是 Flink-10243,它提供了更多的延迟监控粒度的选项,从源头上减小数量。好比说咱们使用了 Single 模式去采集这些数据,那它只会记录每一个 operator 的子任务的延迟,忽略是从哪一个 source 或是 source 的子任务中来。这样就能够得出这样一个公式,也能将以前咱们提到的十个并行度的任务产生的400个延时监控下降到了40个。这个功能发布在了1.7.0中,而且 backport 回了1.5.5和1.6.2.
此外,Flink-10246 提出了改进 MetricQueryService。它包含了几个子任务,前三个子任务为监控服务创建了一个专有的低优先级的 ActorSystem,在这里能够简单的理解为一个独立的线程池提供低优先级的线程去处理相关任务。它的目的也是为了防止监控任务影响到主要的组件。这个功能发布在了1.7.0中。
还有一个就是 Flink-10252,它还依旧处于 review 和改进当中,目的是为了控制监控消息的大小。
接下来会谈一下 Flink 在有赞的一些具体应用。
首先是 Flink 结合 Spring。为何要将这二者作结合呢,首先在有赞有不少服务都只暴露了 Dubbo 的接口,而用户每每都是经过 Spring 去获取这个服务的 client,在实时计算的一些应用中也是如此。
另外,有很多数据应用的开发也是 Java 工程师,他们但愿能在 Flink 中使用 Spring 以及生态中的一些组件去简化他们的开发。用户的需求确定得获得知足。接下来我会讲一些错误的典型,以及最后是怎么去使用的。
第一个错误的典型就是在 Flink 的用户代码中启动一个 Spring 环境,而后在算子中取调用相关的 bean。可是事实上,最后这个 Spring Context 是启动在 client 端的,也就是提交任务的这一端,在图中有一个红色的方框中间写着 Spring Context 表示了它启动的位置。但是用户在实际调用时确实在 TaskManager 的 TaskSlot 中,它们都处在不一样的 jvm,这明显是不合理的。因此呢咱们又遇到了第二个错误。
第二个错误比第一个错误看起来要好多了,咱们在算子中使用了 RichFunction,而且在 open 方法中经过配置文件获取了一个 Spring Context。可是先不说一个 TaskManager 中启动几个 Spring Context 是否是浪费,一个 Jvm 中启动两个 Spring Context 就会出问题。可能有用户就以为,那还不简单,把 TaskSlot 设为1不就好了。但是还有 OperatorChain 这个机制将几个窄依赖的算子绑定到一块运行在一个 TaskSlot 中。那咱们关闭 OperatorChain 不就好了?仍是不行,Flink可能会作基于 CoLocationGroup 的优化,将多个 subtask 放到一个 TaskSlot 中轮番执行。
但其实最后的解决方案仍是比较容易的,无非是使用单例模式来封装 SpringContext,确保每一个jvm中只有一个,在算子函数的 open 方法中经过这个单例来获取相应的 Bean。
但是在调用 Dubbo 服务的时候,一次响应每每最少也要在10 ms 以上。一个 TaskSlot 最大的吞吐也就在一千,能够说对性能是大大的浪费。那么解决这个问题的话能够经过异步和缓存,对于屡次返回同一个值的调用可使用缓存,提高吞吐咱们可使用异步。
但是若是想同时使用异步和缓存呢?刚开始我以为这是一个挺容易实现的功能,但在实际写 RichAsyncFunction 的时候我发现并无办法使用 Flink 托管的 KeyedState。因此最初想到的方法就是作一个相似 LRU 的 Cache 去缓存数据。可是这彻底不能借助到 Flink 的状态管理的优点。因此我研究了一下实现。
为何不支持呢?
当一条记录进入算子的时候,Flink 会先将 key 提取出来并将 KeyedState 指向与这个 key 关联的存储空间,图上就指向了 key4 相关的存储空间。可是若是此时 key1 关联的异步操做完成了,但愿把内容缓存起来,会将内容写入到 key4 绑定的存储空间。当下一次 key1 相关的记录进入算子时,回去 key1 关联的存储空间查找,但是根本找不到数据,只好再次请求。
因此解决的方法是定制一个算子,每条记录进入系统,都让它指向同一个公用 key 的存储空间。在这个空间使用 MapState 来作缓存。最后算子运行的 function 继承 AbstractRichFunction 在 open 方法中来获取 KeyedState,实现 AsyncFunction 接口来作异步操做。
最先咱们使用 SDK 的方式来简化 SQL 实时任务的开发,可是这对用户来讲也不算很是友好,因此如今讲 SQL 实时任务界面化,用 Flink 做为底层引擎去执行这些任务。
在作 SQL 实时任务时,首先是外部系统的抽象,将数据源和数据池抽象为流资源,用户将它们数据的 Schema 信息和元信息注册到平台中,平台根据用户所在的项目组管理读写的权限。在这里消息源的格式若是能作到统一能下降不少复杂度。好比在有赞,想要接入的用户必须保证是 Json 格式的消息,经过一条样例消息能够直接生成 Schema 信息。
接下来是根据用户选择的数据源和数据池,获取相应的 Schema 信息和元信息,在 Flink 任务中注册相应的外部系统 Table 链接器,再执行相应的 SQL 语句。
在 SQL 语义不支持的功能上尽可能使用 UDF 的方式来拓展。
有数据源和数据池之间的元信息,还能够获取实时任务之间可能存在的依赖关系,而且能作到整个链路的监控
Flink 的批处理和 ML 模块的尝试,会跟 Spark 进行对比,分析优劣势。目前还处于调研阶段,目前比较关注的是 Flink 和 Hive的结合,对应 FLINK-10566 这个 issue。
从 Flink 的发展来说呢,我比较关注并参与接下来对于调度和资源管理的优化。如今 Flink 的调度和任务执行图是耦合在一块的,使用比较简单地调度机制。经过将调度器隔离出来,作成可插拔式的,能够应用更多的调度机制。此外,基于新的调度器,还能够去作更灵活的资源补充和减小机制,实现 Auto Scaling。这可能在接下来的版本中会是一个重要的特性。对应 FLINK-10404 和 FLINK-10429 这两个 issue。
最后打个小广告,有赞大数据团队基础设施团队,主要负责有赞的数据平台(DP), 实时计算(Storm, Spark Streaming, Flink),离线计算(HDFS,YARN,HIVE, SPARK SQL),在线存储(HBase),实时 OLAP(Druid) 等数个技术产品,欢迎感兴趣的小伙伴联系 yangshimin@youzan.com