Flink 原理与实现:内存管理

摘要: 现在,大数据领域的开源框架(Hadoop,Spark,Storm)都使用的 JVM,固然也包括 Flink。基于 JVM 的数据分析引擎都须要面对将大量数据存到内存中,这就不得不面对 JVM 存在的几个问题: 1. Java 对象存储密度低。一个只包含 boolean 属性的对象占用了16个字节内存:对象头占了8个,boolean 属性占了1个,对齐填充占了7个。而实际上只须要一个bit(1html

现在,大数据领域的开源框架(Hadoop,Spark,Storm)都使用的 JVM,固然也包括 Flink。基于 JVM 的数据分析引擎都须要面对将大量数据存到内存中,这就不得不面对 JVM 存在的几个问题:java

  1. Java 对象存储密度低。一个只包含 boolean 属性的对象占用了16个字节内存:对象头占了8个,boolean 属性占了1个,对齐填充占了7个。而实际上只须要一个bit(1/8字节)就够了。
  2. Full GC 会极大地影响性能,尤为是为了处理更大数据而开了很大内存空间的JVM来讲,GC 会达到秒级甚至分钟级。
  3. OOM 问题影响稳定性。OutOfMemoryError是分布式计算框架常常会遇到的问题,当JVM中全部对象大小超过度配给JVM的内存大小时,就会发生OutOfMemoryError错误,致使JVM崩溃,分布式框架的健壮性和性能都会受到影响。

因此目前,愈来愈多的大数据项目开始本身管理JVM内存了,像 Spark、Flink、HBase,为的就是得到像 C 同样的性能以及避免 OOM 的发生。本文将会讨论 Flink 是如何解决上面的问题的,主要内容包括内存管理、定制的序列化工具、缓存友好的数据结构和算法、堆外内存、JIT编译优化等。算法

 

积极的内存管理

Flink 并非将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上,这个内存块叫作 MemorySegment,它表明了一段固定长度的内存(默认大小为 32KB),也是 Flink 中最小的内存分配单元,而且提供了很是高效的读写方法。你能够把 MemorySegment 想象成是为 Flink 定制的 java.nio.ByteBuffer。它的底层能够是一个普通的 Java 字节数组(byte[]),也能够是一个申请在堆外的 ByteBuffer。每条记录都会以序列化的形式存储在一个或多个MemorySegment中。apache

Flink 中的 Worker 名叫 TaskManager,是用来运行用户代码的 JVM 进程。TaskManager 的堆内存主要被分红了三个部分:数组

TB17qs5JpXXXXXhXpXXXXXXXXXX

  • Network Buffers: 必定数量的32KB大小的 buffer,主要用于数据的网络传输。在 TaskManager 启动的时候就会分配。默认数量是 2048 个,能够经过 taskmanager.network.numberOfBuffers 来配置。(阅读这篇文章了解更多Network Buffer的管理)
  • Memory Manager Pool: 这是一个由 MemoryManager 管理的,由众多MemorySegment组成的超大集合。Flink 中的算法(如 sort/shuffle/join)会向这个内存池申请 MemorySegment,将序列化后的数据存于其中,使用完后释放回内存池。默认状况下,池子占了堆内存的 70% 的大小。
  • Remaining (Free) Heap: 这部分的内存是留给用户代码以及 TaskManager 的数据结构使用的。由于这些数据结构通常都很小,因此基本上这些内存都是给用户代码使用的。从GC的角度来看,能够把这里当作的新生代,也就是说这里主要都是由用户代码生成的短时间对象。

注意:Memory Manager Pool 主要在Batch模式下使用。在Steaming模式下,该池子不会预分配内存,也不会向该池子请求内存块。也就是说该部分的内存都是能够给用户代码使用的。不过社区是打算在 Streaming 模式下也能将该池子利用起来。缓存

Flink 采用相似 DBMS 的 sort 和 join 算法,直接操做二进制数据,从而使序列化/反序列化带来的开销达到最小。因此 Flink 的内部实现更像 C/C++ 而非 Java。若是须要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。若是要操做多块MemorySegment就像操做一块大的连续内存同样,Flink会使用逻辑视图(AbstractPagedInputView)来方便操做。下图描述了 Flink 如何存储序列化后的数据到内存块中,以及在须要的时候如何将数据存储到磁盘上。网络

从上面咱们可以得出 Flink 积极的内存管理以及直接操做二进制数据有如下几点好处:数据结构

  1. 减小GC压力。显而易见,由于全部常驻型数据都以二进制的形式存在 Flink 的MemoryManager中,这些MemorySegment一直呆在老年代而不会被GC回收。其余的数据对象基本上是由用户代码生成的短生命周期对象,这部分对象能够被 Minor GC 快速回收。只要用户不去建立大量相似缓存的常驻型对象,那么老年代的大小是不会变的,Major GC也就永远不会发生。从而有效地下降了垃圾回收的压力。另外,这里的内存块还能够是堆外内存,这可使得 JVM 内存更小,从而加速垃圾回收。
  2. 避免了OOM。全部的运行时数据结构和算法只能经过内存池申请内存,保证了其使用的内存大小是固定的,不会由于运行时数据结构和算法而发生OOM。在内存吃紧的状况下,算法(sort/join等)会高效地将一大批内存块写到磁盘,以后再读回来。所以,OutOfMemoryErrors能够有效地被避免。
  3. 节省内存空间。Java 对象在存储上有不少额外的消耗(如上一节所谈)。若是只存储实际数据的二进制内容,就能够避免这部分消耗。
  4. 高效的二进制操做 & 缓存友好的计算。二进制数据以定义好的格式存储,能够高效地比较与操做。另外,该二进制形式能够把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构能够对高速缓存更友好,能够从 L1/L2/L3 缓存得到性能的提高(下文会详细解释)。

为 Flink 量身定制的序列化框架

目前 Java 生态圈提供了众多的序列化框架:Java serialization, Kryo, Apache Avro 等等。可是 Flink 实现了本身的序列化框架。由于在 Flink 中处理的数据流一般是同一类型,因为数据集对象的类型固定,对于数据集能够只保存一份对象Schema信息,节省大量的存储空间。同时,对于固定大小的类型,也可经过固定的偏移位置存取。当咱们须要访问某个对象成员变量的时候,经过定制的序列化工具,并不须要反序列化整个Java对象,而是能够直接经过偏移量,只是反序列化特定的对象成员变量。若是对象的成员变量较多时,可以大大减小Java对象的建立开销,以及内存数据的拷贝大小。app

Flink支持任意的Java或是Scala类型。Flink 在数据类型上有很大的进步,不须要实现一个特定的接口(像Hadoop中的org.apache.hadoop.io.Writable),Flink 可以自动识别数据类型。Flink 经过 Java Reflection 框架分析基于 Java 的 Flink 程序 UDF (User Define Function)的返回类型的类型信息,经过 Scala Compiler 分析基于 Scala 的 Flink 程序 UDF 的返回类型的类型信息。类型信息由 TypeInformation 类表示,TypeInformation 支持如下几种类型:框架

  • BasicTypeInfo: 任意Java 基本类型(装箱的)或 String 类型。
  • BasicArrayTypeInfo: 任意Java基本类型数组(装箱的)或 String 数组。
  • WritableTypeInfo: 任意 Hadoop Writable 接口的实现类。
  • TupleTypeInfo: 任意的 Flink Tuple 类型(支持Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的Java Tuple实现。
  • CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)。
  • PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java对象的全部成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法。
  • GenericTypeInfo: 任意没法匹配以前几种类型的类。

前六种数据类型基本上能够知足绝大部分的Flink程序,针对前六种类型数据集,Flink皆能够自动生成对应的TypeSerializer,能很是高效地对数据集进行序列化和反序列化。对于最后一种数据类型,Flink会使用Kryo进行序列化和反序列化。每一个TypeInformation中,都包含了serializer,类型会自动经过serializer进行序列化,而后用Java Unsafe接口写入MemorySegments。对于能够用做key的数据类型,Flink还同时自动生成TypeComparator,用来辅助直接对序列化后的二进制数据进行compare、hash等操做。对于 Tuple、CaseClass、POJO 等组合类型,其TypeSerializer和TypeComparator也是组合的,序列化和比较时会委托给对应的serializers和comparators。以下图展现 一个内嵌型的Tuple3 对象的序列化过程。

能够看出这种序列化方式存储密度是至关紧凑的。其中 int 占4字节,double 占8字节,POJO多个一个字节的header,PojoSerializer只负责将header序列化进去,并委托每一个字段对应的serializer对字段进行序列化。

Flink 的类型系统能够很轻松地扩展出自定义的TypeInformation、Serializer以及Comparator,来提高数据类型在序列化和比较时的性能。

Flink 如何直接操做二进制数据

Flink 提供了如 group、sort、join 等操做,这些操做都须要访问海量数据。这里,咱们以sort为例,这是一个在 Flink 中使用很是频繁的操做。

首先,Flink 会从 MemoryManager 中申请一批 MemorySegment,咱们把这批 MemorySegment 称做 sort buffer,用来存放排序的数据。

咱们会把 sort buffer 分红两块区域。一个区域是用来存放全部对象完整的二进制数据。另外一个区域用来存放指向完整二进制数据的指针以及定长的序列化后的key(key+pointer)。若是须要序列化的key是个变长类型,如String,则会取其前缀序列化。如上图所示,当一个对象要加到 sort buffer 中时,它的二进制数据会被加到第一个区域,指针(可能还有key)会被加到第二个区域。

将实际的数据和指针加定长key分开存放有两个目的。第一,交换定长块(key+pointer)更高效,不用交换真实的数据也不用移动其余key和pointer。第二,这样作是缓存友好的,由于key都是连续存储在内存中的,能够大大减小 cache miss(后面会详细解释)。

排序的关键是比大小和交换。Flink 中,会先用 key 比大小,这样就能够直接用二进制的key比较而不须要反序列化出整个对象。由于key是定长的,因此若是key相同(或者没有提供二进制key),那就必须将真实的二进制数据反序列化出来,而后再作比较。以后,只须要交换key+pointer就能够达到排序的效果,真实的数据不用移动。

最后,访问排序后的数据,能够沿着排好序的key+pointer区域顺序访问,经过pointer找到对应的真实数据,并写到内存或外部(更多细节能够看这篇文章 Joins in Flink)。

缓存友好的数据结构和算法

随着磁盘IO和网络IO愈来愈快,CPU逐渐成为了大数据领域的瓶颈。从 L1/L2/L3 缓存读取数据的速度比从主内存读取数据的速度快好几个量级。经过性能分析能够发现,CPU时间中的很大一部分都是浪费在等待数据从主内存过来上。若是这些数据能够从 L1/L2/L3 缓存过来,那么这些等待时间能够极大地下降,而且全部的算法会所以而受益。

在上面讨论中咱们谈到的,Flink 经过定制的序列化框架将算法中须要操做的数据(如sort中的key)连续存储,而完整数据存储在其余地方。由于对于完整的数据来讲,key+pointer更容易装进缓存,这大大提升了缓存命中率,从而提升了基础算法的效率。这对于上层应用是彻底透明的,能够充分享受缓存友好带来的性能提高。

走向堆外内存

Flink 基于堆内存的内存管理机制已经能够解决不少JVM现存问题了,为何还要引入堆外内存?

  1. 启动超大内存(上百GB)的JVM须要很长时间,GC停留时间也会很长(分钟级)。使用堆外内存的话,能够极大地减少堆内存(只须要分配Remaining Heap那一块),使得 TaskManager 扩展到上百GB内存不是问题。
  2. 高效的 IO 操做。堆外内存在写磁盘或网络传输时是 zero-copy,而堆内存的话,至少须要 copy 一次。
  3. 堆外内存是进程间共享的。也就是说,即便JVM进程崩溃也不会丢失数据。这能够用来作故障恢复(Flink暂时没有利用起这个,不过将来极可能会去作)。

可是强大的东西老是会有其负面的一面,否则为什么你们不都用堆外内存呢。

  1. 堆内存的使用、监控、调试都要简单不少。堆外内存意味着更复杂更麻烦。
  2. Flink 有时须要分配短生命周期的 MemorySegment,这个申请在堆上会更廉价。
  3. 有些操做在堆内存上会快一点点。

Flink用经过ByteBuffer.allocateDirect(numBytes)来申请堆外内存,用 sun.misc.Unsafe 来操做堆外内存。

基于 Flink 优秀的设计,实现堆外内存是很方便的。Flink 将原来的 MemorySegment 变成了抽象类,并生成了两个子类。HeapMemorySegment 和 HybridMemorySegment。从字面意思上也很容易理解,前者是用来分配堆内存的,后者是用来分配堆外内存和堆内存的。是的,你没有看错,后者既能够分配堆外内存又能够分配堆内存。为何要这样设计呢?

首先假设HybridMemorySegment只提供分配堆外内存。在上述堆外内存的不足中的第二点谈到,Flink 有时须要分配短生命周期的 buffer,这些buffer用HeapMemorySegment会更高效。那么当使用堆外内存时,为了也知足堆内存的需求,咱们须要同时加载两个子类。这就涉及到了 JIT 编译优化的问题。由于之前 MemorySegment 是一个单独的 final 类,没有子类。JIT 编译时,全部要调用的方法都是肯定的,全部的方法调用均可以被去虚化(de-virtualized)和内联(inlined),这能够极大地提升性能(MemroySegment的使用至关频繁)。然而若是同时加载两个子类,那么 JIT 编译器就只能在真正运行到的时候才知道是哪一个子类,这样就没法提早作优化。实际测试的性能差距在 2.7 被左右。

Flink 使用了两种方案:

方案1:只能有一种 MemorySegment 实现被加载

代码中全部的短生命周期和长生命周期的MemorySegment都实例化其中一个子类,另外一个子类根本没有实例化过(使用工厂模式来控制)。那么运行一段时间后,JIT 会意识到全部调用的方法都是肯定的,而后会作优化。

方案2:提供一种实现能同时处理堆内存和堆外内存

这就是 HybridMemorySegment 了,能同时处理堆与堆外内存,这样就不须要子类了。这里 Flink 优雅地实现了一份代码能同时操做堆和堆外内存。这主要归功于 sun.misc.Unsafe提供的一系列方法,如getLong方法:

sun.misc.Unsafe.getLong(Object reference, long offset)
  • 若是reference不为空,则会取该对象的地址,加上后面的offset,从相对地址处取出8字节并获得 long。这对应了堆内存的场景。
  • 若是reference为空,则offset就是要操做的绝对地址,从该地址处取出数据。这对应了堆外内存的场景。

这里咱们看下 MemorySegment 及其子类的实现。

public abstract class MemorySegment {
  // 堆内存引用
  protected final byte[] heapMemory;
  // 堆外内存地址
  protected long address;
  
  //堆内存的初始化
  MemorySegment(byte[] buffer, Object owner) {
    //一些先验检查
    ...
    this.heapMemory = buffer;
    this.address = BYTE_ARRAY_BASE_OFFSET;
    ...
  }

  //堆外内存的初始化
  MemorySegment(long offHeapAddress, int size, Object owner) {
    //一些先验检查
    ...
    this.heapMemory = null;
    this.address = offHeapAddress;
    ...
  }
  
  public final long getLong(int index) {
    final long pos = address + index;
    if (index >= 0 && pos <= addressLimit - 8) {
      // 这是咱们关注的地方,使用 Unsafe 来操做 on-heap & off-heap
      return UNSAFE.getLong(heapMemory, pos);
    }
    else if (address > addressLimit) {
      throw new IllegalStateException("segment has been freed");
    }
    else {
      // index is in fact invalid
      throw new IndexOutOfBoundsException();
    }
  }
  ...
}

public final class HeapMemorySegment extends MemorySegment {
  // 指向heapMemory的额外引用,用来如数组越界的检查
  private byte[] memory;
  // 只能初始化堆内存
  HeapMemorySegment(byte[] memory, Object owner) {
    super(Objects.requireNonNull(memory), owner);
    this.memory = memory;
  }
  ...
}

public final class HybridMemorySegment extends MemorySegment {
  private final ByteBuffer offHeapBuffer;
  
  // 堆外内存初始化
  HybridMemorySegment(ByteBuffer buffer, Object owner) {
    super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
    this.offHeapBuffer = buffer;
  }
  
  // 堆内存初始化
  HybridMemorySegment(byte[] buffer, Object owner) {
    super(buffer, owner);
    this.offHeapBuffer = null;
  }
  ...
}

能够发现,HybridMemorySegment 中的不少方法其实都下沉到了父类去实现。包括堆内堆外内存的初始化。MemorySegment 中的 getXXX/putXXX 方法都是调用了 unsafe 方法,能够说MemorySegment已经具备了些 Hybrid 的意思了。HeapMemorySegment只调用了父类的MemorySegment(byte[] buffer, Object owner)方法,也就只能申请堆内存。另外,阅读代码你会发现,许多方法(大量的 getXXX/putXXX)都被标记成了 final,两个子类也是 final 类型,为的也是优化 JIT 编译器,会提醒 JIT 这个方法是能够被去虚化和内联的。

对于堆外内存,使用 HybridMemorySegment 能同时用来表明堆和堆外内存。这样只须要一个类就能表明长生命周期的堆外内存和短生命周期的堆内存。既然HybridMemorySegment已经这么全能,为何还要方案1呢?由于咱们须要工厂模式来保证只有一个子类被加载(为了更高的性能),并且HeapMemorySegment比heap模式的HybridMemorySegment要快。

下方是一些性能测试数据,更详细的数据请参考这篇文章

Segment Time
HeapMemorySegment, exclusive 1,441 msecs
HeapMemorySegment, mixed 3,841 msecs
HybridMemorySegment, heap, exclusive 1,626 msecs
HybridMemorySegment, off-heap, exclusive 1,628 msecs
HybridMemorySegment, heap, mixed 3,848 msecs
HybridMemorySegment, off-heap, mixed 3,847 msecs

总结

本文主要总结了 Flink 面对 JVM 存在的问题,而在内存管理的道路上越走越深。从本身管理内存,到序列化框架,再到堆外内存。其实纵观大数据生态圈,其实会发现各个开源项目都有一样的趋势。好比最近炒的很火热的 Spark Tungsten 项目,与 Flink 在内存管理上的思想是及其类似的。

参考资料

相关文章
相关标签/搜索