什么是Apache Flink

大数据计算引擎的发展

     这几年大数据的飞速发展,出现了不少热门的开源社区,其中著名的有 Hadoop、Storm,以及后来的 Spark,他们都有着各自专一的应用场景。Spark 掀开了内存计算的先河,也之内存为赌注,赢得了内存计算的飞速发展。Spark 的火热或多或少的掩盖了其余分布式计算的系统身影。就像 Flink,也就在这个时候默默的发展着。html

在国外一些社区,有不少人将大数据的计算引擎分红了 4 代,固然,也有不少人不会认同。咱们先姑且这么认为和讨论。web

首先第一代的计算引擎,无疑就是 Hadoop 承载的 MapReduce。这里你们应该都不会对 MapReduce 陌生,它将计算分为两个阶段,分别为 Map 和 Reduce。对于上层应用来讲,就不得不千方百计去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。算法

因为这样的弊端,催生了支持 DAG 框架的产生。所以,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里咱们不去细究各类 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来讲,大多仍是批处理的任务。apache

接下来就是以 Spark 为表明的第三代的计算引擎。第三代计算引擎的特色主要是 Job 内部的 DAG 支持(不跨越 Job),以及强调的实时计算。在这里,不少人也会认为第三代计算引擎也可以很好的运行批处理的 Job。浏览器

随着第三代计算引擎的出现,促进了上层应用快速发展,例如各类迭代计算的性能以及对流计算和 SQL 等的支持。Flink 的诞生就被归在了第四代。这应该主要表如今 Flink 对流计算的支持,以及更一步的实时性上面。固然 Flink 也能够支持 Batch 的任务,以及 DAG 的运算。session

或许会有人不一样意以上的分类,我以为其实这并不重要的,重要的是体会各个框架的差别,以及更适合的场景。并进行理解,没有哪个框架能够完美的支持全部的场景,也就不可能有任何一个框架能彻底取代另外一个,就像 Spark 没有彻底取代 Hadoop,固然 Flink 也不可能取代 Spark。本文将致力描述 Flink 的原理以及应用。架构

 

Flink 简介

不少人可能都是在 2015 年才听到 Flink 这个词,其实早在 2008 年,Flink 的前身已是柏林理工大学一个研究性项目, 在 2014 被 Apache 孵化器所接受,而后迅速地成为了 ASF(Apache Software Foundation)的顶级项目之一。Flink 的最新版本目前已经更新到了 0.10.0 了,在不少人感慨 Spark 的快速发展的同时,或许咱们也该为 Flink 的发展速度点个赞。app

Flink 是一个针对流数据和批数据的分布式处理引擎。它主要是由 Java 代码实现。目前主要仍是依靠开源社区的贡献而发展。对 Flink 而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。再换句话说,Flink 会把全部任务当成流来处理,这也是其最大的特色。Flink 能够支持本地的快速迭代,以及一些环形的迭代任务。而且 Flink 能够定制化内存管理。在这点,若是要对比 Flink 和 Spark 的话,Flink 并无将内存彻底交给应用层。这也是为何 Spark 相对于 Flink,更容易出现 OOM 的缘由(out of memory)。就框架自己与应用场景来讲,Flink 更类似与 Storm。若是以前了解过 Storm 或者 Flume 的读者,可能会更容易理解 Flink 的架构和不少概念。下面让咱们先来看下 Flink 的架构图。框架

图 1. Flink 架构图

图 1. Flink 架构图

如图 1 所示,咱们能够了解到 Flink 几个最基础的概念,Client、JobManager 和 TaskManager。Client 用来提交任务给 JobManager,JobManager 分发任务给 TaskManager 去执行,而后 TaskManager 会心跳的汇报任务状态。看到这里,有的人应该已经有种回到 Hadoop 一代的错觉。确实,从架构图去看,JobManager 很像当年的 JobTracker,TaskManager 也很像当年的 TaskTracker。然而有一个最重要的区别就是 TaskManager 之间是是流(Stream)。其次,Hadoop 一代中,只有 Map 和 Reduce 之间的 Shuffle,而对 Flink 而言,多是不少级,而且在 TaskManager 内部和 TaskManager 之间都会有数据传递,而不像 Hadoop,是固定的 Map 到 Reduce。curl

Flink 中的调度简述

在 Flink 集群中,计算资源被定义为 Task Slot。每一个 TaskManager 会拥有一个或多个 Slots。JobManager 会以 Slot 为单位调度 Task。可是这里的 Task 跟咱们在 Hadoop 中的理解是有区别的。对 Flink 的 JobManager 来讲,其调度的是一个 Pipeline 的 Task,而不是一个点。举个例子,在 Hadoop 中 Map 和 Reduce 是两个独立调度的 Task,而且都会去占用计算资源。对 Flink 来讲 MapReduce 是一个 Pipeline 的 Task,只占用一个计算资源。类同的,若是有一个 MRR 的 Pipeline Task,在 Flink 中其也是一个被总体调度的 Pipeline Task。在 TaskManager 中,根据其所拥有的 Slot 个数,同时会拥有多个 Pipeline。

在 Flink StandAlone 的部署模式中,这个还比较容易理解。由于 Flink 自身也须要简单的管理计算资源(Slot)。当 Flink 部署在 Yarn 上面以后,Flink 并无弱化资源管理。也就是说这时候的 Flink 在作一些 Yarn 该作的事情。从设计角度来说,我认为这是不太合理的。若是 Yarn 的 Container 没法彻底隔离 CPU 资源,这时候对 Flink 的 TaskManager 配置多个 Slot,应该会出现资源不公平利用的现象。Flink 若是想在数据中心更好的与其余计算框架共享计算资源,应该尽可能不要干预计算资源的分配和定义。

须要深度学习 Flink 调度读者,能够在 Flink 的源码目录中找到 flink-runtime 这个文件夹,JobManager 的 code 基本都在这里。

Flink 的生态圈

一个计算框架要有长远的发展,必须打造一个完整的 Stack。否则就跟纸上谈兵同样,没有任何意义。只有上层有了具体的应用,并能很好的发挥计算框架自己的优点,那么这个计算框架才能吸引更多的资源,才会更快的进步。因此 Flink 也在努力构建本身的 Stack。

Flink 首先支持了 Scala 和 Java 的 API,Python 也正在测试中。Flink 经过 Gelly 支持了图操做,还有机器学习的 FlinkML。Table 是一种接口化的 SQL 支持,也就是 API 支持,而不是文本化的 SQL 解析和执行。对于完整的 Stack 咱们能够参考下图。

图 2. Flink 的 Stack

图 2. Flink 的 Stack

Flink 为了更普遍的支持大数据的生态圈,其下也实现了不少 Connector 的子项目。最熟悉的,固然就是与 Hadoop HDFS 集成。其次,Flink 也宣布支持了 Tachyon、S3 以及 MapRFS。不过对于 Tachyon 以及 S3 的支持,都是经过 Hadoop HDFS 这层包装实现的,也就是说要使用 Tachyon 和 S3,就必须有 Hadoop,并且要更改 Hadoop 的配置(core-site.xml)。若是浏览 Flink 的代码目录,咱们就会看到更多 Connector 项目,例如 Flume 和 Kafka。

Flink 的部署

Flink 有三种部署模式,分别是 Local、Standalone Cluster 和 Yarn Cluster。对于 Local 模式来讲,JobManager 和 TaskManager 会公用一个 JVM 来完成 Workload。若是要验证一个简单的应用,Local 模式是最方便的。实际应用中大多使用 Standalone 或者 Yarn Cluster。下面我主要介绍下这两种模式。

Standalone 模式

在搭建 Standalone 模式的 Flink 集群以前,咱们须要先下载 Flink 安装包。这里咱们须要下载 Flink 针对 Hadoop 1.x 的包。下载并解压后,进到 Flink 的根目录,而后查看 conf 文件夹,以下图。

图 3. Flink 的目录结构

图 3. Flink 的目录结构

咱们须要指定 Master 和 Worker。Master 机器会启动 JobManager,Worker 则会启动 TaskManager。所以,咱们须要修改 conf 目录中的 master 和 slaves。在配置 master 文件时,须要指定 JobManager 的 UI 监听端口。通常状况下,JobManager 只需配置一个,Worker 则须配置一个或多个(以行为单位)。示例以下:

1
2
3
4
5
micledeMacBook-Pro:conf micle$ cat masters
localhost:8081
 
micledeMacBook-Pro:conf micle$ cat slaves
localhost

在 conf 目录中找到文件 flink-conf.yaml。在这个文件中定义了 Flink 各个模块的基本属性,如 RPC 的端口,JobManager 和 TaskManager 堆的大小等。在不考虑 HA 的状况下,通常只须要修改属性 taskmanager.numberOfTaskSlots,也就是每一个 Task Manager 所拥有的 Slot 个数。这个属性,通常设置成机器 CPU 的 core 数,用来平衡机器之间的运算性能。其默认值为 1。配置完成后,使用下图中的命令启动 JobManager 和 TaskManager(启动以前,须要确认 Java 的环境是否已经就绪)。

图 4. 启动 StandAlone 模式的 Flink

图 4. 启动 StandAlone 模式的 Flink

启动以后咱们就能够登录 Flink 的 GUI 页面。在页面中咱们能够看到 Flink 集群的基本属性,在 JobManager 和 TaskManager 的页面中,能够看到这两个模块的属性。目前 Flink 的 GUI,只提供了简单的查看功能,没法动态修改配置属性。通常在企业级应用中,这是很难被接受的。所以,一个企业真正要应用 Flink 的话,估计也不得不增强 WEB 的功能。

图 5. Flink 的 GUI 页面

图 5. Flink 的 GUI 页面

Yarn Cluster 模式

在一个企业中,为了最大化的利用集群资源,通常都会在一个集群中同时运行多种类型的 Workload。所以 Flink 也支持在 Yarn 上面运行。首先,让咱们经过下图了解下 Yarn 和 Flink 的关系。

图 6. Flink 与 Yarn 的关系

图 6. Flink 与 Yarn 的关系

在图中能够看出,Flink 与 Yarn 的关系与 MapReduce 和 Yarn 的关系是同样的。Flink 经过 Yarn 的接口实现了本身的 App Master。当在 Yarn 中部署了 Flink,Yarn 就会用本身的 Container 来启动 Flink 的 JobManager(也就是 App Master)和 TaskManager。

了解了 Flink 与 Yarn 的关系,咱们就简单看下部署的步骤。在这以前须要先部署好 Yarn 的集群,这里我就不作介绍了。咱们能够经过如下的命令查看 Yarn 中现有的 Application,而且来检查 Yarn 的状态。

1
yarn application –list

若是命令正确返回了,就说明 Yarn 的 RM 目前已经在启动状态。针对不一样的 Yarn 版本,Flink 有不一样的安装包。咱们能够在 Apache Flink 的下载页中找到对应的安装包。个人 Yarn 版本为 2.7.1。再介绍具体的步骤以前,咱们须要先了解 Flink 有两种在 Yarn 上面的运行模式。一种是让 Yarn 直接启动 JobManager 和 TaskManager,另外一种是在运行 Flink Workload 的时候启动 Flink 的模块。前者至关于让 Flink 的模块处于 Standby 的状态。这里,我也主要介绍下前者。

在下载和解压 Flink 的安装包以后,须要在环境中增长环境变量 HADOOP_CONF_DIR 或者 YARN_CONF_DIR,其指向 Yarn 的配置目录。如运行下面的命令:

1
export HADOOP_CONF_DIR=/etc/hadoop/conf

这是由于 Flink 实现了 Yarn 的 Client,所以须要 Yarn 的一些配置和 Jar 包。在配置好环境变量后,只需简单的运行以下的脚本,Yarn 就会启动 Flink 的 JobManager 和 TaskManager。

1
yarn-session.sh –d –s 2 –tm 800 –n 2

上面的命令的意思是,向 Yarn 申请 2 个 Container 启动 TaskManager(-n 2),每一个 TaskManager 拥有两个 Task Slot(-s 2),而且向每一个 TaskManager 的 Container 申请 800M 的内存。在上面的命令成功后,咱们就能够在 Yarn Application 页面看到 Flink 的纪录。以下图。

图 7. Flink on Yarn

图 7. Flink on Yarn

若是有些读者在虚拟机中测试,可能会遇到错误。这里须要注意内存的大小,Flink 向 Yarn 会申请多个 Container,可是 Yarn 的配置可能限制了 Container 所能申请的内存大小,甚至 Yarn 自己所管理的内存就很小。这样极可能没法正常启动 TaskManager,尤为当指定多个 TaskManager 的时候。所以,在启动 Flink 以后,须要去 Flink 的页面中检查下 Flink 的状态。这里能够从 RM 的页面中,直接跳转(点击 Tracking UI)。这时候 Flink 的页面如图 8。

图 8. Flink 的页面

图 8. Flink 的页面

对于 Flink 安装时的 Trouble-shooting,可能更多时候须要查看 Yarn 相关的 log 来分析。这里就很少作介绍,读者能够到 Yarn 相关的描述中查找。

Flink 的 HA

对于一个企业级的应用,稳定性是首要要考虑的问题,而后才是性能,所以 HA 机制是必不可少的。另外,对于已经了解 Flink 架构的读者,可能会更担忧 Flink 架构背后的单点问题。和 Hadoop 一代同样,从架构中咱们能够很明显的发现 JobManager 有明显的单点问题(SPOF,single point of failure)。 JobManager 肩负着任务调度以及资源分配,一旦 JobManager 出现意外,其后果可想而知。Flink 对 JobManager HA 的处理方式,原理上基本和 Hadoop 同样(一代和二代)。

首先,咱们须要知道 Flink 有两种部署的模式,分别是 Standalone 以及 Yarn Cluster 模式。对于 Standalone 来讲,Flink 必须依赖于 Zookeeper 来实现 JobManager 的 HA(Zookeeper 已经成为了大部分开源框架 HA 必不可少的模块)。在 Zookeeper 的帮助下,一个 Standalone 的 Flink 集群会同时有多个活着的 JobManager,其中只有一个处于工做状态,其余处于 Standby 状态。当工做中的 JobManager 失去链接后(如宕机或 Crash),Zookeeper 会从 Standby 中选举新的 JobManager 来接管 Flink 集群。

对于 Yarn Cluaster 模式来讲,Flink 就要依靠 Yarn 自己来对 JobManager 作 HA 了。其实这里彻底是 Yarn 的机制。对于 Yarn Cluster 模式来讲,JobManager 和 TaskManager 都是被 Yarn 启动在 Yarn 的 Container 中。此时的 JobManager,其实应该称之为 Flink Application Master。也就说它的故障恢复,就彻底依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 同样)。因为彻底依赖了 Yarn,所以不一样版本的 Yarn 可能会有细微的差别。这里再也不作深究。

Flink 的 Rest API 介绍

Flink 和其余大多开源的框架同样,提供了不少有用的 Rest API。不过 Flink 的 RestAPI,目前还不是很强大,只能支持一些 Monitor 的功能。Flink Dashboard 自己也是经过其 Rest 来查询各项的结果数据。在 Flink RestAPI 基础上,能够比较容易的将 Flink 的 Monitor 功能和其余第三方工具相集成,这也是其设计的初衷。

在 Flink 的进程中,是由 JobManager 来提供 Rest API 的服务。所以在调用 Rest 以前,要肯定 JobManager 是否处于正常的状态。正常状况下,在发送一个 Rest 请求给 JobManager 以后,Client 就会收到一个 JSON 格式的返回结果。因为目前 Rest 提供的功能还很少,须要加强这块功能的读者能够在子项目 flink-runtime-web 中找到对应的代码。其中最关键一个类 WebRuntimeMonitor,就是用来对全部的 Rest 请求作分流的,若是须要添加一个新类型的请求,就须要在这里增长对应的处理代码。下面我例举几个经常使用 Rest API。

1.查询 Flink 集群的基本信息: /overview。示例命令行格式以及返回结果以下:

1
2
$ curl http://localhost:8081/overview{"taskmanagers":1,"slots-total":16,
"slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}

2.查询当前 Flink 集群中的 Job 信息:/jobs。示例命令行格式以及返回结果以下:

1
2
$ curl http://localhost:8081/jobs{"jobs-running":[],"jobs-finished":
["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}

3.查询一个指定的 Job 信息: /jobs/jobid。这个查询的结果会返回特别多的详细的内容,这是我在浏览器中进行的测试,以下图:

图 9. Rest 查询具体的 Job 信息

图 9. Rest 查询具体的 Job 信息

想要了解更多 Rest 请求内容的读者,能够去 Apache Flink 的页面中查找。因为篇幅有限,这里就不一一列举。

运行 Flink 的 Workload

WordCount 的例子,就像是计算框架的 helloworld。这里我就以 WordCount 为例,介绍下如何在 Flink 中运行 workload。

在安装好 Flink 的环境中,找到 Flink 的目录。而后找到 bin/flink,它就是用来提交 Flink workload 的工具。对于 WordCount,咱们能够直接使用已有的示例 jar 包。如运行以下的命令:

1
./bin/flink run ./examples/WordCount.jar hdfs://user/root/test hdfs://user/root/out

上面的命令是在 HDFS 中运行 WordCount,若是没有 HDFS 用本地的文件系统也是能够的,只须要将“hdfs://”换成“file://”。这里咱们须要强调一种部署关系,就是 StandAlone 模式的 Flink,也是能够直接访问 HDFS 等分布式文件系统的。

结束语

Flink 是一个比 Spark 起步晚的项目,可是并不表明 Flink 的前途就会暗淡。Flink 和 Spark 有不少相似之处,但也有不少明显的差别。本文并无比较这二者之间的差别,这是将来我想与你们探讨的。例如 Flink 如何更高效的管理内存,如何进一步的避免用户程序的 OOM。在 Flink 的世界里一切都是流,它更专一处理流应用。因为其起步晚,加上社区的活跃度并无 Spark 那么热,因此其在一些细节的场景支持上,并无 Spark 那么完善。例如目前在 SQL 的支持上并无 Spark 那么平滑。在企业级应用中,Spark 已经开始落地,而 Flink 可能还须要一段时间的打磨。在后续文章中,我会详细介绍如何开发 Flink 的程序,以及更多有关 Flink 内部实现的内容。

 

内容来自:https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-flink/

相关文章
相关标签/搜索