既然有了Apache Spark,为何还要使用Apache Flink?css
由于Flink是一个纯流式计算引擎,而相似于Spark这种微批的引擎,只是Flink流式引擎的一个特例。其余的不一样点以后会陆续谈到。html
Flink起源于一个叫作Stratosphere的研究项目,目标是创建下一代大数据分析引擎,其在2014年4月16日成为Apache的孵化项目,从Stratosphere 0.6开始,正式改名为Flink。Flink 0.7中介绍了最重要的特性:Streaming API。最初只支持Java API,后来增长了Scala API。java
Flink 1.X版本的包含了各类各样的组件,包括部署、flink core(runtime)以及API和各类库。git
从部署上讲,Flink支持local模式、集群模式(standalone集群或者Yarn集群)、云端部署。Runtime是主要的数据处理引擎,它以JobGraph形式的API接收程序,JobGraph是一个简单的并行数据流,包含一系列的tasks,每一个task包含了输入和输出(source和sink例外)。github
DataStream API和DataSet API是流处理和批处理的应用程序接口,当程序在编译时,生成JobGraph。编译完成后,根据API的不一样,优化器(批或流)会生成不一样的执行计划。根据部署方式的不一样,优化后的JobGraph被提交给了executors去执行。算法
Flink分布式程序包含2个主要的进程:JobManager和TaskManager.当程序运行时,不一样的进程就会参与其中,包括Jobmanager、TaskManager和JobClient。apache
首先,Flink程序提交给JobClient,JobClient再提交到JobManager,JobManager负责资源的协调和Job的执行。一旦资源分配完成,task就会分配到不一样的TaskManager,TaskManager会初始化线程去执行task,并根据程序的执行状态向JobManager反馈,执行的状态包括starting、in progress、finished以及canceled和failing等。当Job执行完成,结果会返回给客户端。bootstrap
Master进程,负责Job的管理和资源的协调。包括任务调度,检查点管理,失败恢复等。windows
固然,对于集群HA模式,能够同时多个master进程,其中一个做为leader,其余做为standby。当leader失败时,会选出一个standby的master做为新的leader(经过zookeeper实现leader选举)。
JobManager包含了3个重要的组件:缓存
Flink内部使用Akka模型做为JobManager和TaskManager之间的通讯机制。
Actor系统是个容器,包含许多不一样的Actor,这些Actor扮演者不一样的角色。Actor系统提供相似于调度、配置、日志等服务,同时包含了全部actors初始化时的线程池。
全部的Actors存在着层级的关系。新加入的Actor会被分配一个父类的Actor。Actors之间的通讯采用一个消息系统,每一个Actor都有一个“邮箱”,用于读取消息。若是Actors是本地的,则消息在共享内存中共享;若是Actors是远程的,则消息经过RPC远程调用。
每一个父类的Actor都负责监控其子类Actor,当子类Actor出现错误时,本身先尝试重启并修复错误;若是子类Actor不能修复,则将问题升级并由父类Actor处理。
在Flink中,actor是一个有状态和行为的容器。Actor的线程持续的处理从“邮箱”中接收到的消息。Actor中的状态和行为则由收到的消息决定。
Flink中的Executors被定义为task slots(线程槽位)。每一个Task Manager须要管理一个或多个task slots。
Flink经过SlotSharingGroup和CoLocationGroup来决定哪些task须要被共享,哪些task须要被单独的slot使用。
Flink的检查点机制是保证其一致性容错功能的骨架。它持续的为分布式的数据流和有状态的operator生成一致性的快照。其改良自Chandy-Lamport算法,叫作ABS(轻量级异步Barrier快照),具体参见论文:
Lightweight Asynchronous Snapshots for Distributed Dataflows
Flink的容错机制持续的构建轻量级的分布式快照,所以负载很是低。一般这些有状态的快照都被放在HDFS中存储(state backend)。程序一旦失败,Flink将中止executor并从最近的完成了的检查点开始恢复(依赖可重发的数据源+快照)。
Barrier做为一种Event,是Flink快照中最主要的元素。它会随着data record一块儿被注入到流数据中,并且不会超越data record。每一个barrier都有一个惟一的ID,将data record分到不一样的检查点的范围中。下图展现了barrier是如何被注入到data record中的:
每一个快照中的状态都会报告给Job Manager的检查点协调器;快照发生时,flink会在某些有状态的operator上对data record进行对齐操做(alignment),目的是避免失败恢复时重复消费数据。这个过程也是exactly once的保证。一般对齐操做的时间仅是毫秒级的。可是对于某些极端的应用,在每一个operator上产生的毫秒级延迟也不能容许的话,则能够选择降级到at least once,即跳过对齐操做,当失败恢复时可能发生重复消费数据的状况。Flink默认采用exactly once意义的处理。
Task Managers是具体执行tasks的worker节点,执行发生在一个JVM中的一个或多个线程中。Task的并行度是由运行在Task Manager中的task slots的数量决定。若是一个Task Manager有4个slots,那么JVM的内存将分配给每一个task slot 25%的内存。一个Task slot中能够运行1个或多个线程,同一个slot中的线程又能够共享相同的JVM。在相同的JVM中的tasks,会共享TCP链接和心跳消息:
Job Client并非Flink程序执行中的内部组件,而是程序执行的入口。Job Client负责接收用户提交的程序,并建立一个data flow,而后将生成的data flow提交给Job Manager。一旦执行完成,Job Client将返回给用户结果。
Data flow就是执行计划,好比下面一个简单的word count的程序:
当用户将这段程序提交时,Job Client负责接收此程序,并根据operator生成一个data flow,那么这个程序生成的data flow也许看起来像是这个样子:
默认状况下,Flink的data flow都是分布式并行处理的,对于数据的并行处理,flink将operators和数据流进行partition。Operator partitions叫作sub-tasks。数据流又能够分为一对一的传输与重分布的状况。
咱们看到,从source到map的data flow,是一个一对一的关系,不必产生shuffle操做;而从map到groupBy操做,flink会根据key将数据重分布,即shuffle操做,目的是聚合数据,产生正确的结果。
Flink自己就被设计为高性能和低延迟的引擎。不像Spark这种框架,你没有必要作许多手动的配置,用以得到最佳性能,Flink管道式(pipeline)的数据处理方式已经给了你最佳的性能。
经过检查点+可重发的数据源,使得Flink对于stateful的operator,支持exactly once的计算。固然你能够选择降级到at least once。
Flink支持数据驱动的窗口,这意味着咱们能够基于时间(event time或processing time)、count和session来构建窗口;窗口同时能够定制化,经过特定的pattern实现。
经过轻量级、分布式快照实现。
Flink在JVM内部进行内存的自我管理,使得其独立于java自己的垃圾回收机制。当处理hash、index、caching和sorting时,Flink自个人内存管理方式使得这些操做很高效。可是,目前自个人内存管理只在批处理中实现,流处理程序并未使用。
为了不shuffle、sort等操做,Flink的批处理API进行了优化,它能够确保避免过分的磁盘IO而尽量使用缓存。
Flink中批和流有各自的API,你既能够开发批程序,也能够开发流处理程序。事实上,Flink中的流处理优先原则,认为批处理是流处理的一种特殊状况。
Flink提供了用于机器学习、图计算、Table API等库,同时Flink也支持复杂的CEP处理和警告。
Flink支持Event Time语义的处理,这有助于处理流计算中的乱序问题,有些数据也许会迟到,咱们能够经过基于event time、count、session的窗口来处于这样的场景。
直接参见官方文档:QuickStart
直接参见官方文档:Standalone Cluster
略去,可参见官方文档:Examples
Flink细节上的讨论和处理模型。下一章将介绍Flink Streaming API。
许多领域须要数据的实时处理,物联网驱动的应用程序在数据的存储、处理和分析上须要实时或准实时的进行。
Flink提供流处理的API叫作DataStream API,每一个Flink程序均可以按照下面的步骤进行开发:
咱们首先要得到已经存在的运行环境或者建立它。有3种方法获得运行环境:
Flink支持许多预约义的数据源,同时也支持自定义数据源。下面咱们看看有哪些预约义的数据源。
DataStream API支持从socket读取数据,有以下3个方法:
你可使用readTextFile(String path)来消费文件中的数据做为流数据的来源,默认状况下的格式是TextInputFormat。固然你也能够经过readFile(FileInputFormat inputFormat, String path)来指定FileInputFormat的格式。
Flink一样支持读取文件流:
关于基于文件的数据流,这里再也不过多介绍。
Transformation容许将数据从一种形式转换为另外一种形式,输入能够是1个源也能够是多个,输出则能够是0个、1个或者多个。下面咱们一一介绍这些Transformations。
输入1个元素,输出一个元素,Java API以下:
输入1个元素,输出0个、1个或多个元素,Java API以下:
条件过滤时使用,当结果为true时,输出记录;
逻辑上按照key分组,内部使用hash函数进行分组,返回keyedDataStream:
inputStream.keyBy("someKey");
keyedStream流上,将上一次reduce的结果和本次的进行操做,例如sum reduce的例子:
在keyedStream流上的记录进行链接操做,例如:
假如是一个(1,2,3,4,5)的流,那么结果将是:Start=1=2=3=4=5
在keyedStream上应用相似min、max等聚合操做:
窗口功能容许在keyedStream上应用时间或者其余条件(count或session),根据key分组作聚合操做。
流是无界的,为了处理无界的流,咱们能够将流切分到有界的窗口中去处理,根据指定的key,切分为不一样的窗口。咱们可使用Flink预约义的窗口分配器。固然你也能够经过继承WindowAssginer自定义分配器。
下面看看有哪些预约义的分配器。
Global window的范围是无限的,你须要指定触发器来触发窗口。一般来说,每一个数据按照指定的key分配到不一样的窗口中,若是不指定触发器,则窗口永远不会触发。
Tumbling窗口是基于特定时间建立的,他们的大小固定,窗口间不会发生重合。例如你想基于event timen每隔10分钟计算一次,这个窗口就很适合。
Sliding窗口的大小也是固定的,但窗口之间会发生重合,例如你想基于event time每隔1分钟,统一过去10分钟的数据时,这个窗口就很适合。
Session窗口容许咱们设置一个gap时间,来决定在关闭一个session以前,咱们要等待多长时间,是衡量用户活跃与否的标志。
WindowAll操做不是基于key的,是对全局数据进行的计算。因为不基于key,所以是非并行的,即并行度是1.在使用时性能会受些影响。
inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)));
Union功能就是在2个或多个DataStream上进行链接,成为一个新的DataStream。
inputStream. union(inputStream1, inputStream2, ...)
Join容许在2个DataStream上基于相同的key进行链接操做,计算的范围也是要基于一个window进行。
Split的功能是根据某些条件将一个流切分为2个或多个流。例如你有一个混合数据的流,根据数据自身的某些特征,将其划分到多个不一样的流单独处理。
DataStream根据选择的字段,将流转换为新的流。
Project功能容许你选择流中的一部分元素做为新的数据流中的字段,至关于作个映射。
Flink容许咱们在流上执行物理分片,固然你能够选择自定义partitioning。
根据某个具体的key,将DataStream中的元素按照key从新进行分片,将相同key的元素聚合到一个线程中执行。
不根据具体的key,而是随机将数据打散。
inputStream.shuffle();
内部使用round robin方法将数据均匀打散。这对于数据倾斜时是很好的选择。
inputStream.rebalance();
Rescaling是经过执行oepration算子来实现的。因为这种方式仅发生在一个单一的节点,所以没有跨网络的数据传输。
inputStream.rescale();
广播用于将dataStream全部数据发到每个partition。
inputStream.broadcast();
咱们最终须要将结果保存在某个地方,Flink提供了一些选项:
对于Flink中的connector以及自定义输出,后续的章节会讲到。
Flink Streaming API受到了Google DataFlow模型的启发,支持3种不一样类型的时间概念:
(1)Event Time
事件发生的时间,通常数据中自带时间戳。这就可能致使乱序的发生。
(2)Processing Time
Processing Time是机器的时间,这种时间跟数据自己没有关系,彻底依赖于机器的时间。
(3)Ingestion Time
是数据进入到Flink的时间。注入时间比processing time更加昂贵(多了一个assign timestamp的步骤),可是其准确性相比processing time的处理更好。因为是进入Flink才分配时间戳,所以没法处理乱序。
咱们经过在env中设置时间属性来选择不一样的时间概念:
Flink提供了预约义的时间戳抽取器和水位线生成器。参考:
Pre-defined Timestamp Extractors / Watermark Emitters
kafka是一个基于发布、订阅的分布式消息系统。Flink定义了kafka consumer做为数据源。咱们只须要引入特定的依赖便可(这里以kafka 0.9为例):
在使用时,咱们须要指定topic name以及反序列化器:
Flink默认支持String和Json的反序列化。
Kafka consumer在实现时实现了检查点功能,所以失败恢复时能够重发。
Kafka除了consumer外,咱们也能够将结果输出到kafka。即kafka producer。例如:
用twitter做为数据源,首先你须要用于twitter帐号。以后你须要建立twitter应用并认证。
这里有个帮助文档:https://dev.twitter.com/oauth/overview/application-owner-access-tokens
Pom中添加依赖:
API:
这3个connetor略过,壳参考官方文档:
https://flink.apache.org/ecosystem.html
这里能够参考OSCON的例子:
https://github.com/dataArtisans/oscon。
本章介绍了Flink的DataStream API,下一章将介绍DataSet API。