现在,许多用于分析大型数据集的开源系统都是用 Java 或者是基于 JVM 的编程语言实现的。最着名的例子是 Apache Hadoop,还有较新的框架,如 Apache Spark、Apache Drill、Apache Flink。基于 JVM 的数据分析引擎面临的一个常见挑战就是如何在内存中存储大量的数据(包括缓存和高效处理)。合理的管理好 JVM 内存能够将 难以配置且不可预测的系统 与 少许配置且稳定运行的系统区分开来。html
在这篇文章中,咱们将讨论 Apache Flink 如何管理内存,讨论其自定义序列化与反序列化机制,以及它是如何操做二进制数据的。java
在 JVM 中处理大量数据最直接的方式就是将这些数据作为对象存储在堆内存中,而后直接在内存中操做这些数据,若是想进行排序则就是对对象列表进行排序。然而这种方法有一些明显的缺点,首先,在频繁的建立和销毁大量对象的时候,监视和控制堆内存的使用并非一件很简单的事情。若是对象分配过多的话,那么会致使内存过分使用,从而触发 OutOfMemoryError,致使 JVM 进程直接被杀死。另外一个方面就是由于这些对象大都是生存在新生代,当 JVM 进行垃圾回收时,垃圾收集的开销很容易达到 50% 甚至更多。最后就是 Java 对象具备必定的空间开销(具体取决于 JVM 和平台)。对于具备许多小对象的数据集,这能够显著减小有效可用的内存量。若是你精通系统设计和系统调优,你能够根据系统进行特定的参数调整,能够或多或少的控制出现 OutOfMemoryError 的次数和避免堆内存的过多使用,可是这种设置和调优的做用有限,尤为是在数据量较大和执行环境发生变化的状况下。git
Apache Flink 起源于一个研究项目,该项目旨在结合基于 MapReduce 的系统和并行数据库系统的最佳技术。在此背景下,Flink 一直有本身的内存数据处理方法。Flink 将对象序列化为固定数量的预先分配的内存段,而不是直接把对象放在堆内存上。它的 DBMS 风格的排序和链接算法尽量多地对这个二进制数据进行操做,以此将序列化和反序列化开销降到最低。若是须要处理的数据多于能够保存在内存中的数据,Flink 的运算符会将部分数据溢出到磁盘。事实上,不少Flink 的内部实现看起来更像是 C / C ++,而不是普通的 Java。下图概述了 Flink 如何在内存段中存储序列化数据并在必要时溢出到磁盘:github
Flink 的主动内存管理和操做二进制数据有几个好处:算法
一、内存安全执行和高效的核外算法 因为分配的内存段的数量是固定的,所以监控剩余的内存资源是很是简单的。在内存不足的状况下,处理操做符能够有效地将更大批的内存段写入磁盘,后面再将它们读回到内存。所以,OutOfMemoryError 就有效的防止了。sql
二、减小垃圾收集压力 由于全部长生命周期的数据都是在 Flink 的管理内存中以二进制表示的,因此全部数据对象都是短暂的,甚至是可变的,而且能够重用。短生命周期的对象能够更有效地进行垃圾收集,这大大下降了垃圾收集的压力。如今,预先分配的内存段是 JVM 堆上的长期存在的对象,为了下降垃圾收集的压力,Flink 社区正在积极地将其分配到堆外内存。这种努力将使得 JVM 堆变得更小,垃圾收集所消耗的时间将更少。数据库
三、节省空间的数据存储 Java 对象具备存储开销,若是数据以二进制的形式存储,则能够避免这种开销。apache
四、高效的二进制操做和缓存敏感性 在给定合适的二进制表示的状况下,能够有效地比较和操做二进制数据。此外,二进制表示能够将相关值、哈希码、键和指针等相邻地存储在内存中。这使得数据结构一般具备更高效的缓存访问模式。编程
主动内存管理的这些特性在用于大规模数据分析的数据处理系统中是很是可取的,可是要实现这些功能的代价也是高昂的。要实现对二进制数据的自动内存管理和操做并不是易事,使用 java.util.HashMap
比实现一个可溢出的 hash-table
(由字节数组和自定义序列化支持)。固然,Apache Flink 并非惟一一个基于 JVM 且对二进制数据进行操做的数据处理系统。例如 Apache Drill、Apache Ignite、Apache Geode 也有应用相似技术,最近 Apache Spark 也宣布将向这个方向演进。数组
下面咱们将详细讨论 Flink 如何分配内存、若是对对象进行序列化和反序列化以及若是对二进制数据进行操做。咱们还将经过一些性能表现数据来比较处理堆内存上的对象和对二进制数据的操做。
Flink TaskManager 是由几个内部组件组成的:actor 系统(负责与 Flink master 协调)、IOManager(负责将数据溢出到磁盘并将其读取回来)、MemoryManager(负责协调内存使用)。在本篇文章中,咱们主要讲解 MemoryManager。
MemoryManager 负责将 MemorySegments 分配、计算和分发给数据处理操做符,例如 sort 和 join 等操做符。MemorySegment 是 Flink 的内存分配单元,由常规 Java 字节数组支持(默认大小为 32 KB)。MemorySegment 经过使用 Java 的 unsafe 方法对其支持的字节数组提供很是有效的读写访问。你能够将 MemorySegment 看做是 Java 的 NIO ByteBuffer 的定制版本。为了在更大的连续内存块上操做多个 MemorySegment,Flink 使用了实现 Java 的 java.io.DataOutput 和 java.io.DataInput 接口的逻辑视图。
MemorySegments 在 TaskManager 启动时分配一次,并在 TaskManager 关闭时销毁。所以,在 TaskManager 的整个生命周期中,MemorySegment 是重用的,而不会被垃圾收集的。在初始化 TaskManager 的全部内部数据结构而且已启动全部核心服务以后,MemoryManager 开始建立 MemorySegments。默认状况下,服务初始化后,70% 可用的 JVM 堆内存由 MemoryManager 分配(也能够配置所有)。剩余的 JVM 堆内存用于在任务处理期间实例化的对象,包括由用户定义的函数建立的对象。下图显示了启动后 TaskManager JVM 中的内存分布:
Java 生态系统提供了几个库,能够将对象转换为二进制表示形式并返回。常见的替代方案是标准 Java 序列化,Kryo,Apache Avro,Apache Thrift 或 Google 的 Protobuf。Flink 包含本身的自定义序列化框架,以便控制数据的二进制表示。这一点很重要,由于对二进制数据进行操做须要对序列化布局有准确的了解。此外,根据在二进制数据上执行的操做配置序列化布局能够显著提高性能。Flink 的序列化机制利用了这一特性,即在执行程序以前,要序列化和反序列化的对象的类型是彻底已知的。
Flink 程序能够处理表示为任意 Java 或 Scala 对象的数据。在优化程序以前,须要识别程序数据流的每一个处理步骤中的数据类型。对于 Java 程序,Flink 提供了一个基于反射的类型提取组件,用于分析用户定义函数的返回类型。Scala 程序能够在 Scala 编译器的帮助下进行分析。Flink 使用 TypeInformation 表示每种数据类型。
注:该图选自董伟柯的文章《Apache Flink 类型和序列化机制简介》,侵删
Flink 有以下几种数据类型的 TypeInformations:
注:该图选自董伟柯的文章《Apache Flink 类型和序列化机制简介》,侵删
每一个 TypeInformation 都为它所表明的数据类型提供了一个序列化器。例如,BasicTypeInfo 返回一个序列化器,该序列化器写入相应的基本类型;WritableTypeInfo 的序列化器将序列化和反序列化委托给实现 Hadoop 的 Writable 接口的对象的 write() 和 readFields() 方法;GenericTypeInfo 返回一个序列化器,该序列化器将序列化委托给 Kryo。对象将自动经过 Java 中高效的 Unsafe 方法来序列化到 Flink MemorySegments 支持的 DataOutput。对于可用做键的数据类型,例如哈希值,TypeInformation 提供了 TypeComparators,TypeComparators 比较和哈希对象,而且能够根据具体的数据类型有效的比较二进制并提取固定长度的二进制 key 前缀。
Tuple,Pojo 和 CaseClass 类型是复合类型,它们可能嵌套一个或者多个数据类型。所以,它们的序列化和比较也都比较复杂,通常将其成员数据类型的序列化和比较都交给各自的 Serializers(序列化器) 和 Comparators(比较器)。下图说明了 Tuple3
对象的序列化,其中Person
是 POJO 并定义以下:
public class Person {
public int id;
public String name;
}复制代码
经过提供定制的 TypeInformations、Serializers(序列化器) 和 Comparators(比较器),能够方便地扩展 Flink 的类型系统,从而提升序列化和比较自定义数据类型的性能。
与其余的数据处理框架的 API(包括 SQL)相似,Flink 的 API 也提供了对数据集进行分组、排序和链接等转换操做。这些转换操做的数据集可能很是大。关系数据库系统具备很是高效的算法,好比 merge-sort、merge-join 和 hash-join。Flink 创建在这种技术的基础上,可是主要分为使用自定义序列化和自定义比较器来处理任意对象。在下面文章中咱们将经过 Flink 的内存排序算法示例演示 Flink 如何使用二进制数据进行操做。
Flink 为其数据处理操做符预先分配内存,初始化时,排序算法从 MemoryManager 请求内存预算,并接收一组相应的 MemorySegments。这些 MemorySegments 变成了缓冲区的内存池,缓冲区中收集要排序的数据。下图说明了如何将数据对象序列化到排序缓冲区中:
排序缓冲区在内部分为两个内存区域:第一个区域保存全部对象的完整二进制数据,第二个区域包含指向完整二进制对象数据的指针(取决于 key 的数据类型)。将对象添加到排序缓冲区时,它的二进制数据会追加到第一个区域,指针(可能还有一个 key)被追加到第二个区域。分离实际数据和指针以及固定长度的 key 有两个目的:它能够有效的交换固定长度的 entries(key 和指针),还能够减小排序时须要移动的数据。若是排序的 key 是可变长度的数据类型(好比 String),则固定长度的排序 key 必须是前缀 key,好比字符串的前 n 个字符。请注意:并不是全部数据类型都提供固定长度的前缀排序 key。将对象序列化到排序缓冲区时,两个内存区域都使用内存池中的 MemorySegments 进行扩展。一旦内存池为空且不能再添加对象时,则排序缓冲区将会被彻底填充并能够进行排序。Flink 的排序缓冲区提供了比较和交换元素的方法,这使得实际的排序算法是可插拔的。默认状况下, Flink 使用了 Quicksort(快速排序)实现,可使用 HeapSort(堆排序)。下图显示了如何比较两个对象:
排序缓冲区经过比较它们的二进制固定长度排序 key 来比较两个元素。若是元素的完整 key(不是前缀 key) 或者二进制前缀 key 不相等,则表明比较成功。若是前缀 key 相等(或者排序 key 的数据类型不提供二进制前缀 key),则排序缓冲区遵循指向实际对象数据的指针,对两个对象进行反序列化并比较对象。根据比较结果,排序算法决定是否交换比较的元素。排序缓冲区经过移动其固定长度 key 和指针来交换两个元素,实际数据不会移动,排序算法完成后,排序缓冲区中的指针被正确排序。下图演示了如何从排序缓冲区返回已排序的数据:
经过顺序读取排序缓冲区的指针区域,跳过排序 key 并按照实际数据的排序指针返回排序数据。此数据要么反序列化并做为对象返回,要么在外部合并排序的状况下复制二进制数据并将其写入磁盘。
那么,对二进制数据进行操做对性能意味着什么?咱们将运行一个基准测试,对 1000 万个Tuple2
对象进行排序以找出答案。整数字段的值从均匀分布中采样。String 字段值的长度为 12 个字符,并从长尾分布中进行采样。输入数据由返回可变对象的迭代器提供,即返回具备不一样字段值的相同 Tuple 对象实例。Flink 在从内存,网络或磁盘读取数据时使用此技术,以免没必要要的对象实例化。基准测试在具备 900 MB 堆大小的 JVM 中运行,在堆上存储和排序 1000 万个 Tuple 对象而且不会致使触发 OutOfMemoryError 大约须要这么大的内存。咱们使用三种排序方法在Integer 字段和 String 字段上对 Tuple 对象进行排序:
一、对象存在堆中:Tuple 对象存储在经常使用的 java.util.ArrayList
中,初始容量设置为 1000 万,并使用 Java 中经常使用的集合排序进行排序。
三、Kryo 序列化:使用 Kryo 序列化将 Tuple 字段序列化为 600 MB 大小的排序缓冲区,并在没有二进制排序 key 的状况下进行排序。这意味着每次比较须要对两个对象进行反序列化。
全部排序方法都使用单线程实现。结果的时间是十次运行结果的平均值。在每次运行以后,咱们调用System.gc()
请求垃圾收集运行,该运行不会进入测量的执行时间。下图显示了将输入数据存储在内存中,对其进行排序并将其做为对象读回的时间。
咱们看到 Flink 使用本身的序列化器对二进制数据进行排序明显优于其余两种方法。与存储在堆内存上相比,咱们看到将数据加载到内存中要快得多。由于咱们其实是在收集对象,没有机会重用对象实例,但必须从新建立每一个 Tuple。这比 Flink 的序列化器(或Kryo序列化)效率低。另外一方面,与反序列化相比,从堆中读取对象是无性能消耗的。在咱们的基准测试中,对象克隆比序列化和反序列化组合更耗性能。查看排序时间,咱们看到对二进制数据的排序也比 Java 的集合排序更快。使用没有二进制排序 key 的 Kryo 序列化的数据排序比其余方法慢得多。这是由于反序列化带来很大的开销。在String 字段上对 Tuple 进行排序比在 Integer 字段上排序更快,由于长尾值分布显着减小了成对比较的数量。为了更好地了解排序过程当中发生的情况,咱们使用 VisualVM 监控执行的 JVM。如下截图显示了执行 10次 运行时的堆内存使用状况、垃圾收集状况和 CPU 使用状况。
测试是在 8 核机器上运行单线程,所以一个核心的彻底利用仅对应 12.5% 的整体利用率。截图显示,对二进制数据进行操做可显著减小垃圾回收活动。对于对象存在堆中,垃圾收集器在排序缓冲区被填满时以很是短的时间间隔运行,而且即便对于单个处理线程也会致使大量 CPU 使用(排序自己不会触发垃圾收集器)。JVM 垃圾收集多个并行线程,解释了高CPU 整体利用率。另外一方面,对序列化数据进行操做的方法不多触发垃圾收集器而且 CPU 利用率低得多。实际上,若是使用 Flink 序列化的方式在 Integer 字段上对 Tuple 进行排序,则垃圾收集器根本不运行,由于对于成对比较,不须要反序列化任何对象。Kryo 序列化须要比较多的垃圾收集,由于它不使用二进制排序 key 而且每次排序都要反序列化两个对象。
内存使用状况上图显示 Flink 序列化和 Kryo 序列化不断的占用大量内存
存使用状况图表显示flink-serialized和kryo-serialized不断占用大量内存。这是因为 MemorySegments 的预分配。实际内存使用率要低得多,由于排序缓冲区并未彻底填充。下表显示了每种方法的内存消耗。1000 万条数据产生大约 280 MB 的二进制数据(对象数据、指针和排序 key),具体取决于使用的序列化程序以及二进制排序 key 的存在和大小。将其与数据存储在堆上的方法进行比较,咱们发现对二进制数据进行操做能够显著提升内存效率。在咱们的基准测试中,若是序列化为排序缓冲区而不是将其做为堆上的对象保存,则能够在内存中对两倍以上的数据进行排序。
占用内存 |
对象存在堆中 |
Flink 序列化 |
Kryo 序列化 |
---|---|---|---|
对 Integer 排序 | 约 700 MB(堆内存) | 277 MB(排序缓冲区) | 266 MB(排序缓冲区) |
对 String 排序 |
约 700 MB(堆内存) | 315 MB(排序缓冲区) | 266 MB(排序缓冲区) |
总而言之,测试验证了文章前面说的对二进制数据进行操做的好处。
Apache Flink 具备至关多的高级技术,能够经过有限的内存资源安全有效地处理大量数据。可是有几点可使 Flink 更有效率。Flink 社区正在努力将管理内存移动到堆外内存。这将容许更小的 JVM,更低的垃圾收集开销,以及更容易的系统配置。使用 Flink 的 Table API,全部操做(如 aggregation 和 projection)的语义都是已知的(与黑盒用户定义的函数相反)。所以,咱们能够为直接对二进制数据进行操做的 Table API 操做生成代码。进一步的改进包括序列化设计,这些设计针对应用于二进制数据的操做和针对序列化器和比较器的代码生成而定制。
本文地址: www.54tianzhisheng.cn/2019/03/24/…
本文翻译自:https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
翻译:zhisheng,二次转载请注明地址,不然保留追究法律责任。
微信公众号:zhisheng
另外我本身整理了些 Flink 的学习资料,目前已经所有放到微信公众号了。你能够加个人微信:zhisheng_tian,而后回复关键字:Flink 便可无条件获取到。
更多私密资料请加入知识星球!
之后这个项目的全部代码都将放在这个仓库里,包含了本身学习 flink 的一些 demo 和博客。
一、Flink 从0到1学习 —— Apache Flink 介绍
二、Flink 从0到1学习 —— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
三、Flink 从0到1学习 —— Flink 配置文件详解
四、Flink 从0到1学习 —— Data Source 介绍
五、Flink 从0到1学习 —— 如何自定义 Data Source ?
六、Flink 从0到1学习 —— Data Sink 介绍
七、Flink 从0到1学习 —— 如何自定义 Data Sink ?
八、Flink 从0到1学习 —— Flink Data transformation(转换)
九、Flink 从0到1学习 —— 介绍 Flink 中的 Stream Windows
十、Flink 从0到1学习 —— Flink 中的几种 Time 详解
十一、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 ElasticSearch
十二、Flink 从0到1学习 —— Flink 项目如何运行?
1三、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Kafka
1四、Flink 从0到1学习 —— Flink JobManager 高可用性配置
1五、Flink 从0到1学习 —— Flink parallelism 和 Slot 介绍
1六、Flink 从0到1学习 —— Flink 读取 Kafka 数据批量写入到 MySQL
1七、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RabbitMQ
1八、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HBase
1九、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HDFS
20、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Redis
2一、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Cassandra
2二、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Flume
2三、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 InfluxDB
2四、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RocketMQ
2五、Flink 从0到1学习 —— 你上传的 jar 包藏到哪里去了
2六、Flink 从0到1学习 —— 你的 Flink job 日志跑到哪里去了
2八、Flink 从0到1学习 —— Flink 中如何管理配置?
2九、Flink 从0到1学习—— Flink 不能够连续 Split(分流)?
30、Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文
3二、为何说流处理即将来?
3三、OPPO 数据中台之基石:基于 Flink SQL 构建实时数据仓库
3六、Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理
3八、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了
40、Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)
四、Flink 源码解析 —— standalone session 模式启动流程
五、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Job Manager 启动
六、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Task Manager 启动
七、Flink 源码解析 —— 分析 Batch WordCount 程序的执行过程
八、Flink 源码解析 —— 分析 Streaming WordCount 程序的执行过程
九、Flink 源码解析 —— 如何获取 JobGraph?
十、Flink 源码解析 —— 如何获取 StreamGraph?
十一、Flink 源码解析 —— Flink JobManager 有什么做用?
十二、Flink 源码解析 —— Flink TaskManager 有什么做用?
1三、Flink 源码解析 —— JobManager 处理 SubmitJob 的过程
1四、Flink 源码解析 —— TaskManager 处理 SubmitJob 的过程
1五、Flink 源码解析 —— 深度解析 Flink Checkpoint 机制
1六、Flink 源码解析 —— 深度解析 Flink 序列化机制
1七、Flink 源码解析 —— 深度解析 Flink 是如何管理好内存的?
1八、Flink Metrics 源码解析 —— Flink-metrics-core
1九、Flink Metrics 源码解析 —— Flink-metrics-datadog
20、Flink Metrics 源码解析 —— Flink-metrics-dropwizard
2一、Flink Metrics 源码解析 —— Flink-metrics-graphite
2二、Flink Metrics 源码解析 —— Flink-metrics-influxdb
2三、Flink Metrics 源码解析 —— Flink-metrics-jmx
2四、Flink Metrics 源码解析 —— Flink-metrics-slf4j
2五、Flink Metrics 源码解析 —— Flink-metrics-statsd
2六、Flink Metrics 源码解析 —— Flink-metrics-prometheus
2七、Flink 源码解析 —— 如何获取 ExecutionGraph ?
30、Flink Clients 源码解析原文出处:zhisheng的博客,欢迎关注个人公众号:zhisheng