本文转载自:https://ververica.cn/develope...
做者:伍翀(云邪)html
现在,大数据领域的开源框架(Hadoop,Spark,Storm)都使用的 JVM,固然也包括 Flink。基于 JVM 的数据分析引擎都须要面对将大量数据存到内存中,这就不得不面对 JVM 存在的几个问题:java
因此目前,愈来愈多的大数据项目开始本身管理 JVM 内存了,像 Spark、Flink、HBase,为的就是得到像 C 同样的性能以及避免 OOM 的发生。本文将会讨论 Flink 是如何解决上面的问题的,主要内容包括内存管理、定制的序列化工具、缓存友好的数据结构和算法、堆外内存、JIT 编译优化等。算法
Flink 并非将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上,这个内存块叫作 **MemorySegment**
,它表明了一段固定长度的内存(默认大小为 32KB),也是 Flink 中最小的内存分配单元,而且提供了很是高效的读写方法。你能够把 MemorySegment 想象成是为 Flink 定制的 **java.nio.ByteBuffer**
。它的底层能够是一个普通的 Java 字节数组(**byte[]**
),也能够是一个申请在堆外的 **ByteBuffer**
。每条记录都会以序列化的形式存储在一个或多个**MemorySegment**
中。apache
Flink 中的 Worker 名叫 TaskManager,是用来运行用户代码的 JVM 进程。TaskManager 的堆内存主要被分红了三个部分:数组
**taskmanager.network.numberOfBuffers**
来配置。(阅读这篇文章了解更多Network Buffer的管理)MemoryManager
管理的,由众多MemorySegment
组成的超大集合。Flink 中的算法(如 sort/shuffle/join)会向这个内存池申请 MemorySegment,将序列化后的数据存于其中,使用完后释放回内存池。默认状况下,池子占了堆内存的 70% 的大小。注意:Memory Manager Pool 主要在Batch模式下使用。在Steaming模式下,该池子不会预分配内存,也不会向该池子请求内存块。也就是说该部分的内存都是能够给用户代码使用的。不过社区是打算在 Streaming 模式下也能将该池子利用起来。缓存
Flink 采用相似 DBMS 的 sort 和 join 算法,直接操做二进制数据,从而使序列化/反序列化带来的开销达到最小。因此 Flink 的内部实现更像 C/C++ 而非 Java。若是须要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。若是要操做多块MemorySegment就像操做一块大的连续内存同样,Flink会使用逻辑视图(**AbstractPagedInputView**
)来方便操做。下图描述了 Flink 如何存储序列化后的数据到内存块中,以及在须要的时候如何将数据存储到磁盘上。网络
从上面咱们可以得出 Flink 积极的内存管理以及直接操做二进制数据有如下几点好处:数据结构
MemoryManager
中,这些MemorySegment
一直呆在老年代而不会被GC回收。其余的数据对象基本上是由用户代码生成的短生命周期对象,这部分对象能够被 Minor GC 快速回收。只要用户不去建立大量相似缓存的常驻型对象,那么老年代的大小是不会变的,Major GC也就永远不会发生。从而有效地下降了垃圾回收的压力。另外,这里的内存块还能够是堆外内存,这可使得 JVM 内存更小,从而加速垃圾回收。**OutOfMemoryErrors**
能够有效地被避免。目前 Java 生态圈提供了众多的序列化框架:Java serialization, Kryo, Apache Avro 等等。可是 Flink 实现了本身的序列化框架。由于在 Flink 中处理的数据流一般是同一类型,因为数据集对象的类型固定,对于数据集能够只保存一份对象 Schema 信息,节省大量的存储空间。同时,对于固定大小的类型,也可经过固定的偏移位置存取。当咱们须要访问某个对象成员变量的时候,经过定制的序列化工具,并不须要反序列化整个 Java 对象,而是能够直接经过偏移量,只是反序列化特定的对象成员变量。若是对象的成员变量较多时,可以大大减小 Java 对象的建立开销,以及内存数据的拷贝大小。app
Flink 支持任意的 Java 或是 Scala 类型。Flink 在数据类型上有很大的进步,不须要实现一个特定的接口(像 Hadoop 中的**org.apache.hadoop.io.Writable**
),Flink 可以自动识别数据类型。Flink 经过 Java Reflection 框架分析基于 Java 的 Flink 程序 UDF (User Define Function)的返回类型的类型信息,经过 Scala Compiler 分析基于 Scala 的 Flink 程序 UDF 的返回类型的类型信息。类型信息由 **TypeInformation**
类表示,TypeInformation 支持如下几种类型:框架
BasicTypeInfo
: 任意Java 基本类型(装箱的)或 String 类型。BasicArrayTypeInfo
: 任意Java基本类型数组(装箱的)或 String 数组。**WritableTypeInfo**
: 任意 Hadoop Writable 接口的实现类。TupleTypeInfo
: 任意的 Flink Tuple 类型(支持Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的Java Tuple实现。CaseClassTypeInfo
: 任意的 Scala CaseClass(包括 Scala tuples)。PojoTypeInfo
: 任意的 POJO (Java or Scala),例如,Java对象的全部成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法。GenericTypeInfo
: 任意没法匹配以前几种类型的类。前六种数据类型基本上能够知足绝大部分的 Flink 程序,针对前六种类型数据集,Flink 皆能够自动生成对应的TypeSerializer,能很是高效地对数据集进行序列化和反序列化。对于最后一种数据类型,Flink 会使用 Kryo 进行序列化和反序列化。每一个 TypeInformation 中,都包含了 serializer,类型会自动经过serializer进行序列化,而后用 Java Unsafe 接口写入 MemorySegments。对于能够用做 key 的数据类型,Flink 还同时自动生成 TypeComparator,用来辅助直接对序列化后的二进制数据进行 compare、hash 等操做。对于 Tuple、CaseClass、POJO 等组合类型,其 TypeSerializer 和 TypeComparator 也是组合的,序列化和比较时会委托给对应的 serializers 和 comparators。以下图展现 一个内嵌型的 Tuple3<Integer,Double,Person> 对象的序列化过程。
能够看出这种序列化方式存储密度是至关紧凑的。其中 int 占4字节,double 占8字节,POJO 多个一个字节的 header,PojoSerializer 只负责将 header序列化进去,并委托每一个字段对应的 serializer 对字段进行序列化。
Flink 的类型系统能够很轻松地扩展出自定义的TypeInformation、Serializer 以及 Comparator,来提高数据类型在序列化和比较时的性能。
Flink 提供了如 group、sort、join 等操做,这些操做都须要访问海量数据。这里,咱们以 sort 为例,这是一个在 Flink 中使用很是频繁的操做。
首先,Flink 会从 MemoryManager 中申请一批 MemorySegment,咱们把这批 MemorySegment 称做 sort buffer,用来存放排序的数据。
咱们会把 sort buffer 分红两块区域。一个区域是用来存放全部对象完整的二进制数据。另外一个区域用来存放指向完整二进制数据的指针以及定长的序列化后的key(key+pointer)。若是须要序列化的 key 是个变长类型,如 String,则会取其前缀序列化。如上图所示,当一个对象要加到 sort buffer 中时,它的二进制数据会被加到第一个区域,指针(可能还有 key)会被加到第二个区域。
将实际的数据和指针加定长 key 分开存放有两个目的。第一,交换定长块(key+pointer)更高效,不用交换真实的数据也不用移动其余 key 和 pointer。第二,这样作是缓存友好的,由于 key 都是连续存储在内存中的,能够大大减小 cache miss(后面会详细解释)。
排序的关键是比大小和交换。Flink 中,会先用 key 比大小,这样就能够直接用二进制的 key 比较而不须要反序列化出整个对象。由于 key 是定长的,因此若是 key 相同(或者没有提供二进制key),那就必须将真实的二进制数据反序列化出来,而后再作比较。以后,只须要交换 key+pointer 就能够达到排序的效果,真实的数据不用移动。
最后,访问排序后的数据,能够沿着排好序的key+pointer区域顺序访问,经过pointer找到对应的真实数据,并写到内存或外部(更多细节能够看这篇文章 Joins in Flink)。
随着磁盘 IO 和网络 IO 愈来愈快,CPU 逐渐成为了大数据领域的瓶颈。从 L1/L2/L3 缓存读取数据的速度比从主内存读取数据的速度快好几个量级。经过性能分析能够发现,CPU 时间中的很大一部分都是浪费在等待数据从主内存过来上。若是这些数据能够从 L1/L2/L3 缓存过来,那么这些等待时间能够极大地下降,而且全部的算法会所以而受益。
在上面讨论中咱们谈到的,Flink 经过定制的序列化框架将算法中须要操做的数据(如 sort 中的 key)连续存储,而完整数据存储在其余地方。由于对于完整的数据来讲,key+pointer 更容易装进缓存,这大大提升了缓存命中率,从而提升了基础算法的效率。这对于上层应用是彻底透明的,能够充分享受缓存友好带来的性能提高。
Flink 基于堆内存的内存管理机制已经能够解决不少 JVM 现存问题了,为何还要引入堆外内存?
可是强大的东西老是会有其负面的一面,否则为什么你们不都用堆外内存呢。
MemorySegment
,这个申请在堆上会更廉价。Flink用经过**ByteBuffer.allocateDirect(numBytes)**
来申请堆外内存,用 **sun.misc.Unsafe**
来操做堆外内存。
基于 Flink 优秀的设计,实现堆外内存是很方便的。Flink 将原来的 **MemorySegment**
变成了抽象类,并生成了两个子类。**HeapMemorySegment**
和 **HybridMemorySegment**
。从字面意思上也很容易理解,前者是用来分配堆内存的,后者是用来分配堆外内存和堆内存的。是的,你没有看错,后者既能够分配堆外内存又能够分配堆内存。为何要这样设计呢?
首先假设**HybridMemorySegment**
只提供分配堆外内存。在上述堆外内存的不足中的第二点谈到,Flink 有时须要分配短生命周期的 buffer,这些 buffer 用**HeapMemorySegment**
会更高效。那么当使用堆外内存时,为了也知足堆内存的需求,咱们须要同时加载两个子类。这就涉及到了 JIT 编译优化的问题。由于之前 **MemorySegment**
是一个单独的 final 类,没有子类。JIT 编译时,全部要调用的方法都是肯定的,全部的方法调用均可以被去虚化(de-virtualized)和内联(inlined),这能够极大地提升性能(MemroySegment的使用至关频繁)。然而若是同时加载两个子类,那么 JIT 编译器就只能在真正运行到的时候才知道是哪一个子类,这样就没法提早作优化。实际测试的性能差距在 2.7 被左右。
Flink 使用了两种方案:
方案1:只能有一种 MemorySegment 实现被加载
代码中全部的短生命周期和长生命周期的 MemorySegment 都实例化其中一个子类,另外一个子类根本没有实例化过(使用工厂模式来控制)。那么运行一段时间后,JIT 会意识到全部调用的方法都是肯定的,而后会作优化。
方案2:提供一种实现能同时处理堆内存和堆外内存
这就是 **HybridMemorySegment**
了,能同时处理堆与堆外内存,这样就不须要子类了。这里 Flink 优雅地实现了一份代码能同时操做堆和堆外内存。这主要归功于 **sun.misc.Unsafe**
提供的一系列方法,如 getLong方法:
sun.misc.Unsafe.getLong(Object reference, long offset)
这里咱们看下 **MemorySegment**
及其子类的实现。
public abstract class MemorySegment { // 堆内存引用 protected final byte[] heapMemory; // 堆外内存地址 protected long address; //堆内存的初始化 MemorySegment(byte[] buffer, Object owner) { //一些先验检查 ... this.heapMemory = buffer; this.address = BYTE_ARRAY_BASE_OFFSET; ... } //堆外内存的初始化 MemorySegment(long offHeapAddress, int size, Object owner) { //一些先验检查 ... this.heapMemory = null; this.address = offHeapAddress; ... } public final long getLong(int index) { final long pos = address + index; if (index >= 0 && pos <= addressLimit - 8) { // 这是咱们关注的地方,使用 Unsafe 来操做 on-heap & off-heap return UNSAFE.getLong(heapMemory, pos); } else if (address > addressLimit) { throw new IllegalStateException("segment has been freed"); } else { // index is in fact invalid throw new IndexOutOfBoundsException(); } } ... } public final class HeapMemorySegment extends MemorySegment { // 指向heapMemory的额外引用,用来如数组越界的检查 private byte[] memory; // 只能初始化堆内存 HeapMemorySegment(byte[] memory, Object owner) { super(Objects.requireNonNull(memory), owner); this.memory = memory; } ... } public final class HybridMemorySegment extends MemorySegment { private final ByteBuffer offHeapBuffer; // 堆外内存初始化 HybridMemorySegment(ByteBuffer buffer, Object owner) { super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner); this.offHeapBuffer = buffer; } // 堆内存初始化 HybridMemorySegment(byte[] buffer, Object owner) { super(buffer, owner); this.offHeapBuffer = null; } ... }
能够发现,HybridMemorySegment 中的不少方法其实都下沉到了父类去实现。包括堆内堆外内存的初始化。**MemorySegment**
中的 **getXXX**
/**putXXX**
方法都是调用了 unsafe 方法,能够说**MemorySegment**
已经具备了些 Hybrid 的意思了。**HeapMemorySegment**
只调用了父类的**MemorySegment(byte[] buffer, Object owner)**
方法,也就只能申请堆内存。另外,阅读代码你会发现,许多方法(大量的 getXXX/putXXX)都被标记成了 final,两个子类也是 final 类型,为的也是优化 JIT 编译器,会提醒 JIT 这个方法是能够被去虚化和内联的。
对于堆外内存,使用 **HybridMemorySegment**
能同时用来表明堆和堆外内存。这样只须要一个类就能表明长生命周期的堆外内存和短生命周期的堆内存。既然**HybridMemorySegment**
已经这么全能,为何还要方案1呢?由于咱们须要工厂模式来保证只有一个子类被加载(为了更高的性能),并且 HeapMemorySegment 比 heap 模式的 HybridMemorySegment 要快。
下方是一些性能测试数据,更详细的数据请参考这篇文章。
Segment | Time |
---|---|
HeapMemorySegment, exclusive | 1,441 msecs |
HeapMemorySegment, mixed | 3,841 msecs |
HybridMemorySegment, heap, exclusive | 1,626 msecs |
HybridMemorySegment, off-heap, exclusive | 1,628 msecs |
HybridMemorySegment, heap, mixed | 3,848 msecs |
HybridMemorySegment, off-heap, mixed | 3,847 msecs |
本文主要总结了 Flink 面对 JVM 存在的问题,而在内存管理的道路上越走越深。从本身管理内存,到序列化框架,再到堆外内存。其实纵观大数据生态圈,其实会发现各个开源项目都有一样的趋势。好比最近炒的很火热的 Spark Tungsten 项目,与 Flink 在内存管理上的思想是及其类似的。