Flink 核心技术浅析(整理版)

1. Flink简介

Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它可以基于同一个Flink流执行引擎(streaming dataflow engine),提供支持流处理和批处理两种类型应用的功能。batch dataSet能够视做data Streaming的一种特例。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务:html

  • DataSet API,对静态数据进行批处理操做,将静态数据抽象成分布式的数据集,用户能够方便地使用Flink提供的各类操做符对分布式数据集进行处理,支持Java、Scala和Python。
  • DataStream API,对数据流进行流处理操做,将流式的数据抽象成分布式的数据流,用户能够方便地对分布式数据流进行各类操做,支持Java和Scala。
  • Table API,对结构化数据进行查询操做,对结构化数据抽象成关系表,并经过类SQL的DSL对关系表进行各类查询操做,支持Java和Scala。
  • Flink ML,Flink的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。
  • Gelly,Flink的图计算库,提供了图计算的相关API以及多种图计算算法。

Flink的技术栈如图所示:java

 此外,Flink也能够方便和Hadoop生态圈中其余项目集成,例如Flink能够读取存储在HDFS或HBase中的静态数据,以Kafka做为流式的数据源,直接重用MapReduce或Store代码,或是经过YARN申请集群资源等。算法

2. Flink核心特色

2.1 统一的批处理和流处理系统

在执行引擎这一层,流处理系统与批处理系统最大不一样在于节点间的数据传输方式。对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,而后马上经过网络传输到下一个节点,由下一个节点继续处理。而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会马上经过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当全部数据都被处理完成后,才开始将处理后的数据经过网络传输到下一个节点。这两种数据传输模式是两个极端,对应的是流处理对低延迟的要求和批处理系统对高吞吐量的要求。apache

Flink的执行引擎采用了一种十分灵活的方式,同时支持了上述两种数据传输模型。Flink以固定的缓存块为单位进行网络数据传输,用户能够经过缓存块超时值指定缓存块的传输时机。若是缓存块的超时值为0,则Flink的数据传输方式相似上文所提到流处理系统的标准模型,此时系统能够得到最低的处理延迟。若是缓存块的超时值为无限大,则Flink的数据传输方式相似上文所提到批处理系统标准模型,此时系统能够得到最高的吞吐量。同时缓存块的超时值也能够设置为0到无限大之间的任意值。缓存块的超时阀值越小,则Flink流处理执行引擎的数据处理延迟越低,但吞吐量也会下降,反之亦然。经过调整缓存块的超时阀值,用户可根据需求灵活地权衡系统延迟和吞吐量。编程

在统一的流式执行引擎基础上,Flink同时支持了流计算和批处理,并对性能(延迟、吞吐量等)有所保障。相对于其余原生的流处理与批处理系统,并无由于统一执行引擎而受到影响,从而大幅度减轻了用户安装、部署、监控、维护等成本。数组

2.2 Flink流处理的容错机制

对于一个分布式系统来讲,单个进程或是节点崩溃致使整个Job失败是常常发生的事情,在异常发生时不会丢失用户数据并能自动恢复才是分布式系统必须支持的特性之一。本节主要介绍Flink流处理系统任务级别的容错机制。缓存

批处理系统比较容易实现容错机制,因为文件能够重复访问,当个某个任务失败后,重启该任务便可。可是到了流处理系统,因为数据源是无限的数据流,从而致使一个流处理任务执行几个月的状况,将全部数据缓存或是持久化,留待之后重复访问基本上是不可行的。Flink基于分布式快照与可部分重发的数据源实现了容错。用户可自定义对整个Job进行快照的时间间隔,当任务失败时,Flink会将整个Job恢复到最近一次快照,并从数据源重发快照以后的数据。网络

Flink的分布式快照实现借鉴了Chandy和Lamport在1985年发表的一篇关于分布式快照的论文,其实现的主要思想以下:按照用户自定义的分布式快照间隔时间,Flink会定时在全部数据源中插入一种特殊的快照标记消息,这些快照标记消息和其余消息同样在DAG中流动,可是不会被用户定义的业务逻辑所处理,每个快照标记消息都将其所在的数据流分红两部分:本次快照数据和下次快照数据数据结构

图3中Flink包含快照标记消息的消息流框架

快照标记消息沿着DAG流经各个操做符,当操做符处理到快照标记消息时,会对本身的状态进行快照,并存储起来。当一个操做符有多个输入的时候,Flink会将先抵达的快照标记消息及其以后的消息缓存起来,当全部的输入中对应该快照的快照标记消息所有抵达后,操做符对本身的状态快照并存储,以后处理全部快照标记消息以后的已缓存消息。操做符对本身的状态快照并存储能够是异步与增量的操做,并不须要阻塞消息的处理。分布式快照的流程如图4所示:

 图4 Flink分布式快照流程图

当全部的Data Sink(终点操做符)都收到快照标记信息并对本身的状态快照和存储后,整个分布式快照就完成了,同时通知数据源释放该快照标记消息以前的全部消息。若以后发生节点崩溃等异常状况时,只须要恢复以前存储的分布式快照状态,并从数据源重发该快照之后的消息就能够了。

Exactly-Once是流处理系统须要支持的一个很是重要的特性,它保证每一条消息只被流处理系统一次,许多流处理任务的业务逻辑都依赖于Exactly-Once特性。相对于At-Least-Once或是At-Most-Once,Exactly-Once特性对流处理系统的要求更为严格,实现也更加困难。Flink基于分布式快照实现了Exactly-Once特性。

相对于其余流处理系统的容错方案,Flink基于分布式快照的方案在功能和性能方面都具备不少优势,包括:

  • 低延迟。因为操做符状态的存储能够异步,因此进行快照的过程基本上不会阻塞消息的处理,所以不会对消息延迟产生负面影响。
  • 高吞吐量。当操做符状态较少时,对吞吐量基本没有影响。当操做符状态较多时,相对于其余的容错机制,分布式快照的时间间隔是用户自定义的,因此用户能够权衡错误恢复时间和吞吐量要求来调整分布式快照的时间间隔。

与业务逻辑的隔离。Flink的分布式快照机制与用户的业务逻辑是彻底隔离的,用户的业务逻辑不会依赖或是对分布式快照产生任何影响。

错误恢复代价。分布式快照的时间间隔越短,错误恢复的时间越少,与吞吐量负相关。

2.3 Flink流处理的时间窗口

对于流处理系统来讲,流入的消息不存在上限,因此对于聚合或是链接等操做,流处理系统须要对流入的消息进行分段,而后基于每一段数据进行聚合或是链接。消息的分段即称为窗口,流处理系统支持的窗口有不少类型,最多见的就是时间窗口,基于时间间隔对消息进行分段处理。本节主要介绍Flink流处理系统支持的各类时间窗口。

对于目前大部分流处理系统来讲,时间窗口通常是根据Task所在节点的本地时钟进行切分,这种方式实现起来比较容易,不会产生阻塞。可是可能没法知足某些应用需求,好比:①消息自己带有时间戳,用户但愿按照消息自己的时间特性进行分段处理;②因为不一样节点的时钟可能不一样,以及消息在流经各个节点的延迟不一样,在某个节点属于同一个时间窗口处理的消息,流到下一个节点时可能被切分到不一样的时间窗口中,从而产生不符合预期的结果。

Flink支持3种类型的时间窗口,分别适用于用户对时间窗口不一样类型的要求:

  • Operator Time。根据Task所在节点的本地时钟来切分的时间窗口。
  • Event Time。消息自带时间戳,根据消息的时间戳进行处理,确保时间戳在同一个时间窗口的全部消息必定会被正确处理。因为消息可能乱序流入Task,因此Task须要缓存当前时间窗口消息处理的状态,直到确认属于该时间窗口的全部消息都被处理,才能够释放,若是乱序的消息延迟很高会影响分布式系统的吞吐量和延迟。
  • Ingress Time。有时消息自己并不带时间戳信息,但用户依然但愿按照消息而不是节点时钟划分时间窗口,例如避免上面提到的第二个问题,此时能够在消息源流入Flink流处理系统时自动生成增量的时间戳赋予消息,以后处理的流程与Event Time相同。Ingress Time能够当作是Event Time的一个特例,因为其在消息源处时间戳必定是有序的,因此在流处理系统中,相对于Event Time,其乱序的消息延迟不会很高,所以对Flink分布式系统的吞吐量和延迟的影响也会更小。

2.4 定制的内存管理

Flink项目基于Java及Scala等JVM语言,JVM自己做为一个各类类型应用的执行平台,其对Java对象的管理也是基于通用的处理策略,其垃圾回收器经过估算Java对象的生命周期对Java对象进行有效率的管理。

JVM存在的问题

Java对象开销

相对于C/C++等更加接近底层的语言,Java对象的存储密度相对偏低,例如[1],"abcd"这样简单的字符串在UTF-8编码中须要4个字节存储,但采用了UTF-16编码存储字符串的Java须要8个字节,同时Java对象还有header等其余额外信息,一个4字节字符串对象在Java中须要48字节的空间来存储。对于大部分的大数据应用,内存都是稀缺资源,更有效率的内存存储,意味着CPU数据访问吐吞量更高,以及更少磁盘落地的存在。

对象存储结构引起的cache miss

为了缓解CPU处理速度与内存访问速度的差距,现代CPU数据访问通常都会有多级缓存。当从内存加载数据到缓存时,通常是以cache line为单位加载数据,因此当CPU访问的数据若是是在内存中连续存储的话,访问的效率会很是高。若是CPU要访问的数据不在当前缓存全部的cache line中,则须要从内存中加载对应的数据,这被称为一次cache miss。当cache miss很是高的时候,CPU大部分的时间都在等待数据加载,而不是真正的处理数据。Java对象并非连续的存储在内存上,同时不少的Java数据结构的数据汇集性也很差。

大数据的垃圾回收

Java的垃圾回收机制一直让Java开发者又爱又恨,一方面它免去了开发者本身回收资源的步骤,提升了开发效率,减小了内存泄漏的可能,另外一方面垃圾回收也是Java应用的不定时炸弹,有时秒级甚至是分钟级的垃圾回收极大影响了Java应用的性能和可用性。在时下数据中心,大容量内存获得了普遍的应用,甚至出现了单台机器配置TB内存的状况,同时,大数据分析一般会遍历整个源数据集,对数据进行转换、清洗、处理等步骤。在这个过程当中,会产生海量的Java对象,JVM的垃圾回收执行效率对性能有很大影响。经过JVM参数调优提升垃圾回收效率须要用户对应用和分布式计算框架以及JVM的各参数有深刻了解,并且有时候这也远远不够。

OOM问题

OutOfMemoryError是分布式计算框架常常会遇到的问题,当JVM中全部对象大小超过度配给JVM的内存大小时,就会出现OutOfMemoryError错误,JVM崩溃,分布式框架的健壮性和性能都会受到影响。经过JVM管理内存,同时试图解决OOM问题的应用,一般都须要检查Java对象的大小,并在某些存储Java对象特别多的数据结构中设置阈值进行控制。可是JVM并无提供官方检查Java对象大小的工具,第三方的工具类库可能没法准确通用地肯定Java对象大小[6]。侵入式的阈值检查也会为分布式计算框架的实现增长不少额外与业务逻辑无关的代码。

Flink的处理策略

为了解决以上提到的问题,高性能分布式计算框架一般须要如下技术:

定制的序列化工具

显式内存管理的前提步骤就是序列化,将Java对象序列化成二进制数据存储在内存上(on heap或是off-heap)。通用的序列化框架,如Java默认使用java.io.Serializable将Java对象及其成员变量的全部元信息做为其序列化数据的一部分,序列化后的数据包含了全部反序列化所需的信息。这在某些场景中十分必要,可是对于Flink这样的分布式计算框架来讲,这些元数据信息多是冗余数据。

分布式计算框架可使用定制序列化工具的前提是要待处理数据流一般是同一类型,因为数据集对象的类型固定,从而能够只保存一份对象Schema信息,节省大量的存储空间。同时,对于固定大小的类型,也可经过固定的偏移位置存取。在须要访问某个对象成员变量时,经过定制的序列化工具,并不须要反序列化整个Java对象,而是直接经过偏移量,从而只须要反序列化特定的对象成员变量。若是对象的成员变量较多时,可以大大减小Java对象的建立开销,以及内存数据的拷贝大小。Flink数据集都支持任意Java或是Scala类型,经过自动生成定制序列化工具,既保证了API接口对用户友好(不用像Hadoop那样数据类型须要继承实现org.apache.hadoop.io.Writable接口),也达到了和Hadoop相似的序列化效率。

Flink对数据集的类型信息进行分析,而后自动生成定制的序列化工具类。Flink支持任意的Java或是Scala类型,经过Java Reflection框架分析基于Java的Flink程序UDF(User Define Function)的返回类型的类型信息,经过Scala Compiler分析基于Scala的Flink程序UDF的返回类型的类型信息。类型信息由TypeInformation类表示,这个类有诸多具体实现类,例如:

  • BasicTypeInfo任意Java基本类型(装包或未装包)和String类型。
  • BasicArrayTypeInfo任意Java基本类型数组(装包或未装包)和String数组。
  • WritableTypeInfo任意Hadoop的Writable接口的实现类。
  • TupleTypeInfo任意的Flink tuple类型(支持Tuple1 to Tuple25)。 Flink tuples是固定长度固定类型的Java Tuple实现。
  • CaseClassTypeInfo任意的 Scala CaseClass(包括 Scala tuples)。
  • PojoTypeInfo任意的POJO (Java or Scala),例如Java对象的全部成员变量,要么是public修饰符定义,要么有getter/setter方法。
  • GenericTypeInfo任意没法匹配以前几种类型的类。

前6种类型数据集几乎覆盖了绝大部分的Flink程序,针对前6种类型数据集,Flink皆能够自动生成对应的TypeSerializer定制序列化工具,很是有效率地对数据集进行序列化和反序列化。对于第7种类型,Flink使用Kryo进行序列化和反序列化。此外,对于可被用做Key的类型,Flink还同时自动生成TypeComparator,用来辅助直接对序列化后的二进制数据直接进行compare、hash等操做。对于Tuple、CaseClass、Pojo等组合类型,Flink自动生成的TypeSerializer、TypeComparator一样是组合的,并把其成员的序列化/反序列化代理给其成员对应的TypeSerializer、TypeComparator,如图6所示:

图6组合类型序列化

此外若有须要,用户可经过集成TypeInformation接口定制实现本身的序列化工具。

显式的内存管理

 通常通用的作法是批量申请和释放内存,每一个JVM实例有一个统一的内存管理器,全部内存的申请和释放都经过该内存管理器进行。这能够避免常见的内存碎片问题,同时因为数据以二进制的方式存储,能够大大减轻垃圾回收压力。

垃圾回收是JVM内存管理回避不了的问题,JDK8的G1算法改善了JVM垃圾回收的效率和可用范围,但对于大数据处理实际环境还远远不够。这也和如今分布式框架的发展趋势有所冲突,愈来愈多的分布式计算框架但愿尽量多地将待处理数据集放入内存,而对于JVM垃圾回收来讲,内存中Java对象越少、存活时间越短,其效率越高。经过JVM进行内存管理的话,OutOfMemoryError也是一个很难解决的问题。同时,在JVM内存管理中,Java对象有潜在的碎片化存储问题(Java对象全部信息可能在内存中连续存储),也有可能在全部Java对象大小没有超过JVM分配内存时,出现OutOfMemoryError问题。Flink将内存分为3个部分,每一个部分都有不一样用途:

  • Network buffers: 一些以32KB Byte数组为单位的buffer,主要被网络模块用于数据的网络传输。
  • Memory Manager pool大量以32KB Byte数组为单位的内存池,全部的运行时算法(例如Sort/Shuffle/Join)都从这个内存池申请内存,并将序列化后的数据存储其中,结束后释放回内存池。
  • Remaining (Free) Heap主要留给UDF中用户本身建立的Java对象,由JVM管理。

Network buffers在Flink中主要基于Netty的网络传输,无需多讲。Remaining Heap用于UDF中用户本身建立的Java对象,在UDF中,用户一般是流式的处理数据,并不须要不少内存,同时Flink也不鼓励用户在UDF中缓存不少数据,由于这会引发前面提到的诸多问题。Memory Manager pool(之后之内存池代指)一般会配置为最大的一块内存,接下来会详细介绍。

在Flink中,内存池由多个MemorySegment组成,每一个MemorySegment表明一块连续的内存,底层存储是byte[],默认32KB大小。MemorySegment提供了根据偏移量访问数据的各类方法,如get/put int、long、float、double等,MemorySegment之间数据拷贝等方法和java.nio.ByteBuffer相似。对于Flink的数据结构,一般包括多个向内存池申请的MemeorySegment,全部要存入的对象经过TypeSerializer序列化以后,将二进制数据存储在MemorySegment中,在取出时经过TypeSerializer反序列化。数据结构经过MemorySegment提供的set/get方法访问具体的二进制数据。Flink这种看起来比较复杂的内存管理方式带来的好处主要有:

  • 二进制的数据存储大大提升了数据存储密度,节省了存储空间。
  • 全部的运行时数据结构和算法只能经过内存池申请内存,保证了其使用的内存大小是固定的,不会由于运行时数据结构和算法而发生OOM。对于大部分的分布式计算框架来讲,这部分因为要缓存大量数据最有可能致使OOM。
  • 内存池虽然占据了大部份内存,但其中的MemorySegment容量较大(默认32KB),因此内存池中的Java对象其实不多,并且一直被内存池引用,全部在垃圾回收时很快进入持久代,大大减轻了JVM垃圾回收的压力。
  • Remaining Heap的内存虽然由JVM管理,可是因为其主要用来存储用户处理的流式数据,生命周期很是短,速度很快的Minor GC就会所有回收掉,通常不会触发Full GC。

Flink当前的内存管理在最底层是基于byte[],因此数据最终仍是on-heap,最近Flink增长了off-heap的内存管理支持。Flink off-heap的内存管理相对于on-heap的优势主要在于:

  • 启动分配了大内存(例如100G)的JVM很耗费时间,垃圾回收也很慢。若是采用off-heap,剩下的Network buffer和Remaining heap都会很小,垃圾回收也不用考虑MemorySegment中的Java对象了。
  • 更有效率的IO操做。在off-heap下,将MemorySegment写到磁盘或是网络能够支持zeor-copy技术,而on-heap的话则至少须要一次内存拷贝。
  • off-heap可用于错误恢复,好比JVM崩溃,在on-heap时数据也随之丢失,但在off-heap下,off-heap的数据可能还在。此外,off-heap上的数据还能够和其余程序共享。

缓存友好的计算

对于计算密集的数据结构和算法,直接操做序列化后的二进制数据,而不是将对象反序列化后再进行操做。同时,只将操做相关的数据连续存储,能够最大化的利用L1/L2/L3缓存,减小Cache miss的几率,提高CPU计算的吞吐量。以排序为例,因为排序的主要操做是对Key进行对比,若是将全部排序数据的Key与Value分开并对Key连续存储,那么访问Key时的Cache命中率会大大提升。

磁盘IO和网络IO以前一直被认为是Hadoop系统的瓶颈,可是随着Spark、Flink等新一代分布式计算框架的发展,愈来愈多的趋势使得CPU/Memory逐渐成为瓶颈,这些趋势包括:

  • 更先进的IO硬件逐渐普及。10GB网络和SSD硬盘等已经被愈来愈多的数据中心使用。
  • 更高效的存储格式。Parquet,ORC等列式存储被愈来愈多的Hadoop项目支持,其很是高效的压缩性能大大减小了落地存储的数据量。
  • 更高效的执行计划。例如不少SQL系统执行计划优化器的Fliter-Push-Down优化会将过滤条件尽量的提早,甚至提早到Parquet的数据访问层,使得在不少实际的工做负载中并不须要不少的磁盘IO。

3. Flink vs Spark

经过比较spark,了解flink的做用和优缺点,主要从设计抽象、内存管理、语言实现,以及API和SQL等方面来描述。

3.1 设计抽象

接触过 Spark 的同窗,应该比较熟悉,在处理批处理任务,可使用 RDD,而对于流处理,可使用 Streaming,然其实际仍是 RDD,因此本质上仍是 RDD 抽象而来。可是,在 Flink 中,批处理用 DataSet,对于流处理,有 DataStreams。思想相似,但却有所不一样:其一,DataSet 在运行时表现为 Runtime Plans,而在 Spark 中,RDD 在运行时表现为 Java Objects。在 Flink 中有 Logical Plan ,这和 Spark 中的 DataFrames 相似。于是,在 Flink 中,如果使用这类 API ,会被优先来优化(即:自动优化迭代)。然而,在 Spark 中,RDD 就没有这块的相关优化。

另外,DataSet 和 DataStream 是相对独立的 API,在 Spark 中,全部不一样的 API,好比 Streaming,DataFrame 都是基于 RDD 抽象的。然而在 Flink 中,DataSet 和 DataStream 是同一个公用引擎之上的两个独立的抽象。因此,不能把这二者的行为合并在一块儿操做,目前官方正在处理这种问题。

3.2 内存

在以前的版本(1.5之前),Spark 延用 Java 的内存管理来作数据缓存,这样很容易致使 OOM 或者 GC。以后,Spark 开始转向另外更加友好和精准的控制内存,即:Tungsten 项目。然而,对于 Flink 来讲,从一开始就坚持使用本身控制内存。Flink 除把数据存在本身管理的内存以外,还直接操做二进制数据。在 Spark 1.5以后的版本开始,全部的 DataFrame 操做都是直接做用于 Tungsten 的二进制数据上。

  PS:Tungsten 项目将是 Spark 自诞生以来内核级别的最大改动,以大幅度提高 Spark 应用程序的内存和 CPU 利用率为目标,旨在最大程度上利用硬件性能。该项目包括了三个方面的改进:

  1. 内存管理和二进制处理:更加明确的管理内存,消除 JVM 对象模型和垃圾回收开销。
  2. 缓存友好计算:使用算法和数据结构来实现内存分级结构。
  3. 代码生成:使用代码生成来利用新型编译器和 CPU。

3.2 编程语言

Spark 使用 Scala 来实现的,它提供了 Java,Python 以及 R 语言的编程接口。而对于 Flink 来讲,它是使用 Java 实现的,提供 Scala 编程 API。从编程语言的角度来看,Spark 略显丰富一些。Spark 和 Flink 二者都倾向于使用 Scala 来实现对应的业务。

3.2 SQL

目前,Spark SQL 是其组件中较为活跃的一部分,它提供了相似于 Hive SQL 来查询结构化数据,API 依然很成熟。对于 Flink 来讲,支持 Flink Table API。

总结

参考资料:

https://www.cnblogs.com/smartloli/p/5580757.html

https://www.cnblogs.com/feiyudemeng/p/8998772.html

https://yq.aliyun.com/articles/600173

相关文章
相关标签/搜索