Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通讯以及容错机制等功能java
Flink提供了诸多更高抽象层的API以便用户编写分布式任务:算法
DataSet API, 对静态数据进行批处理操做,将静态数据抽象成分布式的数据集,用户能够方便地使用Flink提供的各类操做符对分布式数据集进行处理,支持Java、Scala和Python。 DataStream API,对数据流进行流处理操做,将流式的数据抽象成分布式的数据流,用户能够方便地对分布式数据流进行各类操做,支持Java和Scala。apache
Table API,对结构化数据进行查询操做,将结构化数据抽象成关系表,并经过类SQL的DSL对关系表进行各类查询操做,支持Java和Scala。数组
对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,而后马上经过网络传输到下一个节点,由下一个节点继续处理缓存
而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会马上经过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当全部数据都被处理完成后,才开始将处理后的数据经过网络传输到下一个节点。网络
这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求数据结构
Flink的执行引擎同时支持了这两种数据传输模型,Flink以固定的缓存块为单位进行网络数据传输,用户能够经过缓存块超时值指定缓存块的传输时机,超时值为0,则是流处理系统的标准模型,此时能够得到最低的处理延迟,缓存块的超时值为无限大,则Flink的数据传输方式相似上文所提到批处理系统的标准模型。并发
缓存块的超时阈值越小,则流处理数据的延迟越低,但吞吐量也会变低。根据超时阈值来灵活权衡系统延迟和吞吐量。Flink基于分布式快照与可部分重发的数据源实现了容错。框架
用户可自定义对整个Job进行快照的时间间隔,当任务失败时,Flink会将整个Job恢复到最近一次快照,并从数据源重发快照以后的数据。异步
按照用户自定义的快照间隔时间,flink会定时在数据源中插入快照标记的消息,快照消息和普通消息都在DAG中流动,但不会被用户定义的逻辑所处理,每个快照消息都将其所在的数据流分红2部分:本次快照数据和下次快照数据。当操做符处理到快照标记消息,对本身的状态进行快照标记并缓存。操做符对本身的快照和状态能够是异步,增量操做,并不阻塞消息处理。当全部的终点操做符都收到快照标记信息并对本身的状态快照和存储后,整个分布式快照就完成了。同时通知数据源释放该快照标记消息以前的全部消息。若以后的节点崩溃等异常,就能够恢复分布式快照状态。并从数据源重发该快照之后的消息。
flink基于分布式快照实现了一次性。
目前大部分流处理系统来讲,时间窗口通常是根据Task所在节点的本地时钟进行切分,
是可能没法知足某些应用需求,好比:
消息自己带有时间戳,用户但愿按照消息自己的时间特性进行分段处理。
因为不一样节点的时钟可能不一样,以及消息在流经各个节点的延迟不一样,在某个节点属于同一个时间窗口处理的消息,流到下一个节点时可能被切分到不一样的时间窗口中,从而产生不符合预期的结果
Flink支持3种类型的时间窗口:
1.Operator Time。根据Task所在节点的本地时钟来切分的时间窗口
2.Event Time。消息自带时间戳,根据消息的时间戳进行处理,确保时间戳在同一个时间窗口的全部消息必定会被正确处理。因为消息可能乱序流入Task,因此Task须要缓存当前时间窗口消息处理的状态,直到确认属于该时间窗口的全部消息都被处理,才能够释放,若是乱序的消息延迟很高会影响分布式系统的吞吐量和延迟。
3.ingress Time。有时消息自己并不带有时间戳信息,但用户依然但愿按照消息而不是节点时钟划分时间窗口,例如避免上面提到的第二个问题,此时能够在消息源流入Flink流处理系统时自动生成增量的时间戳赋予消息,以后处理的流程与Event Time相同。Ingress Time能够当作是Event Time的一个特例,因为其在消息源处时间戳必定是有序的,因此在流处理系统中,相对于Event Time,其乱序的消息延迟不会很高,所以对Flink分布式系统的吞吐量和延迟的影响也会更小。
当操做符经过基于Event Time的时间窗口来处理数据时,它必须在肯定全部属于该时间窗口的消息所有流入此操做符后才能开始数据处理。可是因为消息多是乱序的,因此操做符没法直接确认什么时候全部属于该时间窗口的消息所有流入此操做符。WaterMark包含一个时间戳,Flink使用WaterMark标记全部小于该时间戳的消息都已流入
一个可能的优化措施是,对于聚合类的操做符,能够提早对部分消息进行聚合操做,当有属于该时间窗口的新消息流入时,基于以前的部分聚合结果继续计算,这样的话,只需缓存中间计算结果便可,无需缓存该时间窗口的全部消息
flink基于watermark实现了基于时间戳的全局排序:
排序操做:排序操做符缓存全部流入的消息,当接收到watermark时,对时间戳小于该watermark的消息进行排序,并发送到下一个节点。在此排序操做符中释放全部时间戳小于该watermark的消息,继续缓存流入的消息。等待下一次watermark触发下一次排序。
watermark保证了其以后不会出现时间戳比它小的消息,所以能够保证排序的正确性。请注意:排序操做符有多个节点,只能保证每一个节点流出的消息有序,节点之间的消息不能有序,要实现全局有序,则只能有一个排序操做符节点。
Java对象的存储密度相对偏低,例如[1],“abcd”这样简单的字符串在UTF-8编码中须要4个字节存储
采用了UTF-16编码存储字符串的Java则须要8个字节,同时Java对象还有header等其余额外信息,一个4字节字符串对象在Java中须要48字节的空间来存储。对于大部分的大数据应用,内存都是稀缺资源,更有效率地内存存储,意味着CPU数据访问吞吐量更高,以及更少磁盘落地的存在。
垃圾回收也是Java应用的不定时炸弹,有时秒级甚至是分钟级的垃圾回收极大影响了Java应用的性能和可用性。
经过JVM参数调优提升垃圾回收效率须要用户对应用和分布式计算框架以及JVM的各参数有深刻了解,并且有时候这也远远不够:
为了解决以上提到的问题,高性能分布式计算框架一般须要如下技术:
Flink的处理策略:
定制的序列化工具,显式内存管理的前提步骤就是序列化,用的序列化框架,如Java默认使用java.io.Serializable
制的序列化框架,如Hadoop的org.apache.hadoop.io.Writable须要用户实现该接口并自定义类的序列化和反序列化方法。这种方式效率最高。
对于计算密集的数据结构和算法,直接操做序列化后的二进制数据,而不是将对象反序列化后再进行操做。
缓存友好的数据结构和算法。对于计算密集的数据结构和算法,直接操做序列化后的二进制数据,而不是将对象反序列化后再进行操做。同时,只将操做相关的数据连续存储,能够最大化的利用L1/L2/L3缓存,减小Cache miss的几率,提高CPU计算的吞吐量。以排序为例,因为排序的主要操做是对Key进行对比,若是将全部排序数据的Key与Value分开并对Key连续存储,那么访问Key时的Cache命中率会大大提升。
分布式计算框架可使用定制序列化工具的前提是要待处理数据流一般是同一类型,因为数据集对象的类型固定,从而能够只保存一份对象Schema信息,节省大量的存储空间
对于固定大小的类型,也可经过固定的偏移位置存取。在须要访问某个对象成员变量时,经过定制的序列化工具,并不须要反序列化整个Java对象,而是直接经过偏移量,从而只须要反序列化特定的对象成员变量。若是对象的成员变量较多时,可以大大减小Java对象的建立开销,以及内存数据的拷贝大小。Flink数据集都支持任意Java或是Scala类型,经过自动生成定制序列化工具,既保证了API接口对用户友好(不用像Hadoop那样数据类型需要继承实现org.apache.hadoop.io.Writable接口),也达到了和Hadoop相似的序列化效率。
Flink对数据集的类型信息进行分析,然后自动生成定制的序列化工具类。Flink支持任意的Java或是Scala类型,经过Java Reflection框架分析基于Java的Flink程序UDF(User Define Function)的返回类型的类型信息,经过Scala Compiler分析基于Scala的Flink程序UDF的返回类型的类型信息。类型信息由TypeInformation类表示,这个类有诸多具体实现类
例如
1.BasicTypeInfo任意Java基本类型(装包或未装包)和String类型
2.BasicArrayTypeInfo任意Java基本类型数组(装包或未装包)和String数组
3.WritableTypeInfo任意Hadoop的Writable接口的实现类
4.TupleTypeInfo任意的Flink tuple类型(支持Tuple1 to Tuple25)Flink tuples是固定长度固定类型的Java Tuple实现
5.CaseClassTypeInfo任意的 Scala CaseClass(包括 Scala tuples)
6.PojoTypeInfo任意的POJO (Java or Scala),Java对象的全部成员变量,要么是public修饰符定义,要么有getter/setter方法
7.GenericTypeInfo任意没法匹配以前几种类型的类。
前6种类型数据集几乎覆盖了绝大部分的Flink程序,针对前6种类型数据集,Flink皆能够自动生成对应的TypeSerializer定制序列化工具,很是有效率地对数据集进行序列化和反序列化
对于第7种类型,Flink使用Kryo进行序列化和反序列化
对于可被用做Key的类型,Flink还同时自动生成TypeComparator,用来辅助直接对序列化后的二进制数据直接进行compare、hash等操做
对于Tuple、CaseClass、Pojo等组合类型,Flink自动生成的TypeSerializer、TypeComparator一样是组合的,并把其成员的序列化/反序列化代理给其成员对应的TypeSerializer、TypeComparator,如图6所示:
此外若有须要,用户可经过集成TypeInformation接口定制实现本身的序列化工具。
JDK8的G1算法改善了JVM垃圾回收的效率和可用范围
经过JVM进行内存管理的话,OutOfMemoryError也是一个很难解决的问题
在JVM内存管理中,Java对象有潜在的碎片化存储问题
Flink将内存分为3个部分,每一个部分都有不一样用途:
1.Network buffers: 一些以32KB Byte数组为单位的buffer,主要被网络模块用于数据的网络传输,基于Netty的网络传输
2.Memory Manager pool大量以32KB Byte数组为单位的内存池,全部的运行时算法(例如Sort/Shuffle/Join)都从这个内存池申请内存并将序列化后的数据存储其中,结束后释放回内存池。一般会配置为最大的一块内存,
3. Remaining (Free) Heap主要留给UDF中用户本身建立的Java对象,由JVM管理。时Flink也不鼓励用户在UDF中缓存不少数据。。 Remaining Heap的内存虽然由JVM管理,可是因为其主要用来存储用户处理的流式数据,生命周期很是短,速度很快的Minor GC就会所有回收掉,通常不会触发Full GC
在Flink中,内存池由多个MemorySegment组成,每一个MemorySegment表明一块连续的内存,底层存储是byte[],默认32KB大小。
MemorySegment提供了根据偏移量访问数据的各类方法,如get/put int、long、float、double等,MemorySegment之间数据拷贝等方法和java.nio.ByteBuffer相似。
对于Flink的数据结构,一般包括多个向内存池申请的MemeorySegment,全部要存入的对象经过TypeSerializer序列化以后,将二进制数据存储在MemorySegment中,在取出时经过TypeSerializer反序列化
数据结构经过MemorySegment提供的set/get方法访问具体的二进制数据
Flink这种看起来比较复杂的内存管理方式带来的好处主要有:
1.二进制的数据存储大大提升了数据存储密度,节省了存储空间。全部的运行时数据结构和算法只能经过内存池申请内存,保证了其使用的内存大小是固定的,不会由于运行时数据结构和算法而发生OOM
Flink当前的内存管理在最底层是基于byte[],
flink排序算法的实现:
1.将待排序的数据通过序列化后存储在两个不一样的MemorySegment集中,数据所有的序列化值存放于其中一个MemorySegment集中。数据序列化后的Key和指向第一个MemorySegment集中值的指针存放于第二个MemorySegment集中。对第二个MemorySegment集中的Key进行排序,如需交换Key位置,只需交换对应的Key+Pointer的位置,第一个MemorySegment集中的数据无需改变。 当比较两个Key大小时,TypeComparator提供了直接基于二进制数据的对比方法,无需反序列化任何数据。排序完成后,访问数据时,按照第二个MemorySegment集中Key的顺序访问,并经过Pointer值找到数据在第一个MemorySegment集中的位置,经过TypeSerializer反序列化成Java对象返回。
经过Key和Full data分离存储的方式尽可能将被操做的数据最小化,提升Cache命中的几率,从而提升CPU的吞吐量。 移动数据时,只需移动Key+Pointer,而无须移动数据自己,大大减小了内存拷贝的数据量。 TypeComparator直接基于二进制数据进行操做,节省了反序列化的时间。
DataSet API级别的执行计划优化器,原生的迭代操做符等,