优先级:算子层面>环境层面>客户端层面>系统层面java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).setParallelism(5); wordCounts.print(); env.execute("Word Count Example"); 复制代码
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = [...] wordCounts.print(); env.execute("Word Count Example"); 复制代码
./bin/flink run -p 10 ../examples/*WordCount-java*.jar 复制代码
或者程序员
try { PackagedProgram program = new PackagedProgram(file, args); InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123"); Configuration config = new Configuration(); Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader()); // set the parallelism to 10 here client.run(program, 10, true); } catch (ProgramInvocationException e) { e.printStackTrace(); } 复制代码
# The parallelism used for programs that did not specify and other parallelism. parallelism.default: 1 复制代码
在大数据领域,词频统计(WordCount)程序就像是一个编程语言的HelloWorld程序,它展现了一个大数据引擎的基本规范。麻雀虽小,五脏俱全,从这个样例中,咱们能够一窥Flink设计和运行原理。数据库
如图1所示,程序分为三大部分,第一部分读取数据源(Source),第二部分对数据作转换操做(Transformation),最后将转换结果输出到一个目的地(Sink)。代码中的方法被称为算子(Operator),是Flink提供给程序员的接口,程序员须要经过这些算子对数据进行操做。Source算子读取数据源中的数据,数据源能够是数据流、也能够存储在文件系统中的文件。Transformation算子对数据进行必要的计算处理。Sink算子将处理结果输出,数据通常被输出到数据库、文件系统或下一个数据流程序。apache
咱们能够把算子理解为1 + 2 运算中的加号,加号(+)是这个算子的一个符号表示,它表示对数字1和数字2作加法运算。一样,在Flink或Spark这样的大数据引擎中,算子对数据进行某种操做,程序员能够根据本身的需求调用合适的算子,完成所需计算任务。经常使用的算子有map
、flatMap
、keyBy
、timeWindow
等,它们分别对数据流执行不一样类型的操做。编程
在程序实际运行前,Flink会将用户编写的代码作一个简单处理,生成一个如图2所示的逻辑视图。图 2展现了WordCount程序中,数据从不一样算子间流动的状况。图中,圆圈表明算子,圆圈间的箭头表明数据流,数据流在Flink程序中通过不一样算子的计算,最终生成为目标数据。其中,keyBy
、timeWindow
和sum
共同组成了一个时间窗口上的聚合操做,被归结为一个算子。咱们能够在Flink的Web UI中,点击一个做业,查看这个做业的逻辑视图。api
对于词频统计这个案例,逻辑上来说无非是对数据流中的单词作提取,而后使用一个Key-Value结构对单词作词频计数,最后输出结果便可,这样的逻辑本能够用几行代码完成,改为使用算子形式,反而让新人看着一头雾水,为何必定要用算子的形式来写程序呢?实际上,算子进化成当前这个形态,就像人类从石块计数,到手指计数,到算盘计数,再到计算机计数这样的进化过程同样,尽管更低级的方式能够完成必定的计算任务,可是随着计算规模的增加,古老的计数方式存在着低效的弊端,没法完成更高级别和更大规模的计算需求。试想,若是咱们不使用大数据引擎提供的算子,而是本身实现一套上述的计算逻辑,尽管咱们能够快速完成当前的词频统计的任务,可是当面临一个新计算任务时,咱们须要从新编写程序,完成一整套计算任务。咱们本身编写代码的横向扩展性可能很低,当输入数据暴增时,咱们须要作很大改动,以部署在更多机器上。数据结构
大数据引擎的算子对计算作了一些抽象,对于新人来讲有必定学习成本,而一旦掌握这门技术,人们所能处理的数据规模将成倍增长。大数据引擎的算子出现,正是针对数据分布在多个节点的大数据场景下,须要一种统一的计算描述语言来对数据作计算而进化出的新计算形态。基于Flink的算子,咱们能够定义一个数据流的逻辑视图,以此完成对大数据的计算。剩下那些数据交换、横向扩展、故障恢复等问题全交由大数据引擎来解决。多线程
在绝大多数的大数据处理场景下,一台机器节点没法处理全部数据,数据被切分到多台节点上。在大数据领域,当数据量大到超过单台机器处理能力时,须要将一份数据切分到多个分区(Partition)上,每一个分区分布在一台虚拟机或物理机上。架构
前一小节已经提到,大数据引擎的算子提供了编程接口,咱们可使用算子构建数据流的逻辑视图。考虑到数据分布在多个节点的状况,逻辑视图只是一种抽象,须要将逻辑视图转化为物理执行图,才能在分布式环境下执行。并发
图 3为WordCount程序的物理执行图,这里数据流分布在2个分区上。箭头部分表示数据流分区,圆圈部分表示算子在分区上的算子子任务(Operator Subtask)。从逻辑视图变为物理执行图后,FlatMap算子在每一个分区都有一个算子子任务,以处理该分区上的数据:FlatMap[1/2]算子子任务处理第一个数据流分区上的数据,以此类推。
算子子任务又被称为算子实例,一个算子在并行执行时,会有多个算子实例。即便输入数据增多,咱们也能够经过部署更多的算子实例来进行横向扩展。从图 3中能够看到,除去Sink外的算子都被分红了2个算子实例,他们的并行度(Parallelism)为2,Sink算子的并行度为1。并行度是能够被设置的,当设置某个算子的并行度为2时,也就意味着有这个算子有2个算子子任务(或者说2个算子实例)并行执行。实际应用中通常根据输入数据量的大小,计算资源的多少等多方面的因素来设置并行度。
注意,在本例中,为了演示,咱们把全部算子的并行度设置为了2:env.setParallelism(2);
,把最后输出的并行度设置成了1:wordCount.print().setParallelism(1);
。若是不单独设置print
的并行度的话,它的并行度也是2。
算子子任务是Flink物理执行的基本单元,算子子任务之间是相互独立的,某个算子子任务有本身的线程,不一样算子子任务可能分布在不一样的节点上。后文在Flink的资源分配部分咱们还会重点介绍算子子任务。
了解了Flink的分布式架构和核心组件,这里咱们从更细粒度上来介绍从逻辑视图转化为物理执行图过程,该过程能够分红四层:StreamGraph
-> JobGraph
-> ExecutionGraph
-> 物理执行图。
StreamGraph
:是根据用户编写的代码生成的最初的图,用来表示一个Flink做业的拓扑结构。在StreamGraph
中,节点StreamNode
就是算子。JobGraph
:JobGraph
是提交给 JobManager 的数据结构。StreamGraph
通过优化后生成了JobGraph
,主要的优化为,将多个符合条件的节点连接在一块儿做为一个JobVertex
节点,这样能够减小数据交换所须要的传输开销。这个连接的过程叫作算子链(Operator Chain),会在下一小节继续介绍。JobVertex
通过算子链后,会包含一到多个算子,它输出是IntermediateDataSet
,是通过算子处理产生的数据集。ExecutionGraph
:JobManager将 JobGraph
转化为ExecutionGraph
。ExecutionGraph
是JobGraph
的并行化版本:假如某个JobVertex
的并行度是2,那么它将被划分为2个ExecutionVertex
,ExecutionVertex
表示一个算子子任务,它监控着单个子任务的执行状况。每一个ExecutionVertex
会输出一个IntermediateResultPartition
,这是单个子任务的输出,再通过ExecutionEdge
输出到下游节点。ExecutionJobVertex
是这些并行子任务的合集,它监控着整个算子的运行状况。ExecutionGraph
是调度层很是核心的数据结构。ExecutionGraph
对做业进行调度后,在各个TaskManager上部署具体的任务,物理执行图并非一个具体的数据结构。能够看到,Flink在数据流图上可谓煞费苦心,仅各种图就有四种之多。对于新人来讲,能够不用太关心这些很是细节的底层实现,只须要了解如下几个核心概念:
在构造物理执行图的过程当中,Flink会将一些算子子任务连接在一块儿,组成算子链。连接后以任务(Task)的形式被TaskManager调度执行。使用算子链是一个很是有效的优化,它能够有效下降算子子任务之间的传输开销。连接以后造成的Task是TaskManager中的一个线程。
例如,数据从Source前向传播到FlatMap,这中间没有发生跨分区的数据交换,所以,咱们彻底能够将Source、FlatMap这两个子任务组合在一块儿,造成一个Task。数据通过keyBy
发生了数据交换,数据会跨越分区,所以没法将keyBy
以及其后面的窗口聚合连接到一块儿。因为WindowAggregation的并行度是2,Sink的并行度为1,数据再次发生了交换,咱们不能把WindowAggregation和Sink两部分连接到一块儿。1.2节中提到,Sink的并行度是人为设置为1,若是咱们把Sink的并行度也设置为2,那么是可让这两个算子连接到一块儿的。
默认状况下,Flink会尽可能将更多的子任务连接在一块儿,这样能减小一些没必要要的数据传输开销。但一个子任务有超过一个输入或发生数据交换时,连接就没法创建。两个算子可以连接到一块儿是有一些规则的,感兴趣的读者能够阅读Flink源码中org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator
中的isChainable
方法。StreamingJobGraphGenerator
类的做用是将StreamGraph
转换为JobGraph
。
尽管将算子连接到一块儿会下降一些传输开销,可是也有一些状况并不须要太多连接。好比,有时候咱们须要将一个很是长的算子链拆开,这样咱们就能够将原来集中在一个线程中的计算拆分到多个线程中来并行计算。Flink容许开发者手动配置是否启用算子链,或者对哪些算子使用算子链。
根据前文的介绍,咱们已经了解到TaskManager负责具体的任务执行。TaskManager是一个JVM进程,在TaskManager中能够并行运行多个Task。在程序执行以前,通过优化,部分子任务被连接在一块儿,组成一个Task。每一个Task是一个线程,须要TaskManager为其分配相应的资源,TaskManager使用任务槽位给Task分配资源。
在解释Flink任务槽位的概念前,咱们先回顾一下进程与线程的概念。在操做系统层面,进程(Process)是进行资源分配和调度的一个独立单位,线程(Thread)是CPU调度的基本单位。好比,咱们经常使用的Office Word软件,在启动后就占用操做系统的一个进程。Windows上可使用任务管理器来查看当前活跃的进程,Linux上可使用top
命令来查看。线程是进程的一个子集,一个线程通常专一于处理一些特定任务,不独立拥有系统资源,只拥有一些运行中必要的资源,如程序计数器。一个进程至少有一个线程,也能够有多个线程。多线程场景下,每一个线程都处理一小个任务,多个线程以高并发的方式同时处理多个小任务,能够提升处理能力。
回到Flink的槽位分配机制上,一个TaskManager是一个进程,TaskManager能够管理一至多个Task,每一个Task是一个线程,占用一个槽位。每一个槽位的资源是整个TaskManager资源的子集,好比这里的TaskManager下有3个槽位,每一个槽位占用TaskManager所管理的1/3的内存,第一个槽位中的Task不会与第二个槽位中的Task互相争抢内存资源。注意,在分配资源时,Flink并无将CPU资源明确分配给各个槽位。
假设咱们给WordCount程序分配两个TaskManager,每一个TaskManager又分配3个槽位,因此总共是6个槽位。结合图 7中对这个做业的并行度设置,整个做业被划分为5个Task,使用5个线程,这5个线程能够按照图 8所示的方式分配到6个槽位中。
Flink容许用户设置TaskManager中槽位的数目,这样用户就能够肯定以怎样的粒度将任务作相互隔离。若是每一个TaskManager只包含一个槽位,那么运行在该槽位内的任务将独享JVM。若是TaskManager包含多个槽位,那么多个槽位内的任务能够共享JVM资源,好比共享TCP链接、心跳信息、部分数据结构等。官方建议将槽位数目设置为TaskManager下可用的CPU核心数,那么平均下来,每一个槽位都能平均得到1个CPU核心。