这几年大数据的飞速发展,出现了不少热门的开源社区,其中著名的有 Hadoop、Storm,以及后来的 Spark,他们都有着各自专一的应用场景。Spark 掀开了内存计算的先河,也之内存为赌注,赢得了内存计算的飞速发展。Spark 的火热或多或少的掩盖了其余分布式计算的系统身影。就像 Flink,也就在这个时候默默的发展着。html
在国外一些社区,有不少人将大数据的计算引擎分红了 4 代,固然,也有不少人不会认同。咱们先姑且这么认为和讨论。web
首先第一代的计算引擎,无疑就是 Hadoop 承载的 MapReduce。这里你们应该都不会对 MapReduce 陌生,它将计算分为两个阶段,分别为 Map 和 Reduce。对于上层应用来讲,就不得不千方百计去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。算法
因为这样的弊端,催生了支持 DAG 框架的产生。所以,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里咱们不去细究各类 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来讲,大多仍是批处理的任务。apache
接下来就是以 Spark 为表明的第三代的计算引擎。第三代计算引擎的特色主要是 Job 内部的 DAG 支持(不跨越 Job),以及强调的实时计算。在这里,不少人也会认为第三代计算引擎也可以很好的运行批处理的 Job。浏览器
随着第三代计算引擎的出现,促进了上层应用快速发展,例如各类迭代计算的性能以及对流计算和 SQL 等的支持。Flink 的诞生就被归在了第四代。这应该主要表如今 Flink 对流计算的支持,以及更一步的实时性上面。固然 Flink 也能够支持 Batch 的任务,以及 DAG 的运算。session
或许会有人不一样意以上的分类,我以为其实这并不重要的,重要的是体会各个框架的差别,以及更适合的场景。并进行理解,没有哪个框架能够完美的支持全部的场景,也就不可能有任何一个框架能彻底取代另外一个,就像 Spark 没有彻底取代 Hadoop,固然 Flink 也不可能取代 Spark。本文将致力描述 Flink 的原理以及应用。架构
不少人可能都是在 2015 年才听到 Flink 这个词,其实早在 2008 年,Flink 的前身已是柏林理工大学一个研究性项目, 在 2014 被 Apache 孵化器所接受,而后迅速地成为了 ASF(Apache Software Foundation)的顶级项目之一。Flink 的最新版本目前已经更新到了 0.10.0 了,在不少人感慨 Spark 的快速发展的同时,或许咱们也该为 Flink 的发展速度点个赞。app
Flink 是一个针对流数据和批数据的分布式处理引擎。它主要是由 Java 代码实现。目前主要仍是依靠开源社区的贡献而发展。对 Flink 而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。再换句话说,Flink 会把全部任务当成流来处理,这也是其最大的特色。Flink 能够支持本地的快速迭代,以及一些环形的迭代任务。而且 Flink 能够定制化内存管理。在这点,若是要对比 Flink 和 Spark 的话,Flink 并无将内存彻底交给应用层。这也是为何 Spark 相对于 Flink,更容易出现 OOM 的缘由(out of memory)。就框架自己与应用场景来讲,Flink 更类似与 Storm。若是以前了解过 Storm 或者 Flume 的读者,可能会更容易理解 Flink 的架构和不少概念。下面让咱们先来看下 Flink 的架构图。框架
如图 1 所示,咱们能够了解到 Flink 几个最基础的概念,Client、JobManager 和 TaskManager。Client 用来提交任务给 JobManager,JobManager 分发任务给 TaskManager 去执行,而后 TaskManager 会心跳的汇报任务状态。看到这里,有的人应该已经有种回到 Hadoop 一代的错觉。确实,从架构图去看,JobManager 很像当年的 JobTracker,TaskManager 也很像当年的 TaskTracker。然而有一个最重要的区别就是 TaskManager 之间是是流(Stream)。其次,Hadoop 一代中,只有 Map 和 Reduce 之间的 Shuffle,而对 Flink 而言,多是不少级,而且在 TaskManager 内部和 TaskManager 之间都会有数据传递,而不像 Hadoop,是固定的 Map 到 Reduce。curl
在 Flink 集群中,计算资源被定义为 Task Slot。每一个 TaskManager 会拥有一个或多个 Slots。JobManager 会以 Slot 为单位调度 Task。可是这里的 Task 跟咱们在 Hadoop 中的理解是有区别的。对 Flink 的 JobManager 来讲,其调度的是一个 Pipeline 的 Task,而不是一个点。举个例子,在 Hadoop 中 Map 和 Reduce 是两个独立调度的 Task,而且都会去占用计算资源。对 Flink 来讲 MapReduce 是一个 Pipeline 的 Task,只占用一个计算资源。类同的,若是有一个 MRR 的 Pipeline Task,在 Flink 中其也是一个被总体调度的 Pipeline Task。在 TaskManager 中,根据其所拥有的 Slot 个数,同时会拥有多个 Pipeline。
在 Flink StandAlone 的部署模式中,这个还比较容易理解。由于 Flink 自身也须要简单的管理计算资源(Slot)。当 Flink 部署在 Yarn 上面以后,Flink 并无弱化资源管理。也就是说这时候的 Flink 在作一些 Yarn 该作的事情。从设计角度来说,我认为这是不太合理的。若是 Yarn 的 Container 没法彻底隔离 CPU 资源,这时候对 Flink 的 TaskManager 配置多个 Slot,应该会出现资源不公平利用的现象。Flink 若是想在数据中心更好的与其余计算框架共享计算资源,应该尽可能不要干预计算资源的分配和定义。
须要深度学习 Flink 调度读者,能够在 Flink 的源码目录中找到 flink-runtime 这个文件夹,JobManager 的 code 基本都在这里。
一个计算框架要有长远的发展,必须打造一个完整的 Stack。否则就跟纸上谈兵同样,没有任何意义。只有上层有了具体的应用,并能很好的发挥计算框架自己的优点,那么这个计算框架才能吸引更多的资源,才会更快的进步。因此 Flink 也在努力构建本身的 Stack。
Flink 首先支持了 Scala 和 Java 的 API,Python 也正在测试中。Flink 经过 Gelly 支持了图操做,还有机器学习的 FlinkML。Table 是一种接口化的 SQL 支持,也就是 API 支持,而不是文本化的 SQL 解析和执行。对于完整的 Stack 咱们能够参考下图。
Flink 为了更普遍的支持大数据的生态圈,其下也实现了不少 Connector 的子项目。最熟悉的,固然就是与 Hadoop HDFS 集成。其次,Flink 也宣布支持了 Tachyon、S3 以及 MapRFS。不过对于 Tachyon 以及 S3 的支持,都是经过 Hadoop HDFS 这层包装实现的,也就是说要使用 Tachyon 和 S3,就必须有 Hadoop,并且要更改 Hadoop 的配置(core-site.xml)。若是浏览 Flink 的代码目录,咱们就会看到更多 Connector 项目,例如 Flume 和 Kafka。
Flink 有三种部署模式,分别是 Local、Standalone Cluster 和 Yarn Cluster。对于 Local 模式来讲,JobManager 和 TaskManager 会公用一个 JVM 来完成 Workload。若是要验证一个简单的应用,Local 模式是最方便的。实际应用中大多使用 Standalone 或者 Yarn Cluster。下面我主要介绍下这两种模式。
在搭建 Standalone 模式的 Flink 集群以前,咱们须要先下载 Flink 安装包。这里咱们须要下载 Flink 针对 Hadoop 1.x 的包。下载并解压后,进到 Flink 的根目录,而后查看 conf 文件夹,以下图。
咱们须要指定 Master 和 Worker。Master 机器会启动 JobManager,Worker 则会启动 TaskManager。所以,咱们须要修改 conf 目录中的 master 和 slaves。在配置 master 文件时,须要指定 JobManager 的 UI 监听端口。通常状况下,JobManager 只需配置一个,Worker 则须配置一个或多个(以行为单位)。示例以下:
1
2
3
4
5
|
micledeMacBook-Pro:conf micle$ cat masters
localhost:8081
micledeMacBook-Pro:conf micle$ cat slaves
localhost
|
在 conf 目录中找到文件 flink-conf.yaml。在这个文件中定义了 Flink 各个模块的基本属性,如 RPC 的端口,JobManager 和 TaskManager 堆的大小等。在不考虑 HA 的状况下,通常只须要修改属性 taskmanager.numberOfTaskSlots,也就是每一个 Task Manager 所拥有的 Slot 个数。这个属性,通常设置成机器 CPU 的 core 数,用来平衡机器之间的运算性能。其默认值为 1。配置完成后,使用下图中的命令启动 JobManager 和 TaskManager(启动以前,须要确认 Java 的环境是否已经就绪)。
启动以后咱们就能够登录 Flink 的 GUI 页面。在页面中咱们能够看到 Flink 集群的基本属性,在 JobManager 和 TaskManager 的页面中,能够看到这两个模块的属性。目前 Flink 的 GUI,只提供了简单的查看功能,没法动态修改配置属性。通常在企业级应用中,这是很难被接受的。所以,一个企业真正要应用 Flink 的话,估计也不得不增强 WEB 的功能。
在一个企业中,为了最大化的利用集群资源,通常都会在一个集群中同时运行多种类型的 Workload。所以 Flink 也支持在 Yarn 上面运行。首先,让咱们经过下图了解下 Yarn 和 Flink 的关系。
在图中能够看出,Flink 与 Yarn 的关系与 MapReduce 和 Yarn 的关系是同样的。Flink 经过 Yarn 的接口实现了本身的 App Master。当在 Yarn 中部署了 Flink,Yarn 就会用本身的 Container 来启动 Flink 的 JobManager(也就是 App Master)和 TaskManager。
了解了 Flink 与 Yarn 的关系,咱们就简单看下部署的步骤。在这以前须要先部署好 Yarn 的集群,这里我就不作介绍了。咱们能够经过如下的命令查看 Yarn 中现有的 Application,而且来检查 Yarn 的状态。
1
|
yarn application –list
|
若是命令正确返回了,就说明 Yarn 的 RM 目前已经在启动状态。针对不一样的 Yarn 版本,Flink 有不一样的安装包。咱们能够在 Apache Flink 的下载页中找到对应的安装包。个人 Yarn 版本为 2.7.1。再介绍具体的步骤以前,咱们须要先了解 Flink 有两种在 Yarn 上面的运行模式。一种是让 Yarn 直接启动 JobManager 和 TaskManager,另外一种是在运行 Flink Workload 的时候启动 Flink 的模块。前者至关于让 Flink 的模块处于 Standby 的状态。这里,我也主要介绍下前者。
在下载和解压 Flink 的安装包以后,须要在环境中增长环境变量 HADOOP_CONF_DIR 或者 YARN_CONF_DIR,其指向 Yarn 的配置目录。如运行下面的命令:
1
|
export HADOOP_CONF_DIR=/etc/hadoop/conf
|
这是由于 Flink 实现了 Yarn 的 Client,所以须要 Yarn 的一些配置和 Jar 包。在配置好环境变量后,只需简单的运行以下的脚本,Yarn 就会启动 Flink 的 JobManager 和 TaskManager。
1
|
yarn-session.sh –d –s 2 –tm 800 –n 2
|
上面的命令的意思是,向 Yarn 申请 2 个 Container 启动 TaskManager(-n 2),每一个 TaskManager 拥有两个 Task Slot(-s 2),而且向每一个 TaskManager 的 Container 申请 800M 的内存。在上面的命令成功后,咱们就能够在 Yarn Application 页面看到 Flink 的纪录。以下图。
若是有些读者在虚拟机中测试,可能会遇到错误。这里须要注意内存的大小,Flink 向 Yarn 会申请多个 Container,可是 Yarn 的配置可能限制了 Container 所能申请的内存大小,甚至 Yarn 自己所管理的内存就很小。这样极可能没法正常启动 TaskManager,尤为当指定多个 TaskManager 的时候。所以,在启动 Flink 以后,须要去 Flink 的页面中检查下 Flink 的状态。这里能够从 RM 的页面中,直接跳转(点击 Tracking UI)。这时候 Flink 的页面如图 8。
对于 Flink 安装时的 Trouble-shooting,可能更多时候须要查看 Yarn 相关的 log 来分析。这里就很少作介绍,读者能够到 Yarn 相关的描述中查找。
对于一个企业级的应用,稳定性是首要要考虑的问题,而后才是性能,所以 HA 机制是必不可少的。另外,对于已经了解 Flink 架构的读者,可能会更担忧 Flink 架构背后的单点问题。和 Hadoop 一代同样,从架构中咱们能够很明显的发现 JobManager 有明显的单点问题(SPOF,single point of failure)。 JobManager 肩负着任务调度以及资源分配,一旦 JobManager 出现意外,其后果可想而知。Flink 对 JobManager HA 的处理方式,原理上基本和 Hadoop 同样(一代和二代)。
首先,咱们须要知道 Flink 有两种部署的模式,分别是 Standalone 以及 Yarn Cluster 模式。对于 Standalone 来讲,Flink 必须依赖于 Zookeeper 来实现 JobManager 的 HA(Zookeeper 已经成为了大部分开源框架 HA 必不可少的模块)。在 Zookeeper 的帮助下,一个 Standalone 的 Flink 集群会同时有多个活着的 JobManager,其中只有一个处于工做状态,其余处于 Standby 状态。当工做中的 JobManager 失去链接后(如宕机或 Crash),Zookeeper 会从 Standby 中选举新的 JobManager 来接管 Flink 集群。
对于 Yarn Cluaster 模式来讲,Flink 就要依靠 Yarn 自己来对 JobManager 作 HA 了。其实这里彻底是 Yarn 的机制。对于 Yarn Cluster 模式来讲,JobManager 和 TaskManager 都是被 Yarn 启动在 Yarn 的 Container 中。此时的 JobManager,其实应该称之为 Flink Application Master。也就说它的故障恢复,就彻底依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 同样)。因为彻底依赖了 Yarn,所以不一样版本的 Yarn 可能会有细微的差别。这里再也不作深究。
Flink 和其余大多开源的框架同样,提供了不少有用的 Rest API。不过 Flink 的 RestAPI,目前还不是很强大,只能支持一些 Monitor 的功能。Flink Dashboard 自己也是经过其 Rest 来查询各项的结果数据。在 Flink RestAPI 基础上,能够比较容易的将 Flink 的 Monitor 功能和其余第三方工具相集成,这也是其设计的初衷。
在 Flink 的进程中,是由 JobManager 来提供 Rest API 的服务。所以在调用 Rest 以前,要肯定 JobManager 是否处于正常的状态。正常状况下,在发送一个 Rest 请求给 JobManager 以后,Client 就会收到一个 JSON 格式的返回结果。因为目前 Rest 提供的功能还很少,须要加强这块功能的读者能够在子项目 flink-runtime-web 中找到对应的代码。其中最关键一个类 WebRuntimeMonitor,就是用来对全部的 Rest 请求作分流的,若是须要添加一个新类型的请求,就须要在这里增长对应的处理代码。下面我例举几个经常使用 Rest API。
1.查询 Flink 集群的基本信息: /overview。示例命令行格式以及返回结果以下:
1
2
|
$ curl http://localhost:8081/overview{"taskmanagers":1,"slots-total":16,
"slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}
|
2.查询当前 Flink 集群中的 Job 信息:/jobs。示例命令行格式以及返回结果以下:
1
2
|
$ curl http://localhost:8081/jobs{"jobs-running":[],"jobs-finished":
["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}
|
3.查询一个指定的 Job 信息: /jobs/jobid。这个查询的结果会返回特别多的详细的内容,这是我在浏览器中进行的测试,以下图:
想要了解更多 Rest 请求内容的读者,能够去 Apache Flink 的页面中查找。因为篇幅有限,这里就不一一列举。
WordCount 的例子,就像是计算框架的 helloworld。这里我就以 WordCount 为例,介绍下如何在 Flink 中运行 workload。
在安装好 Flink 的环境中,找到 Flink 的目录。而后找到 bin/flink,它就是用来提交 Flink workload 的工具。对于 WordCount,咱们能够直接使用已有的示例 jar 包。如运行以下的命令:
1
|
./bin/flink run ./examples/WordCount.jar hdfs://user/root/test hdfs://user/root/out
|
上面的命令是在 HDFS 中运行 WordCount,若是没有 HDFS 用本地的文件系统也是能够的,只须要将“hdfs://”换成“file://”。这里咱们须要强调一种部署关系,就是 StandAlone 模式的 Flink,也是能够直接访问 HDFS 等分布式文件系统的。
Flink 是一个比 Spark 起步晚的项目,可是并不表明 Flink 的前途就会暗淡。Flink 和 Spark 有不少相似之处,但也有不少明显的差别。本文并无比较这二者之间的差别,这是将来我想与你们探讨的。例如 Flink 如何更高效的管理内存,如何进一步的避免用户程序的 OOM。在 Flink 的世界里一切都是流,它更专一处理流应用。因为其起步晚,加上社区的活跃度并无 Spark 那么热,因此其在一些细节的场景支持上,并无 Spark 那么完善。例如目前在 SQL 的支持上并无 Spark 那么平滑。在企业级应用中,Spark 已经开始落地,而 Flink 可能还须要一段时间的打磨。在后续文章中,我会详细介绍如何开发 Flink 的程序,以及更多有关 Flink 内部实现的内容。
内容来自:https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-flink/